Skip to content

Commit b1f7bf0

Browse files
committed
fs: synchronize close with other I/O for streams
Part of the flakiness in the parallel/test-readline-async-iterators-destroy test comes from fs streams starting `_read()` and `_destroy()` without waiting for the other to finish, which can lead to the `fs.read()` call resulting in `EBADF` if timing is bad. Fix this by synchronizing write and read operations with `close()`. Refs: #30660
1 parent d7b8ae7 commit b1f7bf0

File tree

1 file changed

+45
-6
lines changed

1 file changed

+45
-6
lines changed

lib/internal/fs/streams.js

+45-6
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ const {
1010
} = primordials;
1111

1212
const {
13-
ERR_OUT_OF_RANGE
13+
ERR_OUT_OF_RANGE,
14+
ERR_STREAM_DESTROYED
1415
} = require('internal/errors').codes;
1516
const internalUtil = require('internal/util');
1617
const { validateNumber } = require('internal/validators');
@@ -22,6 +23,8 @@ const {
2223
} = require('internal/fs/utils');
2324
const { Readable, Writable } = require('stream');
2425
const { toPathIfFileURL } = require('internal/url');
26+
const kIoDone = Symbol('kIoDone');
27+
const kIsPerformingIO = Symbol('kIsPerformingIO');
2528

2629
const kMinPoolSpace = 128;
2730

@@ -86,6 +89,7 @@ function ReadStream(path, options) {
8689
this.pos = undefined;
8790
this.bytesRead = 0;
8891
this.closed = false;
92+
this[kIsPerformingIO] = false;
8993

9094
if (this.start !== undefined) {
9195
checkPosition(this.start, 'start');
@@ -155,6 +159,8 @@ ReadStream.prototype._read = function(n) {
155159
});
156160
}
157161

162+
if (this.destroyed) return;
163+
158164
if (!pool || pool.length - pool.used < kMinPoolSpace) {
159165
// Discard the old pool.
160166
allocNewPool(this.readableHighWaterMark);
@@ -178,7 +184,12 @@ ReadStream.prototype._read = function(n) {
178184
return this.push(null);
179185

180186
// the actual read.
187+
this[kIsPerformingIO] = true;
181188
fs.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
189+
this[kIsPerformingIO] = false;
190+
// Tell ._destroy() that it's safe to close the fd now.
191+
if (this.destroyed) return this.emit(kIoDone, er);
192+
182193
if (er) {
183194
if (this.autoClose) {
184195
this.destroy();
@@ -224,8 +235,12 @@ ReadStream.prototype._destroy = function(err, cb) {
224235
return;
225236
}
226237

238+
if (this[kIsPerformingIO]) {
239+
this.once(kIoDone, (er) => closeFsStream(this, cb, err || er));
240+
return;
241+
}
242+
227243
closeFsStream(this, cb, err);
228-
this.fd = null;
229244
};
230245

231246
function closeFsStream(stream, cb, err) {
@@ -236,6 +251,8 @@ function closeFsStream(stream, cb, err) {
236251
if (!er)
237252
stream.emit('close');
238253
});
254+
255+
stream.fd = null;
239256
}
240257

241258
ReadStream.prototype.close = function(cb) {
@@ -274,6 +291,7 @@ function WriteStream(path, options) {
274291
this.pos = undefined;
275292
this.bytesWritten = 0;
276293
this.closed = false;
294+
this[kIsPerformingIO] = false;
277295

278296
if (this.start !== undefined) {
279297
checkPosition(this.start, 'start');
@@ -339,7 +357,17 @@ WriteStream.prototype._write = function(data, encoding, cb) {
339357
});
340358
}
341359

360+
if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));
361+
362+
this[kIsPerformingIO] = true;
342363
fs.write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
364+
this[kIsPerformingIO] = false;
365+
// Tell ._destroy() that it's safe to close the fd now.
366+
if (this.destroyed) {
367+
cb(er);
368+
return this.emit(kIoDone, er);
369+
}
370+
343371
if (er) {
344372
if (this.autoClose) {
345373
this.destroy();
@@ -362,7 +390,8 @@ WriteStream.prototype._writev = function(data, cb) {
362390
});
363391
}
364392

365-
const self = this;
393+
if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));
394+
366395
const len = data.length;
367396
const chunks = new Array(len);
368397
let size = 0;
@@ -374,12 +403,22 @@ WriteStream.prototype._writev = function(data, cb) {
374403
size += chunk.length;
375404
}
376405

377-
fs.writev(this.fd, chunks, this.pos, function(er, bytes) {
406+
this[kIsPerformingIO] = true;
407+
fs.writev(this.fd, chunks, this.pos, (er, bytes) => {
408+
this[kIsPerformingIO] = false;
409+
// Tell ._destroy() that it's safe to close the fd now.
410+
if (this.destroyed) {
411+
cb(er);
412+
return this.emit(kIoDone, er);
413+
}
414+
378415
if (er) {
379-
self.destroy();
416+
if (this.autoClose) {
417+
this.destroy();
418+
}
380419
return cb(er);
381420
}
382-
self.bytesWritten += bytes;
421+
this.bytesWritten += bytes;
383422
cb();
384423
});
385424

0 commit comments

Comments
 (0)