Skip to content

Commit 8eeeec1

Browse files
tekwizGozala
andauthored
Fix premature close with chunked transfer encoding and for async iterators in Node 12 (#1064)
Co-authored-by: Irakli Gozalishvili <[email protected]>
1 parent 6ee9d31 commit 8eeeec1

4 files changed

Lines changed: 186 additions & 0 deletions

File tree

README.md

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,55 @@ if (!response.ok) throw new Error(`unexpected response ${response.statusText}`);
285285
await streamPipeline(response.body, createWriteStream('./octocat.png'));
286286
```
287287

288+
In Node.js 14 you can also use async iterators to read `body`; however, be careful to catch
289+
errors -- the longer a response runs, the more likely it is to encounter an error.
290+
291+
```js
292+
const fetch = require('node-fetch');
293+
294+
const response = await fetch('https://httpbin.org/stream/3');
295+
296+
try {
297+
for await (const chunk of response.body) {
298+
console.dir(JSON.parse(chunk.toString()));
299+
}
300+
} catch (err) {
301+
console.error(err.stack);
302+
}
303+
```
304+
305+
In Node.js 12 you can also use async iterators to read `body`; however, async iterators with streams
306+
did not mature until Node.js 14, so you need to do some extra work to ensure you handle errors
307+
directly from the stream and wait on it response to fully close.
308+
309+
```js
310+
const fetch = require('node-fetch');
311+
312+
const read = async body => {
313+
let error;
314+
body.on('error', err => {
315+
error = err;
316+
});
317+
318+
for await (const chunk of body) {
319+
console.dir(JSON.parse(chunk.toString()));
320+
}
321+
322+
return new Promise((resolve, reject) => {
323+
body.on('close', () => {
324+
error ? reject(error) : resolve();
325+
});
326+
});
327+
};
328+
329+
try {
330+
const response = await fetch('https://httpbin.org/stream/3');
331+
await read(response.body);
332+
} catch (err) {
333+
console.error(err.stack);
334+
}
335+
```
336+
288337
### Buffer
289338

290339
If you prefer to cache binary data in full, use buffer(). (NOTE: buffer() is a `node-fetch` only API)

src/index.js

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,30 @@ export default async function fetch(url, options_) {
9595
finalize();
9696
});
9797

98+
fixResponseChunkedTransferBadEnding(request_, err => {
99+
response.body.destroy(err);
100+
});
101+
102+
/* c8 ignore next 18 */
103+
if (process.version < 'v14') {
104+
// Before Node.js 14, pipeline() does not fully support async iterators and does not always
105+
// properly handle when the socket close/end events are out of order.
106+
request_.on('socket', s => {
107+
let endedWithEventsCount;
108+
s.prependListener('end', () => {
109+
endedWithEventsCount = s._eventsCount;
110+
});
111+
s.prependListener('close', hadError => {
112+
// if end happened before close but the socket didn't emit an error, do it now
113+
if (response && endedWithEventsCount < s._eventsCount && !hadError) {
114+
const err = new Error('Premature close');
115+
err.code = 'ERR_STREAM_PREMATURE_CLOSE';
116+
response.body.emit('error', err);
117+
}
118+
});
119+
});
120+
}
121+
98122
request_.on('response', response_ => {
99123
request_.setTimeout(0);
100124
const headers = fromRawHeaders(response_.rawHeaders);
@@ -265,3 +289,31 @@ export default async function fetch(url, options_) {
265289
writeToStream(request_, request);
266290
});
267291
}
292+
293+
function fixResponseChunkedTransferBadEnding(request, errorCallback) {
294+
const LAST_CHUNK = Buffer.from('0\r\n');
295+
let socket;
296+
297+
request.on('socket', s => {
298+
socket = s;
299+
});
300+
301+
request.on('response', response => {
302+
const {headers} = response;
303+
if (headers['transfer-encoding'] === 'chunked' && !headers['content-length']) {
304+
let properLastChunkReceived = false;
305+
306+
socket.on('data', buf => {
307+
properLastChunkReceived = Buffer.compare(buf.slice(-3), LAST_CHUNK) === 0;
308+
});
309+
310+
socket.prependListener('close', () => {
311+
if (!properLastChunkReceived) {
312+
const err = new Error('Premature close');
313+
err.code = 'ERR_STREAM_PREMATURE_CLOSE';
314+
errorCallback(err);
315+
}
316+
});
317+
}
318+
});
319+
}

test/main.js

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,74 @@ describe('node-fetch', () => {
613613
});
614614
});
615615

616+
it('should handle network-error in chunked response', () => {
617+
const url = `${base}error/premature/chunked`;
618+
return fetch(url).then(res => {
619+
expect(res.status).to.equal(200);
620+
expect(res.ok).to.be.true;
621+
622+
return expect(new Promise((resolve, reject) => {
623+
res.body.on('error', reject);
624+
res.body.on('close', resolve);
625+
})).to.eventually.be.rejectedWith(Error, 'Premature close')
626+
.and.have.property('code', 'ERR_STREAM_PREMATURE_CLOSE');
627+
});
628+
});
629+
630+
it('should handle network-error in chunked response async iterator', () => {
631+
const url = `${base}error/premature/chunked`;
632+
return fetch(url).then(res => {
633+
expect(res.status).to.equal(200);
634+
expect(res.ok).to.be.true;
635+
636+
const read = async body => {
637+
const chunks = [];
638+
639+
if (process.version < 'v14') {
640+
// In Node.js 12, some errors don't come out in the async iterator; we have to pick
641+
// them up from the event-emitter and then throw them after the async iterator
642+
let error;
643+
body.on('error', err => {
644+
error = err;
645+
});
646+
647+
for await (const chunk of body) {
648+
chunks.push(chunk);
649+
}
650+
651+
if (error) {
652+
throw error;
653+
}
654+
655+
return new Promise(resolve => {
656+
body.on('close', () => resolve(chunks));
657+
});
658+
}
659+
660+
for await (const chunk of body) {
661+
chunks.push(chunk);
662+
}
663+
664+
return chunks;
665+
};
666+
667+
return expect(read(res.body))
668+
.to.eventually.be.rejectedWith(Error, 'Premature close')
669+
.and.have.property('code', 'ERR_STREAM_PREMATURE_CLOSE');
670+
});
671+
});
672+
673+
it('should handle network-error in chunked response in consumeBody', () => {
674+
const url = `${base}error/premature/chunked`;
675+
return fetch(url).then(res => {
676+
expect(res.status).to.equal(200);
677+
expect(res.ok).to.be.true;
678+
679+
return expect(res.text())
680+
.to.eventually.be.rejectedWith(Error, 'Premature close');
681+
});
682+
});
683+
616684
it('should handle DNS-error response', () => {
617685
const url = 'http://domain.invalid';
618686
return expect(fetch(url)).to.eventually.be.rejected

test/utils/server.js

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,23 @@ export default class TestServer {
323323
}, 100);
324324
}
325325

326+
if (p === '/error/premature/chunked') {
327+
res.writeHead(200, {
328+
'Content-Type': 'application/json',
329+
'Transfer-Encoding': 'chunked'
330+
});
331+
332+
res.write(`${JSON.stringify({data: 'hi'})}\n`);
333+
334+
setTimeout(() => {
335+
res.write(`${JSON.stringify({data: 'bye'})}\n`);
336+
}, 200);
337+
338+
setTimeout(() => {
339+
res.destroy();
340+
}, 400);
341+
}
342+
326343
if (p === '/error/json') {
327344
res.statusCode = 200;
328345
res.setHeader('Content-Type', 'application/json');

0 commit comments

Comments
 (0)