Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 8bd7bc4

Browse files
jridgewellshuding
authored andcommittedJul 8, 2023
Fix stream cancellation in RenderResult.pipe() and sendResponse() (#52157)
### What? I've found 2 more spots that didn't properly cancel the streaming response when the client disconnects. This fixes `RenderResult.pipe()` (called during dynamic render results) and `sendResponse()` (used during Route Handlers using `nodejs` runtime). It also (finally) adds tests for all the cases I'm aware of. ### Why? To allow disconnecting from an AI service when a client disconnects. $$$ ### How? Just checks for `response.closed`, which will be closed when the client's connection disconnects.
1 parent 4e77bf2 commit 8bd7bc4

File tree

14 files changed

+333
-21
lines changed

14 files changed

+333
-21
lines changed
 

‎packages/next-swc/crates/next-core/js/src/entry/app/edge-page-bootstrap.ts

+11-2
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,20 @@ async function render(request: NextRequest, event: NextFetchEvent) {
8080
response.headers.append('Vary', RSC_VARY_HEADER)
8181

8282
const writer = tranform.writable.getWriter()
83-
result.pipe({
83+
const target = {
8484
write: (chunk: Uint8Array) => writer.write(chunk),
8585
end: () => writer.close(),
8686
destroy: (reason?: Error) => writer.abort(reason),
87-
})
87+
closed: false,
88+
}
89+
const onClose = () => {
90+
target.closed = true
91+
}
92+
// No, this cannot be replaced with `finally`, because early cancelling
93+
// the stream will create a rejected promise, and finally will create an
94+
// unhandled rejection.
95+
writer.closed.then(onClose, onClose)
96+
result.pipe(target)
8897

8998
return response
9099
}

‎packages/next/src/server/render-result.ts

+8-13
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ export interface PipeTarget {
1616
end: () => unknown
1717
flush?: () => unknown
1818
destroy: (err?: Error) => unknown
19+
get closed(): boolean
1920
}
2021

2122
export default class RenderResult {
@@ -111,31 +112,25 @@ export default class RenderResult {
111112
: () => {}
112113
const reader = this.response.getReader()
113114

114-
let shouldFatalError = false
115115
try {
116-
let result = await reader.read()
117-
if (!result.done) {
118-
// As we're going to write to the response, we should destroy the
119-
// response if an error occurs.
120-
shouldFatalError = true
121-
}
116+
while (true) {
117+
const result = await reader.read()
118+
119+
if (res.closed || result.done) {
120+
break
121+
}
122122

123-
while (!result.done) {
124123
// Write the data to the response.
125124
res.write(result.value)
126125

127126
// Flush it to the client (if it supports flushing).
128127
flush()
129-
130-
// Read the next chunk.
131-
result = await reader.read()
132128
}
133129

134130
// We're done writing to the response, so we can end it.
135131
res.end()
136132
} catch (err) {
137-
// If we've written to the response, we should destroy it.
138-
if (shouldFatalError) {
133+
if (!res.closed) {
139134
res.destroy(err as any)
140135
}
141136

‎packages/next/src/server/send-response.ts

+1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ export async function sendResponse(
4949
const iterator = consumeUint8ArrayReadableStream(response.body)
5050
try {
5151
for await (const chunk of iterator) {
52+
if (originalResponse.closed) break
5253
originalResponse.write(chunk)
5354
}
5455
} finally {

‎packages/next/src/server/stream-utils/node-web-streams-helper.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ export const streamToBufferedResult = async (
2525
},
2626
end() {},
2727
destroy() {},
28+
closed: false,
2829
}
29-
await renderResult.pipe(writable as any)
30+
await renderResult.pipe(writable)
3031
return renderChunks.join('')
3132
}
3233

‎packages/next/src/server/web-server.ts

+11-5
Original file line numberDiff line numberDiff line change
@@ -417,14 +417,20 @@ export default class NextWebServer extends BaseServer<WebServerOptions> {
417417

418418
if (options.result.isDynamic) {
419419
const writer = res.transformStream.writable.getWriter()
420-
options.result.pipe({
420+
const target = {
421421
write: (chunk: Uint8Array) => writer.write(chunk),
422422
end: () => writer.close(),
423423
destroy: (err: Error) => writer.abort(err),
424-
cork: () => {},
425-
uncork: () => {},
426-
// Not implemented: on/removeListener
427-
} as any)
424+
closed: false,
425+
} as any
426+
const onClose = () => {
427+
target.closed = true
428+
}
429+
// No, this cannot be replaced with `finally`, because early cancelling
430+
// the stream will create a rejected promise, and finally will create an
431+
// unhandled rejection.
432+
writer.closed.then(onClose, onClose)
433+
options.result.pipe(target)
428434
} else {
429435
const payload = await options.result.toUnchunkedString()
430436
res.setHeader('Content-Length', String(byteLength(payload)))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import { Streamable } from '../../streamable'
2+
3+
export const runtime = 'edge'
4+
5+
let streamable
6+
let requestAborted = false
7+
8+
export function GET(req: Request): Response {
9+
// The 2nd request should render the stats. We don't use a query param
10+
// because edge rendering will create a different bundle for that.
11+
if (streamable) {
12+
return new Response(
13+
JSON.stringify({
14+
requestAborted,
15+
i: streamable.i,
16+
streamCleanedUp: streamable.streamCleanedUp,
17+
})
18+
)
19+
}
20+
21+
streamable = Streamable()
22+
req.signal.onabort = () => {
23+
requestAborted = true
24+
}
25+
return new Response(streamable.stream)
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import { Streamable } from '../../streamable'
2+
3+
export const runtime = 'nodejs'
4+
// Next thinks it can statically compile this route, which breaks the test.
5+
export const dynamic = 'force-dynamic'
6+
7+
let streamable
8+
let requestAborted = false
9+
10+
export function GET(req: Request): Response {
11+
// The 2nd request should render the stats. We don't use a query param
12+
// because edge rendering will create a different bundle for that.
13+
if (streamable) {
14+
return new Response(
15+
JSON.stringify({
16+
requestAborted,
17+
i: streamable.i,
18+
streamCleanedUp: streamable.streamCleanedUp,
19+
})
20+
)
21+
}
22+
23+
streamable = Streamable()
24+
req.signal.onabort = () => {
25+
requestAborted = true
26+
}
27+
return new Response(streamable.stream)
28+
}

‎test/e2e/cancel-request/middleware.ts

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import { Streamable } from './streamable'
2+
3+
export const config = {
4+
matcher: '/middleware',
5+
}
6+
7+
let streamable
8+
let requestAborted = false
9+
10+
export default function handler(req: Request): Response {
11+
// The 2nd request should render the stats. We don't use a query param
12+
// because edge rendering will create a different bundle for that.
13+
if (streamable) {
14+
return new Response(
15+
JSON.stringify({
16+
requestAborted,
17+
i: streamable.i,
18+
streamCleanedUp: streamable.streamCleanedUp,
19+
})
20+
)
21+
}
22+
23+
streamable = Streamable()
24+
req.signal.onabort = () => {
25+
requestAborted = true
26+
}
27+
return new Response(streamable.stream)
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import { Streamable } from '../../streamable'
2+
3+
export const config = {
4+
runtime: 'edge',
5+
}
6+
7+
let streamable
8+
let requestAborted = false
9+
10+
export default function handler(req: Request): Response {
11+
// The 2nd request should render the stats. We don't use a query param
12+
// because edge rendering will create a different bundle for that.
13+
if (streamable) {
14+
return new Response(
15+
JSON.stringify({
16+
requestAborted,
17+
i: streamable.i,
18+
streamCleanedUp: streamable.streamCleanedUp,
19+
})
20+
)
21+
}
22+
23+
streamable = Streamable()
24+
req.signal.onabort = () => {
25+
requestAborted = true
26+
}
27+
return new Response(streamable.stream)
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { IncomingMessage, ServerResponse } from 'http'
2+
import { pipeline } from 'stream'
3+
import { Readable } from '../../readable'
4+
5+
export const config = {
6+
runtime: 'nodejs',
7+
}
8+
9+
let readable
10+
let requestAborted = false
11+
12+
export default function handler(
13+
_req: IncomingMessage,
14+
res: ServerResponse
15+
): void {
16+
// The 2nd request should render the stats. We don't use a query param
17+
// because edge rendering will create a different bundle for that.
18+
if (readable) {
19+
res.end(
20+
JSON.stringify({
21+
requestAborted,
22+
i: readable.i,
23+
streamCleanedUp: readable.streamCleanedUp,
24+
})
25+
)
26+
return
27+
}
28+
29+
readable = Readable()
30+
res.on('close', () => {
31+
requestAborted = true
32+
})
33+
pipeline(readable.stream, res, () => {
34+
res.end()
35+
})
36+
}

‎test/e2e/cancel-request/readable.ts

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import * as stream from 'stream'
2+
import { sleep } from './sleep'
3+
4+
export function Readable() {
5+
const encoder = new TextEncoder()
6+
const readable = {
7+
i: 0,
8+
streamCleanedUp: false,
9+
stream: new stream.Readable({
10+
async read() {
11+
await sleep(100)
12+
this.push(encoder.encode(String(readable.i++)))
13+
14+
if (readable.i >= 25) this.push(null)
15+
},
16+
destroy() {
17+
readable.streamCleanedUp = true
18+
},
19+
}),
20+
}
21+
return readable
22+
}

‎test/e2e/cancel-request/sleep.ts

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
export function sleep(ms: number) {
2+
return new Promise((res) => setTimeout(res, ms))
3+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
import { createNextDescribe } from 'e2e-utils'
2+
import { sleep } from './sleep'
3+
import { get } from 'http'
4+
5+
createNextDescribe(
6+
'streaming responses cancel inner stream after disconnect',
7+
{
8+
files: __dirname,
9+
},
10+
({ next }) => {
11+
type CancelState = {
12+
requestAborted: boolean
13+
streamCleanedUp: boolean
14+
i: number
15+
}
16+
17+
function prime(url: string) {
18+
return new Promise<void>((resolve) => {
19+
url = new URL(url, next.url).href
20+
21+
// There's a bug in node-fetch v2 where aborting the fetch will never abort
22+
// the connection, because the body is a transformed stream that doesn't
23+
// close the connection stream.
24+
// https://github.com/node-fetch/node-fetch/pull/670
25+
const req = get(url, async (res) => {
26+
while (true) {
27+
const value = res.read(1)
28+
if (value) break
29+
await sleep(5)
30+
}
31+
32+
res.destroy()
33+
34+
// make sure the connection has finished
35+
await sleep(100)
36+
37+
resolve()
38+
})
39+
req.end()
40+
})
41+
}
42+
43+
// The disconnect from our prime request to the server isn't instant, and
44+
// there's no good signal on the client end for when it happens. So we just
45+
// fetch multiple times waiting for it to happen.
46+
async function getTillCancelled(url: string) {
47+
while (true) {
48+
const res = await next.fetch(url)
49+
const json = (await res.json()) as CancelState
50+
if (json.streamCleanedUp === true) {
51+
return json
52+
}
53+
54+
await sleep(10)
55+
}
56+
}
57+
58+
it('Midddleware cancels inner ReadableStream', async () => {
59+
await prime('/middleware')
60+
const json = await getTillCancelled('/middleware')
61+
expect(json).toMatchObject({
62+
requestAborted: true,
63+
streamCleanedUp: true,
64+
i: (expect as any).toBeWithin(0, 5),
65+
})
66+
})
67+
68+
it('App Route Handler Edge cancels inner ReadableStream', async () => {
69+
await prime('/edge-route')
70+
const json = await getTillCancelled('/edge-route')
71+
expect(json).toMatchObject({
72+
requestAborted: true,
73+
streamCleanedUp: true,
74+
i: (expect as any).toBeWithin(0, 5),
75+
})
76+
})
77+
78+
it('App Route Handler NodeJS cancels inner ReadableStream', async () => {
79+
await prime('/node-route')
80+
const json = await getTillCancelled('/node-route')
81+
expect(json).toMatchObject({
82+
requestAborted: true,
83+
streamCleanedUp: true,
84+
i: (expect as any).toBeWithin(0, 5),
85+
})
86+
})
87+
88+
it('Pages Api Route Edge cancels inner ReadableStream', async () => {
89+
await prime('/api/edge-api')
90+
const json = await getTillCancelled('/api/edge-api')
91+
expect(json).toMatchObject({
92+
requestAborted: true,
93+
streamCleanedUp: true,
94+
i: (expect as any).toBeWithin(0, 5),
95+
})
96+
})
97+
98+
it('Pages Api Route NodeJS cancels inner ReadableStream', async () => {
99+
await prime('/api/node-api')
100+
const json = await getTillCancelled('/api/node-api')
101+
expect(json).toMatchObject({
102+
requestAborted: true,
103+
streamCleanedUp: true,
104+
i: (expect as any).toBeWithin(0, 5),
105+
})
106+
})
107+
}
108+
)

‎test/e2e/cancel-request/streamable.ts

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import { sleep } from './sleep'
2+
3+
export function Streamable() {
4+
const encoder = new TextEncoder()
5+
const streamable = {
6+
i: 0,
7+
streamCleanedUp: false,
8+
stream: new ReadableStream({
9+
async pull(controller) {
10+
await sleep(100)
11+
controller.enqueue(encoder.encode(String(streamable.i++)))
12+
13+
if (streamable.i >= 25) controller.close()
14+
},
15+
cancel() {
16+
streamable.streamCleanedUp = true
17+
},
18+
}),
19+
}
20+
return streamable
21+
}

0 commit comments

Comments
 (0)
Please sign in to comment.