Replies: 2 comments
-
Could you elaborate why Closure Agent can't be used for your usage case? Your closure agent can write messages to file or printing them. |
Beta Was this translation helpful? Give feedback.
-
Thanks, @ekzhu, for the prompt response. I think my question was a bit confusing. I apologize for that. The idea is not to print or save the communications, as it would be straightforward to do in the agents' classes. 1- I want to access those communications in runtime to do something with them, but I wasn't sure if the following example of using ClosureAgent is good practice or not? from autogen_core import DefaultTopicId, SingleThreadedAgentRuntime, ClosureAgent, ClosureContext, DefaultSubscription
from typing import Union
import asyncio
runtime = SingleThreadedAgentRuntime()
# Create a queue to collect results
result_queue = asyncio.Queue()
# Define a closure that can handle both result types
async def collect_results(
_ctx: ClosureContext,
message: Union[CodeWritingResult, CodeReviewResult, CodeReviewTask, CodeWritingTask],
ctx: MessageContext
) -> None:
await result_queue.put(message)
# Register a single ClosureAgent that handles all types of communication
await ClosureAgent.register_closure(
runtime,
"ResultCollector",
collect_results,
subscriptions=lambda: [DefaultSubscription()],
)
await ReviewerAgent.register(
runtime, "ReviewerAgent", lambda: ReviewerAgent(model_client=model_client)
)
await CoderAgent.register(
runtime, "CoderAgent", lambda: CoderAgent(model_client=model_client)
)
runtime.start()
await runtime.publish_message(
message=CodeWritingTask(task="Write a function to find the sum of all even numbers in a list."),
topic_id=DefaultTopicId(),
)
# Keep processing messages until idle.
await runtime.stop_when_idle()
# Print the results just for the sake of this example
while not result_queue.empty():
result = await result_queue.get()
if isinstance(result, CodeWritingResult):
print("Writing Result:", result)
elif isinstance(result, CodeReviewResult):
print("Review Result:", result)
elif isinstance(result, CodeReviewTask):
print("Review Task:", result)
elif isinstance(result, CodeWritingTask):
print("Writing Task:", result) |
Beta Was this translation helpful? Give feedback.
-
I'm planning to use the Reflection design pattern, similar to what is shown in this notebook: https://github.com/microsoft/autogen/blob/main/python/packages/autogen-core/docs/src/user-guide/core-user-guide/design-patterns/reflection.ipynb.
I understand that I can use ClosureAgent to obtain the final results, which is not sufficient in my case. Additionally, it's possible to output communications from the agent classes, either by saving them to a file or printing them. However, is there a better practice for capturing the communications during runtime? In:
Beta Was this translation helpful? Give feedback.
All reactions