diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index 0f949d40..4e8e98f0 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -16,8 +16,9 @@ import asyncio import json +import weakref from dataclasses import dataclass -from typing import Any, AsyncIterator, Optional +from typing import TYPE_CHECKING, Any, AsyncIterator, Optional from ._ffi_client import FfiClient, FfiHandle from ._proto import audio_frame_pb2 as proto_audio_frame @@ -30,6 +31,9 @@ from .track import Track from .frame_processor import FrameProcessor +if TYPE_CHECKING: + from .room import Room + @dataclass class AudioFrameEvent: @@ -65,6 +69,7 @@ def __init__( num_channels: int = 1, frame_size_ms: int | None = None, noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None, + room: Optional["Room"] = None, **kwargs, ) -> None: """Initialize an `AudioStream` instance. @@ -81,6 +86,9 @@ def __init__( noise_cancellation (Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]], optional): If noise cancellation is used, pass a `NoiseCancellationOptions` or `FrameProcessor[AudioFrame]` instance created by the noise cancellation module. + room (Optional[Room], optional): The room this stream's track belongs to, used to + resolve `room_name`, `participant_identity`, and `publication_sid`. May be `None` + if the track is not (yet) associated with a room. Example: ```python @@ -98,6 +106,7 @@ def __init__( ``` """ self._track: Track | None = track + self._room_ref: "Optional[weakref.ref[Room]]" = None self._sample_rate = sample_rate self._num_channels = num_channels self._frame_size_ms = frame_size_ms @@ -132,6 +141,11 @@ def __init__( self._ffi_handle = FfiHandle(stream.handle.id) self._info = stream.info + self._set_room(room) + + if self._track is not None: + self._track._register_audio_stream(self) + @classmethod def from_participant( cls, @@ -144,6 +158,7 @@ def from_participant( num_channels: int = 1, frame_size_ms: int | None = None, noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None, + room: Optional["Room"] = None, ) -> AudioStream: """Create an `AudioStream` from a participant's audio track. @@ -181,6 +196,7 @@ def from_participant( num_channels=num_channels, noise_cancellation=noise_cancellation, frame_size_ms=frame_size_ms, + room=room, ) @classmethod @@ -194,6 +210,7 @@ def from_track( num_channels: int = 1, frame_size_ms: int | None = None, noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None, + room: Optional["Room"] = None, ) -> AudioStream: """Create an `AudioStream` from an existing audio track. @@ -227,8 +244,59 @@ def from_track( num_channels=num_channels, noise_cancellation=noise_cancellation, frame_size_ms=frame_size_ms, + room=room, ) + def _set_room(self, room: Optional["Room"]) -> None: + old_room = self._resolve_room() + if old_room is not room: + if old_room is not None: + old_room.off("token_refreshed", self._on_room_token_refreshed) + if room is not None: + room.on("token_refreshed", self._on_room_token_refreshed) + + self._room_ref = weakref.ref(room) if room is not None else None + + if self._processor and room is not None: + if room._token is not None and room._server_url is not None: + self._processor._on_credentials_updated(token=room._token, url=room._server_url) + + participant_identity, publication_sid = self._find_publication() or ("", "") + self._processor._on_stream_info_updated( + room_name=room.name, + participant_identity=participant_identity, + publication_sid=publication_sid, + ) + + def _on_room_token_refreshed(self) -> None: + if self._processor is None: + return + room = self._resolve_room() + if room is None or room._token is None or room._server_url is None: + return + self._processor._on_credentials_updated(token=room._token, url=room._server_url) + + def _resolve_room(self) -> Optional["Room"]: + return self._room_ref() if self._room_ref is not None else None + + def _find_publication(self) -> Optional[tuple[str, str]]: + room = self._resolve_room() + if room is None or self._track is None: + return None + track_sid = self._track.sid + if not track_sid: + return None + for participant in room.remote_participants.values(): + publication = participant.track_publications.get(track_sid) + if publication is not None: + return participant.identity, publication.sid + local = room._local_participant + if local is not None: + for publication in local.track_publications.values(): + if publication.sid == track_sid: + return local.identity, publication.sid + return None + def __del__(self) -> None: FfiClient.instance.queue.unsubscribe(self._ffi_queue) @@ -303,6 +371,9 @@ async def aclose(self) -> None: This method cleans up resources associated with the audio stream and waits for any pending operations to complete. """ + if self._track is not None: + self._track._unregister_audio_stream(self) + self._set_room(None) self._ffi_handle.dispose() await self._task diff --git a/livekit-rtc/livekit/rtc/room.py b/livekit-rtc/livekit/rtc/room.py index 5bf9e494..41392a4b 100644 --- a/livekit-rtc/livekit/rtc/room.py +++ b/livekit-rtc/livekit/rtc/room.py @@ -726,10 +726,14 @@ def _on_room_event(self, event: proto_room.RoomEvent): sid = event.local_track_published.track_sid lpublication = self.local_participant.track_publications[sid] ltrack = lpublication.track + if ltrack is not None: + ltrack._set_room(self) self.emit("local_track_published", lpublication, ltrack) elif which == "local_track_unpublished": sid = event.local_track_unpublished.publication_sid lpublication = self.local_participant.track_publications[sid] + if lpublication.track is not None: + lpublication.track._set_room(None) self.emit("local_track_unpublished", lpublication) elif which == "local_track_republished": # The SDK auto-republished a local track during a full @@ -770,10 +774,12 @@ def _on_room_event(self, event: proto_room.RoomEvent): rpublication._subscribed = True if track_info.kind == TrackKind.KIND_VIDEO: remote_video_track = RemoteVideoTrack(owned_track_info) + remote_video_track._set_room(self) rpublication._track = remote_video_track self.emit("track_subscribed", remote_video_track, rpublication, rparticipant) elif track_info.kind == TrackKind.KIND_AUDIO: remote_audio_track = RemoteAudioTrack(owned_track_info) + remote_audio_track._set_room(self) rpublication._track = remote_audio_track self.emit("track_subscribed", remote_audio_track, rpublication, rparticipant) elif which == "track_unsubscribed": @@ -781,6 +787,8 @@ def _on_room_event(self, event: proto_room.RoomEvent): rparticipant = self._remote_participants[identity] rpublication = rparticipant.track_publications[event.track_unsubscribed.track_sid] rtrack = rpublication.track + if rtrack is not None: + rtrack._set_room(None) rpublication._track = None rpublication._subscribed = False self.emit("track_unsubscribed", rtrack, rpublication, rparticipant) diff --git a/livekit-rtc/livekit/rtc/track.py b/livekit-rtc/livekit/rtc/track.py index 8a6fe692..ea74be81 100644 --- a/livekit-rtc/livekit/rtc/track.py +++ b/livekit-rtc/livekit/rtc/track.py @@ -12,7 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING, List, Union +import weakref +from typing import TYPE_CHECKING, List, Optional, Union from ._ffi_client import FfiHandle, FfiClient from ._proto import ffi_pb2 as proto_ffi from ._proto import track_pb2 as proto_track @@ -20,6 +21,8 @@ if TYPE_CHECKING: from .audio_source import AudioSource + from .audio_stream import AudioStream + from .room import Room from .video_source import VideoSource @@ -27,6 +30,25 @@ class Track: def __init__(self, owned_info: proto_track.OwnedTrack): self._info = owned_info.info self._ffi_handle = FfiHandle(owned_info.handle.id) + self._room_ref: "Optional[weakref.ref[Room]]" = None + self._audio_streams: "weakref.WeakSet[AudioStream]" = weakref.WeakSet() + + def _resolve_room(self) -> Optional["Room"]: + return self._room_ref() if self._room_ref is not None else None + + def _set_room(self, room: Optional["Room"]) -> None: + self._room_ref = weakref.ref(room) if room is not None else None + for stream in self._audio_streams: + stream._set_room(room) + + def _register_audio_stream(self, stream: "AudioStream") -> None: + self._audio_streams.add(stream) + room = self._resolve_room() + if room is not None: + stream._set_room(room) + + def _unregister_audio_stream(self, stream: "AudioStream") -> None: + self._audio_streams.discard(stream) @property def sid(self) -> str: