Skip to content

Commit 6c07a72

Browse files
addaleaxtargos
authored andcommitted
fs: synchronize close with other I/O for streams
Part of the flakiness in the parallel/test-readline-async-iterators-destroy test comes from fs streams starting `_read()` and `_destroy()` without waiting for the other to finish, which can lead to the `fs.read()` call resulting in `EBADF` if timing is bad. Fix this by synchronizing write and read operations with `close()`. Refs: #30660 PR-URL: #30837 Reviewed-By: Luigi Pinca <[email protected]> Reviewed-By: Ben Noordhuis <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Rich Trott <[email protected]>
1 parent cc4a6ed commit 6c07a72

File tree

1 file changed

+46
-6
lines changed

1 file changed

+46
-6
lines changed

lib/internal/fs/streams.js

+46-6
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ const {
77
NumberIsSafeInteger,
88
ObjectDefineProperty,
99
ObjectSetPrototypeOf,
10+
Symbol,
1011
} = primordials;
1112

1213
const {
13-
ERR_OUT_OF_RANGE
14+
ERR_OUT_OF_RANGE,
15+
ERR_STREAM_DESTROYED
1416
} = require('internal/errors').codes;
1517
const internalUtil = require('internal/util');
1618
const { validateNumber } = require('internal/validators');
@@ -22,6 +24,8 @@ const {
2224
} = require('internal/fs/utils');
2325
const { Readable, Writable } = require('stream');
2426
const { toPathIfFileURL } = require('internal/url');
27+
const kIoDone = Symbol('kIoDone');
28+
const kIsPerformingIO = Symbol('kIsPerformingIO');
2529

2630
const kMinPoolSpace = 128;
2731

@@ -86,6 +90,7 @@ function ReadStream(path, options) {
8690
this.pos = undefined;
8791
this.bytesRead = 0;
8892
this.closed = false;
93+
this[kIsPerformingIO] = false;
8994

9095
if (this.start !== undefined) {
9196
checkPosition(this.start, 'start');
@@ -155,6 +160,8 @@ ReadStream.prototype._read = function(n) {
155160
});
156161
}
157162

163+
if (this.destroyed) return;
164+
158165
if (!pool || pool.length - pool.used < kMinPoolSpace) {
159166
// Discard the old pool.
160167
allocNewPool(this.readableHighWaterMark);
@@ -178,7 +185,12 @@ ReadStream.prototype._read = function(n) {
178185
return this.push(null);
179186

180187
// the actual read.
188+
this[kIsPerformingIO] = true;
181189
fs.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
190+
this[kIsPerformingIO] = false;
191+
// Tell ._destroy() that it's safe to close the fd now.
192+
if (this.destroyed) return this.emit(kIoDone, er);
193+
182194
if (er) {
183195
if (this.autoClose) {
184196
this.destroy();
@@ -224,8 +236,12 @@ ReadStream.prototype._destroy = function(err, cb) {
224236
return;
225237
}
226238

239+
if (this[kIsPerformingIO]) {
240+
this.once(kIoDone, (er) => closeFsStream(this, cb, err || er));
241+
return;
242+
}
243+
227244
closeFsStream(this, cb, err);
228-
this.fd = null;
229245
};
230246

231247
function closeFsStream(stream, cb, err) {
@@ -236,6 +252,8 @@ function closeFsStream(stream, cb, err) {
236252
if (!er)
237253
stream.emit('close');
238254
});
255+
256+
stream.fd = null;
239257
}
240258

241259
ReadStream.prototype.close = function(cb) {
@@ -274,6 +292,7 @@ function WriteStream(path, options) {
274292
this.pos = undefined;
275293
this.bytesWritten = 0;
276294
this.closed = false;
295+
this[kIsPerformingIO] = false;
277296

278297
if (this.start !== undefined) {
279298
checkPosition(this.start, 'start');
@@ -339,7 +358,17 @@ WriteStream.prototype._write = function(data, encoding, cb) {
339358
});
340359
}
341360

361+
if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));
362+
363+
this[kIsPerformingIO] = true;
342364
fs.write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
365+
this[kIsPerformingIO] = false;
366+
// Tell ._destroy() that it's safe to close the fd now.
367+
if (this.destroyed) {
368+
cb(er);
369+
return this.emit(kIoDone, er);
370+
}
371+
343372
if (er) {
344373
if (this.autoClose) {
345374
this.destroy();
@@ -362,7 +391,8 @@ WriteStream.prototype._writev = function(data, cb) {
362391
});
363392
}
364393

365-
const self = this;
394+
if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));
395+
366396
const len = data.length;
367397
const chunks = new Array(len);
368398
let size = 0;
@@ -374,12 +404,22 @@ WriteStream.prototype._writev = function(data, cb) {
374404
size += chunk.length;
375405
}
376406

377-
fs.writev(this.fd, chunks, this.pos, function(er, bytes) {
407+
this[kIsPerformingIO] = true;
408+
fs.writev(this.fd, chunks, this.pos, (er, bytes) => {
409+
this[kIsPerformingIO] = false;
410+
// Tell ._destroy() that it's safe to close the fd now.
411+
if (this.destroyed) {
412+
cb(er);
413+
return this.emit(kIoDone, er);
414+
}
415+
378416
if (er) {
379-
self.destroy();
417+
if (this.autoClose) {
418+
this.destroy();
419+
}
380420
return cb(er);
381421
}
382-
self.bytesWritten += bytes;
422+
this.bytesWritten += bytes;
383423
cb();
384424
});
385425

0 commit comments

Comments
 (0)