Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix stream cancellation in RenderResult.pipe() and sendResponse() #52157

Merged
merged 7 commits into from
Jul 5, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -80,11 +80,20 @@ async function render(request: NextRequest, event: NextFetchEvent) {
response.headers.append('Vary', RSC_VARY_HEADER)

const writer = tranform.writable.getWriter()
result.pipe({
const target = {
write: (chunk: Uint8Array) => writer.write(chunk),
end: () => writer.close(),
destroy: (reason?: Error) => writer.abort(reason),
})
closed: false,
}
const onClose = () => {
target.closed = true
}
// No, this cannot be replaced with `finally`, because early cancelling
// the stream will create a rejected promise, and finally will create an
// unhandled rejection.
writer.closed.then(onClose, onClose)
result.pipe(target)

return response
}
21 changes: 8 additions & 13 deletions packages/next/src/server/render-result.ts
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ export interface PipeTarget {
end: () => unknown
flush?: () => unknown
destroy: (err?: Error) => unknown
get closed(): boolean
}

export default class RenderResult {
@@ -111,31 +112,25 @@ export default class RenderResult {
: () => {}
const reader = this.response.getReader()

let shouldFatalError = false
try {
let result = await reader.read()
if (!result.done) {
// As we're going to write to the response, we should destroy the
// response if an error occurs.
shouldFatalError = true
}
while (true) {
const result = await reader.read()

if (res.closed || result.done) {
break
}

while (!result.done) {
// Write the data to the response.
res.write(result.value)

// Flush it to the client (if it supports flushing).
flush()

// Read the next chunk.
result = await reader.read()
}

// We're done writing to the response, so we can end it.
res.end()
} catch (err) {
// If we've written to the response, we should destroy it.
if (shouldFatalError) {
if (!res.closed) {
res.destroy(err as any)
}

1 change: 1 addition & 0 deletions packages/next/src/server/send-response.ts
Original file line number Diff line number Diff line change
@@ -49,6 +49,7 @@ export async function sendResponse(
const iterator = consumeUint8ArrayReadableStream(response.body)
try {
for await (const chunk of iterator) {
if (originalResponse.closed) break
originalResponse.write(chunk)
}
} finally {
Original file line number Diff line number Diff line change
@@ -25,8 +25,9 @@ export const streamToBufferedResult = async (
},
end() {},
destroy() {},
closed: false,
}
await renderResult.pipe(writable as any)
await renderResult.pipe(writable)
return renderChunks.join('')
}

16 changes: 11 additions & 5 deletions packages/next/src/server/web-server.ts
Original file line number Diff line number Diff line change
@@ -421,14 +421,20 @@ export default class NextWebServer extends BaseServer<WebServerOptions> {

if (options.result.isDynamic) {
const writer = res.transformStream.writable.getWriter()
options.result.pipe({
const target = {
write: (chunk: Uint8Array) => writer.write(chunk),
end: () => writer.close(),
destroy: (err: Error) => writer.abort(err),
cork: () => {},
uncork: () => {},
// Not implemented: on/removeListener
} as any)
closed: false,
} as any
const onClose = () => {
target.closed = true
}
// No, this cannot be replaced with `finally`, because early cancelling
// the stream will create a rejected promise, and finally will create an
// unhandled rejection.
writer.closed.then(onClose, onClose)
options.result.pipe(target)
} else {
const payload = await options.result.toUnchunkedString()
res.setHeader('Content-Length', String(byteLength(payload)))
26 changes: 26 additions & 0 deletions test/e2e/cancel-request/app/edge-route/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { Streamable } from '../../streamable'

export const runtime = 'edge'

let streamable
let requestAborted = false

export function GET(req: Request): Response {
// The 2nd request should render the stats. We don't use a query param
// because edge rendering will create a different bundle for that.
if (streamable) {
return new Response(
JSON.stringify({
requestAborted,
i: streamable.i,
streamCleanedUp: streamable.streamCleanedUp,
})
)
}

streamable = Streamable()
req.signal.onabort = () => {
requestAborted = true
}
return new Response(streamable.stream)
}
28 changes: 28 additions & 0 deletions test/e2e/cancel-request/app/node-route/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { Streamable } from '../../streamable'

export const runtime = 'nodejs'
// Next thinks it can statically compile this route, which breaks the test.
export const dynamic = 'force-dynamic'

let streamable
let requestAborted = false

export function GET(req: Request): Response {
// The 2nd request should render the stats. We don't use a query param
// because edge rendering will create a different bundle for that.
if (streamable) {
return new Response(
JSON.stringify({
requestAborted,
i: streamable.i,
streamCleanedUp: streamable.streamCleanedUp,
})
)
}

streamable = Streamable()
req.signal.onabort = () => {
requestAborted = true
}
return new Response(streamable.stream)
}
28 changes: 28 additions & 0 deletions test/e2e/cancel-request/middleware.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { Streamable } from './streamable'

export const config = {
matcher: '/middleware',
}

let streamable
let requestAborted = false

export default function handler(req: Request): Response {
// The 2nd request should render the stats. We don't use a query param
// because edge rendering will create a different bundle for that.
if (streamable) {
return new Response(
JSON.stringify({
requestAborted,
i: streamable.i,
streamCleanedUp: streamable.streamCleanedUp,
})
)
}

streamable = Streamable()
req.signal.onabort = () => {
requestAborted = true
}
return new Response(streamable.stream)
}
28 changes: 28 additions & 0 deletions test/e2e/cancel-request/pages/api/edge-api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { Streamable } from '../../streamable'

export const config = {
runtime: 'edge',
}

let streamable
let requestAborted = false

export default function handler(req: Request): Response {
// The 2nd request should render the stats. We don't use a query param
// because edge rendering will create a different bundle for that.
if (streamable) {
return new Response(
JSON.stringify({
requestAborted,
i: streamable.i,
streamCleanedUp: streamable.streamCleanedUp,
})
)
}

streamable = Streamable()
req.signal.onabort = () => {
requestAborted = true
}
return new Response(streamable.stream)
}
36 changes: 36 additions & 0 deletions test/e2e/cancel-request/pages/api/node-api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { IncomingMessage, ServerResponse } from 'http'
import { pipeline } from 'stream'
import { Readable } from '../../readable'

export const config = {
runtime: 'nodejs',
}

let readable
let requestAborted = false

export default function handler(
_req: IncomingMessage,
res: ServerResponse
): void {
// The 2nd request should render the stats. We don't use a query param
// because edge rendering will create a different bundle for that.
if (readable) {
res.end(
JSON.stringify({
requestAborted,
i: readable.i,
streamCleanedUp: readable.streamCleanedUp,
})
)
return
}

readable = Readable()
res.on('close', () => {
requestAborted = true
})
pipeline(readable.stream, res, () => {
res.end()
})
}
22 changes: 22 additions & 0 deletions test/e2e/cancel-request/readable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import * as stream from 'stream'
import { sleep } from './sleep'

export function Readable() {
const encoder = new TextEncoder()
const readable = {
i: 0,
streamCleanedUp: false,
stream: new stream.Readable({
async read() {
await sleep(100)
this.push(encoder.encode(String(readable.i++)))

if (readable.i >= 25) this.push(null)
},
destroy() {
readable.streamCleanedUp = true
},
}),
}
return readable
}
3 changes: 3 additions & 0 deletions test/e2e/cancel-request/sleep.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export function sleep(ms: number) {
return new Promise((res) => setTimeout(res, ms))
}
108 changes: 108 additions & 0 deletions test/e2e/cancel-request/stream-cancel.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import { createNextDescribe } from 'e2e-utils'
import { sleep } from './sleep'
import { get } from 'http'

createNextDescribe(
'streaming responses cancel inner stream after disconnect',
{
files: __dirname,
},
({ next }) => {
type CancelState = {
requestAborted: boolean
streamCleanedUp: boolean
i: number
}

function prime(url: string) {
return new Promise<void>((resolve) => {
url = new URL(url, next.url).href

// There's a bug in node-fetch v2 where aborting the fetch will never abort
// the connection, because the body is a transformed stream that doesn't
// close the connection stream.
// https://github.com/node-fetch/node-fetch/pull/670
const req = get(url, async (res) => {
while (true) {
const value = res.read(1)
if (value) break
await sleep(5)
}

res.destroy()

// make sure the connection has finished
await sleep(100)

resolve()
})
req.end()
})
}

// The disconnect from our prime request to the server isn't instant, and
// there's no good signal on the client end for when it happens. So we just
// fetch multiple times waiting for it to happen.
async function getTillCancelled(url: string) {
while (true) {
const res = await next.fetch(url)
const json = (await res.json()) as CancelState
if (json.streamCleanedUp === true) {
return json
}

await sleep(10)
}
}

it('Midddleware cancels inner ReadableStream', async () => {
await prime('/middleware')
const json = await getTillCancelled('/middleware')
expect(json).toMatchObject({
requestAborted: true,
streamCleanedUp: true,
i: (expect as any).toBeWithin(0, 5),
})
})

it('App Route Handler Edge cancels inner ReadableStream', async () => {
await prime('/edge-route')
const json = await getTillCancelled('/edge-route')
expect(json).toMatchObject({
requestAborted: true,
streamCleanedUp: true,
i: (expect as any).toBeWithin(0, 5),
})
})

it('App Route Handler NodeJS cancels inner ReadableStream', async () => {
await prime('/node-route')
const json = await getTillCancelled('/node-route')
expect(json).toMatchObject({
requestAborted: true,
streamCleanedUp: true,
i: (expect as any).toBeWithin(0, 5),
})
})

it('Pages Api Route Edge cancels inner ReadableStream', async () => {
await prime('/api/edge-api')
const json = await getTillCancelled('/api/edge-api')
expect(json).toMatchObject({
requestAborted: true,
streamCleanedUp: true,
i: (expect as any).toBeWithin(0, 5),
})
})

it('Pages Api Route NodeJS cancels inner ReadableStream', async () => {
await prime('/api/node-api')
const json = await getTillCancelled('/api/node-api')
expect(json).toMatchObject({
requestAborted: true,
streamCleanedUp: true,
i: (expect as any).toBeWithin(0, 5),
})
})
}
)
21 changes: 21 additions & 0 deletions test/e2e/cancel-request/streamable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { sleep } from './sleep'

export function Streamable() {
const encoder = new TextEncoder()
const streamable = {
i: 0,
streamCleanedUp: false,
stream: new ReadableStream({
async pull(controller) {
await sleep(100)
controller.enqueue(encoder.encode(String(streamable.i++)))

if (streamable.i >= 25) controller.close()
},
cancel() {
streamable.streamCleanedUp = true
},
}),
}
return streamable
}