Skip to content

Commit f64bebf

Browse files
committed
stream: add pipeline and finished
PR-URL: nodejs#19828 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent 5cc948b commit f64bebf

File tree

9 files changed

+917
-0
lines changed

9 files changed

+917
-0
lines changed

doc/api/errors.md

+6
Original file line numberDiff line numberDiff line change
@@ -1431,6 +1431,12 @@ An attempt was made to call [`stream.pipe()`][] on a [`Writable`][] stream.
14311431

14321432
An attempt was made to call [`stream.write()`][] with a `null` chunk.
14331433

1434+
<a id="ERR_STREAM_PREMATURE_CLOSE"></a>
1435+
### ERR_STREAM_PREMATURE_CLOSE
1436+
1437+
An error returned by `stream.finished()` and `stream.pipeline()`, when a stream
1438+
or a pipeline ends non gracefully with no explicit error.
1439+
14341440
<a id="ERR_STREAM_PUSH_AFTER_EOF"></a>
14351441
### ERR_STREAM_PUSH_AFTER_EOF
14361442

doc/api/stream.md

+106
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ There are four fundamental stream types within Node.js:
4646
* [Transform][] - Duplex streams that can modify or transform the data as it
4747
is written and read (for example [`zlib.createDeflate()`][]).
4848

49+
Additionally this module includes the utility functions [pipeline][] and
50+
[finished][].
51+
4952
### Object Mode
5053

5154
All streams created by Node.js APIs operate exclusively on strings and `Buffer`
@@ -1287,6 +1290,107 @@ implementors should not override this method, but instead implement
12871290
[`readable._destroy()`][readable-_destroy].
12881291
The default implementation of `_destroy()` for `Transform` also emit `'close'`.
12891292

1293+
### stream.finished(stream, callback)
1294+
<!-- YAML
1295+
added: REPLACEME
1296+
-->
1297+
1298+
* `stream` {Stream} A readable and/or writable stream.
1299+
* `callback` {Function} A callback function that takes an optional error
1300+
argument.
1301+
1302+
A function to get notified when a stream is no longer readable, writable
1303+
or has experienced an error or a premature close event.
1304+
1305+
```js
1306+
const { finished } = require('stream');
1307+
1308+
const rs = fs.createReadStream('archive.tar');
1309+
1310+
finished(rs, (err) => {
1311+
if (err) {
1312+
console.error('Stream failed', err);
1313+
} else {
1314+
console.log('Stream is done reading');
1315+
}
1316+
});
1317+
1318+
rs.resume(); // drain the stream
1319+
```
1320+
1321+
Especially useful in error handling scenarios where a stream is destroyed
1322+
prematurely (like an aborted HTTP request), and will not emit `'end'`
1323+
or `'finish'`.
1324+
1325+
The `finished` API is promisify'able as well;
1326+
1327+
```js
1328+
const finished = util.promisify(stream.finished);
1329+
1330+
const rs = fs.createReadStream('archive.tar');
1331+
1332+
async function run() {
1333+
await finished(rs);
1334+
console.log('Stream is done reading');
1335+
}
1336+
1337+
run().catch(console.error);
1338+
rs.resume(); // drain the stream
1339+
```
1340+
1341+
### stream.pipeline(...streams[, callback])
1342+
<!-- YAML
1343+
added: REPLACEME
1344+
-->
1345+
1346+
* `...streams` {Stream} Two or more streams to pipe between.
1347+
* `callback` {Function} A callback function that takes an optional error
1348+
argument.
1349+
1350+
A module method to pipe between streams forwarding errors and properly cleaning
1351+
up and provide a callback when the pipeline is complete.
1352+
1353+
```js
1354+
const { pipeline } = require('stream');
1355+
const fs = require('fs');
1356+
const zlib = require('zlib');
1357+
1358+
// Use the pipeline API to easily pipe a series of streams
1359+
// together and get notified when the pipeline is fully done.
1360+
1361+
// A pipeline to gzip a potentially huge tar file efficiently:
1362+
1363+
pipeline(
1364+
fs.createReadStream('archive.tar'),
1365+
zlib.createGzip(),
1366+
fs.createWriteStream('archive.tar.gz'),
1367+
(err) => {
1368+
if (err) {
1369+
console.error('Pipeline failed', err);
1370+
} else {
1371+
console.log('Pipeline succeeded');
1372+
}
1373+
}
1374+
);
1375+
```
1376+
1377+
The `pipeline` API is promisify'able as well:
1378+
1379+
```js
1380+
const pipeline = util.promisify(stream.pipeline);
1381+
1382+
async function run() {
1383+
await pipeline(
1384+
fs.createReadStream('archive.tar'),
1385+
zlib.createGzip(),
1386+
fs.createWriteStream('archive.tar.gz')
1387+
);
1388+
console.log('Pipeline succeeded');
1389+
}
1390+
1391+
run().catch(console.error);
1392+
```
1393+
12901394
## API for Stream Implementers
12911395

12921396
<!--type=misc-->
@@ -2397,6 +2501,8 @@ contain multi-byte characters.
23972501
[http-incoming-message]: http.html#http_class_http_incomingmessage
23982502
[zlib]: zlib.html
23992503
[hwm-gotcha]: #stream_highwatermark_discrepancy_after_calling_readable_setencoding
2504+
[pipeline]: #stream_stream_pipeline_streams_callback
2505+
[finished]: #stream_stream_finished_stream_callback
24002506
[stream-_flush]: #stream_transform_flush_callback
24012507
[stream-_read]: #stream_readable_read_size_1
24022508
[stream-_transform]: #stream_transform_transform_chunk_encoding_callback

lib/internal/errors.js

+1
Original file line numberDiff line numberDiff line change
@@ -961,6 +961,7 @@ E('ERR_STDOUT_CLOSE', 'process.stdout cannot be closed', Error);
961961
E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error);
962962
E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed', Error);
963963
E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError);
964+
E('ERR_STREAM_PREMATURE_CLOSE', 'Premature close', Error);
964965
E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error);
965966
E('ERR_STREAM_UNSHIFT_AFTER_END_EVENT',
966967
'stream.unshift() after end event', Error);

lib/internal/streams/end-of-stream.js

+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Ported from https://github.com/mafintosh/end-of-stream with
2+
// permission from the author, Mathias Buus (@mafintosh).
3+
4+
'use strict';
5+
6+
const {
7+
ERR_STREAM_PREMATURE_CLOSE
8+
} = require('internal/errors').codes;
9+
10+
function noop() {}
11+
12+
function isRequest(stream) {
13+
return stream.setHeader && typeof stream.abort === 'function';
14+
}
15+
16+
function once(callback) {
17+
let called = false;
18+
return function(err) {
19+
if (called) return;
20+
called = true;
21+
callback.call(this, err);
22+
};
23+
}
24+
25+
function eos(stream, opts, callback) {
26+
if (typeof opts === 'function') return eos(stream, null, opts);
27+
if (!opts) opts = {};
28+
29+
callback = once(callback || noop);
30+
31+
const ws = stream._writableState;
32+
const rs = stream._readableState;
33+
let readable = opts.readable || (opts.readable !== false && stream.readable);
34+
let writable = opts.writable || (opts.writable !== false && stream.writable);
35+
36+
const onlegacyfinish = () => {
37+
if (!stream.writable) onfinish();
38+
};
39+
40+
const onfinish = () => {
41+
writable = false;
42+
if (!readable) callback.call(stream);
43+
};
44+
45+
const onend = () => {
46+
readable = false;
47+
if (!writable) callback.call(stream);
48+
};
49+
50+
const onerror = (err) => {
51+
callback.call(stream, err);
52+
};
53+
54+
const onclose = () => {
55+
if (readable && !(rs && rs.ended)) {
56+
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
57+
}
58+
if (writable && !(ws && ws.ended)) {
59+
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
60+
}
61+
};
62+
63+
const onrequest = () => {
64+
stream.req.on('finish', onfinish);
65+
};
66+
67+
if (isRequest(stream)) {
68+
stream.on('complete', onfinish);
69+
stream.on('abort', onclose);
70+
if (stream.req) onrequest();
71+
else stream.on('request', onrequest);
72+
} else if (writable && !ws) { // legacy streams
73+
stream.on('end', onlegacyfinish);
74+
stream.on('close', onlegacyfinish);
75+
}
76+
77+
stream.on('end', onend);
78+
stream.on('finish', onfinish);
79+
if (opts.error !== false) stream.on('error', onerror);
80+
stream.on('close', onclose);
81+
82+
return function() {
83+
stream.removeListener('complete', onfinish);
84+
stream.removeListener('abort', onclose);
85+
stream.removeListener('request', onrequest);
86+
if (stream.req) stream.req.removeListener('finish', onfinish);
87+
stream.removeListener('end', onlegacyfinish);
88+
stream.removeListener('close', onlegacyfinish);
89+
stream.removeListener('finish', onfinish);
90+
stream.removeListener('end', onend);
91+
stream.removeListener('error', onerror);
92+
stream.removeListener('close', onclose);
93+
};
94+
}
95+
96+
module.exports = eos;

lib/internal/streams/pipeline.js

+95
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Ported from https://github.com/mafintosh/pump with
2+
// permission from the author, Mathias Buus (@mafintosh).
3+
4+
'use strict';
5+
6+
const eos = require('internal/streams/end-of-stream');
7+
8+
const {
9+
ERR_MISSING_ARGS,
10+
ERR_STREAM_DESTROYED
11+
} = require('internal/errors').codes;
12+
13+
function once(callback) {
14+
let called = false;
15+
return function(err) {
16+
if (called) return;
17+
called = true;
18+
callback(err);
19+
};
20+
}
21+
22+
function noop() {}
23+
24+
function isRequest(stream) {
25+
return stream.setHeader && typeof stream.abort === 'function';
26+
}
27+
28+
function destroyer(stream, reading, writing, callback) {
29+
callback = once(callback);
30+
31+
let closed = false;
32+
stream.on('close', () => {
33+
closed = true;
34+
});
35+
36+
eos(stream, { readable: reading, writable: writing }, (err) => {
37+
if (err) return callback(err);
38+
closed = true;
39+
callback();
40+
});
41+
42+
let destroyed = false;
43+
return (err) => {
44+
if (closed) return;
45+
if (destroyed) return;
46+
destroyed = true;
47+
48+
// request.destroy just do .end - .abort is what we want
49+
if (isRequest(stream)) return stream.abort();
50+
if (typeof stream.destroy === 'function') return stream.destroy();
51+
52+
callback(err || new ERR_STREAM_DESTROYED('pipe'));
53+
};
54+
}
55+
56+
function call(fn) {
57+
fn();
58+
}
59+
60+
function pipe(from, to) {
61+
return from.pipe(to);
62+
}
63+
64+
function popCallback(streams) {
65+
if (!streams.length) return noop;
66+
if (typeof streams[streams.length - 1] !== 'function') return noop;
67+
return streams.pop();
68+
}
69+
70+
function pipeline(...streams) {
71+
const callback = popCallback(streams);
72+
73+
if (Array.isArray(streams[0])) streams = streams[0];
74+
75+
if (streams.length < 2) {
76+
throw new ERR_MISSING_ARGS('streams');
77+
}
78+
79+
let error;
80+
const destroys = streams.map(function(stream, i) {
81+
const reading = i < streams.length - 1;
82+
const writing = i > 0;
83+
return destroyer(stream, reading, writing, function(err) {
84+
if (!error) error = err;
85+
if (err) destroys.forEach(call);
86+
if (reading) return;
87+
destroys.forEach(call);
88+
callback(error);
89+
});
90+
});
91+
92+
return streams.reduce(pipe);
93+
}
94+
95+
module.exports = pipeline;

lib/stream.js

+5
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
'use strict';
2323

2424
const { Buffer } = require('buffer');
25+
const pipeline = require('internal/streams/pipeline');
26+
const eos = require('internal/streams/end-of-stream');
2527

2628
// Note: export Stream before Readable/Writable/Duplex/...
2729
// to avoid a cross-reference(require) issues
@@ -33,6 +35,9 @@ Stream.Duplex = require('_stream_duplex');
3335
Stream.Transform = require('_stream_transform');
3436
Stream.PassThrough = require('_stream_passthrough');
3537

38+
Stream.pipeline = pipeline;
39+
Stream.finished = eos;
40+
3641
// Backwards-compat with node 0.4.x
3742
Stream.Stream = Stream;
3843

node.gyp

+2
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@
154154
'lib/internal/streams/legacy.js',
155155
'lib/internal/streams/destroy.js',
156156
'lib/internal/streams/state.js',
157+
'lib/internal/streams/pipeline.js',
158+
'lib/internal/streams/end-of-stream.js',
157159
'lib/internal/wrap_js_stream.js',
158160
'deps/v8/tools/splaytree.js',
159161
'deps/v8/tools/codemap.js',

0 commit comments

Comments
 (0)