Skip to content

Commit 6189486

Browse files
committed
Improve body buffering/streaming to avoid possible deadlock
It was possible to deadlock request body streaming with beforeRequest. This could happen because the passthrough handler creates a request stream (with asStream) ready to send upstream (this is required, to ensure that's registered before any possible response is sent, or node can dump the body) but beforeRequest itself isn't called until the completed body is buffered. This deadlocked because the stream could hit its high watermark (pausing the request stream entirely) before the body was buffered. The stream would never continue, because it wasn't connected to a sink yet, but it couldn't be connected because beforeRequest needed to run before creating the upstream connection. This is resolved through two changes: * asStream no longer actually streams data - it only reads into asBuffer for as long as possible (at least until truncation). Real streaming (and backpressure) only begins once the buffer truncates or a sink appears. * Truncation stops the input entirely and delegates entirely to asStream. This means that backpressure kicks in at this point. The end result should be that we only store maxSize + highWatermark in our buffers at any time, applying backpressure for anything else, but avoid this deadlock before that point by keeping the stream inactive while buffering is happening. That means if you're just waiting on waitForCompletedRequest then stream backpressure is not happening. The only remaining possible deadlock would happen if a handler created a stream *and started streaming from it* but then paused it so it couldn't drain fully, and then called waitForCompletedRequest. Don't do that - if you start streaming, make sure you will finish. This is all very complicated, but I _think_ this should improve things significantly.
1 parent 2df52cb commit 6189486

File tree

3 files changed

+117
-26
lines changed

3 files changed

+117
-26
lines changed

src/util/buffer-utils.ts

+78-23
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import * as _ from 'lodash';
2+
import { EventEmitter } from 'events';
23
import * as stream from 'stream';
34

45
import { isNode } from './util';
@@ -16,28 +17,60 @@ export const asBuffer = (input: Buffer | Uint8Array | string) =>
1617
: Buffer.from(input);
1718

1819
export type BufferInProgress = Promise<Buffer> & {
19-
currentChunks: Buffer[] // Stores the body chunks as they arrive
20-
failedWith?: Error // Stores the error that killed the stream, if one did
20+
currentChunks: Buffer[]; // Stores the body chunks as they arrive
21+
failedWith?: Error; // Stores the error that killed the stream, if one did
22+
events: EventEmitter; // Emits events - notably 'truncate' if data is truncated
2123
};
2224

23-
// Takes a buffer and a stream, returns a simple stream that outputs the buffer then the stream.
25+
// Takes a buffer and a stream, returns a simple stream that outputs the buffer then the stream. The stream
26+
// is lazy, so doesn't read data in from the buffer or input until something here starts reading.
2427
export const bufferThenStream = (buffer: BufferInProgress, inputStream: stream.Readable): stream.Readable => {
25-
const outputStream = new stream.PassThrough();
28+
let active = false;
29+
30+
const outputStream = new stream.PassThrough({
31+
// Note we use the default highWaterMark, which means this applies backpressure, pushing buffering
32+
// onto the OS & backpressure on network instead of accepting data before we're ready to stream it.
33+
34+
// Without changing behaviour, we listen for read() events, and don't start streaming until we get one.
35+
read(size) {
36+
// On the first actual read of this stream, we pull from the buffer
37+
// and then hook it up to the input.
38+
if (!active) {
39+
if (buffer.failedWith) {
40+
outputStream.destroy(buffer.failedWith);
41+
} else {
42+
// First stream everything that's been buffered so far:
43+
outputStream.write(Buffer.concat(buffer.currentChunks));
44+
45+
// Then start streaming all future incoming data:
46+
inputStream.pipe(outputStream);
47+
48+
if (inputStream.readableEnded) outputStream.end();
49+
if (inputStream.readableAborted) outputStream.destroy();
50+
51+
// Forward any future errors from the input stream:
52+
inputStream.on('error', (e) => {
53+
outputStream.emit('error', e)
54+
});
55+
56+
// Silence 'unhandled rejection' warnings here, since we'll handle
57+
// them on the stream instead
58+
buffer.catch(() => {});
59+
}
60+
active = true;
61+
}
2662

27-
// Forward the buffered data so far
28-
outputStream.write(Buffer.concat(buffer.currentChunks));
29-
// After the data, forward errors from the buffer
30-
if (buffer.failedWith) {
31-
// Announce async, to ensure listeners have time to get set up
32-
setTimeout(() => outputStream.emit('error', buffer.failedWith));
33-
} else {
34-
// Forward future data as it arrives
35-
inputStream.pipe(outputStream);
36-
// Forward any future errors from the input stream
37-
inputStream.on('error', (e) => outputStream.emit('error', e));
38-
// Silence 'unhandled rejection' warnings here, since we'll handle them on the stream instead
39-
buffer.catch(() => {});
40-
}
63+
// Except for the first activation logic (above) do the default transform read() steps just
64+
// like a normal PassThrough stream.
65+
return stream.Transform.prototype._read.call(this, size);
66+
}
67+
});
68+
69+
buffer.events.on('truncate', (chunks) => {
70+
// If the stream hasn't started yet, start it now, so it grabs the buffer
71+
// data before it gets truncated:
72+
if (!active) outputStream.read(0);
73+
});
4174

4275
return outputStream;
4376
};
@@ -63,20 +96,41 @@ export const streamToBuffer = (input: stream.Readable, maxSize = MAX_BUFFER_SIZE
6396
if (input.readableAborted) return failWithAbortError();
6497

6598
let currentSize = 0;
66-
input.on('data', (d: Buffer) => {
99+
const onData = (d: Buffer) => {
67100
currentSize += d.length;
101+
chunks.push(d);
68102

69103
// If we go over maxSize, drop the whole stream, so the buffer
70104
// resolves empty. MaxSize should be large, so this is rare,
71105
// and only happens as an alternative to crashing the process.
72106
if (currentSize > maxSize) {
73-
chunks = []; // Drop all the data so far
74-
return; // Don't save any more data
107+
// Announce truncation, so that other mechanisms (asStream) can
108+
// capture this data if they're interested in it.
109+
bufferPromise.events.emit('truncate', chunks);
110+
111+
// Drop all the data so far & stop reading
112+
bufferPromise.currentChunks = chunks = [];
113+
input.removeListener('data', onData);
114+
115+
// We then resolve immediately - the buffer is done, even if the body
116+
// might still be streaming in we're not listening to it. This means
117+
// that requests can 'complete' for event/callback purposes while
118+
// they're actually still streaming, but only in this scenario where
119+
// the data is too large to really be used by the events/callbacks.
120+
121+
// If we don't resolve, then cases which intentionally don't consume
122+
// the raw stream but do consume the buffer (beforeRequest) would
123+
// deadlock: beforeRequest must complete to begin streaming the
124+
// full body to the target clients.
125+
126+
resolve(Buffer.from([]));
75127
}
128+
};
129+
input.on('data', onData);
76130

77-
chunks.push(d);
131+
input.once('end', () => {
132+
resolve(Buffer.concat(chunks));
78133
});
79-
input.once('end', () => resolve(Buffer.concat(chunks)));
80134
input.once('aborted', failWithAbortError);
81135
input.on('error', (e) => {
82136
bufferPromise.failedWith = bufferPromise.failedWith || e;
@@ -85,6 +139,7 @@ export const streamToBuffer = (input: stream.Readable, maxSize = MAX_BUFFER_SIZE
85139
}
86140
);
87141
bufferPromise.currentChunks = chunks;
142+
bufferPromise.events = new EventEmitter();
88143
return bufferPromise;
89144
};
90145

src/util/request-utils.ts

+8-3
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,14 @@ const parseBodyStream = (bodyStream: stream.Readable, maxSize: number, getHeader
174174
// and buffered data, and then continues with the live stream, if active.
175175
// Listeners to this stream *must* be attached synchronously after this call.
176176
asStream() {
177-
return completedBuffer
178-
? bufferToStream(completedBuffer)
179-
: bufferThenStream(body.asBuffer(), bodyStream);
177+
// If we've already buffered the whole body, just stream it out:
178+
if (completedBuffer) return bufferToStream(completedBuffer);
179+
180+
// Otherwise, we want to start buffering now, and wrap that with
181+
// a stream that can live-stream the buffered data on demand:
182+
const buffer = body.asBuffer();
183+
buffer.catch(() => {}); // Errors will be handled via the stream, so silence unhandled rejections here.
184+
return bufferThenStream(buffer, bodyStream);
180185
},
181186
asBuffer() {
182187
if (!bufferPromise) {

test/integration/proxying/http-proxying.spec.ts

+31
Original file line numberDiff line numberDiff line change
@@ -1045,6 +1045,37 @@ nodeOnly(() => {
10451045
expect(await proxiedRequestData.body.getText()).to.equal('');
10461046
});
10471047

1048+
it("should still proxy larger request bodies even if beforeRequest is used", async () => {
1049+
const message = "A large request body";
1050+
1051+
const remoteEndpoint = await remoteServer.forAnyRequest().thenReply(200);
1052+
const proxyEndpoint = await server.forPost(remoteServer.url).thenPassThrough({
1053+
beforeRequest: async (req) => {
1054+
const bodyText = await req.body.getText();
1055+
// Body is too long, and so should be truncated when examined (as here)
1056+
expect(bodyText).to.equal('');
1057+
return {};
1058+
}
1059+
});
1060+
1061+
let response = await request.post({
1062+
url: remoteServer.url,
1063+
body: message,
1064+
resolveWithFullResponse: true
1065+
});
1066+
1067+
expect(response.statusCode).to.equal(200);
1068+
1069+
// Even though it's truncated for buffering, the request data should still be proxied
1070+
// through successfully to the upstream server:
1071+
const resultingRequest = (await remoteEndpoint.getSeenRequests())[0];
1072+
expect(await resultingRequest.body.getText()).to.equal(message);
1073+
1074+
// But it's truncated in event data, not buffered
1075+
const proxiedRequestData = (await proxyEndpoint.getSeenRequests())[0];
1076+
expect(await proxiedRequestData.body.getText()).to.equal('');
1077+
});
1078+
10481079
it("should still proxy larger response bodies", async () => {
10491080
await remoteServer.forAnyRequest().thenReply(200, "A large response body");
10501081
const proxyEndpoint = await server.forGet(remoteServer.url).thenPassThrough();

0 commit comments

Comments
 (0)