Skip to content

Commit bb8a65d

Browse files
addaleaxBethGriggs
authored andcommitted
stream: fix end-of-stream for HTTP/2
HTTP/2 streams call `.end()` on themselves from their `.destroy()` method, which might be queued (e.g. due to network congestion) and not processed before the stream itself is destroyed. In that case, the `_writableState.ended` property could be set before the stream emits its `'close'` event, and never actually emits the `'finished'` event, confusing the end-of-stream implementation so that it wouldn’t call its callback. This can be fixed by watching for the end events themselves using the existing `'finish'` and `'end'` listeners rather than relying on the `.ended` properties of the `_...State` objects. These properties still need to be checked to know whether stream closure was premature – My understanding is that ideally, streams should not emit `'close'` before `'end'` and/or `'finished'`, so this might be another bug, but changing this would require modifying tests and almost certainly be a breaking change. Fixes: #24456 PR-URL: #24926 Reviewed-By: Rich Trott <[email protected]> Reviewed-By: Luigi Pinca <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Franziska Hinkelmann <[email protected]>
1 parent bdf21c1 commit bb8a65d

File tree

3 files changed

+53
-9
lines changed

3 files changed

+53
-9
lines changed

lib/internal/streams/end-of-stream.js

+14-7
Original file line numberDiff line numberDiff line change
@@ -28,22 +28,24 @@ function eos(stream, opts, callback) {
2828

2929
callback = once(callback || noop);
3030

31-
const ws = stream._writableState;
32-
const rs = stream._readableState;
3331
let readable = opts.readable || (opts.readable !== false && stream.readable);
3432
let writable = opts.writable || (opts.writable !== false && stream.writable);
3533

3634
const onlegacyfinish = () => {
3735
if (!stream.writable) onfinish();
3836
};
3937

38+
var writableEnded = stream._writableState && stream._writableState.finished;
4039
const onfinish = () => {
4140
writable = false;
41+
writableEnded = true;
4242
if (!readable) callback.call(stream);
4343
};
4444

45+
var readableEnded = stream._readableState && stream._readableState.endEmitted;
4546
const onend = () => {
4647
readable = false;
48+
readableEnded = true;
4749
if (!writable) callback.call(stream);
4850
};
4951

@@ -52,11 +54,16 @@ function eos(stream, opts, callback) {
5254
};
5355

5456
const onclose = () => {
55-
if (readable && !(rs && rs.ended)) {
56-
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
57+
let err;
58+
if (readable && !readableEnded) {
59+
if (!stream._readableState || !stream._readableState.ended)
60+
err = new ERR_STREAM_PREMATURE_CLOSE();
61+
return callback.call(stream, err);
5762
}
58-
if (writable && !(ws && ws.ended)) {
59-
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
63+
if (writable && !writableEnded) {
64+
if (!stream._writableState || !stream._writableState.ended)
65+
err = new ERR_STREAM_PREMATURE_CLOSE();
66+
return callback.call(stream, err);
6067
}
6168
};
6269

@@ -69,7 +76,7 @@ function eos(stream, opts, callback) {
6976
stream.on('abort', onclose);
7077
if (stream.req) onrequest();
7178
else stream.on('request', onrequest);
72-
} else if (writable && !ws) { // legacy streams
79+
} else if (writable && !stream._writableState) { // legacy streams
7380
stream.on('end', onlegacyfinish);
7481
stream.on('close', onlegacyfinish);
7582
}

test/parallel/parallel.status

-2
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@ test-net-connect-options-port: PASS,FLAKY
1212
test-http2-pipe: PASS,FLAKY
1313
test-worker-syntax-error: PASS,FLAKY
1414
test-worker-syntax-error-file: PASS,FLAKY
15-
# https://github.com/nodejs/node/issues/24456
16-
test-stream-pipeline-http2: PASS,FLAKY
1715

1816
[$system==linux]
1917

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const { Readable, Duplex, pipeline } = require('stream');
5+
6+
// Test that the callback for pipeline() is called even when the ._destroy()
7+
// method of the stream places an .end() request to itself that does not
8+
// get processed before the destruction of the stream (i.e. the 'close' event).
9+
// Refs: https://github.com/nodejs/node/issues/24456
10+
11+
const readable = new Readable({
12+
read: common.mustCall(() => {})
13+
});
14+
15+
const duplex = new Duplex({
16+
write(chunk, enc, cb) {
17+
// Simulate messages queueing up.
18+
},
19+
read() {},
20+
destroy(err, cb) {
21+
// Call end() from inside the destroy() method, like HTTP/2 streams
22+
// do at the time of writing.
23+
this.end();
24+
cb(err);
25+
}
26+
});
27+
28+
duplex.on('finished', common.mustNotCall());
29+
30+
pipeline(readable, duplex, common.mustCall((err) => {
31+
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
32+
}));
33+
34+
// Write one chunk of data, and destroy the stream later.
35+
// That should trigger the pipeline destruction.
36+
readable.push('foo');
37+
setImmediate(() => {
38+
readable.destroy();
39+
});

0 commit comments

Comments
 (0)