-
-
Notifications
You must be signed in to change notification settings - Fork 738
Retry #2264
Copy link
Copy link
Closed
Labels
enhancementNew feature or requestNew feature or request
Description
Similar to #2256
Sharing some internal code we have been using in case someone wants to work on it and add it as a util in undici.
const assert = require('node:assert')
const stream = require('node:stream')
const { isReadableNodeStream } = require('../stream')
const { parseHeaders } = require('../http')
const createError = require('http-errors')
function parseRange(range) {
const m = range ? range.match(/^bytes=(\d+)-(\d+)?$/) : null
return m ? { start: parseInt(m[1]), end: m[2] ? parseInt(m[2]) : null } : null
}
module.exports = class RetryHandler {
/** @type {Object} */ opts
/** @type {Object} */ dispatcher
/** @type {Error | undefined} */ retryError
/** @type {Number} */ retryCount = 0
/** @type {Number} */ retryMax
/** @type {Array<Number>} */ retryCode
/** @type {Array<String>} */ retryMessage
/** @type {Array<Number>} */ retryStatus
/** @type {Array<String>} */ retryMethod
/** @type {Boolean} */ idempotent
/** @type {Number} */ position = 0
/** @type {NodeJS.Timeout | null} */ timeout
/** @type {{start: Number, end: Number | null} | null} */ range
constructor (opts, { dispatcher, handler }) {
const range = opts.headers?.range ?? opts.headers?.range
this.dispatcher = dispatcher
this.handler = handler
this.opts = opts
this.statusCode = 0
this.idempotent = opts.idempotent
const {
count: retryMax = 8,
method: retryMethod = ['GET', 'HEAD', 'OPTIONS', 'PUT', 'DELETE', 'TRACE', 'PATCH'],
status: retryStatus = [420, 429, 502, 503, 504],
code: retryCode = [
'ECONNRESET',
'ECONNREFUSED',
'ENOTFOUND',
'ENETDOWN',
'ENETUNREACH',
'EHOSTDOWN',
'EHOSTUNREACH',
'EPIPE',
],
message: retryMessage = ['other side closed'],
} = opts.retry ?? {}
this.retryMax = retryMax
this.retryCount = retryCode
this.retryMessage = retryMessage
this.retryStatus = retryStatus
this.retryMethod = retryMethod
this.range = opts.method === 'GET'
? range
? parseRange(range)
: { start: 0, end: null }
: null
}
onConnect(abort) {
this.handler.onConnect(err => {
if (this.timeout) {
clearTimeout(this.timeout)
}
abort(err)
})
}
onHeaders(statusCode, rawHeaders, resume, statusMessage) {
if (statusCode >= 300) {
throw createError(statusCode, { headers: parseHeaders(rawHeaders) })
}
let contentLength
for (let i = 0; i < rawHeaders.length; i += 2) {
const key = rawHeaders[i].toString()
if (key.length === 'content-length'.length && key.toLowerCase() === 'content-length') {
contentLength = rawHeaders[key]
}
}
if (statusCode === 200 && this.range?.end === null && contentLength) {
this.range.end = Number(contentLength)
assert(Number.isFinite(this.range.end), 'invalid content-length')
}
if (this.statusCode) {
assert(this.retryError)
if (statusCode === 200) {
// TODO (feat): Resume 200 if distance is not too far?
throw this.retryError
} else if (statusCode === 206) {
// TODO (fix): Check content-range.
return true // TODO (fix): return false if body is full...
} else {
throw this.retryError
}
} else {
this.statusCode = statusCode
return this.handler.onHeaders(statusCode, rawHeaders, resume, statusMessage)
}
}
onData(chunk) {
this.position += chunk.length
return this.handler.onData(chunk)
}
onComplete(rawTrailers) {
return this.handler.onComplete(rawTrailers)
}
onError(err) {
if (this.statusCode) {
if (this.range && this.retryCount < this.retryMax && this.opts.methid === 'GET') {
// this.logger?.warn({ err, retryCount: this.retryCount }, 'upstream response retrying')
this.retryError = err
this.retryCount += 1
this.dispatcher.dispatch(
{
...this.opts,
headers: {
...this.opts.headers,
range: `bytes=${this.range.start + this.position}-${
this.range.end ? this.range.end : ''
}`,
},
},
this,
)
} else {
return this.handler.onError(err)
}
} else if (
err.name !== 'AbortError' &&
this.retryCount < this.retryMax &&
(this.opts.body == null ||
typeof this.opts.body === 'string' ||
Buffer.isBuffer(this.opts.body) ||
// @ts-ignore: isDisturbed is not in typedefs
(isReadableNodeStream(this.opts.body) && !stream.isDisturbed(this.opts.body))) &&
(this.idempotent || this.retryMethod.includes(this.opts.method)) &&
(this.retryCode.includes(err.code) ||
this.retryMessage.includes(err.message) ||
this.retryStatus.includes(err.statusCode))
) {
const delay =
parseInt(err.headers?.['Retry-After']) * 1e3 ||
Math.min(10e3, this.retryCount * 1e3 + 1e3)
// this.logger?.warn({ err, retryCount: this.retryCount, delay }, 'upstream request retrying')
this.retryError = err
this.retryCount += 1
this.timeout = setTimeout(() => {
this.timeout = null
this.dispatcher.dispatch(this.opts, this)
}, delay).unref()
} else {
// this.logger?.error({ err }, 'upstream request failed')
this.handler.onError(err)
}
}
}Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request