Skip to content

Commit da95919

Browse files
authoredDec 14, 2024··
Update streaming callable API (#1652)
* Update streaming callable API * Fix linter error * Stream type defaults to unknown * Changelog * Format fix
1 parent 46e6453 commit da95919

File tree

5 files changed

+84
-47
lines changed

5 files changed

+84
-47
lines changed
 

‎CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
- Add an authPolicy callback to CallableOptions for reusable auth middleware as well as helper auth policies (#1650)
2+
- Multiple breaking changes to the not-yet-announced streaming feature for Callable Functions (#1652)

‎spec/common/providers/https.spec.ts

+6-6
Original file line numberDiff line numberDiff line change
@@ -770,7 +770,7 @@ describe("onCallHandler", () => {
770770
cors: { origin: true, methods: "POST" },
771771
},
772772
(req, resp) => {
773-
resp.write("hello");
773+
resp.sendChunk("hello");
774774
return "world";
775775
},
776776
"gcfv2"
@@ -840,10 +840,10 @@ describe("onCallHandler", () => {
840840
{
841841
cors: { origin: true, methods: "POST" },
842842
},
843-
(req, resp) => {
844-
resp.write("initial message");
845-
mockReq.emit("close");
846-
resp.write("should not be sent");
843+
async (req, resp) => {
844+
await resp.sendChunk("initial message");
845+
await mockReq.emit("close");
846+
await resp.sendChunk("should not be sent");
847847
return "done";
848848
},
849849
"gcfv2"
@@ -908,7 +908,7 @@ describe("onCallHandler", () => {
908908
},
909909
async (resp, res) => {
910910
await new Promise((resolve) => setTimeout(resolve, 3_000));
911-
res.write("hello");
911+
res.sendChunk("hello");
912912
await new Promise((resolve) => setTimeout(resolve, 3_000));
913913
return "done";
914914
},

‎spec/helper.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,11 @@ export function runHandler(
8484
}
8585
}
8686

87-
public write(writeBody: any) {
87+
public write(writeBody: any, cb?: () => void) {
8888
this.sentBody += typeof writeBody === "object" ? JSON.stringify(writeBody) : writeBody;
89+
if (cb) {
90+
setImmediate(cb);
91+
}
8992
return true;
9093
}
9194

‎src/common/providers/https.ts

+46-29
Original file line numberDiff line numberDiff line change
@@ -141,23 +141,30 @@ export interface CallableRequest<T = any> {
141141
* The raw request handled by the callable.
142142
*/
143143
rawRequest: Request;
144+
145+
/**
146+
* Whether this is a streaming request.
147+
* Code can be optimized by not trying to generate a stream of chunks to
148+
* call response.sendChunk on if request.acceptsStreaming is false.
149+
* It is always safe, however, to call response.sendChunk as this will
150+
* noop if acceptsStreaming is false.
151+
*/
152+
acceptsStreaming: boolean;
144153
}
145154

146155
/**
147-
* CallableProxyResponse exposes subset of express.Response object
148-
* to allow writing partial, streaming responses back to the client.
156+
* CallableProxyResponse allows streaming response chunks and listening to signals
157+
* triggered in events such as a disconnect.
149158
*/
150-
export interface CallableProxyResponse {
159+
export interface CallableResponse<T = unknown> {
151160
/**
152161
* Writes a chunk of the response body to the client. This method can be called
153162
* multiple times to stream data progressively.
163+
* Returns a promise of whether the data was written. This can be false, for example,
164+
* if the request was not a streaming request. Rejects if there is a network error.
154165
*/
155-
write: express.Response["write"];
156-
/**
157-
* Indicates whether the client has requested and can handle streaming responses.
158-
* This should be checked before attempting to stream data to avoid compatibility issues.
159-
*/
160-
acceptsStreaming: boolean;
166+
sendChunk: (chunk: T) => Promise<boolean>;
167+
161168
/**
162169
* An AbortSignal that is triggered when the client disconnects or the
163170
* request is terminated prematurely.
@@ -586,13 +593,9 @@ async function checkTokens(
586593
auth: "INVALID",
587594
};
588595

589-
await Promise.all([
590-
Promise.resolve().then(async () => {
591-
verifications.auth = await checkAuthToken(req, ctx);
592-
}),
593-
Promise.resolve().then(async () => {
594-
verifications.app = await checkAppCheckToken(req, ctx, options);
595-
}),
596+
[verifications.auth, verifications.app] = await Promise.all([
597+
checkAuthToken(req, ctx),
598+
checkAppCheckToken(req, ctx, options),
596599
]);
597600

598601
const logPayload = {
@@ -697,9 +700,9 @@ async function checkAppCheckToken(
697700
}
698701

699702
type v1CallableHandler = (data: any, context: CallableContext) => any | Promise<any>;
700-
type v2CallableHandler<Req, Res> = (
703+
type v2CallableHandler<Req, Res, Stream> = (
701704
request: CallableRequest<Req>,
702-
response?: CallableProxyResponse
705+
response?: CallableResponse<Stream>
703706
) => Res;
704707

705708
/** @internal **/
@@ -718,9 +721,9 @@ export interface CallableOptions<T = any> {
718721
}
719722

720723
/** @internal */
721-
export function onCallHandler<Req = any, Res = any>(
724+
export function onCallHandler<Req = any, Res = any, Stream = unknown>(
722725
options: CallableOptions<Req>,
723-
handler: v1CallableHandler | v2CallableHandler<Req, Res>,
726+
handler: v1CallableHandler | v2CallableHandler<Req, Res, Stream>,
724727
version: "gcfv1" | "gcfv2"
725728
): (req: Request, res: express.Response) => Promise<void> {
726729
const wrapped = wrapOnCallHandler(options, handler, version);
@@ -739,9 +742,9 @@ function encodeSSE(data: unknown): string {
739742
}
740743

741744
/** @internal */
742-
function wrapOnCallHandler<Req = any, Res = any>(
745+
function wrapOnCallHandler<Req = any, Res = any, Stream = unknown>(
743746
options: CallableOptions<Req>,
744-
handler: v1CallableHandler | v2CallableHandler<Req, Res>,
747+
handler: v1CallableHandler | v2CallableHandler<Req, Res, Stream>,
745748
version: "gcfv1" | "gcfv2"
746749
): (req: Request, res: express.Response) => Promise<void> {
747750
return async (req: Request, res: express.Response): Promise<void> => {
@@ -855,27 +858,41 @@ function wrapOnCallHandler<Req = any, Res = any>(
855858
const arg: CallableRequest<Req> = {
856859
...context,
857860
data,
861+
acceptsStreaming,
858862
};
859863

860-
const responseProxy: CallableProxyResponse = {
861-
write(chunk): boolean {
864+
const responseProxy: CallableResponse<Stream> = {
865+
sendChunk(chunk: Stream): Promise<boolean> {
862866
// if client doesn't accept sse-protocol, response.write() is no-op.
863867
if (!acceptsStreaming) {
864-
return false;
868+
return Promise.resolve(false);
865869
}
866870
// if connection is already closed, response.write() is no-op.
867871
if (abortController.signal.aborted) {
868-
return false;
872+
return Promise.resolve(false);
869873
}
870874
const formattedData = encodeSSE({ message: chunk });
871-
const wrote = res.write(formattedData);
875+
let resolve: (wrote: boolean) => void;
876+
let reject: (err: Error) => void;
877+
const p = new Promise<boolean>((res, rej) => {
878+
resolve = res;
879+
reject = rej;
880+
});
881+
const wrote = res.write(formattedData, (error) => {
882+
if (error) {
883+
reject(error);
884+
return;
885+
}
886+
resolve(wrote);
887+
});
888+
872889
// Reset heartbeat timer after successful write
873890
if (wrote && heartbeatInterval !== null && heartbeatSeconds > 0) {
874891
scheduleHeartbeat();
875892
}
876-
return wrote;
893+
894+
return p;
877895
},
878-
acceptsStreaming,
879896
signal: abortController.signal,
880897
};
881898
if (acceptsStreaming) {

‎src/v2/providers/https.ts

+27-11
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import { isDebugFeatureEnabled } from "../../common/debug";
3333
import { ResetValue } from "../../common/options";
3434
import {
3535
CallableRequest,
36-
CallableProxyResponse,
36+
CallableResponse,
3737
FunctionsErrorCode,
3838
HttpsError,
3939
onCallHandler,
@@ -258,12 +258,17 @@ export type HttpsFunction = ((
258258
/**
259259
* Creates a callable method for clients to call using a Firebase SDK.
260260
*/
261-
export interface CallableFunction<T, Return> extends HttpsFunction {
261+
export interface CallableFunction<T, Return, Stream = unknown> extends HttpsFunction {
262262
/** Executes the handler function with the provided data as input. Used for unit testing.
263263
* @param data - An input for the handler function.
264264
* @returns The output of the handler function.
265265
*/
266-
run(data: CallableRequest<T>): Return;
266+
run(request: CallableRequest<T>): Return;
267+
268+
stream(
269+
request: CallableRequest<T>,
270+
response: CallableResponse<Stream>
271+
): { stream: AsyncIterator<Stream>; output: Return };
267272
}
268273

269274
/**
@@ -387,22 +392,22 @@ export function onRequest(
387392
* @param handler - A function that takes a {@link https.CallableRequest}.
388393
* @returns A function that you can export and deploy.
389394
*/
390-
export function onCall<T = any, Return = any | Promise<any>>(
395+
export function onCall<T = any, Return = any | Promise<any>, Stream = unknown>(
391396
opts: CallableOptions<T>,
392-
handler: (request: CallableRequest<T>, response?: CallableProxyResponse) => Return
393-
): CallableFunction<T, Return extends Promise<unknown> ? Return : Promise<Return>>;
397+
handler: (request: CallableRequest<T>, response?: CallableResponse<Stream>) => Return
398+
): CallableFunction<T, Return extends Promise<unknown> ? Return : Promise<Return>, Stream>;
394399

395400
/**
396401
* Declares a callable method for clients to call using a Firebase SDK.
397402
* @param handler - A function that takes a {@link https.CallableRequest}.
398403
* @returns A function that you can export and deploy.
399404
*/
400-
export function onCall<T = any, Return = any | Promise<any>>(
401-
handler: (request: CallableRequest<T>, response?: CallableProxyResponse) => Return
405+
export function onCall<T = any, Return = any | Promise<any>, Stream = unknown>(
406+
handler: (request: CallableRequest<T>, response?: CallableResponse<Stream>) => Return
402407
): CallableFunction<T, Return extends Promise<unknown> ? Return : Promise<Return>>;
403-
export function onCall<T = any, Return = any | Promise<any>>(
408+
export function onCall<T = any, Return = any | Promise<any>, Stream = unknown>(
404409
optsOrHandler: CallableOptions<T> | ((request: CallableRequest<T>) => Return),
405-
handler?: (request: CallableRequest<T>, response?: CallableProxyResponse) => Return
410+
handler?: (request: CallableRequest<T>, response?: CallableResponse<Stream>) => Return
406411
): CallableFunction<T, Return extends Promise<unknown> ? Return : Promise<Return>> {
407412
let opts: CallableOptions;
408413
if (arguments.length === 1) {
@@ -421,7 +426,7 @@ export function onCall<T = any, Return = any | Promise<any>>(
421426
}
422427

423428
// fix the length of handler to make the call to handler consistent
424-
const fixedLen = (req: CallableRequest<T>, resp?: CallableProxyResponse) => handler(req, resp);
429+
const fixedLen = (req: CallableRequest<T>, resp?: CallableResponse<Stream>) => handler(req, resp);
425430
let func: any = onCallHandler(
426431
{
427432
cors: { origin, methods: "POST" },
@@ -474,6 +479,17 @@ export function onCall<T = any, Return = any | Promise<any>>(
474479
callableTrigger: {},
475480
};
476481

482+
// TODO: in the next major version, do auth/appcheck in these helper methods too.
477483
func.run = withInit(handler);
484+
func.stream = () => {
485+
return {
486+
stream: {
487+
next(): Promise<IteratorResult<Stream>> {
488+
return Promise.reject("Coming soon");
489+
},
490+
},
491+
output: Promise.reject("Coming soon"),
492+
};
493+
};
478494
return func;
479495
}

0 commit comments

Comments
 (0)
Please sign in to comment.