Skip to content

Commit 7f6a0ed

Browse files
ronagBridgeAR
authored andcommitted
fs: allow overriding fs for streams
Allow overriding open, write, and close when using createReadStream() and createWriteStream(). PR-URL: #29083 Refs: #29050 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: Rich Trott <[email protected]>
1 parent 3753f47 commit 7f6a0ed

File tree

4 files changed

+164
-41
lines changed

4 files changed

+164
-41
lines changed

doc/api/fs.md

+24-2
Original file line numberDiff line numberDiff line change
@@ -1674,6 +1674,10 @@ changes:
16741674
- version: v2.3.0
16751675
pr-url: https://github.com/nodejs/node/pull/1845
16761676
description: The passed `options` object can be a string now.
1677+
- version: REPLACEME
1678+
pr-url: https://github.com/nodejs/node/pull/REPLACEME
1679+
description: The `fs` options allow overriding the used `fs`
1680+
implementation.
16771681
-->
16781682

16791683
* `path` {string|Buffer|URL}
@@ -1688,7 +1692,8 @@ changes:
16881692
* `start` {integer}
16891693
* `end` {integer} **Default:** `Infinity`
16901694
* `highWaterMark` {integer} **Default:** `64 * 1024`
1691-
* Returns: {fs.ReadStream}
1695+
* `fs` {Object|null} **Default:** `null`
1696+
* Returns: {fs.ReadStream} See [Readable Stream][].
16921697

16931698
Unlike the 16 kb default `highWaterMark` for a readable stream, the stream
16941699
returned by this method has a default `highWaterMark` of 64 kb.
@@ -1715,6 +1720,10 @@ By default, the stream will not emit a `'close'` event after it has been
17151720
destroyed. This is the opposite of the default for other `Readable` streams.
17161721
Set the `emitClose` option to `true` to change this behavior.
17171722

1723+
By providing the `fs` option it is possible to override the corresponding `fs`
1724+
implementations for `open`, `read` and `close`. When providing the `fs` option,
1725+
you must override `open`, `close` and `read`.
1726+
17181727
```js
17191728
const fs = require('fs');
17201729
// Create a stream from some character device.
@@ -1768,6 +1777,10 @@ changes:
17681777
- version: v2.3.0
17691778
pr-url: https://github.com/nodejs/node/pull/1845
17701779
description: The passed `options` object can be a string now.
1780+
- version: REPLACEME
1781+
pr-url: https://github.com/nodejs/node/pull/REPLACEME
1782+
description: The `fs` options allow overriding the used `fs`
1783+
implementation.
17711784
-->
17721785

17731786
* `path` {string|Buffer|URL}
@@ -1780,7 +1793,8 @@ changes:
17801793
* `autoClose` {boolean} **Default:** `true`
17811794
* `emitClose` {boolean} **Default:** `false`
17821795
* `start` {integer}
1783-
* Returns: {fs.WriteStream}
1796+
* `fs` {Object|null} **Default:** `null`
1797+
* Returns: {fs.WriteStream} See [Writable Stream][].
17841798

17851799
`options` may also include a `start` option to allow writing data at
17861800
some position past the beginning of the file, allowed values are in the
@@ -1799,6 +1813,12 @@ By default, the stream will not emit a `'close'` event after it has been
17991813
destroyed. This is the opposite of the default for other `Writable` streams.
18001814
Set the `emitClose` option to `true` to change this behavior.
18011815

1816+
By providing the `fs` option it is possible to override the corresponding `fs`
1817+
implementations for `open`, `write`, `writev` and `close`. Overriding `write()`
1818+
without `writev()` can reduce performance as some optimizations (`_writev()`)
1819+
will be disabled. When providing the `fs` option, you must override `open`,
1820+
`close` and at least one of `write` and `writev`.
1821+
18021822
Like [`ReadStream`][], if `fd` is specified, [`WriteStream`][] will ignore the
18031823
`path` argument and will use the specified file descriptor. This means that no
18041824
`'open'` event will be emitted. `fd` should be blocking; non-blocking `fd`s
@@ -5520,6 +5540,7 @@ the file contents.
55205540
[`Number.MAX_SAFE_INTEGER`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/MAX_SAFE_INTEGER
55215541
[`ReadDirectoryChangesW`]: https://docs.microsoft.com/en-us/windows/desktop/api/winbase/nf-winbase-readdirectorychangesw
55225542
[`ReadStream`]: #fs_class_fs_readstream
5543+
[Readable Stream]: #stream_class_stream_readable
55235544
[`URL`]: url.html#url_the_whatwg_url_api
55245545
[`UV_THREADPOOL_SIZE`]: cli.html#cli_uv_threadpool_size_size
55255546
[`WriteStream`]: #fs_class_fs_writestream
@@ -5577,3 +5598,4 @@ the file contents.
55775598
[chcp]: https://ss64.com/nt/chcp.html
55785599
[inode]: https://en.wikipedia.org/wiki/Inode
55795600
[support of file system `flags`]: #fs_file_system_flags
5601+
[Writable Stream]: #stream_class_stream_writable

lib/internal/fs/streams.js

+91-37
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ const {
1111
} = primordials;
1212

1313
const {
14+
ERR_INVALID_ARG_TYPE,
1415
ERR_OUT_OF_RANGE,
1516
ERR_STREAM_DESTROYED
1617
} = require('internal/errors').codes;
@@ -28,6 +29,7 @@ const kIoDone = Symbol('kIoDone');
2829
const kIsPerformingIO = Symbol('kIsPerformingIO');
2930

3031
const kMinPoolSpace = 128;
32+
const kFs = Symbol('kFs');
3133

3234
let pool;
3335
// It can happen that we expect to read a large chunk of data, and reserve
@@ -76,6 +78,23 @@ function ReadStream(path, options) {
7678
options.emitClose = false;
7779
}
7880

81+
this[kFs] = options.fs || fs;
82+
83+
if (typeof this[kFs].open !== 'function') {
84+
throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function',
85+
this[kFs].open);
86+
}
87+
88+
if (typeof this[kFs].read !== 'function') {
89+
throw new ERR_INVALID_ARG_TYPE('options.fs.read', 'function',
90+
this[kFs].read);
91+
}
92+
93+
if (typeof this[kFs].close !== 'function') {
94+
throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function',
95+
this[kFs].close);
96+
}
97+
7998
Readable.call(this, options);
8099

81100
// Path will be ignored when fd is specified, so it can be falsy
@@ -136,7 +155,7 @@ function _openReadFs(stream) {
136155
return;
137156
}
138157

139-
fs.open(stream.path, stream.flags, stream.mode, (er, fd) => {
158+
stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => {
140159
if (er) {
141160
if (stream.autoClose) {
142161
stream.destroy();
@@ -186,42 +205,43 @@ ReadStream.prototype._read = function(n) {
186205

187206
// the actual read.
188207
this[kIsPerformingIO] = true;
189-
fs.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
190-
this[kIsPerformingIO] = false;
191-
// Tell ._destroy() that it's safe to close the fd now.
192-
if (this.destroyed) return this.emit(kIoDone, er);
193-
194-
if (er) {
195-
if (this.autoClose) {
196-
this.destroy();
197-
}
198-
this.emit('error', er);
199-
} else {
200-
let b = null;
201-
// Now that we know how much data we have actually read, re-wind the
202-
// 'used' field if we can, and otherwise allow the remainder of our
203-
// reservation to be used as a new pool later.
204-
if (start + toRead === thisPool.used && thisPool === pool) {
205-
const newUsed = thisPool.used + bytesRead - toRead;
206-
thisPool.used = roundUpToMultipleOf8(newUsed);
208+
this[kFs].read(
209+
this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
210+
this[kIsPerformingIO] = false;
211+
// Tell ._destroy() that it's safe to close the fd now.
212+
if (this.destroyed) return this.emit(kIoDone, er);
213+
214+
if (er) {
215+
if (this.autoClose) {
216+
this.destroy();
217+
}
218+
this.emit('error', er);
207219
} else {
208-
// Round down to the next lowest multiple of 8 to ensure the new pool
209-
// fragment start and end positions are aligned to an 8 byte boundary.
210-
const alignedEnd = (start + toRead) & ~7;
211-
const alignedStart = roundUpToMultipleOf8(start + bytesRead);
212-
if (alignedEnd - alignedStart >= kMinPoolSpace) {
213-
poolFragments.push(thisPool.slice(alignedStart, alignedEnd));
220+
let b = null;
221+
// Now that we know how much data we have actually read, re-wind the
222+
// 'used' field if we can, and otherwise allow the remainder of our
223+
// reservation to be used as a new pool later.
224+
if (start + toRead === thisPool.used && thisPool === pool) {
225+
const newUsed = thisPool.used + bytesRead - toRead;
226+
thisPool.used = roundUpToMultipleOf8(newUsed);
227+
} else {
228+
// Round down to the next lowest multiple of 8 to ensure the new pool
229+
// fragment start and end positions are aligned to an 8 byte boundary.
230+
const alignedEnd = (start + toRead) & ~7;
231+
const alignedStart = roundUpToMultipleOf8(start + bytesRead);
232+
if (alignedEnd - alignedStart >= kMinPoolSpace) {
233+
poolFragments.push(thisPool.slice(alignedStart, alignedEnd));
234+
}
214235
}
215-
}
216236

217-
if (bytesRead > 0) {
218-
this.bytesRead += bytesRead;
219-
b = thisPool.slice(start, start + bytesRead);
220-
}
237+
if (bytesRead > 0) {
238+
this.bytesRead += bytesRead;
239+
b = thisPool.slice(start, start + bytesRead);
240+
}
221241

222-
this.push(b);
223-
}
224-
});
242+
this.push(b);
243+
}
244+
});
225245

226246
// Move the pool positions, and internal position for reading.
227247
if (this.pos !== undefined)
@@ -245,7 +265,7 @@ ReadStream.prototype._destroy = function(err, cb) {
245265
};
246266

247267
function closeFsStream(stream, cb, err) {
248-
fs.close(stream.fd, (er) => {
268+
stream[kFs].close(stream.fd, (er) => {
249269
er = er || err;
250270
cb(er);
251271
stream.closed = true;
@@ -279,6 +299,40 @@ function WriteStream(path, options) {
279299
options.emitClose = false;
280300
}
281301

302+
this[kFs] = options.fs || fs;
303+
if (typeof this[kFs].open !== 'function') {
304+
throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function',
305+
this[kFs].open);
306+
}
307+
308+
if (!this[kFs].write && !this[kFs].writev) {
309+
throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function',
310+
this[kFs].write);
311+
}
312+
313+
if (this[kFs].write && typeof this[kFs].write !== 'function') {
314+
throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function',
315+
this[kFs].write);
316+
}
317+
318+
if (this[kFs].writev && typeof this[kFs].writev !== 'function') {
319+
throw new ERR_INVALID_ARG_TYPE('options.fs.writev', 'function',
320+
this[kFs].writev);
321+
}
322+
323+
if (typeof this[kFs].close !== 'function') {
324+
throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function',
325+
this[kFs].close);
326+
}
327+
328+
// It's enough to override either, in which case only one will be used.
329+
if (!this[kFs].write) {
330+
this._write = null;
331+
}
332+
if (!this[kFs].writev) {
333+
this._writev = null;
334+
}
335+
282336
Writable.call(this, options);
283337

284338
// Path will be ignored when fd is specified, so it can be falsy
@@ -335,7 +389,7 @@ function _openWriteFs(stream) {
335389
return;
336390
}
337391

338-
fs.open(stream.path, stream.flags, stream.mode, (er, fd) => {
392+
stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => {
339393
if (er) {
340394
if (stream.autoClose) {
341395
stream.destroy();
@@ -361,7 +415,7 @@ WriteStream.prototype._write = function(data, encoding, cb) {
361415
if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));
362416

363417
this[kIsPerformingIO] = true;
364-
fs.write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
418+
this[kFs].write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
365419
this[kIsPerformingIO] = false;
366420
// Tell ._destroy() that it's safe to close the fd now.
367421
if (this.destroyed) {
@@ -405,7 +459,7 @@ WriteStream.prototype._writev = function(data, cb) {
405459
}
406460

407461
this[kIsPerformingIO] = true;
408-
fs.writev(this.fd, chunks, this.pos, (er, bytes) => {
462+
this[kFs].writev(this.fd, chunks, this.pos, (er, bytes) => {
409463
this[kIsPerformingIO] = false;
410464
// Tell ._destroy() that it's safe to close the fd now.
411465
if (this.destroyed) {

test/parallel/test-fs-read-stream.js

+11-2
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,11 @@ const fixtures = require('../common/fixtures');
3131
const fn = fixtures.path('elipses.txt');
3232
const rangeFile = fixtures.path('x.txt');
3333

34-
{
34+
function test1(options) {
3535
let paused = false;
3636
let bytesRead = 0;
3737

38-
const file = fs.createReadStream(fn);
38+
const file = fs.createReadStream(fn, options);
3939
const fileSize = fs.statSync(fn).size;
4040

4141
assert.strictEqual(file.bytesRead, 0);
@@ -88,6 +88,15 @@ const rangeFile = fixtures.path('x.txt');
8888
});
8989
}
9090

91+
test1({});
92+
test1({
93+
fs: {
94+
open: common.mustCall(fs.open),
95+
read: common.mustCallAtLeast(fs.read, 1),
96+
close: common.mustCall(fs.close),
97+
}
98+
});
99+
91100
{
92101
const file = fs.createReadStream(fn, { encoding: 'utf8' });
93102
file.length = 0;
+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
'use strict';
2+
const common = require('../common');
3+
const path = require('path');
4+
const fs = require('fs');
5+
6+
const tmpdir = require('../common/tmpdir');
7+
tmpdir.refresh();
8+
9+
{
10+
const file = path.join(tmpdir.path, 'write-end-test0.txt');
11+
const stream = fs.createWriteStream(file, {
12+
fs: {
13+
open: common.mustCall(fs.open),
14+
write: common.mustCallAtLeast(fs.write, 1),
15+
close: common.mustCall(fs.close),
16+
}
17+
});
18+
stream.end('asd');
19+
stream.on('close', common.mustCall());
20+
}
21+
22+
23+
{
24+
const file = path.join(tmpdir.path, 'write-end-test1.txt');
25+
const stream = fs.createWriteStream(file, {
26+
fs: {
27+
open: common.mustCall(fs.open),
28+
write: fs.write,
29+
writev: common.mustCallAtLeast(fs.writev, 1),
30+
close: common.mustCall(fs.close),
31+
}
32+
});
33+
stream.write('asd');
34+
stream.write('asd');
35+
stream.write('asd');
36+
stream.end();
37+
stream.on('close', common.mustCall());
38+
}

0 commit comments

Comments
 (0)