Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic streaming support #7856

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

chenmoneygithub
Copy link
Collaborator

@chenmoneygithub chenmoneygithub commented Feb 27, 2025

This PR adds output streaming support in DSPy, which covers:

  • Only string-type output field is qualified for streaming.
  • Users can stream string output field from any submodule of a DSPy program, not necessarily from the final output.
  • The stream chunk is wrapped in class dspy.streaming.StreamResponse, including the actual message and the associated predict's information.

To use the output streaming, users need to:

  • Prepare a list of dspy.streaming.StreamListener, with a required attribute signature_field_name pointing to the output field's name to capture the streaming. predict and predict_name are optional, if not supplied, it will be automatically detected when calling dspy.streamify.
  • Call dspy.streamify() with the list of StreamLIsteners, and process the generator. Please see a code demo below.

So in a nutshell, we just provide users with the ability to capture the token stream for certain fields of certain predicts, but we just return the stream to users. We don't chime in or interfere with how users use these tokens.

Code example:

import asyncio

import dspy
from dspy.streaming import StatusMessage

lm = dspy.LM("openai/gpt-4o-mini")
dspy.settings.configure(lm=lm)


predict = dspy.Predict("question->answer, reasoning")
stream_listeners = [
    dspy.streaming.StreamListener(signature_field_name="answer"),
    dspy.streaming.StreamListener(signature_field_name="reasoning"),
]
stream_predict = dspy.streamify(
    predict, stream_listeners=stream_listeners, include_final_prediction_in_output_stream=False
)

async def main():
    output = stream_predict(question="why did a chicken cross the kitchen?")
    return_value = None
    async for value in output:
        if isinstance(value, StatusMessage):
            print(value.message)
        elif isinstance(value, dspy.Prediction):
            print(value)
        elif isinstance(value, dspy.streaming.StreamResponse):
            print(value)
    return return_value


asyncio.run(main())

It will produce a stream like below:

StreamResponse(predict_name='self', signature_field_name='answer', chunk='To')
StreamResponse(predict_name='self', signature_field_name='answer', chunk=' get')
StreamResponse(predict_name='self', signature_field_name='answer', chunk=' to')
StreamResponse(predict_name='self', signature_field_name='answer', chunk=' the')
StreamResponse(predict_name='self', signature_field_name='answer', chunk=' other side of the kitchen.')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk='This')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' is')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' a')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' playful')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' twist')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' on')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' the')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' classic')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' joke')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' "')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk='Why')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' did')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' the')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' chicken')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' cross')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' the')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' road')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk='?"')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' The')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' humor')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' lies')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' in')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' the')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' unexpected')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' simplicity')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' of')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' the')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' answer')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=',')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' which')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' is')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' a')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' pun')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' on')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' the')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' idea')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' that')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' the')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' chicken')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' is')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' just')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' trying')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' to')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' reach')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' the')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' other')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' side')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' of')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' the')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' kitchen')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=',')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' much')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' like')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' it')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' would')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' cross')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' a')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' road')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk='.')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' It')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' plays')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' on')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' the')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' expectation')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' of')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' a')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' more')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' complex')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' or')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' humorous')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' answer')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=',')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' but')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' instead')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' delivers')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' a')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' straightforward')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' response')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' that')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' is')
StreamResponse(predict_name='self', signature_field_name='reasoning', chunk=' amusing in its simplicity.')

The name is the one you get from program.named_predictors().

@chenmoneygithub chenmoneygithub force-pushed the dspy-streaming-generic branch 2 times, most recently from 7bd112b to b13142b Compare February 28, 2025 18:33
@chenmoneygithub chenmoneygithub requested a review from okhat March 3, 2025 23:36
@chenmoneygithub chenmoneygithub force-pushed the dspy-streaming-generic branch from b13142b to d04eb5d Compare March 3, 2025 23:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant