Skip to content

Commit c3daa3e

Browse files
addaleaxtargos
authored andcommitted
fs: synchronize close with other I/O for streams
Part of the flakiness in the parallel/test-readline-async-iterators-destroy test comes from fs streams starting `_read()` and `_destroy()` without waiting for the other to finish, which can lead to the `fs.read()` call resulting in `EBADF` if timing is bad. Fix this by synchronizing write and read operations with `close()`. Refs: #30660 PR-URL: #30837 Reviewed-By: Luigi Pinca <[email protected]> Reviewed-By: Ben Noordhuis <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Rich Trott <[email protected]>
1 parent b6688ef commit c3daa3e

File tree

1 file changed

+46
-6
lines changed

1 file changed

+46
-6
lines changed

lib/internal/fs/streams.js

+46-6
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ const {
77
NumberIsSafeInteger,
88
ObjectDefineProperty,
99
ObjectSetPrototypeOf,
10+
Symbol,
1011
} = primordials;
1112

1213
const {
13-
ERR_OUT_OF_RANGE
14+
ERR_OUT_OF_RANGE,
15+
ERR_STREAM_DESTROYED
1416
} = require('internal/errors').codes;
1517
const { validateNumber } = require('internal/validators');
1618
const fs = require('fs');
@@ -21,6 +23,8 @@ const {
2123
} = require('internal/fs/utils');
2224
const { Readable, Writable } = require('stream');
2325
const { toPathIfFileURL } = require('internal/url');
26+
const kIoDone = Symbol('kIoDone');
27+
const kIsPerformingIO = Symbol('kIsPerformingIO');
2428

2529
const kMinPoolSpace = 128;
2630

@@ -85,6 +89,7 @@ function ReadStream(path, options) {
8589
this.pos = undefined;
8690
this.bytesRead = 0;
8791
this.closed = false;
92+
this[kIsPerformingIO] = false;
8893

8994
if (this.start !== undefined) {
9095
checkPosition(this.start, 'start');
@@ -143,6 +148,8 @@ ReadStream.prototype._read = function(n) {
143148
});
144149
}
145150

151+
if (this.destroyed) return;
152+
146153
if (!pool || pool.length - pool.used < kMinPoolSpace) {
147154
// Discard the old pool.
148155
allocNewPool(this.readableHighWaterMark);
@@ -166,7 +173,12 @@ ReadStream.prototype._read = function(n) {
166173
return this.push(null);
167174

168175
// the actual read.
176+
this[kIsPerformingIO] = true;
169177
fs.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
178+
this[kIsPerformingIO] = false;
179+
// Tell ._destroy() that it's safe to close the fd now.
180+
if (this.destroyed) return this.emit(kIoDone, er);
181+
170182
if (er) {
171183
if (this.autoClose) {
172184
this.destroy();
@@ -212,8 +224,12 @@ ReadStream.prototype._destroy = function(err, cb) {
212224
return;
213225
}
214226

227+
if (this[kIsPerformingIO]) {
228+
this.once(kIoDone, (er) => closeFsStream(this, cb, err || er));
229+
return;
230+
}
231+
215232
closeFsStream(this, cb, err);
216-
this.fd = null;
217233
};
218234

219235
function closeFsStream(stream, cb, err) {
@@ -224,6 +240,8 @@ function closeFsStream(stream, cb, err) {
224240
if (!er)
225241
stream.emit('close');
226242
});
243+
244+
stream.fd = null;
227245
}
228246

229247
ReadStream.prototype.close = function(cb) {
@@ -262,6 +280,7 @@ function WriteStream(path, options) {
262280
this.pos = undefined;
263281
this.bytesWritten = 0;
264282
this.closed = false;
283+
this[kIsPerformingIO] = false;
265284

266285
if (this.start !== undefined) {
267286
checkPosition(this.start, 'start');
@@ -316,7 +335,17 @@ WriteStream.prototype._write = function(data, encoding, cb) {
316335
});
317336
}
318337

338+
if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));
339+
340+
this[kIsPerformingIO] = true;
319341
fs.write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
342+
this[kIsPerformingIO] = false;
343+
// Tell ._destroy() that it's safe to close the fd now.
344+
if (this.destroyed) {
345+
cb(er);
346+
return this.emit(kIoDone, er);
347+
}
348+
320349
if (er) {
321350
if (this.autoClose) {
322351
this.destroy();
@@ -339,7 +368,8 @@ WriteStream.prototype._writev = function(data, cb) {
339368
});
340369
}
341370

342-
const self = this;
371+
if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));
372+
343373
const len = data.length;
344374
const chunks = new Array(len);
345375
let size = 0;
@@ -351,12 +381,22 @@ WriteStream.prototype._writev = function(data, cb) {
351381
size += chunk.length;
352382
}
353383

354-
fs.writev(this.fd, chunks, this.pos, function(er, bytes) {
384+
this[kIsPerformingIO] = true;
385+
fs.writev(this.fd, chunks, this.pos, (er, bytes) => {
386+
this[kIsPerformingIO] = false;
387+
// Tell ._destroy() that it's safe to close the fd now.
388+
if (this.destroyed) {
389+
cb(er);
390+
return this.emit(kIoDone, er);
391+
}
392+
355393
if (er) {
356-
self.destroy();
394+
if (this.autoClose) {
395+
this.destroy();
396+
}
357397
return cb(er);
358398
}
359-
self.bytesWritten += bytes;
399+
this.bytesWritten += bytes;
360400
cb();
361401
});
362402

0 commit comments

Comments
 (0)