Skip to content

Commit f0be053

Browse files
aks-targos
authored andcommitted
lib: merge onread handlers for http2 streams & net.Socket
Refs: #20993 Co-authored-by: Anna Henningsen <[email protected]> PR-URL: #22449 Reviewed-By: James M Snell <[email protected]> Reviewed-By: Colin Ihrig <[email protected]>
1 parent c7268c4 commit f0be053

File tree

3 files changed

+68
-95
lines changed

3 files changed

+68
-95
lines changed

lib/internal/http2/core.js

+5-33
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,10 @@ const {
105105
const {
106106
createWriteWrap,
107107
writeGeneric,
108-
writevGeneric
108+
writevGeneric,
109+
onStreamRead,
110+
kMaybeDestroy,
111+
kUpdateTimer
109112
} = require('internal/stream_base_commons');
110113
const {
111114
kTimeout,
@@ -142,7 +145,6 @@ const kHandle = Symbol('handle');
142145
const kID = Symbol('id');
143146
const kInit = Symbol('init');
144147
const kInfoHeaders = Symbol('sent-info-headers');
145-
const kMaybeDestroy = Symbol('maybe-destroy');
146148
const kLocalSettings = Symbol('local-settings');
147149
const kOptions = Symbol('options');
148150
const kOwner = owner_symbol;
@@ -156,7 +158,6 @@ const kServer = Symbol('server');
156158
const kSession = Symbol('session');
157159
const kState = Symbol('state');
158160
const kType = Symbol('type');
159-
const kUpdateTimer = Symbol('update-timer');
160161
const kWriteGeneric = Symbol('write-generic');
161162

162163
const kDefaultSocketTimeout = 2 * 60 * 1000;
@@ -374,36 +375,6 @@ function onStreamClose(code) {
374375
}
375376
}
376377

377-
// Receives a chunk of data for a given stream and forwards it on
378-
// to the Http2Stream Duplex for processing.
379-
function onStreamRead(nread, buf) {
380-
const stream = this[kOwner];
381-
if (nread >= 0 && !stream.destroyed) {
382-
debug(`Http2Stream ${stream[kID]} [Http2Session ` +
383-
`${sessionName(stream[kSession][kType])}]: receiving data chunk ` +
384-
`of size ${nread}`);
385-
stream[kUpdateTimer]();
386-
if (!stream.push(buf)) {
387-
if (!stream.destroyed) // we have to check a second time
388-
this.readStop();
389-
}
390-
return;
391-
}
392-
393-
// Last chunk was received. End the readable side.
394-
debug(`Http2Stream ${stream[kID]} [Http2Session ` +
395-
`${sessionName(stream[kSession][kType])}]: ending readable.`);
396-
397-
// defer this until we actually emit end
398-
if (!stream.readable) {
399-
stream[kMaybeDestroy]();
400-
} else {
401-
stream.on('end', stream[kMaybeDestroy]);
402-
stream.push(null);
403-
stream.read(0);
404-
}
405-
}
406-
407378
// Called when the remote peer settings have been updated.
408379
// Resets the cached settings.
409380
function onSettings() {
@@ -2145,6 +2116,7 @@ function afterOpen(session, options, headers, streamOptions, err, fd) {
21452116
class ServerHttp2Stream extends Http2Stream {
21462117
constructor(session, handle, id, options, headers) {
21472118
super(session, options);
2119+
handle.owner = this;
21482120
this[kInit](id, handle);
21492121
this[kProtocol] = headers[HTTP2_HEADER_SCHEME];
21502122
this[kAuthority] = headers[HTTP2_HEADER_AUTHORITY];

lib/internal/stream_base_commons.js

+52-3
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
'use strict';
22

33
const { Buffer } = require('buffer');
4-
const errors = require('internal/errors');
54
const { WriteWrap } = process.binding('stream_wrap');
5+
const { UV_EOF } = process.binding('uv');
6+
const { errnoException } = require('internal/errors');
7+
const { owner_symbol } = require('internal/async_hooks').symbols;
68

7-
const errnoException = errors.errnoException;
9+
const kMaybeDestroy = Symbol('kMaybeDestroy');
10+
const kUpdateTimer = Symbol('kUpdateTimer');
811

912
function handleWriteReq(req, data, encoding) {
1013
const { handle } = req;
@@ -81,8 +84,54 @@ function afterWriteDispatched(self, req, err, cb) {
8184
}
8285
}
8386

87+
function onStreamRead(nread, buf) {
88+
const handle = this;
89+
const stream = this[owner_symbol];
90+
91+
stream[kUpdateTimer]();
92+
93+
if (nread > 0 && !stream.destroyed) {
94+
if (!stream.push(buf)) {
95+
handle.reading = false;
96+
if (!stream.destroyed) {
97+
const err = handle.readStop();
98+
if (err)
99+
stream.destroy(errnoException(err, 'read'));
100+
}
101+
}
102+
103+
return;
104+
}
105+
106+
if (nread === 0) {
107+
return;
108+
}
109+
110+
if (nread !== UV_EOF) {
111+
return stream.destroy(errnoException(nread, 'read'));
112+
}
113+
114+
// defer this until we actually emit end
115+
if (stream._readableState.endEmitted) {
116+
if (stream[kMaybeDestroy])
117+
stream[kMaybeDestroy]();
118+
} else {
119+
if (stream[kMaybeDestroy])
120+
stream.on('end', stream[kMaybeDestroy]);
121+
122+
// push a null to signal the end of data.
123+
// Do it before `maybeDestroy` for correct order of events:
124+
// `end` -> `close`
125+
stream.push(null);
126+
stream.read(0);
127+
}
128+
}
129+
84130
module.exports = {
85131
createWriteWrap,
86132
writevGeneric,
87-
writeGeneric
133+
writeGeneric,
134+
onStreamRead,
135+
kMaybeDestroy,
136+
kUpdateTimer,
88137
};

lib/net.js

+11-59
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,7 @@ const {
3636
const assert = require('assert');
3737
const {
3838
UV_EADDRINUSE,
39-
UV_EINVAL,
40-
UV_EOF
39+
UV_EINVAL
4140
} = process.binding('uv');
4241

4342
const { Buffer } = require('buffer');
@@ -61,7 +60,9 @@ const {
6160
const {
6261
createWriteWrap,
6362
writevGeneric,
64-
writeGeneric
63+
writeGeneric,
64+
onStreamRead,
65+
kUpdateTimer
6566
} = require('internal/stream_base_commons');
6667
const errors = require('internal/errors');
6768
const {
@@ -208,7 +209,7 @@ function initSocketHandle(self) {
208209
// Handle creation may be deferred to bind() or connect() time.
209210
if (self._handle) {
210211
self._handle[owner_symbol] = self;
211-
self._handle.onread = onread;
212+
self._handle.onread = onStreamRead;
212213
self[async_id_symbol] = getNewAsyncId(self._handle);
213214
}
214215
}
@@ -514,6 +515,12 @@ Object.defineProperty(Socket.prototype, 'bufferSize', {
514515
}
515516
});
516517

518+
Object.defineProperty(Socket.prototype, kUpdateTimer, {
519+
get: function() {
520+
return this._unrefTimer;
521+
}
522+
});
523+
517524

518525
// Just call handle.readStart until we have enough in the buffer
519526
Socket.prototype._read = function(n) {
@@ -615,61 +622,6 @@ Socket.prototype._destroy = function(exception, cb) {
615622
}
616623
};
617624

618-
619-
// This function is called whenever the handle gets a
620-
// buffer, or when there's an error reading.
621-
function onread(nread, buffer) {
622-
var handle = this;
623-
var self = handle[owner_symbol];
624-
assert(handle === self._handle, 'handle != self._handle');
625-
626-
self._unrefTimer();
627-
628-
debug('onread', nread);
629-
630-
if (nread > 0) {
631-
debug('got data');
632-
633-
// read success.
634-
// In theory (and in practice) calling readStop right now
635-
// will prevent this from being called again until _read() gets
636-
// called again.
637-
638-
// Optimization: emit the original buffer with end points
639-
var ret = self.push(buffer);
640-
641-
if (handle.reading && !ret) {
642-
handle.reading = false;
643-
debug('readStop');
644-
var err = handle.readStop();
645-
if (err)
646-
self.destroy(errnoException(err, 'read'));
647-
}
648-
return;
649-
}
650-
651-
// if we didn't get any bytes, that doesn't necessarily mean EOF.
652-
// wait for the next one.
653-
if (nread === 0) {
654-
debug('not any data, keep waiting');
655-
return;
656-
}
657-
658-
// Error, possibly EOF.
659-
if (nread !== UV_EOF) {
660-
return self.destroy(errnoException(nread, 'read'));
661-
}
662-
663-
debug('EOF');
664-
665-
// push a null to signal the end of data.
666-
// Do it before `maybeDestroy` for correct order of events:
667-
// `end` -> `close`
668-
self.push(null);
669-
self.read(0);
670-
}
671-
672-
673625
Socket.prototype._getpeername = function() {
674626
if (!this._peername) {
675627
if (!this._handle || !this._handle.getpeername) {

0 commit comments

Comments
 (0)