@@ -13,8 +13,10 @@ const {
1313
1414const {
1515 ERR_INVALID_ARG_TYPE,
16- ERR_OUT_OF_RANGE,
1716 ERR_METHOD_NOT_IMPLEMENTED,
17+ ERR_OUT_OF_RANGE,
18+ ERR_STREAM_DESTROYED,
19+ ERR_SYSTEM_ERROR,
1820} = require('internal/errors').codes;
1921const {
2022 deprecate,
@@ -392,22 +394,75 @@ WriteStream.prototype.open = openWriteFs;
392394
393395WriteStream.prototype._construct = _construct;
394396
397+ function writeAll(data, size, pos, cb, retries = 0) {
398+ this[kFs].write(this.fd, data, 0, size, pos, (er, bytesWritten, buffer) => {
399+ // No data currently available and operation should be retried later.
400+ if (er?.code === 'EAGAIN') {
401+ er = null;
402+ bytesWritten = 0;
403+ }
404+
405+ if (this.destroyed || er) {
406+ return cb(er || new ERR_STREAM_DESTROYED('write'));
407+ }
408+
409+ this.bytesWritten += bytesWritten;
410+
411+ retries = bytesWritten ? 0 : retries + 1;
412+ size -= bytesWritten;
413+ pos += bytesWritten;
414+
415+ // Try writing non-zero number of bytes up to 5 times.
416+ if (retries > 5) {
417+ cb(new ERR_SYSTEM_ERROR('write failed'));
418+ } else if (size) {
419+ writeAll.call(this, buffer.slice(bytesWritten), size, pos, cb, retries);
420+ } else {
421+ cb();
422+ }
423+ });
424+ }
425+
426+ function writevAll(chunks, size, pos, cb, retries = 0) {
427+ this[kFs].writev(this.fd, chunks, this.pos, (er, bytesWritten, buffers) => {
428+ // No data currently available and operation should be retried later.
429+ if (er?.code === 'EAGAIN') {
430+ er = null;
431+ bytesWritten = 0;
432+ }
433+
434+ if (this.destroyed || er) {
435+ return cb(er || new ERR_STREAM_DESTROYED('writev'));
436+ }
437+
438+ this.bytesWritten += bytesWritten;
439+
440+ retries = bytesWritten ? 0 : retries + 1;
441+ size -= bytesWritten;
442+ pos += bytesWritten;
443+
444+ // Try writing non-zero number of bytes up to 5 times.
445+ if (retries > 5) {
446+ cb(new ERR_SYSTEM_ERROR('writev failed'));
447+ } else if (size) {
448+ writevAll.call(this, [Buffer.concat(buffers).slice(bytesWritten)], size, pos, cb, retries);
449+ } else {
450+ cb();
451+ }
452+ });
453+ }
454+
395455WriteStream.prototype._write = function(data, encoding, cb) {
396456 this[kIsPerformingIO] = true;
397- this[kFs].write (this.fd , data, 0, data.length, this.pos, (er, bytes ) => {
457+ writeAll.call (this, data, data.length, this.pos, (er) => {
398458 this[kIsPerformingIO] = false;
399459 if (this.destroyed) {
400460 // Tell ._destroy() that it's safe to close the fd now.
401461 cb(er);
402462 return this.emit(kIoDone, er);
403463 }
404464
405- if (er) {
406- return cb(er);
407- }
408-
409- this.bytesWritten += bytes;
410- cb();
465+ cb(er);
411466 });
412467
413468 if (this.pos !== undefined)
@@ -427,20 +482,15 @@ WriteStream.prototype._writev = function(data, cb) {
427482 }
428483
429484 this[kIsPerformingIO] = true;
430- this[kFs].writev (this.fd , chunks, this.pos, (er, bytes ) => {
485+ writevAll.call (this, chunks, size, this.pos, (er) => {
431486 this[kIsPerformingIO] = false;
432487 if (this.destroyed) {
433488 // Tell ._destroy() that it's safe to close the fd now.
434489 cb(er);
435490 return this.emit(kIoDone, er);
436491 }
437492
438- if (er) {
439- return cb(er);
440- }
441-
442- this.bytesWritten += bytes;
443- cb();
493+ cb(er);
444494 });
445495
446496 if (this.pos !== undefined)
0 commit comments