Skip to content

Commit 24acd53

Browse files
addaleaxrvagg
authored andcommitted
net,http2: merge after-write code
PR-URL: #24380 Refs: #19060 Reviewed-By: James M Snell <[email protected]> Reviewed-By: Daniel Bevenius <[email protected]>
1 parent 2565ff0 commit 24acd53

File tree

4 files changed

+47
-54
lines changed

4 files changed

+47
-54
lines changed

lib/internal/http2/core.js

+9-16
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ const {
108108
writeGeneric,
109109
writevGeneric,
110110
onStreamRead,
111+
kAfterAsyncWrite,
111112
kMaybeDestroy,
112113
kUpdateTimer
113114
} = require('internal/stream_base_commons');
@@ -1515,21 +1516,6 @@ function trackWriteState(stream, bytes) {
15151516
session[kHandle].chunksSentSinceLastWrite = 0;
15161517
}
15171518

1518-
function afterDoStreamWrite(status, handle) {
1519-
const stream = handle[kOwner];
1520-
const session = stream[kSession];
1521-
1522-
stream[kUpdateTimer]();
1523-
1524-
const { bytes } = this;
1525-
stream[kState].writeQueueSize -= bytes;
1526-
1527-
if (session !== undefined)
1528-
session[kState].writeQueueSize -= bytes;
1529-
if (typeof this.callback === 'function')
1530-
this.callback(null);
1531-
}
1532-
15331519
function streamOnResume() {
15341520
if (!this.destroyed)
15351521
this[kHandle].readStart();
@@ -1782,6 +1768,13 @@ class Http2Stream extends Duplex {
17821768
'bug in Node.js');
17831769
}
17841770

1771+
[kAfterAsyncWrite]({ bytes }) {
1772+
this[kState].writeQueueSize -= bytes;
1773+
1774+
if (this.session !== undefined)
1775+
this.session[kState].writeQueueSize -= bytes;
1776+
}
1777+
17851778
[kWriteGeneric](writev, data, encoding, cb) {
17861779
// When the Http2Stream is first created, it is corked until the
17871780
// handle and the stream ID is assigned. However, if the user calls
@@ -1808,7 +1801,7 @@ class Http2Stream extends Duplex {
18081801
if (!this.headersSent)
18091802
this[kProceed]();
18101803

1811-
const req = createWriteWrap(this[kHandle], afterDoStreamWrite);
1804+
const req = createWriteWrap(this[kHandle]);
18121805
req.stream = this[kID];
18131806

18141807
if (writev)

lib/internal/stream_base_commons.js

+26-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ const { owner_symbol } = require('internal/async_hooks').symbols;
1616

1717
const kMaybeDestroy = Symbol('kMaybeDestroy');
1818
const kUpdateTimer = Symbol('kUpdateTimer');
19+
const kAfterAsyncWrite = Symbol('kAfterAsyncWrite');
1920

2021
function handleWriteReq(req, data, encoding) {
2122
const { handle } = req;
@@ -52,11 +53,33 @@ function handleWriteReq(req, data, encoding) {
5253
}
5354
}
5455

55-
function createWriteWrap(handle, oncomplete) {
56+
function onWriteComplete(status) {
57+
const stream = this.handle[owner_symbol];
58+
59+
if (stream.destroyed) {
60+
if (typeof this.callback === 'function')
61+
this.callback(null);
62+
return;
63+
}
64+
65+
if (status < 0) {
66+
const ex = errnoException(status, 'write', this.error);
67+
stream.destroy(ex, this.callback);
68+
return;
69+
}
70+
71+
stream[kUpdateTimer]();
72+
stream[kAfterAsyncWrite](this);
73+
74+
if (typeof this.callback === 'function')
75+
this.callback(null);
76+
}
77+
78+
function createWriteWrap(handle) {
5679
var req = new WriteWrap();
5780

5881
req.handle = handle;
59-
req.oncomplete = oncomplete;
82+
req.oncomplete = onWriteComplete;
6083
req.async = false;
6184
req.bytes = 0;
6285
req.buffer = null;
@@ -160,6 +183,7 @@ module.exports = {
160183
writevGeneric,
161184
writeGeneric,
162185
onStreamRead,
186+
kAfterAsyncWrite,
163187
kMaybeDestroy,
164188
kUpdateTimer,
165189
};

lib/net.js

+6-34
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ const {
6262
writevGeneric,
6363
writeGeneric,
6464
onStreamRead,
65+
kAfterAsyncWrite,
6566
kUpdateTimer
6667
} = require('internal/stream_base_commons');
6768
const {
@@ -685,6 +686,10 @@ protoGetter('localPort', function localPort() {
685686
});
686687

687688

689+
Socket.prototype[kAfterAsyncWrite] = function() {
690+
this[kLastWriteQueueSize] = 0;
691+
};
692+
688693
Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
689694
// If we are still connecting, then buffer this for later.
690695
// The Writable logic will buffer up any more writes while
@@ -707,7 +712,7 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
707712

708713
this._unrefTimer();
709714

710-
var req = createWriteWrap(this._handle, afterWrite);
715+
var req = createWriteWrap(this._handle);
711716
if (writev)
712717
writevGeneric(this, req, data, cb);
713718
else
@@ -771,39 +776,6 @@ protoGetter('bytesWritten', function bytesWritten() {
771776
});
772777

773778

774-
function afterWrite(status, handle, err) {
775-
var self = handle[owner_symbol];
776-
if (self !== process.stderr && self !== process.stdout)
777-
debug('afterWrite', status);
778-
779-
if (this.async)
780-
self[kLastWriteQueueSize] = 0;
781-
782-
// callback may come after call to destroy.
783-
if (self.destroyed) {
784-
debug('afterWrite destroyed');
785-
if (this.callback)
786-
this.callback(null);
787-
return;
788-
}
789-
790-
if (status < 0) {
791-
var ex = errnoException(status, 'write', this.error);
792-
debug('write failure', ex);
793-
self.destroy(ex, this.callback);
794-
return;
795-
}
796-
797-
self._unrefTimer();
798-
799-
if (self !== process.stderr && self !== process.stdout)
800-
debug('afterWrite call cb');
801-
802-
if (this.callback)
803-
this.callback.call(undefined);
804-
}
805-
806-
807779
function checkBindError(err, port, handle) {
808780
// EADDRINUSE may not be reported until we call listen() or connect().
809781
// To complicate matters, a failed bind() followed by listen() or connect()

src/node_http2.cc

+6-2
Original file line numberDiff line numberDiff line change
@@ -1571,8 +1571,12 @@ void Http2Session::ClearOutgoing(int status) {
15711571
current_outgoing_buffers_.swap(outgoing_buffers_);
15721572
for (const nghttp2_stream_write& wr : current_outgoing_buffers_) {
15731573
WriteWrap* wrap = wr.req_wrap;
1574-
if (wrap != nullptr)
1575-
wrap->Done(status);
1574+
if (wrap != nullptr) {
1575+
// TODO(addaleax): Pass `status` instead of 0, so that we actually error
1576+
// out with the error from the write to the underlying protocol,
1577+
// if one occurred.
1578+
wrap->Done(0);
1579+
}
15761580
}
15771581
}
15781582

0 commit comments

Comments
 (0)