Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: compose #39029

Closed
wants to merge 8 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
@@ -1859,6 +1859,48 @@ run().catch(console.error);
after the `callback` has been invoked. In the case of reuse of streams after
failure, this can cause event listener leaks and swallowed errors.

### `stream.compose(...streams)`
<!-- YAML
added: REPLACEME
-->

* `streams` {Stream[]}
* Returns: {stream.Duplex}

Combines two or more streams into a `Duplex` stream that writes to the
first stream and reads from the last. Each provided stream is piped into
the next, using `stream.pipeline`. If any of the streams error then all
are destroyed, including the outer `Duplex` stream.

Because `stream.compose` returns a new stream that in turn can (and
should) be piped into other streams, it enables composition. In contrast,
when passing streams to `stream.pipeline`, typically the first stream is
a readable stream and the last a writable stream, forming a closed
circuit.

```mjs
import { compose, Transform } from 'stream';

const removeSpaces = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).replace(' ', ''));
}
});

const toUpper = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).toUpperCase());
}
});

let res = '';
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
res += buf;
}

console.log(res); // prints 'HELLOWORLD'
```

### `stream.Readable.from(iterable, [options])`
<!-- YAML
added:
284 changes: 284 additions & 0 deletions lib/internal/streams/compose.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
'use strict';

const pipeline = require('internal/streams/pipeline');
const Duplex = require('internal/streams/duplex');
const { createDeferredPromise } = require('internal/util');
const { destroyer } = require('internal/streams/destroy');
const from = require('internal/streams/from');
const {
isNodeStream,
isIterable,
isReadable,
isWritable,
} = require('internal/streams/utils');
const {
PromiseResolve,
} = primordials;
const {
AbortError,
codes: {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_ARG_VALUE,
ERR_INVALID_RETURN_VALUE,
ERR_MISSING_ARGS,
},
} = require('internal/errors');
const assert = require('internal/assert');

// This is needed for pre node 17.
class ComposeDuplex extends Duplex {
constructor(options) {
super(options);

// https://github.com/nodejs/node/pull/34385

if (options?.readable === false) {
this._readableState.readable = false;
this._readableState.ended = true;
this._readableState.endEmitted = true;
}

if (options?.writable === false) {
this._writableState.writable = false;
this._writableState.ending = true;
this._writableState.ended = true;
this._writableState.finished = true;
}
}
}

module.exports = function compose(...streams) {
if (streams.length === 0) {
throw new ERR_MISSING_ARGS('streams');
}

if (streams.length === 1) {
return makeDuplex(streams[0], 'streams[0]');
}

const orgStreams = [...streams];

if (typeof streams[0] === 'function') {
streams[0] = makeDuplex(streams[0], 'streams[0]');
}

if (typeof streams[streams.length - 1] === 'function') {
const idx = streams.length - 1;
streams[idx] = makeDuplex(streams[idx], `streams[${idx}]`);
}

for (let n = 0; n < streams.length; ++n) {
if (!isNodeStream(streams[n])) {
// TODO(ronag): Add checks for non streams.
continue;
}
if (n < streams.length - 1 && !isReadable(streams[n])) {
throw new ERR_INVALID_ARG_VALUE(
`streams[${n}]`,
orgStreams[n],
'must be readable'
);
}
if (n > 0 && !isWritable(streams[n])) {
throw new ERR_INVALID_ARG_VALUE(
`streams[${n}]`,
orgStreams[n],
'must be writable'
);
}
}

let ondrain;
let onfinish;
let onreadable;
let onclose;
let d;

function onfinished(err) {
const cb = onclose;
onclose = null;

if (cb) {
cb(err);
} else if (err) {
d.destroy(err);
} else if (!readable && !writable) {
d.destroy();
}
}

const head = streams[0];
const tail = pipeline(streams, onfinished);

const writable = !!isWritable(head);
const readable = !!isReadable(tail);

// TODO(ronag): Avoid double buffering.
// Implement Writable/Readable/Duplex traits.
// See, https://github.com/nodejs/node/pull/33515.
d = new ComposeDuplex({
highWaterMark: 1,
writableObjectMode: !!head?.writableObjectMode,
readableObjectMode: !!tail?.writableObjectMode,
writable,
readable,
});

if (writable) {
d._write = function(chunk, encoding, callback) {
if (head.write(chunk, encoding)) {
callback();
} else {
ondrain = callback;
}
};

d._final = function(callback) {
head.end();
onfinish = callback;
};

head.on('drain', function() {
if (ondrain) {
const cb = ondrain;
ondrain = null;
cb();
}
});

tail.on('finish', function() {
if (onfinish) {
const cb = onfinish;
onfinish = null;
cb();
}
});
}

if (readable) {
tail.on('readable', function() {
if (onreadable) {
const cb = onreadable;
onreadable = null;
cb();
}
});

tail.on('end', function() {
d.push(null);
});

d._read = function() {
while (true) {
const buf = tail.read();

if (buf === null) {
onreadable = d._read;
return;
}

if (!d.push(buf)) {
return;
}
}
};
}

d._destroy = function(err, callback) {
if (!err && onclose !== null) {
err = new AbortError();
}

onreadable = null;
ondrain = null;
onfinish = null;

if (onclose === null) {
callback(err);
} else {
onclose = callback;
destroyer(tail, err);
}
};

return d;
};

function makeDuplex(stream, name) {
let ret;
if (typeof stream === 'function') {
assert(stream.length > 0);

const { value, write, final } = fromAsyncGen(stream);

if (isIterable(value)) {
ret = from(ComposeDuplex, value, {
objectMode: true,
highWaterMark: 1,
write,
final
});
} else if (typeof value?.then === 'function') {
const promise = PromiseResolve(value)
.then((val) => {
if (val != null) {
throw new ERR_INVALID_RETURN_VALUE('nully', name, val);
}
})
.catch((err) => {
destroyer(ret, err);
});

ret = new ComposeDuplex({
objectMode: true,
highWaterMark: 1,
readable: false,
write,
final(cb) {
final(() => promise.then(cb, cb));
}
});
} else {
throw new ERR_INVALID_RETURN_VALUE(
'Iterable, AsyncIterable or AsyncFunction', name, value);
}
} else if (isNodeStream(stream)) {
ret = stream;
} else if (isIterable(stream)) {
ret = from(ComposeDuplex, stream, {
objectMode: true,
highWaterMark: 1,
writable: false
});
} else {
throw new ERR_INVALID_ARG_TYPE(
name,
['Stream', 'Iterable', 'AsyncIterable', 'Function'],
stream)
;
}
return ret;
}

function fromAsyncGen(fn) {
let { promise, resolve } = createDeferredPromise();
const value = fn(async function*() {
while (true) {
const { chunk, done, cb } = await promise;
process.nextTick(cb);
if (done) return;
yield chunk;
({ promise, resolve } = createDeferredPromise());
}
}());

return {
value,
write(chunk, encoding, cb) {
resolve({ chunk, done: false, cb });
},
final(cb) {
resolve({ done: true, cb });
}
};
}
3 changes: 0 additions & 3 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
@@ -295,9 +295,6 @@ function pipeline(...streams) {
}
}

// TODO(ronag): Consider returning a Duplex proxy if the first argument
// is a writable. Would improve composability.
// See, https://github.com/nodejs/node/issues/32020
return ret;
}

10 changes: 9 additions & 1 deletion lib/internal/streams/utils.js
Original file line number Diff line number Diff line change
@@ -28,7 +28,15 @@ function isWritableNodeStream(obj) {
}

function isNodeStream(obj) {
return isReadableNodeStream(obj) || isWritableNodeStream(obj);
return (
obj &&
(
obj._readableState ||
obj._writableState ||
(typeof obj.write === 'function' && typeof obj.on === 'function') ||
(typeof obj.pipe === 'function' && typeof obj.on === 'function')
)
);
}

function isIterable(obj, isAsync) {
22 changes: 22 additions & 0 deletions lib/stream.js
Original file line number Diff line number Diff line change
@@ -30,9 +30,16 @@ const {
} = require('internal/util');

const pipeline = require('internal/streams/pipeline');
const _compose = require('internal/streams/compose');
const { destroyer } = require('internal/streams/destroy');
const eos = require('internal/streams/end-of-stream');
const internalBuffer = require('internal/buffer');
const { isNodeStream } = require('internal/streams/utils');
const {
codes: {
ERR_INVALID_ARG_VALUE,
},
} = require('internal/errors');

const promises = require('stream/promises');

@@ -48,6 +55,21 @@ Stream.addAbortSignal = addAbortSignal;
Stream.finished = eos;
Stream.destroy = destroyer;

Stream.compose = function compose(...streams) {
// TODO (ronag): Remove this once async function API
// has been discussed.
for (let n = 0; n < streams.length; ++n) {
if (!isNodeStream(streams[n])) {
throw new ERR_INVALID_ARG_VALUE(
`streams[${n}]`,
streams[n],
'must be stream'
);
}
}
return _compose(...streams);
};

ObjectDefineProperty(Stream, 'promises', {
configurable: true,
enumerable: true,
1 change: 1 addition & 0 deletions test/parallel/test-bootstrap-modules.js
Original file line number Diff line number Diff line change
@@ -101,6 +101,7 @@ const expectedModules = new Set([
'NativeModule internal/stream_base_commons',
'NativeModule internal/streams/add-abort-signal',
'NativeModule internal/streams/buffer_list',
'NativeModule internal/streams/compose',
'NativeModule internal/streams/destroy',
'NativeModule internal/streams/duplex',
'NativeModule internal/streams/end-of-stream',
425 changes: 425 additions & 0 deletions test/parallel/test-stream-compose.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,425 @@
// Flags: --expose-internals

'use strict';

const common = require('../common');
const {
Readable,
Transform,
Writable,
finished,
PassThrough
} = require('stream');
const compose = require('internal/streams/compose');
const assert = require('assert');

{
let res = '';
compose(
new Transform({
transform: common.mustCall((chunk, encoding, callback) => {
callback(null, chunk + chunk);
})
}),
new Transform({
transform: common.mustCall((chunk, encoding, callback) => {
callback(null, chunk.toString().toUpperCase());
})
})
)
.end('asd')
.on('data', common.mustCall((buf) => {
res += buf;
}))
.on('end', common.mustCall(() => {
assert.strictEqual(res, 'ASDASD');
}));
}

{
let res = '';
compose(
async function*(source) {
for await (const chunk of source) {
yield chunk + chunk;
}
},
async function*(source) {
for await (const chunk of source) {
yield chunk.toString().toUpperCase();
}
}
)
.end('asd')
.on('data', common.mustCall((buf) => {
res += buf;
}))
.on('end', common.mustCall(() => {
assert.strictEqual(res, 'ASDASD');
}));
}

{
let res = '';
compose(
async function*(source) {
for await (const chunk of source) {
yield chunk + chunk;
}
}
)
.end('asd')
.on('data', common.mustCall((buf) => {
res += buf;
}))
.on('end', common.mustCall(() => {
assert.strictEqual(res, 'asdasd');
}));
}

{
let res = '';
compose(
Readable.from(['asd']),
new Transform({
transform: common.mustCall((chunk, encoding, callback) => {
callback(null, chunk.toString().toUpperCase());
})
})
)
.on('data', common.mustCall((buf) => {
res += buf;
}))
.on('end', common.mustCall(() => {
assert.strictEqual(res, 'ASD');
}));
}

{
let res = '';
compose(
async function* () {
yield 'asd';
}(),
new Transform({
transform: common.mustCall((chunk, encoding, callback) => {
callback(null, chunk.toString().toUpperCase());
})
})
)
.on('data', common.mustCall((buf) => {
res += buf;
}))
.on('end', common.mustCall(() => {
assert.strictEqual(res, 'ASD');
}));
}

{
let res = '';
compose(
new Transform({
transform: common.mustCall((chunk, encoding, callback) => {
callback(null, chunk.toString().toUpperCase());
})
}),
async function*(source) {
for await (const chunk of source) {
yield chunk;
}
},
new Writable({
write: common.mustCall((chunk, encoding, callback) => {
res += chunk;
callback(null);
})
})
)
.end('asd')
.on('finish', common.mustCall(() => {
assert.strictEqual(res, 'ASD');
}));
}

{
let res = '';
compose(
new Transform({
transform: common.mustCall((chunk, encoding, callback) => {
callback(null, chunk.toString().toUpperCase());
})
}),
async function*(source) {
for await (const chunk of source) {
yield chunk;
}
},
async function(source) {
for await (const chunk of source) {
res += chunk;
}
}
)
.end('asd')
.on('finish', common.mustCall(() => {
assert.strictEqual(res, 'ASD');
}));
}

{
let res;
compose(
new Transform({
objectMode: true,
transform: common.mustCall((chunk, encoding, callback) => {
callback(null, { chunk });
})
}),
async function*(source) {
for await (const chunk of source) {
yield chunk;
}
},
new Transform({
objectMode: true,
transform: common.mustCall((chunk, encoding, callback) => {
callback(null, { chunk });
})
})
)
.end(true)
.on('data', common.mustCall((buf) => {
res = buf;
}))
.on('end', common.mustCall(() => {
assert.strictEqual(res.chunk.chunk, true);
}));
}

{
const _err = new Error('asd');
compose(
new Transform({
objectMode: true,
transform: common.mustCall((chunk, encoding, callback) => {
callback(_err);
})
}),
async function*(source) {
for await (const chunk of source) {
yield chunk;
}
},
new Transform({
objectMode: true,
transform: common.mustNotCall((chunk, encoding, callback) => {
callback(null, { chunk });
})
})
)
.end(true)
.on('data', common.mustNotCall())
.on('end', common.mustNotCall())
.on('error', (err) => {
assert.strictEqual(err, _err);
});
}

{
const _err = new Error('asd');
compose(
new Transform({
objectMode: true,
transform: common.mustCall((chunk, encoding, callback) => {
callback(null, chunk);
})
}),
async function*(source) {
let tmp = '';
for await (const chunk of source) {
tmp += chunk;
throw _err;
}
return tmp;
},
new Transform({
objectMode: true,
transform: common.mustNotCall((chunk, encoding, callback) => {
callback(null, { chunk });
})
})
)
.end(true)
.on('data', common.mustNotCall())
.on('end', common.mustNotCall())
.on('error', (err) => {
assert.strictEqual(err, _err);
});
}

{
let buf = '';

// Convert into readable Duplex.
const s1 = compose(async function* () {
yield 'Hello';
yield 'World';
}(), async function* (source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
}, async function(source) {
for await (const chunk of source) {
buf += chunk;
}
});

assert.strictEqual(s1.writable, false);
assert.strictEqual(s1.readable, false);

finished(s1.resume(), common.mustCall((err) => {
assert(!err);
assert.strictEqual(buf, 'HELLOWORLD');
}));
}

{
let buf = '';
// Convert into transform duplex.
const s2 = compose(async function* (source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
});
s2.end('helloworld');
s2.resume();
s2.on('data', (chunk) => {
buf += chunk;
});

finished(s2.resume(), common.mustCall((err) => {
assert(!err);
assert.strictEqual(buf, 'HELLOWORLD');
}));
}

{
let buf = '';

// Convert into readable Duplex.
const s1 = compose(async function* () {
yield 'Hello';
yield 'World';
}());

// Convert into transform duplex.
const s2 = compose(async function* (source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
});

// Convert into writable duplex.
const s3 = compose(async function(source) {
for await (const chunk of source) {
buf += chunk;
}
});

const s4 = compose(s1, s2, s3);

finished(s4, common.mustCall((err) => {
assert(!err);
assert.strictEqual(buf, 'HELLOWORLD');
}));
}

{
let buf = '';

// Convert into readable Duplex.
const s1 = compose(async function* () {
yield 'Hello';
yield 'World';
}(), async function* (source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
}, async function(source) {
for await (const chunk of source) {
buf += chunk;
}
});

finished(s1, common.mustCall((err) => {
assert(!err);
assert.strictEqual(buf, 'HELLOWORLD');
}));
}

{
try {
compose();
} catch (err) {
assert.strictEqual(err.code, 'ERR_MISSING_ARGS');
}
}

{
try {
compose(new Writable(), new PassThrough());
} catch (err) {
assert.strictEqual(err.code, 'ERR_INVALID_ARG_VALUE');
}
}

{
try {
compose(new PassThrough(), new Readable({ read() {} }), new PassThrough());
} catch (err) {
assert.strictEqual(err.code, 'ERR_INVALID_ARG_VALUE');
}
}

{
let buf = '';

// Convert into readable Duplex.
const s1 = compose(async function* () {
yield 'Hello';
yield 'World';
}(), async function* (source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
}, async function(source) {
for await (const chunk of source) {
buf += chunk;
}
return buf;
});

finished(s1, common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE');
}));
}

{
let buf = '';

// Convert into readable Duplex.
const s1 = compose('HelloWorld', async function* (source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
}, async function(source) {
for await (const chunk of source) {
buf += chunk;
}
});

finished(s1, common.mustCall((err) => {
assert(!err);
assert.strictEqual(buf, 'HELLOWORLD');
}));
}