Skip to content

Commit e712f63

Browse files
authored
Merge pull request #1249 from streamich/copilot/fix-streaming-breaks-file-handles
fix: EBADF when calling fileHandle.close() after streaming via pipeline
2 parents 20adf5b + 8e98f10 commit e712f63

4 files changed

Lines changed: 62 additions & 12 deletions

File tree

packages/fs-node-utils/src/types/misc.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,8 @@ export interface IFileHandle extends EventEmitter {
148148
chmod(mode: TMode): Promise<void>;
149149
chown(uid: number, gid: number): Promise<void>;
150150
close(): Promise<void>;
151-
createReadStream(options: IFileHandleReadStreamOptions): IReadStream;
152-
createWriteStream(options: IFileHandleWriteStreamOptions): IWriteStream;
151+
createReadStream(options?: IFileHandleReadStreamOptions): IReadStream;
152+
createWriteStream(options?: IFileHandleWriteStreamOptions): IWriteStream;
153153
datasync(): Promise<void>;
154154
readableWebStream(options?: IReadableWebStreamOptions): ReadableStream;
155155
read(

packages/fs-node/src/FileHandle.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,11 @@ export class FileHandle extends EventEmitter implements IFileHandle {
8686
return promisify(this.fs, 'fdatasync')(this.fd);
8787
}
8888

89-
createReadStream(options: opts.IFileHandleReadStreamOptions): IReadStream {
89+
createReadStream(options?: opts.IFileHandleReadStreamOptions): IReadStream {
9090
return this.fs.createReadStream('', { ...options, fd: this });
9191
}
9292

93-
createWriteStream(options: opts.IFileHandleWriteStreamOptions): IWriteStream {
93+
createWriteStream(options?: opts.IFileHandleWriteStreamOptions): IWriteStream {
9494
return this.fs.createWriteStream('', { ...options, fd: this });
9595
}
9696

packages/fs-node/src/__tests__/volume/FileHandle.test.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,4 +404,38 @@ describe('FileHandle', () => {
404404
await handle.close();
405405
});
406406
});
407+
408+
describe('createReadStream()', () => {
409+
it('allows close after streaming via pipeline', async () => {
410+
const { Writable, pipeline } = await import('node:stream');
411+
const { promisify } = await import('node:util');
412+
const pipelineAsync = promisify(pipeline);
413+
414+
const fs = createFs();
415+
fs.writeFileSync('/test', 'teststring');
416+
const handle = await fs.promises.open('/test', 'r');
417+
418+
const s = handle.createReadStream();
419+
await pipelineAsync(s, new Writable({ write: (_chunk, _encoding, cb) => cb() }));
420+
421+
await expect(handle.close()).resolves.toBeUndefined();
422+
});
423+
});
424+
425+
describe('createWriteStream()', () => {
426+
it('allows close after streaming via pipeline', async () => {
427+
const { Readable, pipeline } = await import('node:stream');
428+
const { promisify } = await import('node:util');
429+
const pipelineAsync = promisify(pipeline);
430+
431+
const fs = createFs();
432+
fs.writeFileSync('/test', '');
433+
const handle = await fs.promises.open('/test', 'w');
434+
435+
const readable = Readable.from(['hello']);
436+
await pipelineAsync(readable, handle.createWriteStream());
437+
438+
await expect(handle.close()).resolves.toBeUndefined();
439+
});
440+
});
407441
});

packages/fs-node/src/volume.ts

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1707,6 +1707,7 @@ function FsReadStream(vol, path, options) {
17071707
Readable.call(this, options);
17081708

17091709
this.path = pathToFilename(path);
1710+
this._fileHandle = options.fd && typeof options.fd !== 'number' ? options.fd : null;
17101711
this.fd = options.fd === undefined ? null : typeof options.fd !== 'number' ? options.fd.fd : options.fd;
17111712
this.flags = options.flags === undefined ? 'r' : options.flags;
17121713
this.mode = options.mode === undefined ? 0o666 : options.mode;
@@ -1840,10 +1841,17 @@ FsReadStream.prototype.close = function (cb) {
18401841
this.closed = true;
18411842
}
18421843

1843-
this._vol.close(this.fd, er => {
1844-
if (er) this.emit('error', er);
1845-
else this.emit('close');
1846-
});
1844+
if (this._fileHandle) {
1845+
this._fileHandle.close().then(
1846+
() => this.emit('close'),
1847+
er => this.emit('error', er),
1848+
);
1849+
} else {
1850+
this._vol.close(this.fd, er => {
1851+
if (er) this.emit('error', er);
1852+
else this.emit('close');
1853+
});
1854+
}
18471855

18481856
this.fd = null;
18491857
};
@@ -1876,6 +1884,7 @@ function FsWriteStream(vol, path, options) {
18761884
Writable.call(this, options);
18771885

18781886
this.path = pathToFilename(path);
1887+
this._fileHandle = options.fd && typeof options.fd !== 'number' ? options.fd : null;
18791888
this.fd = options.fd === undefined ? null : typeof options.fd !== 'number' ? options.fd.fd : options.fd;
18801889
this.flags = options.flags === undefined ? 'w' : options.flags;
18811890
this.mode = options.mode === undefined ? 0o666 : options.mode;
@@ -2006,10 +2015,17 @@ FsWriteStream.prototype.close = function (cb) {
20062015
this.closed = true;
20072016
}
20082017

2009-
this._vol.close(this.fd, er => {
2010-
if (er) this.emit('error', er);
2011-
else this.emit('close');
2012-
});
2018+
if (this._fileHandle) {
2019+
this._fileHandle.close().then(
2020+
() => this.emit('close'),
2021+
er => this.emit('error', er),
2022+
);
2023+
} else {
2024+
this._vol.close(this.fd, er => {
2025+
if (er) this.emit('error', er);
2026+
else this.emit('close');
2027+
});
2028+
}
20132029

20142030
this.fd = null;
20152031
};

0 commit comments

Comments
 (0)