Skip to content

Commit a872b97

Browse files
apapirovskiMylesBorins
authored andcommitted
http2: fix compat stream read handling, add tests
Handle edge case where stream pause is called between resume being called and actually evaluated. Other minor adjustments to avoid various edge cases around stream events. Add new tests that cover all changes. Fixes: #15491 PR-URL: #15503 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent b5faadd commit a872b97

4 files changed

+170
-22
lines changed

lib/internal/http2/compat.js

+25-22
Original file line numberDiff line numberDiff line change
@@ -97,46 +97,50 @@ function onStreamError(error) {
9797
}
9898

9999
function onRequestPause() {
100-
this[kStream].pause();
100+
const stream = this[kStream];
101+
if (stream)
102+
stream.pause();
101103
}
102104

103105
function onRequestResume() {
104-
this[kStream].resume();
105-
}
106-
107-
function onRequestDrain() {
108-
if (this.isPaused())
109-
this.resume();
106+
const stream = this[kStream];
107+
if (stream)
108+
stream.resume();
110109
}
111110

112-
function onStreamResponseDrain() {
111+
function onStreamDrain() {
113112
this[kResponse].emit('drain');
114113
}
115114

115+
// TODO Http2Stream does not emit 'close'
116116
function onStreamClosedRequest() {
117117
this[kRequest].push(null);
118118
}
119119

120+
// TODO Http2Stream does not emit 'close'
120121
function onStreamClosedResponse() {
121-
const res = this[kResponse];
122-
res.writable = false;
123-
res.emit('finish');
122+
this[kResponse].emit('finish');
124123
}
125124

126125
function onStreamAbortedRequest(hadError, code) {
127-
if ((this.writable) ||
128-
(this._readableState && !this._readableState.ended)) {
129-
this.emit('aborted', hadError, code);
130-
this.emit('close');
126+
const request = this[kRequest];
127+
if (request[kState].closed === false) {
128+
request.emit('aborted', hadError, code);
129+
request.emit('close');
131130
}
132131
}
133132

134133
function onStreamAbortedResponse() {
135-
if (this.writable) {
136-
this.emit('close');
134+
const response = this[kResponse];
135+
if (response[kState].closed === false) {
136+
response.emit('close');
137137
}
138138
}
139139

140+
function resumeStream(stream) {
141+
stream.resume();
142+
}
143+
140144
class Http2ServerRequest extends Readable {
141145
constructor(stream, headers, options, rawHeaders) {
142146
super(options);
@@ -158,13 +162,12 @@ class Http2ServerRequest extends Readable {
158162
stream.on('end', onStreamEnd);
159163
stream.on('error', onStreamError);
160164
stream.on('close', onStreamClosedRequest);
161-
stream.on('aborted', onStreamAbortedRequest.bind(this));
165+
stream.on('aborted', onStreamAbortedRequest);
162166
const onfinish = this[kFinish].bind(this);
163167
stream.on('streamClosed', onfinish);
164168
stream.on('finish', onfinish);
165169
this.on('pause', onRequestPause);
166170
this.on('resume', onRequestResume);
167-
this.on('drain', onRequestDrain);
168171
}
169172

170173
get closed() {
@@ -221,7 +224,7 @@ class Http2ServerRequest extends Readable {
221224
_read(nread) {
222225
const stream = this[kStream];
223226
if (stream !== undefined) {
224-
stream.resume();
227+
process.nextTick(resumeStream, stream);
225228
} else {
226229
this.emit('error', new errors.Error('ERR_HTTP2_STREAM_CLOSED'));
227230
}
@@ -279,9 +282,9 @@ class Http2ServerResponse extends Stream {
279282
this[kStream] = stream;
280283
stream[kResponse] = this;
281284
this.writable = true;
282-
stream.on('drain', onStreamResponseDrain);
285+
stream.on('drain', onStreamDrain);
283286
stream.on('close', onStreamClosedResponse);
284-
stream.on('aborted', onStreamAbortedResponse.bind(this));
287+
stream.on('aborted', onStreamAbortedResponse);
285288
const onfinish = this[kFinish].bind(this);
286289
stream.on('streamClosed', onfinish);
287290
stream.on('finish', onfinish);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// Flags: --expose-http2
2+
'use strict';
3+
4+
const common = require('../common');
5+
if (!common.hasCrypto)
6+
common.skip('missing crypto');
7+
const assert = require('assert');
8+
const h2 = require('http2');
9+
10+
// Check that pause & resume work as expected with Http2ServerRequest
11+
12+
const testStr = 'Request Body from Client';
13+
14+
const server = h2.createServer();
15+
16+
server.on('request', common.mustCall((req, res) => {
17+
let data = '';
18+
req.pause();
19+
req.setEncoding('utf8');
20+
req.on('data', common.mustCall((chunk) => (data += chunk)));
21+
setTimeout(common.mustCall(() => {
22+
assert.strictEqual(data, '');
23+
req.resume();
24+
}), common.platformTimeout(100));
25+
req.on('end', common.mustCall(() => {
26+
assert.strictEqual(data, testStr);
27+
res.end();
28+
}));
29+
30+
// shouldn't throw if underlying Http2Stream no longer exists
31+
res.on('finish', common.mustCall(() => process.nextTick(() => {
32+
assert.doesNotThrow(() => req.pause());
33+
assert.doesNotThrow(() => req.resume());
34+
})));
35+
}));
36+
37+
server.listen(0, common.mustCall(() => {
38+
const port = server.address().port;
39+
40+
const client = h2.connect(`http://localhost:${port}`);
41+
const request = client.request({
42+
':path': '/foobar',
43+
':method': 'POST',
44+
':scheme': 'http',
45+
':authority': `localhost:${port}`
46+
});
47+
request.resume();
48+
request.end(testStr);
49+
request.on('end', common.mustCall(function() {
50+
client.destroy();
51+
server.close();
52+
}));
53+
}));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Flags: --expose-http2
2+
'use strict';
3+
4+
const common = require('../common');
5+
if (!common.hasCrypto)
6+
common.skip('missing crypto');
7+
const assert = require('assert');
8+
const http2 = require('http2');
9+
const fs = require('fs');
10+
const path = require('path');
11+
12+
// piping should work as expected with createWriteStream
13+
14+
const loc = path.join(common.fixturesDir, 'person.jpg');
15+
const fn = path.join(common.tmpDir, 'http2pipe.jpg');
16+
common.refreshTmpDir();
17+
18+
const server = http2.createServer();
19+
20+
server.on('request', common.mustCall((req, res) => {
21+
const dest = req.pipe(fs.createWriteStream(fn));
22+
dest.on('finish', common.mustCall(() => {
23+
assert.deepStrictEqual(fs.readFileSync(loc), fs.readFileSync(fn));
24+
fs.unlinkSync(fn);
25+
res.end();
26+
}));
27+
}));
28+
29+
server.listen(0, common.mustCall(() => {
30+
const port = server.address().port;
31+
const client = http2.connect(`http://localhost:${port}`);
32+
33+
let remaining = 2;
34+
function maybeClose() {
35+
if (--remaining === 0) {
36+
server.close();
37+
client.destroy();
38+
}
39+
}
40+
41+
const req = client.request({ ':method': 'POST' });
42+
req.on('response', common.mustCall());
43+
req.resume();
44+
req.on('end', common.mustCall(maybeClose));
45+
const str = fs.createReadStream(loc);
46+
str.on('end', common.mustCall(maybeClose));
47+
str.pipe(req);
48+
}));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Flags: --expose-http2
2+
'use strict';
3+
4+
const common = require('../common');
5+
if (!common.hasCrypto)
6+
common.skip('missing crypto');
7+
const assert = require('assert');
8+
const h2 = require('http2');
9+
10+
// Check that drain event is passed from Http2Stream
11+
12+
const testString = 'tests';
13+
14+
const server = h2.createServer();
15+
16+
server.on('request', common.mustCall((req, res) => {
17+
res.stream._writableState.highWaterMark = testString.length;
18+
assert.strictEqual(res.write(testString), false);
19+
res.on('drain', common.mustCall(() => res.end(testString)));
20+
}));
21+
22+
server.listen(0, common.mustCall(() => {
23+
const port = server.address().port;
24+
25+
const client = h2.connect(`http://localhost:${port}`);
26+
const request = client.request({
27+
':path': '/foobar',
28+
':method': 'POST',
29+
':scheme': 'http',
30+
':authority': `localhost:${port}`
31+
});
32+
request.resume();
33+
request.end();
34+
35+
let data = '';
36+
request.setEncoding('utf8');
37+
request.on('data', (chunk) => (data += chunk));
38+
39+
request.on('end', common.mustCall(function() {
40+
assert.strictEqual(data, testString.repeat(2));
41+
client.destroy();
42+
server.close();
43+
}));
44+
}));

0 commit comments

Comments
 (0)