Skip to content

Commit a475312

Browse files
Introduce AudioStream.from_participant + VideoStream.from_participant (#229)
1 parent 86a0229 commit a475312

File tree

3 files changed

+185
-37
lines changed

3 files changed

+185
-37
lines changed

livekit-rtc/livekit/rtc/audio_stream.py

+93-18
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,20 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from __future__ import annotations
16+
1517
import asyncio
16-
from typing import Optional, AsyncIterator
18+
from dataclasses import dataclass
19+
from typing import Any, AsyncIterator, Optional
1720

18-
from ._ffi_client import FfiHandle, FfiClient
21+
from ._ffi_client import FfiClient, FfiHandle
1922
from ._proto import audio_frame_pb2 as proto_audio_frame
2023
from ._proto import ffi_pb2 as proto_ffi
24+
from ._proto.track_pb2 import TrackSource
2125
from ._utils import RingQueue, task_done_logger
2226
from .audio_frame import AudioFrame
27+
from .participant import Participant
2328
from .track import Track
24-
from dataclasses import dataclass
2529

2630

2731
@dataclass
@@ -39,30 +43,95 @@ def __init__(
3943
capacity: int = 0,
4044
sample_rate: int = 48000,
4145
num_channels: int = 1,
46+
**kwargs,
4247
) -> None:
43-
self._track = track
48+
self._track: Track | None = track
49+
self._sample_rate = sample_rate
50+
self._num_channels = num_channels
4451
self._loop = loop or asyncio.get_event_loop()
4552
self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop)
46-
self._queue: RingQueue[AudioFrameEvent] = RingQueue(capacity)
47-
48-
req = proto_ffi.FfiRequest()
49-
new_audio_stream = req.new_audio_stream
50-
new_audio_stream.track_handle = track._ffi_handle.handle
51-
new_audio_stream.type = proto_audio_frame.AudioStreamType.AUDIO_STREAM_NATIVE
52-
new_audio_stream.sample_rate = sample_rate
53-
new_audio_stream.num_channels = num_channels
54-
resp = FfiClient.instance.request(req)
55-
56-
stream_info = resp.new_audio_stream.stream
57-
self._ffi_handle = FfiHandle(stream_info.handle.id)
58-
self._info = stream_info
53+
self._queue: RingQueue[AudioFrameEvent | None] = RingQueue(capacity)
5954

6055
self._task = self._loop.create_task(self._run())
6156
self._task.add_done_callback(task_done_logger)
6257

58+
stream: Any = None
59+
if "participant" in kwargs:
60+
stream = self._create_owned_stream_from_participant(
61+
participant=kwargs["participant"], track_source=kwargs["track_source"]
62+
)
63+
else:
64+
stream = self._create_owned_stream()
65+
self._ffi_handle = FfiHandle(stream.handle.id)
66+
self._info = stream.info
67+
68+
@classmethod
69+
def from_participant(
70+
cls,
71+
*,
72+
participant: Participant,
73+
track_source: TrackSource.ValueType,
74+
loop: Optional[asyncio.AbstractEventLoop] = None,
75+
capacity: int = 0,
76+
sample_rate: int = 48000,
77+
num_channels: int = 1,
78+
) -> AudioStream:
79+
return AudioStream(
80+
participant=participant,
81+
track_source=track_source,
82+
loop=loop,
83+
capacity=capacity,
84+
track=None, # type: ignore
85+
sample_rate=sample_rate,
86+
num_channels=num_channels,
87+
)
88+
89+
@classmethod
90+
def from_track(
91+
cls,
92+
*,
93+
track: Track,
94+
loop: Optional[asyncio.AbstractEventLoop] = None,
95+
capacity: int = 0,
96+
sample_rate: int = 48000,
97+
num_channels: int = 1,
98+
) -> AudioStream:
99+
return AudioStream(
100+
track=track,
101+
loop=loop,
102+
capacity=capacity,
103+
sample_rate=sample_rate,
104+
num_channels=num_channels,
105+
)
106+
63107
def __del__(self) -> None:
64108
FfiClient.instance.queue.unsubscribe(self._ffi_queue)
65109

110+
def _create_owned_stream(self) -> Any:
111+
req = proto_ffi.FfiRequest()
112+
new_audio_stream = req.new_audio_stream
113+
new_audio_stream.track_handle = self._track._ffi_handle.handle
114+
new_audio_stream.sample_rate = self._sample_rate
115+
new_audio_stream.num_channels = self._num_channels
116+
new_audio_stream.type = proto_audio_frame.AudioStreamType.AUDIO_STREAM_NATIVE
117+
resp = FfiClient.instance.request(req)
118+
return resp.new_audio_stream.stream
119+
120+
def _create_owned_stream_from_participant(
121+
self, participant: Participant, track_source: TrackSource.ValueType
122+
) -> Any:
123+
req = proto_ffi.FfiRequest()
124+
audio_stream_from_participant = req.audio_stream_from_participant
125+
audio_stream_from_participant.participant_handle = (
126+
participant._ffi_handle.handle
127+
)
128+
audio_stream_from_participant.type = (
129+
proto_audio_frame.AudioStreamType.AUDIO_STREAM_NATIVE
130+
)
131+
audio_stream_from_participant.track_source = track_source
132+
resp = FfiClient.instance.request(req)
133+
return resp.audio_stream_from_participant.stream
134+
66135
async def _run(self):
67136
while True:
68137
event = await self._ffi_queue.wait_for(self._is_event)
@@ -74,6 +143,7 @@ async def _run(self):
74143
event = AudioFrameEvent(frame)
75144
self._queue.put(event)
76145
elif audio_event.HasField("eos"):
146+
self._queue.put(None)
77147
break
78148

79149
FfiClient.instance.queue.unsubscribe(self._ffi_queue)
@@ -91,4 +161,9 @@ def __aiter__(self) -> AsyncIterator[AudioFrameEvent]:
91161
async def __anext__(self) -> AudioFrameEvent:
92162
if self._task.done():
93163
raise StopAsyncIteration
94-
return await self._queue.get()
164+
165+
item = await self._queue.get()
166+
if item is None:
167+
raise StopAsyncIteration
168+
169+
return item

livekit-rtc/livekit/rtc/room.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
from .e2ee import E2EEManager, E2EEOptions
3131
from .participant import LocalParticipant, Participant, RemoteParticipant
3232
from .track import RemoteAudioTrack, RemoteVideoTrack
33-
from .track_publication import TrackPublication, RemoteTrackPublication
33+
from .track_publication import RemoteTrackPublication, TrackPublication
3434
from .transcription import TranscriptionSegment
3535

3636
EventTypes = Literal[

livekit-rtc/livekit/rtc/video_stream.py

+91-18
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,18 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from __future__ import annotations
16+
1517
import asyncio
1618
from dataclasses import dataclass
17-
from typing import Optional, AsyncIterator
19+
from typing import Any, AsyncIterator, Optional
1820

19-
from ._ffi_client import FfiHandle, FfiClient
21+
from ._ffi_client import FfiClient, FfiHandle
2022
from ._proto import ffi_pb2 as proto_ffi
2123
from ._proto import video_frame_pb2 as proto_video_frame
24+
from ._proto.track_pb2 import TrackSource
2225
from ._utils import RingQueue, task_done_logger
26+
from .participant import Participant
2327
from .track import Track
2428
from .video_frame import VideoFrame
2529

@@ -37,34 +41,98 @@ class VideoStream:
3741
def __init__(
3842
self,
3943
track: Track,
40-
*,
4144
loop: Optional[asyncio.AbstractEventLoop] = None,
4245
capacity: int = 0,
4346
format: Optional[proto_video_frame.VideoBufferType.ValueType] = None,
47+
**kwargs,
4448
) -> None:
45-
self._track = track
4649
self._loop = loop or asyncio.get_event_loop()
4750
self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop)
48-
self._queue: RingQueue[VideoFrameEvent] = RingQueue(capacity)
51+
self._queue: RingQueue[VideoFrameEvent | None] = RingQueue(capacity)
52+
self._track: Track | None = track
53+
self._format = format
54+
self._capacity = capacity
55+
self._format = format
56+
stream: Any = None
57+
if "participant" in kwargs:
58+
stream = self._create_owned_stream_from_participant(
59+
participant=kwargs["participant"], track_source=kwargs["track_source"]
60+
)
61+
else:
62+
stream = self._create_owned_stream()
63+
64+
self._ffi_handle = FfiHandle(stream.handle.id)
65+
self._info = stream.info
66+
67+
self._task = self._loop.create_task(self._run())
68+
self._task.add_done_callback(task_done_logger)
69+
70+
@classmethod
71+
def from_participant(
72+
cls,
73+
*,
74+
participant: Participant,
75+
track_source: TrackSource.ValueType,
76+
loop: Optional[asyncio.AbstractEventLoop] = None,
77+
format: Optional[proto_video_frame.VideoBufferType.ValueType] = None,
78+
capacity: int = 0,
79+
) -> VideoStream:
80+
return VideoStream(
81+
participant=participant,
82+
track_source=track_source,
83+
loop=loop,
84+
capacity=capacity,
85+
format=format,
86+
track=None, # type: ignore
87+
)
88+
89+
@classmethod
90+
def from_track(
91+
cls,
92+
*,
93+
track: Track,
94+
loop: Optional[asyncio.AbstractEventLoop] = None,
95+
format: Optional[proto_video_frame.VideoBufferType.ValueType] = None,
96+
capacity: int = 0,
97+
) -> VideoStream:
98+
return VideoStream(
99+
track=track,
100+
loop=loop,
101+
capacity=capacity,
102+
format=format,
103+
)
49104

105+
def __del__(self) -> None:
106+
FfiClient.instance.queue.unsubscribe(self._ffi_queue)
107+
108+
def _create_owned_stream(self) -> Any:
50109
req = proto_ffi.FfiRequest()
51110
new_video_stream = req.new_video_stream
52-
new_video_stream.track_handle = track._ffi_handle.handle
111+
new_video_stream.track_handle = self._track._ffi_handle.handle
53112
new_video_stream.type = proto_video_frame.VideoStreamType.VIDEO_STREAM_NATIVE
54-
if format is not None:
55-
new_video_stream.format = format
113+
if self._format is not None:
114+
new_video_stream.format = self._format
56115
new_video_stream.normalize_stride = True
57-
58116
resp = FfiClient.instance.request(req)
117+
return resp.new_video_stream.stream
59118

60-
stream_info = resp.new_video_stream.stream
61-
self._ffi_handle = FfiHandle(stream_info.handle.id)
62-
self._info = stream_info.info
63-
self._task = self._loop.create_task(self._run())
64-
self._task.add_done_callback(task_done_logger)
65-
66-
def __del__(self) -> None:
67-
FfiClient.instance.queue.unsubscribe(self._ffi_queue)
119+
def _create_owned_stream_from_participant(
120+
self, participant: Participant, track_source: TrackSource.ValueType
121+
) -> Any:
122+
req = proto_ffi.FfiRequest()
123+
video_stream_from_participant = req.video_stream_from_participant
124+
video_stream_from_participant.participant_handle = (
125+
participant._ffi_handle.handle
126+
)
127+
video_stream_from_participant.type = (
128+
proto_video_frame.VideoStreamType.VIDEO_STREAM_NATIVE
129+
)
130+
video_stream_from_participant.track_source = track_source
131+
video_stream_from_participant.normalize_stride = True
132+
if self._format is not None:
133+
video_stream_from_participant.format = self._format
134+
resp = FfiClient.instance.request(req)
135+
return resp.video_stream_from_participant.stream
68136

69137
async def _run(self) -> None:
70138
while True:
@@ -100,4 +168,9 @@ def __aiter__(self) -> AsyncIterator[VideoFrameEvent]:
100168
async def __anext__(self) -> VideoFrameEvent:
101169
if self._task.done():
102170
raise StopAsyncIteration
103-
return await self._queue.get()
171+
172+
item = await self._queue.get()
173+
if item is None:
174+
raise StopAsyncIteration
175+
176+
return item

0 commit comments

Comments
 (0)