Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add data stream support #347

Merged
merged 38 commits into from
Jan 27, 2025
Merged

Add data stream support #347

merged 38 commits into from
Jan 27, 2025

Conversation

lukasIO
Copy link
Contributor

@lukasIO lukasIO commented Jan 15, 2025

No description provided.

@lukasIO lukasIO requested a review from theomonnom January 15, 2025 14:40
@lukasIO lukasIO marked this pull request as ready for review January 17, 2025 18:17
Copy link
Member

@davidzhao davidzhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really great work!

@lukasIO lukasIO requested review from longcw and davidzhao January 23, 2025 12:00
Copy link
Member

@davidzhao davidzhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice! just a few minor things

TOKEN = os.environ.get("LIVEKIT_TOKEN")
URL = os.environ.get("LIVEKIT_URL")


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it'd be good to include some comments on what this example demonstrates, and what other components the user should prepare for in order to see a demo.

it seems like it's built to work with the JS example?

attributes=dict(header.attributes),
name=header.byte_header.name,
)
self._queue: RingQueue[proto_DataStream.Chunk | None] = RingQueue(capacity)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self._queue: RingQueue[proto_DataStream.Chunk | None] = RingQueue(capacity)
self._queue: RingQueue[Optional[proto_DataStream.Chunk]] = RingQueue(capacity)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is how we also do it in other places in the python SDK, will leave it with | None for consistency

Copy link
Member

@davidzhao davidzhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🍾

@lukasIO lukasIO merged commit 124af3f into main Jan 27, 2025
12 checks passed
@lukasIO lukasIO deleted the lukas/data-streams branch January 27, 2025 10:11
elif which == "stream_chunk_received":
asyncio.gather(self._handle_stream_chunk(event.stream_chunk_received.chunk))
elif which == "stream_trailer_received":
asyncio.gather(
Copy link
Member

@theomonnom theomonnom Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't use asyncio.gather. The correct way to spawn tasks would be to use asyncio.create_task
We should also be careful about always keeping a reference to a task or it may get GC'ed and may not complete.

Let's also cancel them on disconnect

Comment on lines +607 to +608
mime_type: str = "application/octet-stream",
extensions: Optional[Dict[str, str]] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do like the send_text method
extensions: dict[str, str]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I specifically changed this to optional because @longcw had concerns about defaulting a list to an empty list in method parameters

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mime_type: str = "application/octet-stream",
extensions: Optional[Dict[str, str]] = None,
stream_id: str | None = None,
destination_identities: Optional[List[str]] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

destination_identities: List[str] = [],


async def send_file(
self,
file_path: str,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use kwargs here

@lukasIO lukasIO mentioned this pull request Jan 29, 2025
local_participant: LocalParticipant,
*,
topic: str = "",
extensions: Optional[Dict[str, str]] = {},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In BaseStreamWriter it's renamed to attributes, but here is still extensions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks! fixed in #352



@dataclass
class BaseStreamInfo(TypedDict):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be either a dataclass or a TypedDict, but not both. I prefer to make it a dataclass.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise it raise error when trying to access the fields since info is a dict

Traceback (most recent call last):
  File "/home/longc/data/code/agents/examples/avatar/plugin/worker.py", line 112, in _read_audio
    async for frame in self._audio_receiver.stream():
  File "/home/longc/data/code/agents/examples/avatar/plugin/io.py", line 208, in stream
    sample_rate = int(reader.info.attributes["sample_rate"])
                      ^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'dict' object has no attribute 'attributes'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks! updated in #352

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants