Skip to content

Retry #2264

@ronag

Description

@ronag

Similar to #2256

Sharing some internal code we have been using in case someone wants to work on it and add it as a util in undici.

const assert = require('node:assert')
const stream = require('node:stream')
const { isReadableNodeStream } = require('../stream')
const { parseHeaders } = require('../http')
const createError = require('http-errors')

function parseRange(range) {
  const m = range ? range.match(/^bytes=(\d+)-(\d+)?$/) : null
  return m ? { start: parseInt(m[1]), end: m[2] ? parseInt(m[2]) : null } : null
}

module.exports = class RetryHandler {
  /** @type {Object} */ opts
  /** @type {Object} */ dispatcher
  /** @type {Error | undefined} */ retryError
  /** @type {Number} */ retryCount = 0
  /** @type {Number} */ retryMax
  /** @type {Array<Number>} */ retryCode
  /** @type {Array<String>} */ retryMessage
  /** @type {Array<Number>} */ retryStatus
  /** @type {Array<String>} */ retryMethod
  /** @type {Boolean} */ idempotent
  /** @type {Number} */ position = 0
  /** @type {NodeJS.Timeout | null} */ timeout
  /** @type {{start: Number, end: Number | null} | null} */ range

  constructor (opts, { dispatcher, handler }) {
    const range = opts.headers?.range ?? opts.headers?.range

    this.dispatcher = dispatcher
    this.handler = handler
    this.opts = opts
    this.statusCode = 0
    this.idempotent = opts.idempotent

    const {
      count: retryMax = 8,
      method: retryMethod = ['GET', 'HEAD', 'OPTIONS', 'PUT', 'DELETE', 'TRACE', 'PATCH'],
      status: retryStatus = [420, 429, 502, 503, 504],
      code: retryCode = [
        'ECONNRESET',
        'ECONNREFUSED',
        'ENOTFOUND',
        'ENETDOWN',
        'ENETUNREACH',
        'EHOSTDOWN',
        'EHOSTUNREACH',
        'EPIPE',
      ],
      message: retryMessage = ['other side closed'],
    } = opts.retry ?? {}

    this.retryMax = retryMax
    this.retryCount = retryCode
    this.retryMessage = retryMessage
    this.retryStatus = retryStatus
    this.retryMethod = retryMethod

    this.range = opts.method === 'GET'
      ? range
        ? parseRange(range)
        : { start: 0, end: null }
      : null
  }

  onConnect(abort) {
    this.handler.onConnect(err => {
      if (this.timeout) {
        clearTimeout(this.timeout)
      }
      abort(err)
    })
  }

  onHeaders(statusCode, rawHeaders, resume, statusMessage) {
    if (statusCode >= 300) {
      throw createError(statusCode, { headers: parseHeaders(rawHeaders) })
    }

    let contentLength
    for (let i = 0; i < rawHeaders.length; i += 2) {
      const key = rawHeaders[i].toString()
      if (key.length === 'content-length'.length && key.toLowerCase() === 'content-length') {
        contentLength = rawHeaders[key]
      }
    }

    if (statusCode === 200 && this.range?.end === null && contentLength) {
      this.range.end = Number(contentLength)
      assert(Number.isFinite(this.range.end), 'invalid content-length')
    }

    if (this.statusCode) {
      assert(this.retryError)

      if (statusCode === 200) {
        // TODO (feat): Resume 200 if distance is not too far?
        throw this.retryError
      } else if (statusCode === 206) {
        // TODO (fix): Check content-range.
        return true // TODO (fix): return false if body is full...
      } else {
        throw this.retryError
      }
    } else {
      this.statusCode = statusCode
      return this.handler.onHeaders(statusCode, rawHeaders, resume, statusMessage)
    }
  }

  onData(chunk) {
    this.position += chunk.length
    return this.handler.onData(chunk)
  }

  onComplete(rawTrailers) {
    return this.handler.onComplete(rawTrailers)
  }

  onError(err) {
    if (this.statusCode) {
      if (this.range && this.retryCount < this.retryMax && this.opts.methid === 'GET') {
        // this.logger?.warn({ err, retryCount: this.retryCount }, 'upstream response retrying')

        this.retryError = err
        this.retryCount += 1

        this.dispatcher.dispatch(
          {
            ...this.opts,
            headers: {
              ...this.opts.headers,
              range: `bytes=${this.range.start + this.position}-${
                this.range.end ? this.range.end : ''
              }`,
            },
          },
          this,
        )
      } else {
        return this.handler.onError(err)
      }
    } else if (
      err.name !== 'AbortError' &&
      this.retryCount < this.retryMax &&
      (this.opts.body == null ||
        typeof this.opts.body === 'string' ||
        Buffer.isBuffer(this.opts.body) ||
        // @ts-ignore: isDisturbed is not in typedefs
        (isReadableNodeStream(this.opts.body) && !stream.isDisturbed(this.opts.body))) &&
      (this.idempotent || this.retryMethod.includes(this.opts.method)) &&
      (this.retryCode.includes(err.code) ||
        this.retryMessage.includes(err.message) ||
        this.retryStatus.includes(err.statusCode))
    ) {
      const delay =
        parseInt(err.headers?.['Retry-After']) * 1e3 ||
        Math.min(10e3, this.retryCount * 1e3 + 1e3)

      // this.logger?.warn({ err, retryCount: this.retryCount, delay }, 'upstream request retrying')

      this.retryError = err
      this.retryCount += 1

      this.timeout = setTimeout(() => {
        this.timeout = null
        this.dispatcher.dispatch(this.opts, this)
      }, delay).unref()
    } else {
      // this.logger?.error({ err }, 'upstream request failed')
      this.handler.onError(err)
    }
  }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions