Skip to content

Commit 31d2b72

Browse files
authoredJul 26, 2023
Reimplement stream cancellation (#52281)
### What? This reimplements our stream cancellation code for a few more cases: 1. Adds support in all stream-returning APIs 2. Fixes cancellation detection in node 16 3. Implements out-of-band detection, so can cancel in the middle of a read It also (finally) adds tests for all the cases I'm aware of. ### Why? To allow disconnecting from an AI service when a client disconnects. $$$ ### How? 1. Reuses a single pipe function in all paths to push data from the dev's `ReadableStream` into our `ServerResponse` 2. Uses `ServerResponse` to detect disconnect, instead of the `IncomingMessage` (request) - The `close` event fire once all incoming body data is read - The request `abort` event will not fire after the incoming body data has been fully read 3. Using `on('close')` on the writable destination allows us to detect close - Checking for `res.destroyed` in the body of the loop meant we had to wait for the `await stream.read()` to complete before we could possibly cancel the stream - - - #52157 (and #51594) had an issue with Node 16, because I was using `res.closed` to detect when the server response was closed by the client disconnecting. But, `closed` wasn't [added](nodejs/node#45672) until [v18.13.0](https://nodejs.org/en/blog/release/v18.13.0#:~:text=%5Bcbd710bbf4%5D%20%2D%20http%3A%20make%20OutgoingMessage%20more%20streamlike%20(Robert%20Nagy)%20%2345672). This fixes it by using `res.destroyed`. Reverts #52277 Relands #52157 Fixes #52809 ---------
1 parent 39fd917 commit 31d2b72

File tree

29 files changed

+667
-176
lines changed

29 files changed

+667
-176
lines changed
 

‎packages/next-swc/crates/next-core/js/src/entry/app/edge-page-bootstrap.ts

+19-3
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,27 @@ async function render(request: NextRequest, event: NextFetchEvent) {
8080
response.headers.append('Vary', RSC_VARY_HEADER)
8181

8282
const writer = tranform.writable.getWriter()
83-
result.pipe({
83+
84+
let innerClose: undefined | (() => void)
85+
const target = {
8486
write: (chunk: Uint8Array) => writer.write(chunk),
8587
end: () => writer.close(),
86-
destroy: (reason?: Error) => writer.abort(reason),
87-
})
88+
89+
on(_event: 'close', cb: () => void) {
90+
innerClose = cb
91+
},
92+
off(_event: 'close', _cb: () => void) {
93+
innerClose = undefined
94+
},
95+
}
96+
const onClose = () => {
97+
innerClose?.()
98+
}
99+
// No, this cannot be replaced with `finally`, because early cancelling
100+
// the stream will create a rejected promise, and finally will create an
101+
// unhandled rejection.
102+
writer.closed.then(onClose, onClose)
103+
result.pipe(target)
88104

89105
return response
90106
}

‎packages/next-swc/crates/next-core/js/src/internal/nodejs-proxy-handler.ts

+8-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ import {
1111
NodeNextResponse,
1212
} from 'next/dist/server/base-http/node'
1313
import { sendResponse } from 'next/dist/server/send-response'
14-
import { NextRequestAdapter } from 'next/dist/server/web/spec-extension/adapters/next-request'
14+
import {
15+
NextRequestAdapter,
16+
signalFromNodeResponse,
17+
} from 'next/dist/server/web/spec-extension/adapters/next-request'
1518
import { RouteHandlerManagerContext } from 'next/dist/server/future/route-handler-managers/route-handler-manager'
1619

1720
import { attachRequestMeta } from './next-request-helpers'
@@ -43,7 +46,10 @@ export default (routeModule: RouteModule) => {
4346
}
4447

4548
const routeResponse = await routeModule.handle(
46-
NextRequestAdapter.fromNodeNextRequest(req),
49+
NextRequestAdapter.fromNodeNextRequest(
50+
req,
51+
signalFromNodeResponse(response)
52+
),
4753
context
4854
)
4955

‎packages/next/src/export/worker.ts

+6-2
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,10 @@ import { NodeNextRequest } from '../server/base-http/node'
4545
import { isAppRouteRoute } from '../lib/is-app-route-route'
4646
import { toNodeOutgoingHttpHeaders } from '../server/web/utils'
4747
import { RouteModuleLoader } from '../server/future/helpers/module-loader/route-module-loader'
48-
import { NextRequestAdapter } from '../server/web/spec-extension/adapters/next-request'
48+
import {
49+
NextRequestAdapter,
50+
signalFromNodeResponse,
51+
} from '../server/web/spec-extension/adapters/next-request'
4952
import * as ciEnvironment from '../telemetry/ci-info'
5053

5154
const envConfig = require('../shared/lib/runtime-config')
@@ -388,7 +391,8 @@ export default async function exportPage({
388391
// Ensure that the url for the page is absolute.
389392
req.url = `http://localhost:3000${req.url}`
390393
const request = NextRequestAdapter.fromNodeNextRequest(
391-
new NodeNextRequest(req)
394+
new NodeNextRequest(req),
395+
signalFromNodeResponse(res)
392396
)
393397

394398
// Create the context for the handler. This contains the params from

‎packages/next/src/server/base-server.ts

+7-1
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ import {
116116
type RouteMatch,
117117
} from './future/route-matches/route-match'
118118
import { normalizeLocalePath } from '../shared/lib/i18n/normalize-locale-path'
119+
import { signalFromNodeResponse } from './web/spec-extension/adapters/next-request'
119120

120121
export type FindComponentsResult = {
121122
components: LoadComponentsReturnType
@@ -1837,7 +1838,12 @@ export default abstract class Server<ServerOptions extends Options = Options> {
18371838

18381839
try {
18391840
// Handle the match and collect the response if it's a static response.
1840-
const response = await this.handlers.handle(match, req, context)
1841+
const response = await this.handlers.handle(
1842+
match,
1843+
req,
1844+
context,
1845+
signalFromNodeResponse((res as NodeNextResponse).originalResponse)
1846+
)
18411847

18421848
;(req as any).fetchMetrics = (
18431849
context.staticGenerationContext as any

‎packages/next/src/server/future/route-handler-managers/route-handler-manager.ts

+3-2
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ export class RouteHandlerManager {
2424
public async handle(
2525
match: AppRouteRouteMatch,
2626
req: BaseNextRequest,
27-
context: RouteHandlerManagerContext
27+
context: RouteHandlerManagerContext,
28+
signal: AbortSignal
2829
): Promise<Response> {
2930
// The module supports minimal mode, load the minimal module.
3031
const module = await RouteModuleLoader.load<RouteModule>(
@@ -33,7 +34,7 @@ export class RouteHandlerManager {
3334
)
3435

3536
// Convert the BaseNextRequest to a NextRequest.
36-
const request = NextRequestAdapter.fromBaseNextRequest(req)
37+
const request = NextRequestAdapter.fromBaseNextRequest(req, signal)
3738

3839
// Get the response from the handler and send it back.
3940
return await module.handle(request, context)

‎packages/next/src/server/lib/route-resolver.ts

+13-8
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ import { proxyRequest } from './router-utils/proxy-request'
1717
import { getResolveRoutes } from './router-utils/resolve-routes'
1818
import { PERMANENT_REDIRECT_STATUS } from '../../shared/lib/constants'
1919
import { splitCookiesString, toNodeOutgoingHttpHeaders } from '../web/utils'
20-
import { signalFromNodeRequest } from '../web/spec-extension/adapters/next-request'
20+
import { signalFromNodeResponse } from '../web/spec-extension/adapters/next-request'
2121
import { getMiddlewareRouteMatcher } from '../../shared/lib/router/utils/middleware-route-matcher'
22-
import { pipeReadable } from './server-ipc/invoke-request'
22+
import { pipeReadable } from '../pipe-readable'
2323

2424
type RouteResult =
2525
| {
@@ -132,7 +132,7 @@ export async function makeResolver(
132132
serverAddr.port || 3000
133133
}${req.url}`,
134134
body: cloneableBody,
135-
signal: signalFromNodeRequest(req),
135+
signal: signalFromNodeResponse(res),
136136
},
137137
useCache: true,
138138
onWarning: console.warn,
@@ -160,11 +160,11 @@ export async function makeResolver(
160160
}
161161
res.statusCode = result.response.status
162162

163-
for await (const chunk of result.response.body || ([] as any)) {
164-
if (res.closed) break
165-
res.write(chunk)
163+
if (result.response.body) {
164+
await pipeReadable(result.response.body, res)
165+
} else {
166+
res.end()
166167
}
167-
res.end()
168168
} catch (err) {
169169
console.error(err)
170170
res.statusCode = 500
@@ -218,7 +218,12 @@ export async function makeResolver(
218218
req: IncomingMessage,
219219
res: ServerResponse
220220
): Promise<RouteResult | void> {
221-
const routeResult = await resolveRoutes(req, new Set(), false)
221+
const routeResult = await resolveRoutes(
222+
req,
223+
new Set(),
224+
false,
225+
signalFromNodeResponse(res)
226+
)
222227
const {
223228
matchedOutput,
224229
bodyStream,

‎packages/next/src/server/lib/router-server.ts

+31-11
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ import { filterReqHeaders } from './server-ipc/utils'
1515
import { findPagesDir } from '../../lib/find-pages-dir'
1616
import { setupFsCheck } from './router-utils/filesystem'
1717
import { proxyRequest } from './router-utils/proxy-request'
18-
import { invokeRequest, pipeReadable } from './server-ipc/invoke-request'
18+
import { invokeRequest } from './server-ipc/invoke-request'
19+
import { isAbortError, pipeReadable } from '../pipe-readable'
1920
import { createRequestResponseMocks } from './mock-request'
2021
import { createIpcServer, createWorker } from './server-ipc'
2122
import { UnwrapPromise } from '../../lib/coalesced-function'
@@ -29,6 +30,7 @@ import {
2930
PHASE_DEVELOPMENT_SERVER,
3031
PERMANENT_REDIRECT_STATUS,
3132
} from '../../shared/lib/constants'
33+
import { signalFromNodeResponse } from '../web/spec-extension/adapters/next-request'
3234

3335
let initializeResult:
3436
| undefined
@@ -331,14 +333,26 @@ export async function initialize(opts: {
331333

332334
debug('invokeRender', renderUrl, invokeHeaders)
333335

334-
const invokeRes = await invokeRequest(
335-
renderUrl,
336-
{
337-
headers: invokeHeaders,
338-
method: req.method,
339-
},
340-
getRequestMeta(req, '__NEXT_CLONABLE_BODY')?.cloneBodyStream()
341-
)
336+
let invokeRes
337+
try {
338+
invokeRes = await invokeRequest(
339+
renderUrl,
340+
{
341+
headers: invokeHeaders,
342+
method: req.method,
343+
signal: signalFromNodeResponse(res),
344+
},
345+
getRequestMeta(req, '__NEXT_CLONABLE_BODY')?.cloneBodyStream()
346+
)
347+
} catch (e) {
348+
// If the client aborts before we can receive a response object (when
349+
// the headers are flushed), then we can early exit without further
350+
// processing.
351+
if (isAbortError(e)) {
352+
return
353+
}
354+
throw e
355+
}
342356

343357
debug('invokeRender res', invokeRes.status, invokeRes.headers)
344358

@@ -419,7 +433,12 @@ export async function initialize(opts: {
419433
resHeaders,
420434
bodyStream,
421435
matchedOutput,
422-
} = await resolveRoutes(req, matchedDynamicRoutes, false)
436+
} = await resolveRoutes(
437+
req,
438+
matchedDynamicRoutes,
439+
false,
440+
signalFromNodeResponse(res)
441+
)
423442

424443
if (devInstance && matchedOutput?.type === 'devVirtualFsItem') {
425444
const origUrl = req.url || '/'
@@ -687,7 +706,8 @@ export async function initialize(opts: {
687706
const { matchedOutput, parsedUrl } = await resolveRoutes(
688707
req,
689708
new Set(),
690-
true
709+
true,
710+
signalFromNodeResponse(socket)
691711
)
692712

693713
// TODO: allow upgrade requests to pages/app paths?

‎packages/next/src/server/lib/router-utils/proxy-request.ts

+19-1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,24 @@ export async function proxyRequest(
3434
await new Promise((proxyResolve, proxyReject) => {
3535
let finished = false
3636

37+
// http-proxy does not properly detect a client disconnect in newer
38+
// versions of Node.js. This is caused because it only listens for the
39+
// `aborted` event on the our request object, but it also fully reads
40+
// and closes the request object. Node **will not** fire `aborted` when
41+
// the request is already closed. Listening for `close` on our response
42+
// object will detect the disconnect, and we can abort the proxy's
43+
// connection.
44+
proxy.on('proxyReq', (proxyReq) => {
45+
res.on('close', () => proxyReq.destroy())
46+
})
47+
proxy.on('proxyRes', (proxyRes) => {
48+
if (res.destroyed) {
49+
proxyRes.destroy()
50+
} else {
51+
res.on('close', () => proxyRes.destroy())
52+
}
53+
})
54+
3755
proxy.on('proxyRes', (proxyRes, innerReq, innerRes) => {
3856
const cleanup = (err: any) => {
3957
// cleanup event listeners to allow clean garbage collection
@@ -59,7 +77,7 @@ export async function proxyRequest(
5977
finished = true
6078
proxyReject(err)
6179

62-
if (!res.closed) {
80+
if (!res.destroyed) {
6381
res.statusCode = 500
6482
res.end('Internal Server Error')
6583
}

‎packages/next/src/server/lib/router-utils/resolve-routes.ts

+28-9
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { Header } from '../../../lib/load-custom-routes'
1313
import { stringifyQuery } from '../../server-route-utils'
1414
import { toNodeOutgoingHttpHeaders } from '../../web/utils'
1515
import { invokeRequest } from '../server-ipc/invoke-request'
16+
import { isAbortError } from '../../pipe-readable'
1617
import { getCookieParser, setLazyProp } from '../../api-utils'
1718
import { getHostname } from '../../../shared/lib/get-hostname'
1819
import { UnwrapPromise } from '../../../lib/coalesced-function'
@@ -93,7 +94,8 @@ export function getResolveRoutes(
9394
async function resolveRoutes(
9495
req: IncomingMessage,
9596
matchedDynamicRoutes: Set<string>,
96-
isUpgradeReq?: boolean
97+
isUpgradeReq: boolean,
98+
signal: AbortSignal
9799
): Promise<{
98100
finished: boolean
99101
statusCode?: number
@@ -453,14 +455,31 @@ export function getResolveRoutes(
453455

454456
debug('invoking middleware', renderUrl, invokeHeaders)
455457

456-
const middlewareRes = await invokeRequest(
457-
renderUrl,
458-
{
459-
headers: invokeHeaders,
460-
method: req.method,
461-
},
462-
getRequestMeta(req, '__NEXT_CLONABLE_BODY')?.cloneBodyStream()
463-
)
458+
let middlewareRes
459+
try {
460+
middlewareRes = await invokeRequest(
461+
renderUrl,
462+
{
463+
headers: invokeHeaders,
464+
method: req.method,
465+
signal,
466+
},
467+
getRequestMeta(req, '__NEXT_CLONABLE_BODY')?.cloneBodyStream()
468+
)
469+
} catch (e) {
470+
// If the client aborts before we can receive a response object
471+
// (when the headers are flushed), then we can early exit without
472+
// further processing.
473+
if (isAbortError(e)) {
474+
return {
475+
parsedUrl,
476+
resHeaders,
477+
finished: true,
478+
}
479+
}
480+
throw e
481+
}
482+
464483
const middlewareHeaders = toNodeOutgoingHttpHeaders(
465484
middlewareRes.headers
466485
) as Record<string, string | string[] | undefined>
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
import '../../node-polyfill-fetch'
22

33
import type { IncomingMessage } from 'http'
4-
import type { Writable, Readable } from 'stream'
4+
import type { Readable } from 'stream'
55
import { filterReqHeaders } from './utils'
66

77
export const invokeRequest = async (
88
targetUrl: string,
99
requestInit: {
1010
headers: IncomingMessage['headers']
1111
method: IncomingMessage['method']
12+
signal?: AbortSignal
1213
},
1314
readableBody?: Readable | ReadableStream
1415
) => {
@@ -22,10 +23,11 @@ export const invokeRequest = async (
2223
...requestInit.headers,
2324
}) as IncomingMessage['headers']
2425

25-
const invokeRes = await fetch(parsedTargetUrl.toString(), {
26+
return await fetch(parsedTargetUrl.toString(), {
2627
headers: invokeHeaders as any as Headers,
2728
method: requestInit.method,
2829
redirect: 'manual',
30+
signal: requestInit.signal,
2931

3032
...(requestInit.method !== 'GET' &&
3133
requestInit.method !== 'HEAD' &&
@@ -41,31 +43,4 @@ export const invokeRequest = async (
4143
internal: true,
4244
},
4345
})
44-
45-
return invokeRes
46-
}
47-
48-
export async function pipeReadable(
49-
readable: ReadableStream,
50-
writable: Writable
51-
) {
52-
const reader = readable.getReader()
53-
54-
async function doRead() {
55-
const item = await reader.read()
56-
57-
if (item?.value) {
58-
writable.write(Buffer.from(item?.value))
59-
60-
if ('flush' in writable) {
61-
;(writable as any).flush()
62-
}
63-
}
64-
65-
if (!item?.done) {
66-
return doRead()
67-
}
68-
}
69-
await doRead()
70-
writable.end()
7146
}

‎packages/next/src/server/lib/start-server.ts

+23-9
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@ import { getCloneableBody } from '../body-streams'
1313
import { filterReqHeaders } from './server-ipc/utils'
1414
import setupCompression from 'next/dist/compiled/compression'
1515
import { normalizeRepeatedSlashes } from '../../shared/lib/utils'
16-
import { invokeRequest, pipeReadable } from './server-ipc/invoke-request'
16+
import { invokeRequest } from './server-ipc/invoke-request'
17+
import { isAbortError, pipeReadable } from '../pipe-readable'
1718
import {
1819
genRouterWorkerExecArgv,
1920
getDebugPort,
2021
getNodeOptionsWithoutInspect,
2122
} from './utils'
23+
import { signalFromNodeResponse } from '../web/spec-extension/adapters/next-request'
2224

2325
const debug = setupDebug('next:start-server')
2426

@@ -388,14 +390,26 @@ export async function startServer({
388390
targetHost === 'localhost' ? '127.0.0.1' : targetHost
389391
}:${routerPort}${req.url || '/'}`
390392

391-
const invokeRes = await invokeRequest(
392-
targetUrl,
393-
{
394-
headers: req.headers,
395-
method: req.method,
396-
},
397-
getCloneableBody(req).cloneBodyStream()
398-
)
393+
let invokeRes
394+
try {
395+
invokeRes = await invokeRequest(
396+
targetUrl,
397+
{
398+
headers: req.headers,
399+
method: req.method,
400+
signal: signalFromNodeResponse(res),
401+
},
402+
getCloneableBody(req).cloneBodyStream()
403+
)
404+
} catch (e) {
405+
// If the client aborts before we can receive a response object (when
406+
// the headers are flushed), then we can early exit without further
407+
// processing.
408+
if (isAbortError(e)) {
409+
return
410+
}
411+
throw e
412+
}
399413

400414
res.statusCode = invokeRes.status
401415
res.statusMessage = invokeRes.statusText

‎packages/next/src/server/next-server.ts

+14-26
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,13 @@ import { getTracer } from './lib/trace/tracer'
9191
import { NextNodeServerSpan } from './lib/trace/constants'
9292
import { nodeFs } from './lib/node-fs-methods'
9393
import { getRouteRegex } from '../shared/lib/router/utils/route-regex'
94-
import { invokeRequest, pipeReadable } from './lib/server-ipc/invoke-request'
94+
import { invokeRequest } from './lib/server-ipc/invoke-request'
95+
import { pipeReadable } from './pipe-readable'
9596
import { filterReqHeaders } from './lib/server-ipc/utils'
9697
import { createRequestResponseMocks } from './lib/mock-request'
9798
import chalk from 'next/dist/compiled/chalk'
9899
import { NEXT_RSC_UNION_QUERY } from '../client/components/app-router-headers'
99-
import { signalFromNodeRequest } from './web/spec-extension/adapters/next-request'
100+
import { signalFromNodeResponse } from './web/spec-extension/adapters/next-request'
100101
import { RouteModuleLoader } from './future/helpers/module-loader/route-module-loader'
101102
import { loadManifest } from './load-manifest'
102103

@@ -520,6 +521,7 @@ export default class NextNodeServer extends BaseServer {
520521
{
521522
method: newReq.method || 'GET',
522523
headers: newReq.headers,
524+
signal: signalFromNodeResponse(res.originalResponse),
523525
}
524526
)
525527
const filteredResHeaders = filterReqHeaders(
@@ -1522,8 +1524,8 @@ export default class NextNodeServer extends BaseServer {
15221524
url: url,
15231525
page: page,
15241526
body: getRequestMeta(params.request, '__NEXT_CLONABLE_BODY'),
1525-
signal: signalFromNodeRequest(
1526-
(params.request as NodeNextRequest).originalRequest
1527+
signal: signalFromNodeResponse(
1528+
(params.response as NodeNextResponse).originalResponse
15271529
),
15281530
},
15291531
useCache: true,
@@ -1624,14 +1626,12 @@ export default class NextNodeServer extends BaseServer {
16241626
res.statusCode = result.response.status
16251627

16261628
const { originalResponse } = res as NodeNextResponse
1627-
for await (const chunk of result.response.body || ([] as any)) {
1628-
if (originalResponse.closed) break
1629-
this.streamResponseChunk(originalResponse, chunk)
1630-
}
1631-
res.send()
1632-
return {
1633-
finished: true,
1629+
if (result.response.body) {
1630+
await pipeReadable(result.response.body, originalResponse)
1631+
} else {
1632+
originalResponse.end()
16341633
}
1634+
return { finished: true }
16351635
}
16361636
} catch (err) {
16371637
if (isError(err) && err.code === 'ENOENT') {
@@ -1805,8 +1805,8 @@ export default class NextNodeServer extends BaseServer {
18051805
...(params.params && { params: params.params }),
18061806
},
18071807
body: getRequestMeta(params.req, '__NEXT_CLONABLE_BODY'),
1808-
signal: signalFromNodeRequest(
1809-
(params.req as NodeNextRequest).originalRequest
1808+
signal: signalFromNodeResponse(
1809+
(params.res as NodeNextResponse).originalResponse
18101810
),
18111811
},
18121812
useCache: true,
@@ -1835,19 +1835,7 @@ export default class NextNodeServer extends BaseServer {
18351835

18361836
const nodeResStream = (params.res as NodeNextResponse).originalResponse
18371837
if (result.response.body) {
1838-
// TODO(gal): not sure that we always need to stream
1839-
const { consumeUint8ArrayReadableStream } =
1840-
require('next/dist/compiled/edge-runtime') as typeof import('next/dist/compiled/edge-runtime')
1841-
try {
1842-
for await (const chunk of consumeUint8ArrayReadableStream(
1843-
result.response.body
1844-
)) {
1845-
if (nodeResStream.closed) break
1846-
nodeResStream.write(chunk)
1847-
}
1848-
} finally {
1849-
nodeResStream.end()
1850-
}
1838+
await pipeReadable(result.response.body, nodeResStream)
18511839
} else {
18521840
nodeResStream.end()
18531841
}
+100
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
export function isAbortError(e: any): e is Error & { name: 'AbortError' } {
2+
return e?.name === 'AbortError'
3+
}
4+
5+
/**
6+
* This is a minimal implementation of a Writable with just enough
7+
* functionality to handle stream cancellation.
8+
*/
9+
export interface PipeTarget {
10+
/**
11+
* Called when new data is read from readable source.
12+
*/
13+
write: (chunk: Uint8Array) => unknown
14+
15+
/**
16+
* Always called once we read all data (if the writable isn't already
17+
* destroyed by a client disconnect).
18+
*/
19+
end: () => unknown
20+
21+
/**
22+
* An optional method which is called after every write, to support
23+
* immediately streaming in gzip responses.
24+
*/
25+
flush?: () => unknown
26+
27+
/**
28+
* The close event listener is necessary for us to detect an early client
29+
* disconnect while we're attempting to read data. This must be done
30+
* out-of-band so that we can cancel the readable (else we'd have to wait for
31+
* the readable to produce more data before we could tell it to cancel).
32+
*/
33+
on: (event: 'close', cb: () => void) => void
34+
35+
/**
36+
* Allows us to cleanup our onClose listener.
37+
*/
38+
off: (event: 'close', cb: () => void) => void
39+
}
40+
41+
export async function pipeReadable(
42+
readable: ReadableStream,
43+
writable: PipeTarget
44+
) {
45+
const reader = readable.getReader()
46+
let readerDone = false
47+
let writableClosed = false
48+
49+
// It's not enough just to check for `writable.destroyed`, because the client
50+
// may disconnect while we're waiting for a read. We need to immediately
51+
// cancel the readable, and that requires an out-of-band listener.
52+
function onClose() {
53+
writableClosed = true
54+
writable.off('close', onClose)
55+
56+
// If the reader is not yet done, we need to cancel it so that the stream
57+
// source's resources can be cleaned up. If a read is in-progress, this
58+
// will also ensure the read promise rejects and frees our resources.
59+
if (!readerDone) {
60+
readerDone = true
61+
reader.cancel().catch(() => {})
62+
}
63+
}
64+
writable.on('close', onClose)
65+
66+
try {
67+
while (true) {
68+
const { done, value } = await reader.read()
69+
readerDone = done
70+
71+
if (done || writableClosed) {
72+
break
73+
}
74+
75+
if (value) {
76+
writable.write(Buffer.from(value))
77+
writable.flush?.()
78+
}
79+
}
80+
} catch (e) {
81+
// If the client disconnects, we don't want to emit an unhandled error.
82+
if (!isAbortError(e)) {
83+
throw e
84+
}
85+
} finally {
86+
writable.off('close', onClose)
87+
88+
// If we broke out of the loop because of a client disconnect, and the
89+
// close event hasn't yet fired, we can early cancel.
90+
if (!readerDone) {
91+
reader.cancel().catch(() => {})
92+
}
93+
94+
// If the client hasn't disconnected yet, end the writable so that the
95+
// response sends the final bytes.
96+
if (!writableClosed) {
97+
writable.end()
98+
}
99+
}
100+
}

‎packages/next/src/server/render-result.ts

+3-43
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { pipeReadable, PipeTarget } from './pipe-readable'
2+
13
type ContentTypeOption = string | undefined
24

35
export type RenderResultMetadata = {
@@ -11,13 +13,6 @@ export type RenderResultMetadata = {
1113

1214
type RenderResultResponse = string | ReadableStream<Uint8Array> | null
1315

14-
export interface PipeTarget {
15-
write: (chunk: Uint8Array) => unknown
16-
end: () => unknown
17-
flush?: () => unknown
18-
destroy: (err?: Error) => unknown
19-
}
20-
2116
export default class RenderResult {
2217
/**
2318
* The detected content type for the response. This is used to set the
@@ -105,41 +100,6 @@ export default class RenderResult {
105100
)
106101
}
107102

108-
const flush =
109-
'flush' in res && typeof res.flush === 'function'
110-
? res.flush.bind(res)
111-
: () => {}
112-
const reader = this.response.getReader()
113-
114-
let shouldFatalError = false
115-
try {
116-
let result = await reader.read()
117-
if (!result.done) {
118-
// As we're going to write to the response, we should destroy the
119-
// response if an error occurs.
120-
shouldFatalError = true
121-
}
122-
123-
while (!result.done) {
124-
// Write the data to the response.
125-
res.write(result.value)
126-
127-
// Flush it to the client (if it supports flushing).
128-
flush()
129-
130-
// Read the next chunk.
131-
result = await reader.read()
132-
}
133-
134-
// We're done writing to the response, so we can end it.
135-
res.end()
136-
} catch (err) {
137-
// If we've written to the response, we should destroy it.
138-
if (shouldFatalError) {
139-
res.destroy(err as any)
140-
}
141-
142-
throw err
143-
}
103+
return await pipeReadable(this.response, res)
144104
}
145105
}

‎packages/next/src/server/send-response.ts

+2-10
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { BaseNextRequest, BaseNextResponse } from './base-http'
22
import type { NodeNextResponse } from './base-http/node'
3+
import { pipeReadable } from './pipe-readable'
34
import { splitCookiesString } from './web/utils'
45

56
/**
@@ -44,16 +45,7 @@ export async function sendResponse(
4445

4546
// A response body must not be sent for HEAD requests. See https://httpwg.org/specs/rfc9110.html#HEAD
4647
if (response.body && req.method !== 'HEAD') {
47-
const { consumeUint8ArrayReadableStream } =
48-
require('next/dist/compiled/edge-runtime') as typeof import('next/dist/compiled/edge-runtime')
49-
const iterator = consumeUint8ArrayReadableStream(response.body)
50-
try {
51-
for await (const chunk of iterator) {
52-
originalResponse.write(chunk)
53-
}
54-
} finally {
55-
originalResponse.end()
56-
}
48+
await pipeReadable(response.body, originalResponse)
5749
} else {
5850
originalResponse.end()
5951
}

‎packages/next/src/server/stream-utils/node-web-streams-helper.ts

+5-2
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,12 @@ export const streamToBufferedResult = async (
2424
renderChunks.push(decodeText(chunk, textDecoder))
2525
},
2626
end() {},
27-
destroy() {},
27+
28+
// We do not support stream cancellation
29+
on() {},
30+
off() {},
2831
}
29-
await renderResult.pipe(writable as any)
32+
await renderResult.pipe(writable)
3033
return renderChunks.join('')
3134
}
3235

‎packages/next/src/server/web-server.ts

+19-6
Original file line numberDiff line numberDiff line change
@@ -374,14 +374,27 @@ export default class NextWebServer extends BaseServer<WebServerOptions> {
374374

375375
if (options.result.isDynamic) {
376376
const writer = res.transformStream.writable.getWriter()
377-
options.result.pipe({
377+
378+
let innerClose: undefined | (() => void)
379+
const target = {
378380
write: (chunk: Uint8Array) => writer.write(chunk),
379381
end: () => writer.close(),
380-
destroy: (err: Error) => writer.abort(err),
381-
cork: () => {},
382-
uncork: () => {},
383-
// Not implemented: on/removeListener
384-
} as any)
382+
383+
on(_event: 'close', cb: () => void) {
384+
innerClose = cb
385+
},
386+
off(_event: 'close', _cb: () => void) {
387+
innerClose = undefined
388+
},
389+
}
390+
const onClose = () => {
391+
innerClose?.()
392+
}
393+
// No, this cannot be replaced with `finally`, because early cancelling
394+
// the stream will create a rejected promise, and finally will create an
395+
// unhandled rejection.
396+
writer.closed.then(onClose, onClose)
397+
options.result.pipe(target)
385398
} else {
386399
const payload = await options.result.toUnchunkedString()
387400
res.setHeader('Content-Length', String(byteLength(payload)))

‎packages/next/src/server/web/spec-extension/adapters/next-request.ts

+40-11
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,62 @@
11
import type { BaseNextRequest } from '../../../base-http'
22
import type { NodeNextRequest } from '../../../base-http/node'
33
import type { WebNextRequest } from '../../../base-http/web'
4-
import type { IncomingMessage } from 'node:http'
4+
import type { Writable } from 'node:stream'
55

66
import { getRequestMeta } from '../../../request-meta'
77
import { fromNodeOutgoingHttpHeaders } from '../../utils'
88
import { NextRequest } from '../request'
99

10-
export function signalFromNodeRequest(request: IncomingMessage) {
11-
const { errored } = request
12-
if (errored) return AbortSignal.abort(errored)
10+
/**
11+
* Creates an AbortSignal tied to the closing of a ServerResponse (or other
12+
* appropriate Writable).
13+
*
14+
* This cannot be done with the request (IncomingMessage or Readable) because
15+
* the `abort` event will not fire if to data has been fully read (because that
16+
* will "close" the readable stream and nothing fires after that).
17+
*/
18+
export function signalFromNodeResponse(response: Writable) {
19+
const { errored, destroyed } = response
20+
if (errored || destroyed) return AbortSignal.abort(errored)
21+
1322
const controller = new AbortController()
14-
request.on('error', (e) => {
15-
controller.abort(e)
16-
})
23+
// If `finish` fires first, then `res.end()` has been called and the close is
24+
// just us finishing the stream on our side. If `close` fires first, then we
25+
// know the client disconnected before we finished.
26+
function onClose() {
27+
controller.abort()
28+
// eslint-disable-next-line @typescript-eslint/no-use-before-define
29+
response.off('finish', onFinish)
30+
}
31+
function onFinish() {
32+
response.off('close', onClose)
33+
}
34+
response.once('close', onClose)
35+
response.once('finish', onFinish)
36+
1737
return controller.signal
1838
}
1939

2040
export class NextRequestAdapter {
21-
public static fromBaseNextRequest(request: BaseNextRequest): NextRequest {
41+
public static fromBaseNextRequest(
42+
request: BaseNextRequest,
43+
signal: AbortSignal
44+
): NextRequest {
2245
// TODO: look at refining this check
2346
if ('request' in request && (request as WebNextRequest).request) {
2447
return NextRequestAdapter.fromWebNextRequest(request as WebNextRequest)
2548
}
2649

27-
return NextRequestAdapter.fromNodeNextRequest(request as NodeNextRequest)
50+
return NextRequestAdapter.fromNodeNextRequest(
51+
request as NodeNextRequest,
52+
signal
53+
)
2854
}
2955

30-
public static fromNodeNextRequest(request: NodeNextRequest): NextRequest {
56+
public static fromNodeNextRequest(
57+
request: NodeNextRequest,
58+
signal: AbortSignal
59+
): NextRequest {
3160
// HEAD and GET requests can not have a body.
3261
let body: BodyInit | null = null
3362
if (request.method !== 'GET' && request.method !== 'HEAD' && request.body) {
@@ -57,7 +86,7 @@ export class NextRequestAdapter {
5786
headers: fromNodeOutgoingHttpHeaders(request.headers),
5887
// @ts-expect-error - see https://github.com/whatwg/fetch/pull/1457
5988
duplex: 'half',
60-
signal: signalFromNodeRequest(request.originalRequest),
89+
signal,
6190
// geo
6291
// ip
6392
// nextConfig
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import { Streamable } from '../../streamable'
2+
3+
export const runtime = 'edge'
4+
5+
let streamable: ReturnType<typeof Streamable> | undefined
6+
7+
export async function GET(req: Request): Promise<Response> {
8+
// Consume the entire request body.
9+
// This is so we don't confuse the request close with the connection close.
10+
await req.text()
11+
12+
// The 2nd request should render the stats. We don't use a query param
13+
// because edge rendering will create a different bundle for that.
14+
if (streamable) {
15+
const old = streamable
16+
streamable = undefined
17+
const i = await old.finished
18+
return new Response(`${i}`)
19+
}
20+
21+
const write = new URL(req.url!, 'http://localhost/').searchParams.get('write')
22+
const s = (streamable = Streamable(+write!))
23+
req.signal.onabort = () => {
24+
s.abort()
25+
}
26+
return new Response(s.stream)
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import { Streamable } from '../../streamable'
2+
3+
export const runtime = 'nodejs'
4+
// Next thinks it can statically compile this route, which breaks the test.
5+
export const dynamic = 'force-dynamic'
6+
7+
let streamable: ReturnType<typeof Streamable> | undefined
8+
9+
export async function GET(req: Request): Promise<Response> {
10+
// Consume the entire request body.
11+
// This is so we don't confuse the request close with the connection close.
12+
await req.text()
13+
14+
// The 2nd request should render the stats. We don't use a query param
15+
// because edge rendering will create a different bundle for that.
16+
if (streamable) {
17+
const old = streamable
18+
streamable = undefined
19+
const i = await old.finished
20+
return new Response(`${i}`)
21+
}
22+
23+
const write = new URL(req.url!, 'http://localhost/').searchParams.get('write')
24+
const s = (streamable = Streamable(+write!))
25+
req.signal.onabort = () => {
26+
s.abort()
27+
}
28+
return new Response(s.stream)
29+
}

‎test/e2e/cancel-request/middleware.ts

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import { Streamable } from './streamable'
2+
3+
export const config = {
4+
matcher: '/middleware',
5+
}
6+
7+
let streamable: ReturnType<typeof Streamable> | undefined
8+
9+
export default async function handler(req: Request): Promise<Response> {
10+
// Consume the entire request body.
11+
// This is so we don't confuse the request close with the connection close.
12+
await req.text()
13+
14+
// The 2nd request should render the stats. We don't use a query param
15+
// because edge rendering will create a different bundle for that.
16+
if (streamable) {
17+
const old = streamable
18+
streamable = undefined
19+
const i = await old.finished
20+
return new Response(`${i}`)
21+
}
22+
23+
const write = new URL(req.url!, 'http://localhost/').searchParams.get('write')
24+
const s = (streamable = Streamable(+write!))
25+
req.signal.onabort = () => {
26+
s.abort()
27+
}
28+
return new Response(s.stream)
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import { Streamable } from '../../streamable'
2+
3+
export const config = {
4+
runtime: 'edge',
5+
}
6+
7+
let streamable: ReturnType<typeof Streamable> | undefined
8+
9+
export default async function handler(req: Request): Promise<Response> {
10+
// Consume the entire request body.
11+
// This is so we don't confuse the request close with the connection close.
12+
await req.text()
13+
14+
// The 2nd request should render the stats. We don't use a query param
15+
// because edge rendering will create a different bundle for that.
16+
if (streamable) {
17+
const old = streamable
18+
streamable = undefined
19+
const i = await old.finished
20+
return new Response(`${i}`)
21+
}
22+
23+
const write = new URL(req.url!, 'http://localhost/').searchParams.get('write')
24+
const s = (streamable = Streamable(+write!))
25+
req.signal.onabort = () => {
26+
s.abort()
27+
}
28+
return new Response(s.stream)
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import { IncomingMessage, ServerResponse } from 'http'
2+
import { pipeline } from 'stream'
3+
import { Readable } from '../../readable'
4+
5+
export const config = {
6+
runtime: 'nodejs',
7+
}
8+
9+
let readable: ReturnType<typeof Readable> | undefined
10+
11+
export default function handler(
12+
req: IncomingMessage,
13+
res: ServerResponse
14+
): Promise<void> {
15+
// Pages API requests have already consumed the body.
16+
// This is so we don't confuse the request close with the connection close.
17+
18+
// The 2nd request should render the stats. We don't use a query param
19+
// because edge rendering will create a different bundle for that.
20+
if (readable) {
21+
const old = readable
22+
readable = undefined
23+
return old.finished.then((i) => {
24+
res.end(`${i}`)
25+
})
26+
}
27+
28+
const write = new URL(req.url!, 'http://localhost/').searchParams.get('write')
29+
const r = (readable = Readable(+write!))
30+
res.on('close', () => {
31+
r.abort()
32+
})
33+
return new Promise((resolve) => {
34+
pipeline(r.stream, res, () => {
35+
resolve()
36+
res.end()
37+
})
38+
})
39+
}
+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
export default function Home() {
2+
return 'index'
3+
}

‎test/e2e/cancel-request/readable.ts

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import * as stream from 'stream'
2+
import { Deferred, sleep } from './sleep'
3+
4+
export function Readable(write: number) {
5+
const encoder = new TextEncoder()
6+
const cleanedUp = new Deferred()
7+
const aborted = new Deferred()
8+
let i = 0
9+
10+
const readable = {
11+
finished: Promise.all([cleanedUp.promise, aborted.promise]).then(() => i),
12+
13+
abort() {
14+
aborted.resolve()
15+
},
16+
stream: new stream.Readable({
17+
async read() {
18+
if (i >= write) {
19+
return
20+
}
21+
22+
await sleep(100)
23+
this.push(encoder.encode(String(i++)))
24+
},
25+
destroy() {
26+
cleanedUp.resolve()
27+
},
28+
}),
29+
}
30+
return readable
31+
}

‎test/e2e/cancel-request/sleep.ts

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
export function sleep(ms: number) {
2+
return new Promise((res) => setTimeout(res, ms))
3+
}
4+
5+
export class Deferred<T> {
6+
declare promise: Promise<T>
7+
declare resolve: (v?: T | PromiseLike<T>) => void
8+
declare reject: (r?: any) => void
9+
10+
constructor() {
11+
this.promise = new Promise((res, rej) => {
12+
this.resolve = res
13+
this.reject = rej
14+
})
15+
}
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import { createNextDescribe } from 'e2e-utils'
2+
import { sleep } from './sleep'
3+
import { get } from 'http'
4+
5+
createNextDescribe(
6+
'streaming responses cancel inner stream after disconnect',
7+
{
8+
files: __dirname,
9+
},
10+
({ next }) => {
11+
// For some reason, it's flakey. Try a few times.
12+
jest.retryTimes(3)
13+
14+
function prime(url: string, noData?: boolean) {
15+
return new Promise<void>((resolve) => {
16+
url = new URL(url, next.url).href
17+
18+
// There's a bug in node-fetch v2 where aborting the fetch will never abort
19+
// the connection, because the body is a transformed stream that doesn't
20+
// close the connection stream.
21+
// https://github.com/node-fetch/node-fetch/pull/670
22+
const req = get(url, async (res) => {
23+
while (true) {
24+
const value = res.read(1)
25+
if (value) break
26+
await sleep(5)
27+
}
28+
29+
res.destroy()
30+
resolve()
31+
})
32+
req.end()
33+
34+
if (noData) {
35+
req.on('error', (e) => {
36+
// Swallow the "socket hang up" message that happens if you abort
37+
// before the a response connection is received.
38+
if ((e as any).code !== 'ECONNRESET') {
39+
throw e
40+
}
41+
})
42+
43+
setTimeout(() => {
44+
req.abort()
45+
resolve()
46+
}, 100)
47+
}
48+
})
49+
}
50+
51+
describe.each([
52+
['middleware', '/middleware'],
53+
['edge app route handler', '/edge-route'],
54+
['node app route handler', '/node-route'],
55+
['edge pages api', '/api/edge-api'],
56+
['node pages api', '/api/node-api'],
57+
])('%s', (_name, path) => {
58+
it('cancels stream making progress', async () => {
59+
// If the stream is making regular progress, then we'll eventually hit
60+
// the break because `res.destroyed` is true.
61+
const url = path + '?write=25'
62+
await prime(url)
63+
const res = await next.fetch(url)
64+
const i = +(await res.text())
65+
expect(i).toBeWithin(1, 5)
66+
}, 2500)
67+
68+
it('cancels stalled stream', async () => {
69+
// If the stream is stalled, we'll never hit the `res.destroyed` break
70+
// point, so this ensures we handle it with an out-of-band cancellation.
71+
const url = path + '?write=1'
72+
await prime(url)
73+
const res = await next.fetch(url)
74+
const i = +(await res.text())
75+
expect(i).toBe(1)
76+
}, 2500)
77+
78+
it('cancels stream that never sent data', async () => {
79+
// If the client has never sent any data (including headers), then we
80+
// haven't even established the response object yet.
81+
const url = path + '?write=0'
82+
await prime(url, true)
83+
const res = await next.fetch(url)
84+
const i = +(await res.text())
85+
expect(i).toBe(0)
86+
}, 2500)
87+
})
88+
}
89+
)

‎test/e2e/cancel-request/streamable.ts

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import { Deferred, sleep } from './sleep'
2+
3+
export function Streamable(write: number) {
4+
const encoder = new TextEncoder()
5+
const cleanedUp = new Deferred()
6+
const aborted = new Deferred()
7+
let i = 0
8+
9+
const streamable = {
10+
finished: Promise.all([cleanedUp.promise, aborted.promise]).then(() => i),
11+
12+
abort() {
13+
aborted.resolve()
14+
},
15+
stream: new ReadableStream({
16+
async pull(controller) {
17+
if (i >= write) {
18+
return
19+
}
20+
21+
await sleep(100)
22+
controller.enqueue(encoder.encode(String(i++)))
23+
},
24+
cancel() {
25+
cleanedUp.resolve()
26+
},
27+
}),
28+
}
29+
return streamable
30+
}

‎test/integration/edge-runtime-streaming-error/test/index.test.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ function test(context: ReturnType<typeof createContext>) {
2222
await waitFor(200)
2323
await check(
2424
() => stripAnsi(context.output),
25-
new RegExp(`This ReadableStream did not return bytes.`, 'm')
25+
new RegExp(`The first argument must be of type string`, 'm')
2626
)
2727
expect(stripAnsi(context.output)).not.toContain('webpack-internal:')
2828
}

0 commit comments

Comments
 (0)
Please sign in to comment.