Skip to content
This repository was archived by the owner on Jun 24, 2024. It is now read-only.

Commit ac2f73b

Browse files
fix: Fix support for streams without content-length property (#491)
In `pumpify` the `prefinish` event is emitted when an upstream writer has ended
1 parent 985d788 commit ac2f73b

2 files changed

Lines changed: 76 additions & 3 deletions

File tree

src/index.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,10 @@ export class Upload extends Pumpify {
393393
this.upstreamEnded = true;
394394
});
395395

396+
this.on('prefinish', () => {
397+
this.upstreamEnded = true;
398+
});
399+
396400
this.once('writing', () => {
397401
// Now that someone is writing to this object, let's attach
398402
// some duplexes. These duplexes enable this object to be
@@ -517,13 +521,15 @@ export class Upload extends Pumpify {
517521
const removeListeners = () => {
518522
this.removeListener('wroteToChunkBuffer', wroteToChunkBufferCallback);
519523
this.upstream.removeListener('finish', upstreamFinishedCallback);
524+
this.removeListener('prefinish', upstreamFinishedCallback);
520525
};
521526

522527
// If there's data recently written it should be digested
523528
this.once('wroteToChunkBuffer', wroteToChunkBufferCallback);
524529

525530
// If the upstream finishes let's see if there's anything to grab
526531
this.upstream.once('finish', upstreamFinishedCallback);
532+
this.once('prefinish', upstreamFinishedCallback);
527533
});
528534

529535
return willBeMoreChunks;

test/test.ts

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,16 @@ describe('gcs-resumable-upload', () => {
304304
assert.strictEqual(up.chunkSize, 123);
305305
});
306306

307+
it('should set `upstreamEnded` to `true` on `prefinish`', () => {
308+
const up = upload({bucket: BUCKET, file: FILE, chunkSize: 123});
309+
310+
assert.strictEqual(up.upstreamEnded, false);
311+
312+
up.emit('prefinish');
313+
314+
assert.strictEqual(up.upstreamEnded, true);
315+
});
316+
307317
describe('on write', () => {
308318
let uri = '';
309319

@@ -582,7 +592,7 @@ describe('gcs-resumable-upload', () => {
582592

583593
it("should wait for upstream to 'finish' and resolve `true` if data is available", async () => {
584594
const result = await new Promise(resolve => {
585-
up.upstream.once('newListener', (event: string) => {
595+
up.upstream.on('newListener', (event: string) => {
586596
if (event === 'finish') {
587597
// Update the `upstreamChunkBuffer` before emitting 'finish'
588598
up.upstreamChunkBuffer = Buffer.from('abc');
@@ -597,12 +607,46 @@ describe('gcs-resumable-upload', () => {
597607
assert.equal(result, true);
598608
});
599609

610+
it("should wait for 'prefinish' if !`upstreamChunkBuffer.byteLength` && !`upstreamEnded`", async () => {
611+
await new Promise(resolve => {
612+
up.waitForNextChunk().then(resolve);
613+
up.emit('prefinish');
614+
});
615+
});
616+
617+
it("should wait for 'prefinish' and resolve `false` if data is not available", async () => {
618+
const result = await new Promise(resolve => {
619+
up.waitForNextChunk().then(resolve);
620+
up.emit('prefinish');
621+
});
622+
623+
assert.equal(result, false);
624+
});
625+
626+
it("should wait for 'prefinish' and resolve `true` if data is available", async () => {
627+
const result = await new Promise(resolve => {
628+
up.on('newListener', (event: string) => {
629+
if (event === 'prefinish') {
630+
// Update the `upstreamChunkBuffer` before emitting 'prefinish'
631+
up.upstreamChunkBuffer = Buffer.from('abc');
632+
633+
process.nextTick(() => up.emit('prefinish'));
634+
}
635+
});
636+
637+
up.waitForNextChunk().then(resolve);
638+
});
639+
640+
assert.equal(result, true);
641+
});
642+
600643
it('should remove listeners after calling back from `wroteToChunkBuffer`', async () => {
601644
assert.equal(up.listenerCount('finish'), 0);
602645
assert.equal(up.listenerCount('wroteToChunkBuffer'), 0);
646+
assert.equal(up.listenerCount('prefinish'), 1);
603647

604648
await new Promise(resolve => {
605-
up.once('newListener', (event: string) => {
649+
up.on('newListener', (event: string) => {
606650
if (event === 'wroteToChunkBuffer') {
607651
process.nextTick(() => up.emit('wroteToChunkBuffer'));
608652
}
@@ -613,14 +657,16 @@ describe('gcs-resumable-upload', () => {
613657

614658
assert.equal(up.listenerCount('finish'), 0);
615659
assert.equal(up.listenerCount('wroteToChunkBuffer'), 0);
660+
assert.equal(up.listenerCount('prefinish'), 1);
616661
});
617662

618663
it("should remove listeners after calling back from upstream to 'finish'", async () => {
619664
assert.equal(up.listenerCount('finish'), 0);
620665
assert.equal(up.listenerCount('wroteToChunkBuffer'), 0);
666+
assert.equal(up.listenerCount('prefinish'), 1);
621667

622668
await new Promise(resolve => {
623-
up.upstream.once('newListener', (event: string) => {
669+
up.upstream.on('newListener', (event: string) => {
624670
if (event === 'finish') {
625671
process.nextTick(() => up.upstream.emit('finish'));
626672
}
@@ -631,6 +677,27 @@ describe('gcs-resumable-upload', () => {
631677

632678
assert.equal(up.listenerCount('finish'), 0);
633679
assert.equal(up.listenerCount('wroteToChunkBuffer'), 0);
680+
assert.equal(up.listenerCount('prefinish'), 1);
681+
});
682+
683+
it("should remove listeners after calling back from 'prefinish'", async () => {
684+
assert.equal(up.listenerCount('finish'), 0);
685+
assert.equal(up.listenerCount('wroteToChunkBuffer'), 0);
686+
assert.equal(up.listenerCount('prefinish'), 1);
687+
688+
await new Promise(resolve => {
689+
up.on('newListener', (event: string) => {
690+
if (event === 'prefinish') {
691+
process.nextTick(() => up.emit('prefinish'));
692+
}
693+
});
694+
695+
up.waitForNextChunk().then(resolve);
696+
});
697+
698+
assert.equal(up.listenerCount('finish'), 0);
699+
assert.equal(up.listenerCount('wroteToChunkBuffer'), 0);
700+
assert.equal(up.listenerCount('prefinish'), 1);
634701
});
635702
});
636703

0 commit comments

Comments
 (0)