Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data streams #603

Merged
merged 80 commits into from
Apr 7, 2025
Merged

Data streams #603

merged 80 commits into from
Apr 7, 2025

Conversation

ladvoc
Copy link
Contributor

@ladvoc ladvoc commented Mar 13, 2025

This PR adds native data streams support in Rust and a high-level interface for FFI clients.

Copy link
Contributor

ilo-nanpa bot commented Mar 13, 2025

it seems like you haven't added any nanpa changeset files to this PR.

if this pull request includes changes to code, make sure to add a changeset, by writing a file to .nanpa/<unique-name>.kdl:

minor type="added" "Introduce frobnication algorithm"

refer to the manpage for more information.

Copy link
Contributor

@bcherry bcherry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm from a protocol level.

@ladvoc ladvoc requested a review from theomonnom March 28, 2025 05:33
@theomonnom
Copy link
Member

theomonnom commented Mar 31, 2025

I missed your previous comment. You mentioned RPC handlers, but I see how the same issue applies to data stream handlers as well. Using data streams as an example, from my current understanding, a race condition can occur when a participant opens a data stream in response to another participant joining, but the newly joined participant hasn't registered a handler for that stream's topic yet:

Yes, it is the same for textstreams/datastreams/rpc

When a topic is preregistered, any incoming stream matching that topic will be queued until the corresponding handler is registered, at which point each queued stream is dequeued and passed to the handler:

Does this mean that every client will have to return all registered handlers after connect? It would be easier if we don't have to.

A key consideration is that when a topic is preregistered, the user must register the corresponding handler within a short window after connecting. Otherwise, incoming streams could accumulate indefinitely without being handled. To prevent this, I propose a timeout interval (e.g., 30 seconds after connect), after which any queued stream readers that don't yet have handlers will be dropped, and an error will be logged. In practice, as long as the handler is registered shortly after connect, the number of queued streams should remain minimal.

If you send directly after connecting (assume the client language already has a registered handler), you could skip this part. We will not be responsible for any buffering

Another solution would be to not register any callback to the room, because it seems like an anti-pattern with our channel based events? In this case everything will be buffered and you "support the register before connect" on all languages for free

@lukasIO
Copy link
Contributor

lukasIO commented Apr 1, 2025

Another solution would be to not register any callback to the room, because it seems like an anti-pattern with our channel based events? In this case everything will be buffered and you "support the register before connect" on all languages for free

@theomonnom do I understand your suggestion correctly that we could

  • buffer all incoming RPC and StreamHeaders (with subsequent chunks) until connected
  • when connected, have a short timeout (even 1s should be enough) that allows for patterns like await room.connect(); registerHandlers();
  • if there are handlers registered within that timeout, pipe all buffered packets to the handlers
  • after the timeout elapsed (and no handlers have been registered) discard all buffered data

@theomonnom
Copy link
Member

theomonnom commented Apr 3, 2025

I think we don't even need to drop them? If we push the data inside the room_events channel. They're buffered but will be consumed as soon as the connection is established. (Like basically a new RoomEvent variant?). No timeout will be needed IMO.

In the code there is no explicit "buffer until we're connected", we just push to our event queue and assume they're going to get consumed by the user event loop. (In this case livekit-ffi)

The issue with the above pattern, is that every language will have to send a registerHandler each time after the connecting. (we have to keep track of all registered handlers before connection)

But if we "always" forward what we receive to the other languages, then no registration is needed.

If we do this "always" forward mode, then the Python/Node doesn't even need to send a RegisterRequest, but they can just read the data they receive from the ffi server and dispatch their own handlers

@ladvoc ladvoc force-pushed the ladvoc/data-streams branch from 7c93b9c to 320abfe Compare April 5, 2025 03:51
@ladvoc ladvoc force-pushed the ladvoc/data-streams branch from 320abfe to 136faa2 Compare April 5, 2025 03:52
Copy link
Member

@theomonnom theomonnom left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm! very clean impl!

@ladvoc ladvoc merged commit 3f54b6d into main Apr 7, 2025
14 of 19 checks passed
@ladvoc ladvoc deleted the ladvoc/data-streams branch April 7, 2025 22:29
ladvoc added a commit that referenced this pull request Apr 8, 2025
ladvoc added a commit that referenced this pull request Apr 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants