Skip to content

Commit 5f11247

Browse files
authored
fix: retry on body support (#3294)
* test: add testing * refactor: enhance body wrapping * fix: do not mutate original opts * docs: extend documentation
1 parent 18af4b0 commit 5f11247

5 files changed

Lines changed: 281 additions & 9 deletions

File tree

docs/docs/api/RetryHandler.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ It represents the retry state for a given request.
4646
- **dispatch** `(options: Dispatch.DispatchOptions, handlers: Dispatch.DispatchHandlers) => Promise<Dispatch.DispatchResponse>` (required) - Dispatch function to be called after every retry.
4747
- **handler** Extends [`Dispatch.DispatchHandlers`](Dispatcher.md#dispatcherdispatchoptions-handler) (required) - Handler function to be called after the request is successful or the retries are exhausted.
4848

49+
>__Note__: The `RetryHandler` does not retry over stateful bodies (e.g. streams, AsyncIterable) as those, once consumed, are left in an state that cannot be reutilized. For these situations the `RetryHandler` will identify
50+
>the body as stateful and will not retry the request rejecting with the error `UND_ERR_REQ_RETRY`.
51+
4952
Examples:
5053

5154
```js

lib/core/symbols.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ module.exports = {
2020
kHost: Symbol('host'),
2121
kNoRef: Symbol('no ref'),
2222
kBodyUsed: Symbol('used'),
23+
kBody: Symbol('abstracted request body'),
2324
kRunning: Symbol('running'),
2425
kBlocking: Symbol('blocking'),
2526
kPending: Symbol('pending'),

lib/core/util.js

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,72 @@
11
'use strict'
22

33
const assert = require('node:assert')
4-
const { kDestroyed, kBodyUsed, kListeners } = require('./symbols')
4+
const { kDestroyed, kBodyUsed, kListeners, kBody } = require('./symbols')
55
const { IncomingMessage } = require('node:http')
66
const stream = require('node:stream')
77
const net = require('node:net')
8-
const { InvalidArgumentError } = require('./errors')
98
const { Blob } = require('node:buffer')
109
const nodeUtil = require('node:util')
1110
const { stringify } = require('node:querystring')
11+
const { EventEmitter: EE } = require('node:events')
12+
const { InvalidArgumentError } = require('./errors')
1213
const { headerNameLowerCasedRecord } = require('./constants')
1314
const { tree } = require('./tree')
1415

1516
const [nodeMajor, nodeMinor] = process.versions.node.split('.').map(v => Number(v))
1617

18+
class BodyAsyncIterable {
19+
constructor (body) {
20+
this[kBody] = body
21+
this[kBodyUsed] = false
22+
}
23+
24+
async * [Symbol.asyncIterator] () {
25+
assert(!this[kBodyUsed], 'disturbed')
26+
this[kBodyUsed] = true
27+
yield * this[kBody]
28+
}
29+
}
30+
31+
function wrapRequestBody (body) {
32+
if (isStream(body)) {
33+
// TODO (fix): Provide some way for the user to cache the file to e.g. /tmp
34+
// so that it can be dispatched again?
35+
// TODO (fix): Do we need 100-expect support to provide a way to do this properly?
36+
if (bodyLength(body) === 0) {
37+
body
38+
.on('data', function () {
39+
assert(false)
40+
})
41+
}
42+
43+
if (typeof body.readableDidRead !== 'boolean') {
44+
body[kBodyUsed] = false
45+
EE.prototype.on.call(body, 'data', function () {
46+
this[kBodyUsed] = true
47+
})
48+
}
49+
50+
return body
51+
} else if (body && typeof body.pipeTo === 'function') {
52+
// TODO (fix): We can't access ReadableStream internal state
53+
// to determine whether or not it has been disturbed. This is just
54+
// a workaround.
55+
return new BodyAsyncIterable(body)
56+
} else if (
57+
body &&
58+
typeof body !== 'string' &&
59+
!ArrayBuffer.isView(body) &&
60+
isIterable(body)
61+
) {
62+
// TODO: Should we allow re-using iterable if !this.opts.idempotent
63+
// or through some other flag?
64+
return new BodyAsyncIterable(body)
65+
} else {
66+
return body
67+
}
68+
}
69+
1770
function nop () {}
1871

1972
function isStream (obj) {
@@ -634,5 +687,6 @@ module.exports = {
634687
isHttpOrHttpsPrefixed,
635688
nodeMajor,
636689
nodeMinor,
637-
safeHTTPMethods: ['GET', 'HEAD', 'OPTIONS', 'TRACE']
690+
safeHTTPMethods: ['GET', 'HEAD', 'OPTIONS', 'TRACE'],
691+
wrapRequestBody
638692
}

lib/handler/retry-handler.js

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,12 @@ const assert = require('node:assert')
33

44
const { kRetryHandlerDefaultRetry } = require('../core/symbols')
55
const { RequestRetryError } = require('../core/errors')
6-
const { isDisturbed, parseHeaders, parseRangeHeader } = require('../core/util')
6+
const {
7+
isDisturbed,
8+
parseHeaders,
9+
parseRangeHeader,
10+
wrapRequestBody
11+
} = require('../core/util')
712

813
function calculateRetryAfterHeader (retryAfter) {
914
const current = Date.now()
@@ -29,7 +34,7 @@ class RetryHandler {
2934

3035
this.dispatch = handlers.dispatch
3136
this.handler = handlers.handler
32-
this.opts = dispatchOpts
37+
this.opts = { ...dispatchOpts, body: wrapRequestBody(opts.body) }
3338
this.abort = null
3439
this.aborted = false
3540
this.retryOpts = {
@@ -174,7 +179,9 @@ class RetryHandler {
174179
this.abort(
175180
new RequestRetryError('Request failed', statusCode, {
176181
headers,
177-
count: this.retryCount
182+
data: {
183+
count: this.retryCount
184+
}
178185
})
179186
)
180187
return false
@@ -278,7 +285,7 @@ class RetryHandler {
278285

279286
const err = new RequestRetryError('Request failed', statusCode, {
280287
headers,
281-
count: this.retryCount
288+
data: { count: this.retryCount }
282289
})
283290

284291
this.abort(err)

test/retry-handler.js

Lines changed: 209 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ const { tspl } = require('@matteo.collina/tspl')
44
const { test, after } = require('node:test')
55
const { createServer } = require('node:http')
66
const { once } = require('node:events')
7+
const { Readable } = require('node:stream')
78

89
const { RetryHandler, Client } = require('..')
910
const { RequestHandler } = require('../lib/api/api-request')
@@ -204,6 +205,74 @@ test('Should account for network and response errors', async t => {
204205
await t.completed
205206
})
206207

208+
test('Issue #3288 - request with body (asynciterable)', async t => {
209+
t = tspl(t, { plan: 6 })
210+
const server = createServer()
211+
const dispatchOptions = {
212+
method: 'POST',
213+
path: '/',
214+
headers: {
215+
'content-type': 'application/json'
216+
},
217+
body: (function * () {
218+
yield 'hello'
219+
yield 'world'
220+
})()
221+
}
222+
223+
server.on('request', (req, res) => {
224+
res.writeHead(500, {
225+
'content-type': 'application/json'
226+
})
227+
228+
res.end('{"message": "failed"}')
229+
})
230+
231+
server.listen(0, () => {
232+
const client = new Client(`http://localhost:${server.address().port}`)
233+
const handler = new RetryHandler(dispatchOptions, {
234+
dispatch: client.dispatch.bind(client),
235+
handler: {
236+
onConnect () {
237+
t.ok(true, 'pass')
238+
},
239+
onBodySent () {
240+
t.ok(true, 'pass')
241+
},
242+
onHeaders (status, _rawHeaders, resume, _statusMessage) {
243+
t.strictEqual(status, 500)
244+
return true
245+
},
246+
onData (chunk) {
247+
return true
248+
},
249+
onComplete () {
250+
t.fail()
251+
},
252+
onError (err) {
253+
t.equal(err.message, 'Request failed')
254+
t.equal(err.statusCode, 500)
255+
t.equal(err.data.count, 1)
256+
}
257+
}
258+
})
259+
260+
after(async () => {
261+
await client.close()
262+
server.close()
263+
264+
await once(server, 'close')
265+
})
266+
267+
client.dispatch(
268+
dispatchOptions,
269+
handler
270+
)
271+
})
272+
273+
await t.completed
274+
})
275+
207276
test('Should use retry-after header for retries', async t => {
208277
t = tspl(t, { plan: 4 })
209278

@@ -734,6 +803,145 @@ test('retrying a request with a body', async t => {
734803
await t.completed
735804
})
736805

806+
test('retrying a request with a body (stream)', async t => {
807+
let counter = 0
808+
const server = createServer()
809+
const dispatchOptions = {
810+
retryOptions: {
811+
retry: (err, { state, opts }, done) => {
812+
counter++
813+
814+
if (
815+
err.statusCode === 500 ||
816+
err.message.includes('other side closed')
817+
) {
818+
setTimeout(done, 500)
819+
return
820+
}
821+
822+
return done(err)
823+
}
824+
},
825+
method: 'POST',
826+
path: '/',
827+
headers: {
828+
'content-type': 'application/json'
829+
},
830+
body: Readable.from(Buffer.from(JSON.stringify({ hello: 'world' })))
831+
}
832+
833+
t = tspl(t, { plan: 3 })
834+
835+
server.on('request', (req, res) => {
836+
switch (counter) {
837+
case 0:
838+
res.writeHead(500)
839+
res.end('failed')
840+
return
841+
default:
842+
t.fail()
843+
}
844+
})
845+
846+
server.listen(0, () => {
847+
const client = new Client(`http://localhost:${server.address().port}`)
848+
const handler = new RetryHandler(dispatchOptions, {
849+
dispatch: client.dispatch.bind(client),
850+
handler: new RequestHandler(dispatchOptions, (err, data) => {
851+
t.equal(err.statusCode, 500)
852+
t.equal(err.data.count, 1)
853+
t.equal(err.code, 'UND_ERR_REQ_RETRY')
854+
})
855+
})
856+
857+
after(async () => {
858+
await client.close()
859+
server.close()
860+
861+
await once(server, 'close')
862+
})
863+
864+
client.dispatch(
865+
dispatchOptions,
866+
handler
867+
)
868+
})
869+
870+
await t.completed
871+
})
872+
873+
test('retrying a request with a body (buffer)', async t => {
874+
let counter = 0
875+
const server = createServer()
876+
const dispatchOptions = {
877+
retryOptions: {
878+
retry: (err, { state, opts }, done) => {
879+
counter++
880+
881+
if (
882+
err.statusCode === 500 ||
883+
err.message.includes('other side closed')
884+
) {
885+
setTimeout(done, 500)
886+
return
887+
}
888+
889+
return done(err)
890+
}
891+
},
892+
method: 'POST',
893+
path: '/',
894+
headers: {
895+
'content-type': 'application/json'
896+
},
897+
body: Buffer.from(JSON.stringify({ hello: 'world' }))
898+
}
899+
900+
t = tspl(t, { plan: 1 })
901+
902+
server.on('request', (req, res) => {
903+
switch (counter) {
904+
case 0:
905+
req.destroy()
906+
return
907+
case 1:
908+
res.writeHead(500)
909+
res.end('failed')
910+
return
911+
case 2:
912+
res.writeHead(200)
913+
res.end('hello world!')
914+
return
915+
default:
916+
t.fail()
917+
}
918+
})
919+
920+
server.listen(0, () => {
921+
const client = new Client(`http://localhost:${server.address().port}`)
922+
const handler = new RetryHandler(dispatchOptions, {
923+
dispatch: client.dispatch.bind(client),
924+
handler: new RequestHandler(dispatchOptions, (err, data) => {
925+
t.ifError(err)
926+
})
927+
})
928+
929+
after(async () => {
930+
await client.close()
931+
server.close()
932+
933+
await once(server, 'close')
934+
})
935+
936+
client.dispatch(
937+
dispatchOptions,
938+
handler
939+
)
940+
})
941+
942+
await t.completed
943+
})
944+
737945
test('should not error if request is not meant to be retried', async t => {
738946
t = tspl(t, { plan: 3 })
739947

@@ -777,8 +985,7 @@ test('should not error if request is not meant to be retried', async t => {
777985
t.strictEqual(Buffer.concat(chunks).toString('utf-8'), 'Bad request')
778986
},
779987
onError (err) {
780-
console.log({ err })
781-
t.fail()
988+
t.fail(err)
782989
}
783990
}
784991
})

0 commit comments

Comments
 (0)