Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 72 additions & 1 deletion livekit-rtc/livekit/rtc/audio_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

import asyncio
import json
import weakref
from dataclasses import dataclass
from typing import Any, AsyncIterator, Optional
from typing import TYPE_CHECKING, Any, AsyncIterator, Optional

from ._ffi_client import FfiClient, FfiHandle
from ._proto import audio_frame_pb2 as proto_audio_frame
Expand All @@ -30,6 +31,9 @@
from .track import Track
from .frame_processor import FrameProcessor

if TYPE_CHECKING:
from .room import Room


@dataclass
class AudioFrameEvent:
Expand Down Expand Up @@ -65,6 +69,7 @@ def __init__(
num_channels: int = 1,
frame_size_ms: int | None = None,
noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None,
room: Optional["Room"] = None,
**kwargs,
) -> None:
"""Initialize an `AudioStream` instance.
Expand All @@ -81,6 +86,9 @@ def __init__(
noise_cancellation (Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]], optional):
If noise cancellation is used, pass a `NoiseCancellationOptions` or `FrameProcessor[AudioFrame]` instance
created by the noise cancellation module.
room (Optional[Room], optional): The room this stream's track belongs to, used to
resolve `room_name`, `participant_identity`, and `publication_sid`. May be `None`
if the track is not (yet) associated with a room.

Example:
```python
Expand All @@ -98,6 +106,7 @@ def __init__(
```
"""
self._track: Track | None = track
self._room_ref: "Optional[weakref.ref[Room]]" = None
self._sample_rate = sample_rate
self._num_channels = num_channels
self._frame_size_ms = frame_size_ms
Expand Down Expand Up @@ -132,6 +141,11 @@ def __init__(
self._ffi_handle = FfiHandle(stream.handle.id)
self._info = stream.info

self._set_room(room)

if self._track is not None:
self._track._register_audio_stream(self)

@classmethod
def from_participant(
cls,
Expand All @@ -144,6 +158,7 @@ def from_participant(
num_channels: int = 1,
frame_size_ms: int | None = None,
noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None,
room: Optional["Room"] = None,
) -> AudioStream:
"""Create an `AudioStream` from a participant's audio track.

Expand Down Expand Up @@ -181,6 +196,7 @@ def from_participant(
num_channels=num_channels,
noise_cancellation=noise_cancellation,
frame_size_ms=frame_size_ms,
room=room,
)

@classmethod
Expand All @@ -194,6 +210,7 @@ def from_track(
num_channels: int = 1,
frame_size_ms: int | None = None,
noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None,
room: Optional["Room"] = None,
) -> AudioStream:
"""Create an `AudioStream` from an existing audio track.

Expand Down Expand Up @@ -227,8 +244,59 @@ def from_track(
num_channels=num_channels,
noise_cancellation=noise_cancellation,
frame_size_ms=frame_size_ms,
room=room,
)

def _set_room(self, room: Optional["Room"]) -> None:
old_room = self._resolve_room()
if old_room is not room:
if old_room is not None:
old_room.off("token_refreshed", self._on_room_token_refreshed)
if room is not None:
room.on("token_refreshed", self._on_room_token_refreshed)

self._room_ref = weakref.ref(room) if room is not None else None

if self._processor and room is not None:
if room._token is not None and room._server_url is not None:
self._processor._on_credentials_updated(token=room._token, url=room._server_url)

participant_identity, publication_sid = self._find_publication() or ("", "")
self._processor._on_stream_info_updated(
room_name=room.name,
participant_identity=participant_identity,
publication_sid=publication_sid,
)

def _on_room_token_refreshed(self) -> None:
if self._processor is None:
return
room = self._resolve_room()
if room is None or room._token is None or room._server_url is None:
return
self._processor._on_credentials_updated(token=room._token, url=room._server_url)

def _resolve_room(self) -> Optional["Room"]:
return self._room_ref() if self._room_ref is not None else None

def _find_publication(self) -> Optional[tuple[str, str]]:
room = self._resolve_room()
if room is None or self._track is None:
return None
track_sid = self._track.sid
if not track_sid:
return None
for participant in room.remote_participants.values():
publication = participant.track_publications.get(track_sid)
if publication is not None:
return participant.identity, publication.sid
local = room._local_participant
if local is not None:
for publication in local.track_publications.values():
if publication.sid == track_sid:
return local.identity, publication.sid
return None

def __del__(self) -> None:
FfiClient.instance.queue.unsubscribe(self._ffi_queue)

Expand Down Expand Up @@ -303,6 +371,9 @@ async def aclose(self) -> None:
This method cleans up resources associated with the audio stream and waits for
any pending operations to complete.
"""
if self._track is not None:
self._track._unregister_audio_stream(self)
self._set_room(None)
self._ffi_handle.dispose()
await self._task

Expand Down
8 changes: 8 additions & 0 deletions livekit-rtc/livekit/rtc/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,10 +726,14 @@ def _on_room_event(self, event: proto_room.RoomEvent):
sid = event.local_track_published.track_sid
lpublication = self.local_participant.track_publications[sid]
ltrack = lpublication.track
if ltrack is not None:
ltrack._set_room(self)
self.emit("local_track_published", lpublication, ltrack)
elif which == "local_track_unpublished":
sid = event.local_track_unpublished.publication_sid
lpublication = self.local_participant.track_publications[sid]
if lpublication.track is not None:
lpublication.track._set_room(None)
self.emit("local_track_unpublished", lpublication)
elif which == "local_track_republished":
# The SDK auto-republished a local track during a full
Expand Down Expand Up @@ -770,17 +774,21 @@ def _on_room_event(self, event: proto_room.RoomEvent):
rpublication._subscribed = True
if track_info.kind == TrackKind.KIND_VIDEO:
remote_video_track = RemoteVideoTrack(owned_track_info)
remote_video_track._set_room(self)
rpublication._track = remote_video_track
self.emit("track_subscribed", remote_video_track, rpublication, rparticipant)
elif track_info.kind == TrackKind.KIND_AUDIO:
remote_audio_track = RemoteAudioTrack(owned_track_info)
remote_audio_track._set_room(self)
rpublication._track = remote_audio_track
self.emit("track_subscribed", remote_audio_track, rpublication, rparticipant)
elif which == "track_unsubscribed":
identity = event.track_unsubscribed.participant_identity
rparticipant = self._remote_participants[identity]
rpublication = rparticipant.track_publications[event.track_unsubscribed.track_sid]
rtrack = rpublication.track
if rtrack is not None:
rtrack._set_room(None)
rpublication._track = None
rpublication._subscribed = False
self.emit("track_unsubscribed", rtrack, rpublication, rparticipant)
Expand Down
24 changes: 23 additions & 1 deletion livekit-rtc/livekit/rtc/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,43 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import TYPE_CHECKING, List, Union
import weakref
from typing import TYPE_CHECKING, List, Optional, Union
from ._ffi_client import FfiHandle, FfiClient
from ._proto import ffi_pb2 as proto_ffi
from ._proto import track_pb2 as proto_track
from ._proto import stats_pb2 as proto_stats

if TYPE_CHECKING:
from .audio_source import AudioSource
from .audio_stream import AudioStream
from .room import Room
from .video_source import VideoSource


class Track:
def __init__(self, owned_info: proto_track.OwnedTrack):
self._info = owned_info.info
self._ffi_handle = FfiHandle(owned_info.handle.id)
self._room_ref: "Optional[weakref.ref[Room]]" = None
self._audio_streams: "weakref.WeakSet[AudioStream]" = weakref.WeakSet()

def _resolve_room(self) -> Optional["Room"]:
return self._room_ref() if self._room_ref is not None else None

def _set_room(self, room: Optional["Room"]) -> None:
self._room_ref = weakref.ref(room) if room is not None else None
for stream in self._audio_streams:
stream._set_room(room)

def _register_audio_stream(self, stream: "AudioStream") -> None:
self._audio_streams.add(stream)
room = self._resolve_room()
if room is not None:
stream._set_room(room)

def _unregister_audio_stream(self, stream: "AudioStream") -> None:
self._audio_streams.discard(stream)

@property
def sid(self) -> str:
Expand Down
Loading