From 17043328a2204f043a307d7a8b9eacc055121bc7 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Mon, 18 May 2026 16:53:50 -0400 Subject: [PATCH 1/7] feat: add MVP of propagating room downwards from room -> track -> audio stream And extracting metadata from that room that can be fed into the frame processor. --- livekit-rtc/livekit/rtc/audio_stream.py | 65 ++++++++++++++++++++++++- livekit-rtc/livekit/rtc/room.py | 8 +++ livekit-rtc/livekit/rtc/track.py | 20 +++++++- 3 files changed, 91 insertions(+), 2 deletions(-) diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index 0f949d40..9053ebd3 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -17,7 +17,7 @@ import asyncio import json from dataclasses import dataclass -from typing import Any, AsyncIterator, Optional +from typing import TYPE_CHECKING, Any, AsyncIterator, Optional from ._ffi_client import FfiClient, FfiHandle from ._proto import audio_frame_pb2 as proto_audio_frame @@ -30,6 +30,9 @@ from .track import Track from .frame_processor import FrameProcessor +if TYPE_CHECKING: + from .room import Room + @dataclass class AudioFrameEvent: @@ -65,6 +68,7 @@ def __init__( num_channels: int = 1, frame_size_ms: int | None = None, noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None, + room: Optional["Room"] = None, **kwargs, ) -> None: """Initialize an `AudioStream` instance. @@ -81,6 +85,9 @@ def __init__( noise_cancellation (Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]], optional): If noise cancellation is used, pass a `NoiseCancellationOptions` or `FrameProcessor[AudioFrame]` instance created by the noise cancellation module. + room (Optional[Room], optional): The room this stream's track belongs to, used to + resolve `room_name`, `participant_identity`, and `publication_sid`. May be `None` + if the track is not (yet) associated with a room. Example: ```python @@ -98,6 +105,8 @@ def __init__( ``` """ self._track: Track | None = track + self._room: Room | None = room + print("ROOM:", room) self._sample_rate = sample_rate self._num_channels = num_channels self._frame_size_ms = frame_size_ms @@ -132,6 +141,9 @@ def __init__( self._ffi_handle = FfiHandle(stream.handle.id) self._info = stream.info + if self._track is not None: + self._track._register_audio_stream(self) + @classmethod def from_participant( cls, @@ -144,6 +156,7 @@ def from_participant( num_channels: int = 1, frame_size_ms: int | None = None, noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None, + room: Optional["Room"] = None, ) -> AudioStream: """Create an `AudioStream` from a participant's audio track. @@ -181,6 +194,7 @@ def from_participant( num_channels=num_channels, noise_cancellation=noise_cancellation, frame_size_ms=frame_size_ms, + room=room, ) @classmethod @@ -194,6 +208,7 @@ def from_track( num_channels: int = 1, frame_size_ms: int | None = None, noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None, + room: Optional["Room"] = None, ) -> AudioStream: """Create an `AudioStream` from an existing audio track. @@ -227,8 +242,54 @@ def from_track( num_channels=num_channels, noise_cancellation=noise_cancellation, frame_size_ms=frame_size_ms, + room=room, ) + def _set_room(self, room: Optional["Room"]) -> None: + self._room = room + print("ROOM UPDATE:", room) + + @property + def room(self) -> Optional["Room"]: + return self._room + + @property + def room_name(self) -> Optional[str]: + return self._room.name if self._room is not None else None + + @property + def participant_identity(self) -> Optional[str]: + pub = self._find_publication() + if pub is None: + return None + identity, _ = pub + return identity + + @property + def publication_sid(self) -> Optional[str]: + pub = self._find_publication() + if pub is None: + return None + _, sid = pub + return sid + + def _find_publication(self) -> Optional[tuple[str, str]]: + if self._room is None or self._track is None: + return None + track_sid = self._track.sid + if not track_sid: + return None + for participant in self._room.remote_participants.values(): + publication = participant.track_publications.get(track_sid) + if publication is not None: + return participant.identity, publication.sid + local = self._room._local_participant + if local is not None: + for publication in local.track_publications.values(): + if publication.sid == track_sid: + return local.identity, publication.sid + return None + def __del__(self) -> None: FfiClient.instance.queue.unsubscribe(self._ffi_queue) @@ -303,6 +364,8 @@ async def aclose(self) -> None: This method cleans up resources associated with the audio stream and waits for any pending operations to complete. """ + if self._track is not None: + self._track._unregister_audio_stream(self) self._ffi_handle.dispose() await self._task diff --git a/livekit-rtc/livekit/rtc/room.py b/livekit-rtc/livekit/rtc/room.py index 5bf9e494..41392a4b 100644 --- a/livekit-rtc/livekit/rtc/room.py +++ b/livekit-rtc/livekit/rtc/room.py @@ -726,10 +726,14 @@ def _on_room_event(self, event: proto_room.RoomEvent): sid = event.local_track_published.track_sid lpublication = self.local_participant.track_publications[sid] ltrack = lpublication.track + if ltrack is not None: + ltrack._set_room(self) self.emit("local_track_published", lpublication, ltrack) elif which == "local_track_unpublished": sid = event.local_track_unpublished.publication_sid lpublication = self.local_participant.track_publications[sid] + if lpublication.track is not None: + lpublication.track._set_room(None) self.emit("local_track_unpublished", lpublication) elif which == "local_track_republished": # The SDK auto-republished a local track during a full @@ -770,10 +774,12 @@ def _on_room_event(self, event: proto_room.RoomEvent): rpublication._subscribed = True if track_info.kind == TrackKind.KIND_VIDEO: remote_video_track = RemoteVideoTrack(owned_track_info) + remote_video_track._set_room(self) rpublication._track = remote_video_track self.emit("track_subscribed", remote_video_track, rpublication, rparticipant) elif track_info.kind == TrackKind.KIND_AUDIO: remote_audio_track = RemoteAudioTrack(owned_track_info) + remote_audio_track._set_room(self) rpublication._track = remote_audio_track self.emit("track_subscribed", remote_audio_track, rpublication, rparticipant) elif which == "track_unsubscribed": @@ -781,6 +787,8 @@ def _on_room_event(self, event: proto_room.RoomEvent): rparticipant = self._remote_participants[identity] rpublication = rparticipant.track_publications[event.track_unsubscribed.track_sid] rtrack = rpublication.track + if rtrack is not None: + rtrack._set_room(None) rpublication._track = None rpublication._subscribed = False self.emit("track_unsubscribed", rtrack, rpublication, rparticipant) diff --git a/livekit-rtc/livekit/rtc/track.py b/livekit-rtc/livekit/rtc/track.py index 8a6fe692..9d1b8cbb 100644 --- a/livekit-rtc/livekit/rtc/track.py +++ b/livekit-rtc/livekit/rtc/track.py @@ -12,7 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING, List, Union +import weakref +from typing import TYPE_CHECKING, List, Optional, Union from ._ffi_client import FfiHandle, FfiClient from ._proto import ffi_pb2 as proto_ffi from ._proto import track_pb2 as proto_track @@ -20,6 +21,8 @@ if TYPE_CHECKING: from .audio_source import AudioSource + from .audio_stream import AudioStream + from .room import Room from .video_source import VideoSource @@ -27,6 +30,21 @@ class Track: def __init__(self, owned_info: proto_track.OwnedTrack): self._info = owned_info.info self._ffi_handle = FfiHandle(owned_info.handle.id) + self._room: Optional["Room"] = None + self._audio_streams: "weakref.WeakSet[AudioStream]" = weakref.WeakSet() + + def _set_room(self, room: Optional["Room"]) -> None: + self._room = room + for stream in self._audio_streams: + stream._set_room(room) + + def _register_audio_stream(self, stream: "AudioStream") -> None: + self._audio_streams.add(stream) + if self._room is not None: + stream._set_room(self._room) + + def _unregister_audio_stream(self, stream: "AudioStream") -> None: + self._audio_streams.discard(stream) @property def sid(self) -> str: From 0e3737001c04a9361360a92b814fc33145a36fc2 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Tue, 19 May 2026 16:41:27 -0400 Subject: [PATCH 2/7] feat: call _on_stream_info_updated with parent room reference on audio_stream --- livekit-rtc/livekit/rtc/audio_stream.py | 46 ++++++++++--------------- livekit-rtc/livekit/rtc/track.py | 12 ++++--- 2 files changed, 27 insertions(+), 31 deletions(-) diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index 9053ebd3..3bd3887f 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -16,6 +16,7 @@ import asyncio import json +import weakref from dataclasses import dataclass from typing import TYPE_CHECKING, Any, AsyncIterator, Optional @@ -105,7 +106,9 @@ def __init__( ``` """ self._track: Track | None = track - self._room: Room | None = room + self._room_ref: "Optional[weakref.ref[Room]]" = ( + weakref.ref(room) if room is not None else None + ) print("ROOM:", room) self._sample_rate = sample_rate self._num_channels = num_channels @@ -246,44 +249,33 @@ def from_track( ) def _set_room(self, room: Optional["Room"]) -> None: - self._room = room + self._room_ref = weakref.ref(room) if room is not None else None print("ROOM UPDATE:", room) - @property - def room(self) -> Optional["Room"]: - return self._room - - @property - def room_name(self) -> Optional[str]: - return self._room.name if self._room is not None else None - - @property - def participant_identity(self) -> Optional[str]: - pub = self._find_publication() - if pub is None: - return None - identity, _ = pub - return identity + if self._processor: + room = self._resolve_room() + participant_identity, publication_sid = self._find_publication() or ("", "") + self._processor._on_stream_info_updated( + room_name=room.name if room is not None else "", # FIXME: default value? + participant_identity=participant_identity, + publication_sid=publication_sid, + ) - @property - def publication_sid(self) -> Optional[str]: - pub = self._find_publication() - if pub is None: - return None - _, sid = pub - return sid + def _resolve_room(self) -> Optional["Room"]: + return self._room_ref() if self._room_ref is not None else None def _find_publication(self) -> Optional[tuple[str, str]]: - if self._room is None or self._track is None: + room = self._resolve_room() + if room is None or self._track is None: return None track_sid = self._track.sid if not track_sid: return None - for participant in self._room.remote_participants.values(): + for participant in room.remote_participants.values(): publication = participant.track_publications.get(track_sid) if publication is not None: return participant.identity, publication.sid - local = self._room._local_participant + local = room._local_participant if local is not None: for publication in local.track_publications.values(): if publication.sid == track_sid: diff --git a/livekit-rtc/livekit/rtc/track.py b/livekit-rtc/livekit/rtc/track.py index 9d1b8cbb..ea74be81 100644 --- a/livekit-rtc/livekit/rtc/track.py +++ b/livekit-rtc/livekit/rtc/track.py @@ -30,18 +30,22 @@ class Track: def __init__(self, owned_info: proto_track.OwnedTrack): self._info = owned_info.info self._ffi_handle = FfiHandle(owned_info.handle.id) - self._room: Optional["Room"] = None + self._room_ref: "Optional[weakref.ref[Room]]" = None self._audio_streams: "weakref.WeakSet[AudioStream]" = weakref.WeakSet() + def _resolve_room(self) -> Optional["Room"]: + return self._room_ref() if self._room_ref is not None else None + def _set_room(self, room: Optional["Room"]) -> None: - self._room = room + self._room_ref = weakref.ref(room) if room is not None else None for stream in self._audio_streams: stream._set_room(room) def _register_audio_stream(self, stream: "AudioStream") -> None: self._audio_streams.add(stream) - if self._room is not None: - stream._set_room(self._room) + room = self._resolve_room() + if room is not None: + stream._set_room(room) def _unregister_audio_stream(self, stream: "AudioStream") -> None: self._audio_streams.discard(stream) From fe09e781d0d1e503a26784a2309bb7fefa44f906 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 20 May 2026 12:58:29 -0400 Subject: [PATCH 3/7] feat: call _on_credentials_updated with token / server url extracted from room --- livekit-rtc/livekit/rtc/audio_stream.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index 3bd3887f..660079b8 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -254,6 +254,9 @@ def _set_room(self, room: Optional["Room"]) -> None: if self._processor: room = self._resolve_room() + if room and room._token is not None and room._server_url is not None: + self._processor._on_credentials_updated(token=room._token, url=room._server_url) + participant_identity, publication_sid = self._find_publication() or ("", "") self._processor._on_stream_info_updated( room_name=room.name if room is not None else "", # FIXME: default value? From 9cb2b49653bf1424ef23b2894667760d8c584621 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 20 May 2026 13:42:18 -0400 Subject: [PATCH 4/7] fix: remove debugging logs --- livekit-rtc/livekit/rtc/audio_stream.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index 660079b8..4de9d73e 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -109,7 +109,6 @@ def __init__( self._room_ref: "Optional[weakref.ref[Room]]" = ( weakref.ref(room) if room is not None else None ) - print("ROOM:", room) self._sample_rate = sample_rate self._num_channels = num_channels self._frame_size_ms = frame_size_ms @@ -250,7 +249,6 @@ def from_track( def _set_room(self, room: Optional["Room"]) -> None: self._room_ref = weakref.ref(room) if room is not None else None - print("ROOM UPDATE:", room) if self._processor: room = self._resolve_room() From 7cf5a400df96e559674548dcb91b48eb179c239a Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 20 May 2026 13:45:05 -0400 Subject: [PATCH 5/7] fix: address lint errors --- livekit-rtc/livekit/rtc/audio_stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index 4de9d73e..2ea64377 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -257,7 +257,7 @@ def _set_room(self, room: Optional["Room"]) -> None: participant_identity, publication_sid = self._find_publication() or ("", "") self._processor._on_stream_info_updated( - room_name=room.name if room is not None else "", # FIXME: default value? + room_name=room.name if room is not None else "", # FIXME: default value? participant_identity=participant_identity, publication_sid=publication_sid, ) From 51b5ba1503b2a987ef981e5f3c34bbfa0e47f74a Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 20 May 2026 13:46:49 -0400 Subject: [PATCH 6/7] feat: only call frame processor handlers if room is set --- livekit-rtc/livekit/rtc/audio_stream.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index 2ea64377..988afdf8 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -252,15 +252,16 @@ def _set_room(self, room: Optional["Room"]) -> None: if self._processor: room = self._resolve_room() - if room and room._token is not None and room._server_url is not None: - self._processor._on_credentials_updated(token=room._token, url=room._server_url) - - participant_identity, publication_sid = self._find_publication() or ("", "") - self._processor._on_stream_info_updated( - room_name=room.name if room is not None else "", # FIXME: default value? - participant_identity=participant_identity, - publication_sid=publication_sid, - ) + if room: + if room._token is not None and room._server_url is not None: + self._processor._on_credentials_updated(token=room._token, url=room._server_url) + + participant_identity, publication_sid = self._find_publication() or ("", "") + self._processor._on_stream_info_updated( + room_name=room.name, + participant_identity=participant_identity, + publication_sid=publication_sid, + ) def _resolve_room(self) -> Optional["Room"]: return self._room_ref() if self._room_ref is not None else None From 3e5a9ab1631a7100ed6872305d5c7582a7900de8 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 20 May 2026 16:58:34 -0400 Subject: [PATCH 7/7] fix: properly intercept room refresh token events --- livekit-rtc/livekit/rtc/audio_stream.py | 42 ++++++++++++++++--------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index 988afdf8..4e8e98f0 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -106,9 +106,7 @@ def __init__( ``` """ self._track: Track | None = track - self._room_ref: "Optional[weakref.ref[Room]]" = ( - weakref.ref(room) if room is not None else None - ) + self._room_ref: "Optional[weakref.ref[Room]]" = None self._sample_rate = sample_rate self._num_channels = num_channels self._frame_size_ms = frame_size_ms @@ -143,6 +141,8 @@ def __init__( self._ffi_handle = FfiHandle(stream.handle.id) self._info = stream.info + self._set_room(room) + if self._track is not None: self._track._register_audio_stream(self) @@ -248,20 +248,33 @@ def from_track( ) def _set_room(self, room: Optional["Room"]) -> None: + old_room = self._resolve_room() + if old_room is not room: + if old_room is not None: + old_room.off("token_refreshed", self._on_room_token_refreshed) + if room is not None: + room.on("token_refreshed", self._on_room_token_refreshed) + self._room_ref = weakref.ref(room) if room is not None else None - if self._processor: - room = self._resolve_room() - if room: - if room._token is not None and room._server_url is not None: - self._processor._on_credentials_updated(token=room._token, url=room._server_url) + if self._processor and room is not None: + if room._token is not None and room._server_url is not None: + self._processor._on_credentials_updated(token=room._token, url=room._server_url) - participant_identity, publication_sid = self._find_publication() or ("", "") - self._processor._on_stream_info_updated( - room_name=room.name, - participant_identity=participant_identity, - publication_sid=publication_sid, - ) + participant_identity, publication_sid = self._find_publication() or ("", "") + self._processor._on_stream_info_updated( + room_name=room.name, + participant_identity=participant_identity, + publication_sid=publication_sid, + ) + + def _on_room_token_refreshed(self) -> None: + if self._processor is None: + return + room = self._resolve_room() + if room is None or room._token is None or room._server_url is None: + return + self._processor._on_credentials_updated(token=room._token, url=room._server_url) def _resolve_room(self) -> Optional["Room"]: return self._room_ref() if self._room_ref is not None else None @@ -360,6 +373,7 @@ async def aclose(self) -> None: """ if self._track is not None: self._track._unregister_audio_stream(self) + self._set_room(None) self._ffi_handle.dispose() await self._task