diff --git a/livekit-api/livekit/api/access_token.py b/livekit-api/livekit/api/access_token.py index 7b66c6a0..86766c3a 100644 --- a/livekit-api/livekit/api/access_token.py +++ b/livekit-api/livekit/api/access_token.py @@ -281,9 +281,9 @@ def verify(self, token: str, *, verify_signature: bool = True) -> Claims: return grant_claims -def camel_to_snake(t: str): +def camel_to_snake(t: str) -> str: return re.sub(r"(? str: return "".join(word.capitalize() if i else word for i, word in enumerate(t.split("_"))) diff --git a/livekit-api/livekit/api/livekit_api.py b/livekit-api/livekit/api/livekit_api.py index e57bb4e3..16d26831 100644 --- a/livekit-api/livekit/api/livekit_api.py +++ b/livekit-api/livekit/api/livekit_api.py @@ -6,7 +6,7 @@ from .sip_service import SipService from .agent_dispatch_service import AgentDispatchService from .connector_service import ConnectorService -from typing import Optional +from typing import Any, Optional class LiveKitAPI: @@ -96,21 +96,21 @@ def connector(self) -> ConnectorService: """Instance of the ConnectorService""" return self._connector - async def aclose(self): + async def aclose(self) -> None: """Close the API client Call this before your application exits or when the API client is no longer needed.""" # we do not close custom sessions, that's up to the caller - if not self._custom_session: + if not self._custom_session and self._session is not None: await self._session.close() - async def __aenter__(self): + async def __aenter__(self) -> "LiveKitAPI": """@private Support for `async with`""" return self - async def __aexit__(self, exc_type, exc_val, exc_tb): + async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: """@private Support for `async with`""" diff --git a/livekit-rtc/livekit/rtc/_ffi_client.py b/livekit-rtc/livekit/rtc/_ffi_client.py index 21a91e97..d9a12a0e 100644 --- a/livekit-rtc/livekit/rtc/_ffi_client.py +++ b/livekit-rtc/livekit/rtc/_ffi_client.py @@ -34,7 +34,7 @@ atexit.register(_resource_files.close) -def _lib_name(): +def _lib_name() -> Optional[str]: if platform.system() == "Linux": return "liblivekit_ffi.so" elif platform.system() == "Darwin": @@ -44,7 +44,7 @@ def _lib_name(): return None -def get_ffi_lib(): +def get_ffi_lib() -> ctypes.CDLL: # allow to override the lib path using an env var libpath = os.environ.get("LIVEKIT_LIB_PATH", "").strip() if libpath: @@ -73,7 +73,7 @@ def __init__(self, handle: int) -> None: self.handle = handle self._disposed = False - def __del__(self): + def __del__(self) -> None: self.dispose() @property @@ -149,7 +149,7 @@ def unsubscribe(self, queue: Queue[T]) -> None: break -@ctypes.CFUNCTYPE(None, ctypes.POINTER(ctypes.c_uint8), ctypes.c_size_t) +@ctypes.CFUNCTYPE(None, ctypes.POINTER(ctypes.c_uint8), ctypes.c_size_t) # type: ignore[untyped-decorator] def ffi_event_callback( data_ptr: ctypes.POINTER(ctypes.c_uint8), # type: ignore data_len: ctypes.c_size_t, @@ -255,7 +255,7 @@ def __init__(self) -> None: ffi_lib = self._ffi_lib @atexit.register - def _dispose_lk_ffi(): + def _dispose_lk_ffi() -> None: ffi_lib.livekit_ffi_dispose() @property diff --git a/livekit-rtc/livekit/rtc/_utils.py b/livekit-rtc/livekit/rtc/_utils.py index bbaad0e1..91175815 100644 --- a/livekit-rtc/livekit/rtc/_utils.py +++ b/livekit-rtc/livekit/rtc/_utils.py @@ -17,16 +17,16 @@ from collections import deque import ctypes import random -from typing import Callable, Generator, Generic, List, TypeVar, Union +from typing import Any, Callable, Generator, Generic, List, TypeVar, Union logger = logging.getLogger("livekit") class classproperty(object): - def __init__(self, f): - self.f = classmethod(f) + def __init__(self, f: Callable[..., Any]) -> None: + self.f: Any = classmethod(f) - def __get__(self, *a): + def __get__(self, *a: Any) -> Any: return self.f.__get__(*a)() @@ -146,7 +146,7 @@ async def join(self) -> None: _base62_characters = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" -def generate_random_base62(length=12): +def generate_random_base62(length: int = 12) -> str: """ Generate a random base62 encoded string of a specified length. diff --git a/livekit-rtc/livekit/rtc/audio_frame.py b/livekit-rtc/livekit/rtc/audio_frame.py index ab2b7ebd..0a776adc 100644 --- a/livekit-rtc/livekit/rtc/audio_frame.py +++ b/livekit-rtc/livekit/rtc/audio_frame.py @@ -193,7 +193,7 @@ def __repr__(self) -> str: ) @classmethod - def __get_pydantic_core_schema__(cls, *_: Any): + def __get_pydantic_core_schema__(cls, *_: Any) -> Any: from pydantic_core import core_schema import base64 diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index 0f949d40..875b8db2 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -65,7 +65,7 @@ def __init__( num_channels: int = 1, frame_size_ms: int | None = None, noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None, - **kwargs, + **kwargs: Any, ) -> None: """Initialize an `AudioStream` instance. @@ -273,7 +273,7 @@ def _create_owned_stream_from_participant( resp = FfiClient.instance.request(req) return resp.audio_stream_from_participant.stream - async def _run(self): + async def _run(self) -> None: while True: event = await self._ffi_queue.wait_for(self._is_event) audio_event: proto_audio_frame.AudioStreamEvent = event.audio_stream_event diff --git a/livekit-rtc/livekit/rtc/data_stream.py b/livekit-rtc/livekit/rtc/data_stream.py index beb806f4..bf62a217 100644 --- a/livekit-rtc/livekit/rtc/data_stream.py +++ b/livekit-rtc/livekit/rtc/data_stream.py @@ -66,10 +66,10 @@ def __init__( ) self._queue: asyncio.Queue[proto_DataStream.Chunk | None] = asyncio.Queue() - async def _on_chunk_update(self, chunk: proto_DataStream.Chunk): + async def _on_chunk_update(self, chunk: proto_DataStream.Chunk) -> None: await self._queue.put(chunk) - async def _on_stream_close(self, trailer: proto_DataStream.Trailer): + async def _on_stream_close(self, trailer: proto_DataStream.Trailer) -> None: self.info.attributes = self.info.attributes or {} self.info.attributes.update(trailer.attributes) await self._queue.put(None) @@ -114,10 +114,10 @@ def __init__(self, header: proto_DataStream.Header, capacity: int = 0) -> None: ) self._queue: asyncio.Queue[proto_DataStream.Chunk | None] = asyncio.Queue(capacity) - async def _on_chunk_update(self, chunk: proto_DataStream.Chunk): + async def _on_chunk_update(self, chunk: proto_DataStream.Chunk) -> None: await self._queue.put(chunk) - async def _on_stream_close(self, trailer: proto_DataStream.Trailer): + async def _on_stream_close(self, trailer: proto_DataStream.Trailer) -> None: self.info.attributes = self.info.attributes or {} self.info.attributes.update(trailer.attributes) await self._queue.put(None) @@ -166,7 +166,7 @@ def __init__( self._sender_identity = sender_identity or self._local_participant.identity self._closed = False - async def _send_header(self): + async def _send_header(self) -> None: req = proto_ffi.FfiRequest( send_stream_header=proto_room.SendStreamHeaderRequest( header=self._header, @@ -188,7 +188,7 @@ async def _send_header(self): if cb.send_stream_header.error: raise ConnectionError(cb.send_stream_header.error) - async def _send_chunk(self, chunk: proto_DataStream.Chunk): + async def _send_chunk(self, chunk: proto_DataStream.Chunk) -> None: if self._closed: raise RuntimeError(f"Cannot send chunk after stream is closed: {chunk}") req = proto_ffi.FfiRequest( @@ -212,7 +212,7 @@ async def _send_chunk(self, chunk: proto_DataStream.Chunk): if cb.send_stream_chunk.error: raise ConnectionError(cb.send_stream_chunk.error) - async def _send_trailer(self, trailer: proto_DataStream.Trailer): + async def _send_trailer(self, trailer: proto_DataStream.Trailer) -> None: req = proto_ffi.FfiRequest( send_stream_trailer=proto_room.SendStreamTrailerRequest( trailer=trailer, @@ -230,10 +230,12 @@ async def _send_trailer(self, trailer: proto_DataStream.Trailer): finally: FfiClient.instance.queue.unsubscribe(queue) - if cb.send_stream_chunk.error: + if cb.send_stream_trailer.error: raise ConnectionError(cb.send_stream_trailer.error) - async def aclose(self, *, reason: str = "", attributes: Optional[Dict[str, str]] = None): + async def aclose( + self, *, reason: str = "", attributes: Optional[Dict[str, str]] = None + ) -> None: if self._closed: raise RuntimeError("Stream already closed") self._closed = True @@ -281,7 +283,7 @@ def __init__( ) self._write_lock = asyncio.Lock() - async def write(self, text: str): + async def write(self, text: str) -> None: async with self._write_lock: for chunk in split_utf8(text, STREAM_CHUNK_SIZE): content = chunk @@ -333,7 +335,7 @@ def __init__( ) self._write_lock = asyncio.Lock() - async def write(self, data: bytes): + async def write(self, data: bytes) -> None: async with self._write_lock: chunked_data = [ data[i : i + STREAM_CHUNK_SIZE] for i in range(0, len(data), STREAM_CHUNK_SIZE) diff --git a/livekit-rtc/livekit/rtc/data_track.py b/livekit-rtc/livekit/rtc/data_track.py index 255b49aa..a56f1098 100644 --- a/livekit-rtc/livekit/rtc/data_track.py +++ b/livekit-rtc/livekit/rtc/data_track.py @@ -117,7 +117,7 @@ def is_published(self) -> bool: req.local_data_track_is_published.track_handle = self._ffi_handle.handle resp = FfiClient.instance.request(req) - return resp.local_data_track_is_published.is_published + return bool(resp.local_data_track_is_published.is_published) async def unpublish(self) -> None: """Unpublishes the track.""" @@ -181,7 +181,7 @@ def is_published(self) -> bool: req.remote_data_track_is_published.track_handle = self._ffi_handle.handle resp = FfiClient.instance.request(req) - return resp.remote_data_track_is_published.is_published + return bool(resp.remote_data_track_is_published.is_published) def __repr__(self) -> str: return ( diff --git a/livekit-rtc/livekit/rtc/e2ee.py b/livekit-rtc/livekit/rtc/e2ee.py index 3e72d830..8306658e 100644 --- a/livekit-rtc/livekit/rtc/e2ee.py +++ b/livekit-rtc/livekit/rtc/e2ee.py @@ -13,7 +13,7 @@ # limitations under the License. from dataclasses import dataclass, field -from typing import List, Optional +from typing import List, Optional, cast from ._ffi_client import FfiClient from ._proto import e2ee_pb2 as proto_e2ee @@ -89,7 +89,7 @@ def export_shared_key(self, key_index: int) -> bytes: req.e2ee.get_shared_key.key_index = key_index resp = FfiClient.instance.request(req) key = resp.e2ee.get_shared_key.key - return key + return cast(bytes, key) def ratchet_shared_key(self, key_index: int) -> bytes: """Ratchets the shared encryption key to a new key. @@ -112,7 +112,7 @@ def ratchet_shared_key(self, key_index: int) -> bytes: resp = FfiClient.instance.request(req) new_key = resp.e2ee.ratchet_shared_key.new_key - return new_key + return cast(bytes, new_key) def set_key(self, participant_identity: str, key: bytes, key_index: int) -> None: """Sets the encryption key for a specific participant. @@ -157,7 +157,7 @@ def export_key(self, participant_identity: str, key_index: int) -> bytes: req.e2ee.get_key.key_index = key_index resp = FfiClient.instance.request(req) key = resp.e2ee.get_key.key - return key + return cast(bytes, key) def ratchet_key(self, participant_identity: str, key_index: int) -> bytes: """Ratchets the encryption key for a specific participant to a new key. @@ -181,7 +181,7 @@ def ratchet_key(self, participant_identity: str, key_index: int) -> bytes: resp = FfiClient.instance.request(req) new_key = resp.e2ee.ratchet_key.new_key - return new_key + return cast(bytes, new_key) class FrameCryptor: diff --git a/livekit-rtc/livekit/rtc/event_emitter.py b/livekit-rtc/livekit/rtc/event_emitter.py index c2fabae5..bc64d1d7 100644 --- a/livekit-rtc/livekit/rtc/event_emitter.py +++ b/livekit-rtc/livekit/rtc/event_emitter.py @@ -1,6 +1,6 @@ import inspect import asyncio -from typing import Callable, Dict, Set, Optional, Generic, TypeVar +from typing import Any, Callable, Dict, Set, Optional, Generic, TypeVar from .log import logger @@ -14,7 +14,7 @@ def __init__(self) -> None: """ self._events: Dict[T_contra, Set[Callable]] = dict() - def emit(self, event: T_contra, *args) -> None: + def emit(self, event: T_contra, *args: Any) -> None: """ Trigger all callbacks associated with the given event. @@ -104,7 +104,7 @@ def greet_once(name): """ if callback is not None: - def once_callback(*args, **kwargs): + def once_callback(*args: Any, **kwargs: Any) -> None: self.off(event, once_callback) callback(*args, **kwargs) diff --git a/livekit-rtc/livekit/rtc/jupyter.py b/livekit-rtc/livekit/rtc/jupyter.py index f6a3b689..8a2a2367 100644 --- a/livekit-rtc/livekit/rtc/jupyter.py +++ b/livekit-rtc/livekit/rtc/jupyter.py @@ -50,7 +50,7 @@ def room_html(url: str, token: str, *, width: str, height: str) -> HTML: f'srcdoc="{escaped_content}">' ) - return HTML(html_text) + return HTML(html_text) # type: ignore[no-untyped-call] def display_room(url: str, token: str, *, width: str = "100%", height: str = "110px") -> None: @@ -65,4 +65,4 @@ def display_room(url: str, token: str, *, width: str = "100%", height: str = "11 The rendered HTML will include the provided `url` and `token` in plain text. Avoid using sensitive tokens in public notebooks (e.g., tokens with long expiration times). """ - display(room_html(url, token, width=width, height=height)) + display(room_html(url, token, width=width, height=height)) # type: ignore[no-untyped-call] diff --git a/livekit-rtc/livekit/rtc/media_devices.py b/livekit-rtc/livekit/rtc/media_devices.py index 4958a9de..4567e660 100644 --- a/livekit-rtc/livekit/rtc/media_devices.py +++ b/livekit-rtc/livekit/rtc/media_devices.py @@ -22,7 +22,7 @@ import threading if TYPE_CHECKING: - import sounddevice as sd # type: ignore[import-not-found, import-untyped] + import sounddevice as sd # type: ignore[import-not-found] from . import AudioSource from .audio_frame import AudioFrame @@ -163,7 +163,7 @@ def __init__( output_device: Optional[int] = None, delay_estimator: Optional[_APMDelayEstimator] = None, ) -> None: - import sounddevice as sd # type: ignore[import-not-found, import-untyped] + import sounddevice as sd self._sample_rate = sample_rate self._num_channels = num_channels @@ -303,16 +303,17 @@ async def start(self) -> None: if self._mixer is None: self._mixer = AudioMixer(sample_rate=self._sample_rate, num_channels=self._num_channels) - async def _playback_loop(): + async def _playback_loop() -> None: """Internal playback loop that consumes frames from the mixer.""" self._running = True self._stream.start() try: - async for frame in self._mixer: - if not self._running: - break - # Append raw PCM bytes for callback consumption - self._buffer.extend(frame.data.tobytes()) + if self._mixer is not None: + async for frame in self._mixer: + if not self._running: + break + # Append raw PCM bytes for callback consumption + self._buffer.extend(frame.data.tobytes()) finally: self._running = False try: @@ -402,7 +403,7 @@ def list_input_devices(self) -> list[dict[str, Any]]: Returns a list of dictionaries with the `sounddevice` metadata and an added `index` key corresponding to the device index. """ - import sounddevice as sd # type: ignore[import-not-found, import-untyped] + import sounddevice as sd devices = sd.query_devices() result: list[dict[str, Any]] = [] @@ -413,7 +414,7 @@ def list_input_devices(self) -> list[dict[str, Any]]: def list_output_devices(self) -> list[dict[str, Any]]: """List available output devices with indices.""" - import sounddevice as sd # type: ignore[import-not-found, import-untyped] + import sounddevice as sd devices = sd.query_devices() result: list[dict[str, Any]] = [] @@ -424,14 +425,14 @@ def list_output_devices(self) -> list[dict[str, Any]]: def default_input_device(self) -> Optional[int]: """Return the default input device index (or None).""" - import sounddevice as sd # type: ignore[import-not-found, import-untyped] + import sounddevice as sd dev = sd.default.device return dev[0] if isinstance(dev, (list, tuple)) else None def default_output_device(self) -> Optional[int]: """Return the default output device index (or None).""" - import sounddevice as sd # type: ignore[import-not-found, import-untyped] + import sounddevice as sd dev = sd.default.device return dev[1] if isinstance(dev, (list, tuple)) else None @@ -471,7 +472,7 @@ def open_input( Returns: InputCapture: Holder with `source`, `apm`, and `aclose()`. """ - import sounddevice as sd # type: ignore[import-not-found, import-untyped] + import sounddevice as sd loop = self._loop source = AudioSource(self._in_sr, self._channels, loop=loop) diff --git a/livekit-rtc/livekit/rtc/participant.py b/livekit-rtc/livekit/rtc/participant.py index e8a3fcf0..fd259ac1 100644 --- a/livekit-rtc/livekit/rtc/participant.py +++ b/livekit-rtc/livekit/rtc/participant.py @@ -50,6 +50,7 @@ from .rpc import RpcInvocationData from .data_stream import ( TextStreamWriter, + TextStreamInfo, ByteStreamWriter, ByteStreamInfo, STREAM_CHUNK_SIZE, @@ -192,7 +193,7 @@ def __init__( ) -> None: super().__init__(owned_info) self._room_queue = room_queue - self._track_publications: dict[str, LocalTrackPublication] = {} # type: ignore + self._track_publications: dict[str, LocalTrackPublication] = {} self._rpc_handlers: Dict[str, RpcHandler] = {} @property @@ -357,7 +358,7 @@ async def perform_rpc( if cb.perform_rpc.HasField("error"): raise RpcError._from_proto(cb.perform_rpc.error) - return cb.perform_rpc.payload + return cast(str, cb.perform_rpc.payload) def register_rpc_method( self, @@ -617,7 +618,7 @@ async def send_text( topic: str = "", attributes: Optional[Dict[str, str]] = None, reply_to_id: str | None = None, - ): + ) -> TextStreamInfo: total_size = len(text.encode()) writer = await self.stream_text( destination_identities=destination_identities, @@ -811,7 +812,7 @@ def __repr__(self) -> str: class RemoteParticipant(Participant): def __init__(self, owned_info: proto_participant.OwnedParticipant) -> None: super().__init__(owned_info) - self._track_publications: dict[str, RemoteTrackPublication] = {} # type: ignore + self._track_publications: dict[str, RemoteTrackPublication] = {} @property def track_publications(self) -> Mapping[str, RemoteTrackPublication]: diff --git a/livekit-rtc/livekit/rtc/room.py b/livekit-rtc/livekit/rtc/room.py index 7ccda2e4..d7957449 100644 --- a/livekit-rtc/livekit/rtc/room.py +++ b/livekit-rtc/livekit/rtc/room.py @@ -511,10 +511,10 @@ def on_participant_connected(participant): if options.rtc_config: req.connect.options.rtc_config.ice_transport_type = ( options.rtc_config.ice_transport_type - ) # type: ignore + ) req.connect.options.rtc_config.continual_gathering_policy = ( options.rtc_config.continual_gathering_policy - ) # type: ignore + ) req.connect.options.rtc_config.ice_servers.extend(options.rtc_config.ice_servers) # subscribe before connecting so we don't miss any events @@ -587,25 +587,25 @@ async def get_rtc_stats(self) -> RtcStats: return RtcStats(publisher_stats=publisher_stats, subscriber_stats=subscriber_stats) - def register_byte_stream_handler(self, topic: str, handler: ByteStreamHandler): + def register_byte_stream_handler(self, topic: str, handler: ByteStreamHandler) -> None: existing_handler = self._byte_stream_handlers.get(topic) if existing_handler is None: self._byte_stream_handlers[topic] = handler else: raise ValueError("byte stream handler for topic '%s' already set" % topic) - def unregister_byte_stream_handler(self, topic: str): + def unregister_byte_stream_handler(self, topic: str) -> None: if self._byte_stream_handlers.get(topic): self._byte_stream_handlers.pop(topic) - def register_text_stream_handler(self, topic: str, handler: TextStreamHandler): + def register_text_stream_handler(self, topic: str, handler: TextStreamHandler) -> None: existing_handler = self._text_stream_handlers.get(topic) if existing_handler is None: self._text_stream_handlers[topic] = handler else: raise ValueError("text stream handler for topic '%s' already set" % topic) - def unregister_text_stream_handler(self, topic: str): + def unregister_text_stream_handler(self, topic: str) -> None: if self._text_stream_handlers.get(topic): self._text_stream_handlers.pop(topic) @@ -650,7 +650,7 @@ async def disconnect( req = proto_ffi.FfiRequest() req.disconnect.room_handle = self._ffi_handle.handle # type: ignore - req.disconnect.reason = reason # type: ignore + req.disconnect.reason = reason queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) @@ -697,7 +697,7 @@ async def _listen_task(self) -> None: await self._drain_rpc_invocation_tasks() await self._drain_data_stream_tasks() - def _on_rpc_method_invocation(self, rpc_invocation: RpcMethodInvocationEvent): + def _on_rpc_method_invocation(self, rpc_invocation: RpcMethodInvocationEvent) -> None: if self._local_participant is None: return @@ -715,7 +715,7 @@ def _on_rpc_method_invocation(self, rpc_invocation: RpcMethodInvocationEvent): self._rpc_invocation_tasks.add(task) task.add_done_callback(self._rpc_invocation_tasks.discard) - def _on_room_event(self, event: proto_room.RoomEvent): + def _on_room_event(self, event: proto_room.RoomEvent) -> None: which = event.WhichOneof("message") if which == "participant_connected": rparticipant = self._create_remote_participant(event.participant_connected.info) @@ -886,7 +886,7 @@ def _on_room_event(self, event: proto_room.RoomEvent): participant, event.participant_encryption_status_changed.is_encrypted, ) - elif which == "participant_permissions_changed": + elif which == "participant_permission_changed": identity = event.participant_permission_changed.participant_identity participant = self._retrieve_participant(identity) assert isinstance(participant, Participant) @@ -1021,7 +1021,7 @@ def _on_room_event(self, event: proto_room.RoomEvent): def _handle_stream_header( self, header: proto_room.DataStream.Header, participant_identity: str - ): + ) -> None: stream_type = header.WhichOneof("content_header") if stream_type == "text_header": text_stream_handler = self._text_stream_handlers.get(header.topic) @@ -1051,7 +1051,7 @@ def _handle_stream_header( logging.warning("received unknown header type, %s", stream_type) pass - async def _handle_stream_chunk(self, chunk: proto_room.DataStream.Chunk): + async def _handle_stream_chunk(self, chunk: proto_room.DataStream.Chunk) -> None: text_reader = self._text_stream_readers.get(chunk.stream_id) file_reader = self._byte_stream_readers.get(chunk.stream_id) @@ -1060,7 +1060,7 @@ async def _handle_stream_chunk(self, chunk: proto_room.DataStream.Chunk): elif file_reader: await file_reader._on_chunk_update(chunk) - async def _handle_stream_trailer(self, trailer: proto_room.DataStream.Trailer): + async def _handle_stream_trailer(self, trailer: proto_room.DataStream.Trailer) -> None: text_reader = self._text_stream_readers.get(trailer.stream_id) file_reader = self._byte_stream_readers.get(trailer.stream_id) diff --git a/livekit-rtc/livekit/rtc/synchronizer.py b/livekit-rtc/livekit/rtc/synchronizer.py index 8c5f6347..813a3641 100644 --- a/livekit-rtc/livekit/rtc/synchronizer.py +++ b/livekit-rtc/livekit/rtc/synchronizer.py @@ -2,7 +2,7 @@ import logging import time from collections import deque -from typing import Optional, Union +from typing import Any, Optional, Union from .video_frame import VideoFrame from .audio_frame import AudioFrame @@ -149,7 +149,7 @@ def __init__(self, *, expected_fps: float, max_delay_tolerance_ms: float = 300) async def __aenter__(self) -> None: await self.wait_next_process() - async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: + async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: self.after_process() def reset(self) -> None: diff --git a/livekit-rtc/livekit/rtc/track.py b/livekit-rtc/livekit/rtc/track.py index 8a6fe692..0eeb3465 100644 --- a/livekit-rtc/livekit/rtc/track.py +++ b/livekit-rtc/livekit/rtc/track.py @@ -80,14 +80,14 @@ def create_audio_track(name: str, source: "AudioSource") -> "LocalAudioTrack": resp = FfiClient.instance.request(req) return LocalAudioTrack(resp.create_audio_track.track) - def mute(self): + def mute(self) -> None: req = proto_ffi.FfiRequest() req.local_track_mute.track_handle = self._ffi_handle.handle req.local_track_mute.mute = True FfiClient.instance.request(req) self._info.muted = True - def unmute(self): + def unmute(self) -> None: req = proto_ffi.FfiRequest() req.local_track_mute.track_handle = self._ffi_handle.handle req.local_track_mute.mute = False @@ -111,14 +111,14 @@ def create_video_track(name: str, source: "VideoSource") -> "LocalVideoTrack": resp = FfiClient.instance.request(req) return LocalVideoTrack(resp.create_video_track.track) - def mute(self): + def mute(self) -> None: req = proto_ffi.FfiRequest() req.local_track_mute.track_handle = self._ffi_handle.handle req.local_track_mute.mute = True FfiClient.instance.request(req) self._info.muted = True - def unmute(self): + def unmute(self) -> None: req = proto_ffi.FfiRequest() req.local_track_mute.track_handle = self._ffi_handle.handle req.local_track_mute.mute = False diff --git a/livekit-rtc/livekit/rtc/track_publication.py b/livekit-rtc/livekit/rtc/track_publication.py index 84282263..fbe4951b 100644 --- a/livekit-rtc/livekit/rtc/track_publication.py +++ b/livekit-rtc/livekit/rtc/track_publication.py @@ -110,7 +110,7 @@ def track(self) -> Optional[RemoteTrack]: def subscribed(self) -> bool: return self._subscribed - def set_subscribed(self, subscribed: bool): + def set_subscribed(self, subscribed: bool) -> None: req = proto_ffi.FfiRequest() req.set_subscribed.subscribe = subscribed req.set_subscribed.publication_handle = self._ffi_handle.handle diff --git a/livekit-rtc/livekit/rtc/video_frame.py b/livekit-rtc/livekit/rtc/video_frame.py index ff0035e8..30255323 100644 --- a/livekit-rtc/livekit/rtc/video_frame.py +++ b/livekit-rtc/livekit/rtc/video_frame.py @@ -204,7 +204,7 @@ def __repr__(self) -> str: return f"rtc.VideoFrame(width={self.width}, height={self.height}, type={proto_video.VideoBufferType.Name(self.type)})" @classmethod - def __get_pydantic_core_schema__(cls, *_: Any): + def __get_pydantic_core_schema__(cls, *_: Any) -> Any: from pydantic_core import core_schema import base64 diff --git a/livekit-rtc/livekit/rtc/video_stream.py b/livekit-rtc/livekit/rtc/video_stream.py index 799de195..eeced550 100644 --- a/livekit-rtc/livekit/rtc/video_stream.py +++ b/livekit-rtc/livekit/rtc/video_stream.py @@ -45,7 +45,7 @@ def __init__( loop: Optional[asyncio.AbstractEventLoop] = None, capacity: int = 0, format: Optional[proto_video_frame.VideoBufferType.ValueType] = None, - **kwargs, + **kwargs: Any, ) -> None: self._loop = loop or asyncio.get_event_loop() # Only subscribe to video_stream_event to avoid unnecessary memory allocations diff --git a/livekit-rtc/tests/test_e2ee.py b/livekit-rtc/tests/test_e2ee.py index cf931c43..47a192d7 100644 --- a/livekit-rtc/tests/test_e2ee.py +++ b/livekit-rtc/tests/test_e2ee.py @@ -18,7 +18,7 @@ class TestKeyProviderOptions: """Tests for KeyProviderOptions dataclass.""" - def test_default_values(self): + def test_default_values(self) -> None: """Test that KeyProviderOptions has correct default values.""" from livekit.rtc.e2ee import ( KeyProviderOptions, @@ -38,7 +38,7 @@ def test_default_values(self): assert options.key_ring_size == DEFAULT_KEY_RING_SIZE assert options.key_derivation_function == proto_e2ee.KeyDerivationFunction.PBKDF2 - def test_custom_values(self): + def test_custom_values(self) -> None: """Test KeyProviderOptions with custom values.""" from livekit.rtc.e2ee import KeyProviderOptions from livekit.rtc._proto import e2ee_pb2 as proto_e2ee @@ -59,7 +59,7 @@ def test_custom_values(self): assert options.key_ring_size == 8 assert options.key_derivation_function == proto_e2ee.KeyDerivationFunction.HKDF - def test_various_key_lengths(self): + def test_various_key_lengths(self) -> None: """Test that shared_key accepts various lengths.""" from livekit.rtc.e2ee import KeyProviderOptions @@ -85,7 +85,7 @@ def test_various_key_lengths(self): class TestE2EEOptions: """Tests for E2EEOptions dataclass.""" - def test_default_values(self): + def test_default_values(self) -> None: """Test E2EEOptions default values.""" from livekit.rtc.e2ee import E2EEOptions, KeyProviderOptions from livekit.rtc._proto import e2ee_pb2 as proto_e2ee @@ -95,7 +95,7 @@ def test_default_values(self): assert isinstance(options.key_provider_options, KeyProviderOptions) assert options.encryption_type == proto_e2ee.EncryptionType.GCM - def test_with_shared_key(self): + def test_with_shared_key(self) -> None: """Test E2EEOptions with a shared key.""" from livekit.rtc.e2ee import E2EEOptions, KeyProviderOptions @@ -108,7 +108,7 @@ def test_with_shared_key(self): class TestProtoMessageBuilding: """Tests for proto message building with E2EE options.""" - def test_proto_key_provider_options_fields(self): + def test_proto_key_provider_options_fields(self) -> None: """Test that proto KeyProviderOptions has all required fields.""" from livekit.rtc._proto import e2ee_pb2 as proto_e2ee @@ -130,7 +130,7 @@ def test_proto_key_provider_options_fields(self): assert proto_options.key_ring_size == 16 assert proto_options.key_derivation_function == proto_e2ee.KeyDerivationFunction.PBKDF2 - def test_proto_serialization(self): + def test_proto_serialization(self) -> None: """Test that proto message can be serialized without errors.""" from livekit.rtc._proto import e2ee_pb2 as proto_e2ee @@ -151,7 +151,7 @@ def test_proto_serialization(self): assert parsed.key_ring_size == 16 assert parsed.key_derivation_function == proto_e2ee.KeyDerivationFunction.PBKDF2 - def test_e2ee_options_proto_serialization(self): + def test_e2ee_options_proto_serialization(self) -> None: """Test full E2eeOptions proto serialization.""" from livekit.rtc._proto import e2ee_pb2 as proto_e2ee @@ -174,7 +174,7 @@ def test_e2ee_options_proto_serialization(self): class TestPublicExports: """Tests for public API exports.""" - def test_key_derivation_function_exported(self): + def test_key_derivation_function_exported(self) -> None: """Test that KeyDerivationFunction is exported from the package.""" from livekit.rtc import KeyDerivationFunction @@ -182,7 +182,7 @@ def test_key_derivation_function_exported(self): assert KeyDerivationFunction.PBKDF2 == 0 assert KeyDerivationFunction.HKDF == 1 - def test_encryption_type_exported(self): + def test_encryption_type_exported(self) -> None: """Test that EncryptionType is exported from the package.""" from livekit.rtc import EncryptionType @@ -190,7 +190,7 @@ def test_encryption_type_exported(self): assert EncryptionType.GCM == 1 assert EncryptionType.CUSTOM == 2 - def test_e2ee_classes_exported(self): + def test_e2ee_classes_exported(self) -> None: """Test that E2EE classes are exported from the package.""" from livekit.rtc import E2EEOptions, KeyProviderOptions diff --git a/livekit-rtc/tests/test_e2ee_per_participant.py b/livekit-rtc/tests/test_e2ee_per_participant.py index 1c0ad288..6c90e9bb 100644 --- a/livekit-rtc/tests/test_e2ee_per_participant.py +++ b/livekit-rtc/tests/test_e2ee_per_participant.py @@ -19,7 +19,7 @@ import asyncio import os import uuid -from typing import Callable, TypeVar +from typing import Any, Callable, TypeVar import pytest @@ -40,7 +40,7 @@ T = TypeVar("T") -def skip_if_no_credentials(): +def skip_if_no_credentials() -> Any: required_vars = ["LIVEKIT_URL", "LIVEKIT_API_KEY", "LIVEKIT_API_SECRET"] missing = [var for var in required_vars if not os.getenv(var)] return pytest.mark.skipif( @@ -126,8 +126,8 @@ async def publish_dummy_video(source: rtc.VideoSource, stop_event: asyncio.Event @pytest.mark.asyncio -@skip_if_no_credentials() -async def test_e2ee_per_participant(): +@skip_if_no_credentials() # type: ignore[untyped-decorator] +async def test_e2ee_per_participant() -> None: """E2E test for per-participant E2EE keys. E2EE per-participant E2E test: @@ -146,6 +146,7 @@ async def test_e2ee_per_participant(): """ room_name = unique_room_name("test-e2ee-per-participant") url = os.getenv("LIVEKIT_URL") + assert url is not None publisher_room = rtc.Room() receiver1_room = rtc.Room() @@ -197,7 +198,7 @@ def _on_state(participant: rtc.Participant, state: int) -> None: wire_receiver(receiver2_room, "receiver2") publish_stop = asyncio.Event() - publish_task: asyncio.Task | None = None + publish_task: asyncio.Task[None] | None = None try: # 1) connect all three rooms with per-participant E2EE options @@ -340,8 +341,12 @@ def all_remote_video_pubs_gcm(room: rtc.Room) -> bool: set_key_index_on_all_cryptors(publisher_room, 1) key1_bytes, _ = PUBLISHER_KEYS[1] - receiver1_room.e2ee_manager.key_provider.set_key(PUBLISHER_IDENTITY, key1_bytes, 1) - receiver2_room.e2ee_manager.key_provider.set_key(PUBLISHER_IDENTITY, key1_bytes, 1) + receiver1_key_provider = receiver1_room.e2ee_manager.key_provider + receiver2_key_provider = receiver2_room.e2ee_manager.key_provider + assert receiver1_key_provider is not None + assert receiver2_key_provider is not None + receiver1_key_provider.set_key(PUBLISHER_IDENTITY, key1_bytes, 1) + receiver2_key_provider.set_key(PUBLISHER_IDENTITY, key1_bytes, 1) await assert_eventually( lambda: rtc.EncryptionState.OK in seen_e2ee_states["receiver1"], diff --git a/livekit-rtc/tests/test_e2ee_shared_key.py b/livekit-rtc/tests/test_e2ee_shared_key.py index 17db1e15..a14c86bc 100644 --- a/livekit-rtc/tests/test_e2ee_shared_key.py +++ b/livekit-rtc/tests/test_e2ee_shared_key.py @@ -19,7 +19,7 @@ import asyncio import os import uuid -from typing import Callable, TypeVar +from typing import Any, Callable, TypeVar import pytest @@ -34,7 +34,7 @@ T = TypeVar("T") -def skip_if_no_credentials(): +def skip_if_no_credentials() -> Any: required_vars = ["LIVEKIT_URL", "LIVEKIT_API_KEY", "LIVEKIT_API_SECRET"] missing = [var for var in required_vars if not os.getenv(var)] return pytest.mark.skipif( @@ -105,8 +105,8 @@ async def publish_dummy_video(source: rtc.VideoSource, stop_event: asyncio.Event @pytest.mark.asyncio -@skip_if_no_credentials() -async def test_e2ee_shared_key(): +@skip_if_no_credentials() # type: ignore[untyped-decorator] +async def test_e2ee_shared_key() -> None: """E2E test for shared-key E2EE. E2EE shared key E2E test: @@ -124,6 +124,7 @@ async def test_e2ee_shared_key(): """ room_name = unique_room_name("test-e2ee-shared-key") url = os.getenv("LIVEKIT_URL") + assert url is not None publisher_room = rtc.Room() receiver1_room = rtc.Room() @@ -179,7 +180,7 @@ def _on_state(participant: rtc.Participant, state: int) -> None: wire_receiver(receiver2_room, "receiver2") publish_stop = asyncio.Event() - publish_task: asyncio.Task | None = None + publish_task: asyncio.Task[None] | None = None try: connect_options = rtc.RoomOptions(auto_subscribe=True, encryption=make_e2ee_options()) @@ -318,8 +319,12 @@ def all_remote_video_pubs_gcm(room: rtc.Room) -> bool: await asyncio.sleep(1.0) key_provider.set_shared_key(SHARED_KEY, key_index=0) - receiver1_room.e2ee_manager.key_provider.set_shared_key(SHARED_KEY, key_index=0) - receiver2_room.e2ee_manager.key_provider.set_shared_key(SHARED_KEY, key_index=0) + receiver1_key_provider = receiver1_room.e2ee_manager.key_provider + receiver2_key_provider = receiver2_room.e2ee_manager.key_provider + assert receiver1_key_provider is not None + assert receiver2_key_provider is not None + receiver1_key_provider.set_shared_key(SHARED_KEY, key_index=0) + receiver2_key_provider.set_shared_key(SHARED_KEY, key_index=0) await assert_eventually( lambda: rtc.EncryptionState.OK in seen_e2ee_states["receiver1"], diff --git a/makefile b/makefile index f58b7f88..5b9472c6 100644 --- a/makefile +++ b/makefile @@ -70,11 +70,7 @@ lint-fix: ## Run ruff linter and fix issues automatically type-check: ## Run mypy type checker @echo "$(BOLD)$(CYAN)Running type checker...$(RESET)" - @uv pip install pip 2>/dev/null || true - @if uv run mypy --install-types --non-interactive \ - -p livekit.rtc \ - -p livekit.api \ - -p livekit.protocol; then \ + @if uv run mypy livekit-protocol livekit-api livekit-rtc; then \ echo "$(BOLD)$(GREEN)✓ Type checking passed$(RESET)"; \ else \ echo "$(BOLD)$(RED)✗ Type checking failed$(RESET)"; \ diff --git a/pyproject.toml b/pyproject.toml index 467a9a7e..e3dcfc97 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -72,6 +72,6 @@ exclude = "(build|setup\\.py|livekit-rtc/rust-sdks.*)" namespace_packages = true explicit_package_bases = true mypy_path = "livekit-protocol:livekit-api:livekit-rtc" -strict = false # TODO re-enable strict checking +strict = true disallow_any_generics = false plugins = ["pydantic.mypy"] diff --git a/tests/api/test_access_token.py b/tests/api/test_access_token.py index 48835ea1..85aeb6ce 100644 --- a/tests/api/test_access_token.py +++ b/tests/api/test_access_token.py @@ -1,6 +1,6 @@ import datetime -import pytest # type: ignore +import pytest from livekit.api import AccessToken, TokenVerifier, VideoGrants, SIPGrants from livekit.protocol.room import RoomConfiguration from livekit.protocol.agent_dispatch import RoomAgentDispatch @@ -9,7 +9,7 @@ TEST_API_SECRET = "thiskeyistotallyunsafe" -def test_verify_token(): +def test_verify_token() -> None: grants = VideoGrants(room_join=True, room="test_room") sip = SIPGrants(admin=True) @@ -30,11 +30,12 @@ def test_verify_token(): assert claims.metadata == "test_metadata" assert claims.video == grants assert claims.sip == sip + assert claims.attributes is not None assert claims.attributes["key1"] == "value1" assert claims.attributes["key2"] == "value2" -def test_agent_config(): +def test_agent_config() -> None: token = ( AccessToken(TEST_API_KEY, TEST_API_SECRET) .with_identity("test_identity") @@ -50,6 +51,7 @@ def test_agent_config(): token_verifier = TokenVerifier(TEST_API_KEY, TEST_API_SECRET) claims = token_verifier.verify(token) # Verify the decoded claims match + assert claims.room_config is not None assert claims.room_config.agents[0].agent_name == "test-agent" # Split token into header.payload.signature @@ -74,7 +76,7 @@ def test_agent_config(): assert payload_json["roomConfig"]["agents"][0]["agentName"] == "test-agent" -def test_verify_token_invalid(): +def test_verify_token_invalid() -> None: token = AccessToken(TEST_API_KEY, TEST_API_SECRET).with_identity("test_identity").to_jwt() token_verifier = TokenVerifier(TEST_API_KEY, "invalid_secret") @@ -86,7 +88,7 @@ def test_verify_token_invalid(): token_verifier.verify(token) -def test_verify_token_expired(): +def test_verify_token_expired() -> None: token = ( AccessToken(TEST_API_KEY, TEST_API_SECRET) .with_identity("test_identity") diff --git a/tests/api/test_webhook.py b/tests/api/test_webhook.py index fc8828e5..3937cdba 100644 --- a/tests/api/test_webhook.py +++ b/tests/api/test_webhook.py @@ -1,7 +1,7 @@ import base64 import hashlib -import pytest # type: ignore +import pytest from livekit.api import AccessToken, TokenVerifier, WebhookReceiver TEST_API_KEY = "myapikey" @@ -45,7 +45,7 @@ """ -def test_webhook_receiver(): +def test_webhook_receiver() -> None: token_verifier = TokenVerifier(TEST_API_KEY, TEST_API_SECRET) receiver = WebhookReceiver(token_verifier) @@ -56,7 +56,7 @@ def test_webhook_receiver(): receiver.receive(TEST_EVENT, jwt) -def test_bad_hash(): +def test_bad_hash() -> None: token_verifier = TokenVerifier(TEST_API_KEY, TEST_API_SECRET) receiver = WebhookReceiver(token_verifier) @@ -68,7 +68,7 @@ def test_bad_hash(): receiver.receive(TEST_EVENT, jwt) -def test_invalid_body(): +def test_invalid_body() -> None: token_verifier = TokenVerifier(TEST_API_KEY, TEST_API_SECRET) receiver = WebhookReceiver(token_verifier) diff --git a/tests/rtc/test_agc.py b/tests/rtc/test_agc.py index fb2f5df1..15a57f58 100644 --- a/tests/rtc/test_agc.py +++ b/tests/rtc/test_agc.py @@ -7,7 +7,7 @@ FIXTURES_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "fixtures") -def test_agc_modifies_audio(): +def test_agc_modifies_audio() -> None: num_channels = 1 input_wav = os.path.join(FIXTURES_DIR, "test_audio.wav") diff --git a/tests/rtc/test_apm.py b/tests/rtc/test_apm.py index 94795e3c..df5cd41d 100644 --- a/tests/rtc/test_apm.py +++ b/tests/rtc/test_apm.py @@ -8,7 +8,7 @@ FIXTURES_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "fixtures") -def test_audio_processing(): +def test_audio_processing() -> None: sample_rate = 48000 num_channels = 1 frames_per_chunk = sample_rate // 100 diff --git a/tests/rtc/test_e2e.py b/tests/rtc/test_e2e.py index e61bb59b..3ce2ad93 100644 --- a/tests/rtc/test_e2e.py +++ b/tests/rtc/test_e2e.py @@ -22,7 +22,7 @@ import os import time import uuid -from typing import Callable, TypeVar +from typing import Any, Callable, Dict, List, TypeVar import numpy as np import pytest @@ -56,7 +56,7 @@ async def assert_eventually( raise AssertionError(f"{message} (last result: {last_result})") -def skip_if_no_credentials(): +def skip_if_no_credentials() -> Any: required_vars = ["LIVEKIT_URL", "LIVEKIT_API_KEY", "LIVEKIT_API_SECRET"] missing = [var for var in required_vars if not os.getenv(var)] return pytest.mark.skipif( @@ -84,11 +84,12 @@ def unique_room_name(base: str) -> str: @pytest.mark.asyncio -@skip_if_no_credentials() -async def test_publish_track(): +@skip_if_no_credentials() # type: ignore[untyped-decorator] +async def test_publish_track() -> None: """Test that a published track can be subscribed by another participant""" room_name = unique_room_name("test-publish-track") url = os.getenv("LIVEKIT_URL") + assert url is not None publisher_room = rtc.Room() subscriber_room = rtc.Room() @@ -103,7 +104,7 @@ async def test_publish_track(): @subscriber_room.on("track_published") def on_track_published( publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant - ): + ) -> None: track_published_event.set() @subscriber_room.on("track_subscribed") @@ -111,7 +112,7 @@ def on_track_subscribed( track: rtc.Track, publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant, - ): + ) -> None: nonlocal subscribed_track if track.kind == rtc.TrackKind.KIND_AUDIO: subscribed_track = track @@ -142,11 +143,12 @@ def on_track_subscribed( @pytest.mark.asyncio -@skip_if_no_credentials() -async def test_video_packet_trailer_metadata(): +@skip_if_no_credentials() # type: ignore[untyped-decorator] +async def test_video_packet_trailer_metadata() -> None: """Test that packet trailer metadata can be sent and received on video frames.""" room_name = unique_room_name("test-video-packet-trailer") url = os.getenv("LIVEKIT_URL") + assert url is not None publisher_room = rtc.Room() subscriber_room = rtc.Room() @@ -165,7 +167,7 @@ def on_track_subscribed( track: rtc.Track, publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant, - ): + ) -> None: nonlocal subscribed_track, subscribed_publication if track.kind == rtc.TrackKind.KIND_VIDEO: subscribed_track = track @@ -207,7 +209,7 @@ def on_track_subscribed( ) metadata = rtc.FrameMetadata(user_timestamp=123456789, frame_id=77) - async def publish_frames(): + async def publish_frames() -> None: for _ in range(10): source.capture_frame(frame, metadata=metadata) await asyncio.sleep(0.2) @@ -232,11 +234,12 @@ async def publish_frames(): @pytest.mark.asyncio -@skip_if_no_credentials() -async def test_full_reconnect_preserves_local_publication_object(): +@skip_if_no_credentials() # type: ignore[untyped-decorator] +async def test_full_reconnect_preserves_local_publication_object() -> None: """Test that FFI local_track_republished updates the existing publication object.""" room_name = unique_room_name("test-local-republish") url = os.getenv("LIVEKIT_URL") + assert url is not None room = rtc.Room() token = create_token("publisher", room_name) @@ -244,7 +247,7 @@ async def test_full_reconnect_preserves_local_publication_object(): source = None @room.on("reconnected") - def on_reconnected(): + def on_reconnected() -> None: reconnected_event.set() try: @@ -288,11 +291,12 @@ def on_reconnected(): @pytest.mark.asyncio -@skip_if_no_credentials() -async def test_audio_stream_subscribe(): +@skip_if_no_credentials() # type: ignore[untyped-decorator] +async def test_audio_stream_subscribe() -> None: """Test that published audio can be consumed and has similar energy levels""" room_name = unique_room_name("test-audio-stream") url = os.getenv("LIVEKIT_URL") + assert url is not None publisher_room = rtc.Room() subscriber_room = rtc.Room() @@ -308,7 +312,7 @@ def on_track_subscribed( track: rtc.Track, publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant, - ): + ) -> None: nonlocal subscribed_track if track.kind == rtc.TrackKind.KIND_AUDIO: subscribed_track = track @@ -325,9 +329,9 @@ def on_track_subscribed( await publisher_room.local_participant.publish_track(track, options) target_duration = 5.0 - published_energy = [] + published_energy: List[Any] = [] - async def publish_audio(): + async def publish_audio() -> None: async for frame in sine_wave_generator(440, target_duration, SAMPLE_RATE): data = np.frombuffer(frame.data.tobytes(), dtype=np.int16) energy = np.mean(np.abs(data.astype(np.float32))) @@ -383,11 +387,12 @@ async def publish_audio(): @pytest.mark.asyncio -@skip_if_no_credentials() -async def test_room_lifecycle_events(): +@skip_if_no_credentials() # type: ignore[untyped-decorator] +async def test_room_lifecycle_events() -> None: """Test that room lifecycle and track events are fired properly""" room_name = unique_room_name("test-lifecycle-events") url = os.getenv("LIVEKIT_URL") + assert url is not None room1 = rtc.Room() room2 = rtc.Room() @@ -395,7 +400,7 @@ async def test_room_lifecycle_events(): token1 = create_token("participant-1", room_name) token2 = create_token("participant-2", room_name) - events = { + events: Dict[str, List[str]] = { "disconnected": [], "participant_connected": [], "participant_disconnected": [], @@ -410,37 +415,37 @@ async def test_room_lifecycle_events(): } @room1.on("disconnected") - def on_room1_disconnected(reason): + def on_room1_disconnected(reason: Any) -> None: events["disconnected"].append("room1") @room1.on("participant_connected") - def on_room1_participant_connected(participant: rtc.RemoteParticipant): + def on_room1_participant_connected(participant: rtc.RemoteParticipant) -> None: events["participant_connected"].append(f"room1-{participant.identity}") @room1.on("participant_disconnected") - def on_room1_participant_disconnected(participant: rtc.RemoteParticipant): + def on_room1_participant_disconnected(participant: rtc.RemoteParticipant) -> None: events["participant_disconnected"].append(f"room1-{participant.identity}") @room1.on("local_track_published") - def on_room1_local_track_published(publication: rtc.LocalTrackPublication, track): + def on_room1_local_track_published(publication: rtc.LocalTrackPublication, track: Any) -> None: events["local_track_published"].append(f"room1-{publication.sid}") @room1.on("local_track_unpublished") - def on_room1_local_track_unpublished(publication: rtc.LocalTrackPublication): + def on_room1_local_track_unpublished(publication: rtc.LocalTrackPublication) -> None: events["local_track_unpublished"].append(f"room1-{publication.sid}") @room1.on("room_updated") - def on_room1_room_updated(): + def on_room1_room_updated() -> None: events["room_updated"].append("room1") @room1.on("connection_state_changed") - def on_room1_connection_state_changed(state: rtc.ConnectionState): + def on_room1_connection_state_changed(state: rtc.ConnectionState) -> None: events["connection_state_changed"].append(f"room1-{state}") @room2.on("track_published") def on_room2_track_published( publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant - ): + ) -> None: events["track_published"].append(f"room2-{publication.sid}") @room2.on("track_subscribed") @@ -448,13 +453,13 @@ def on_room2_track_subscribed( track: rtc.Track, publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant, - ): + ) -> None: events["track_subscribed"].append(f"room2-{publication.sid}") @room2.on("track_unpublished") def on_room2_track_unpublished( publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant - ): + ) -> None: events["track_unpublished"].append(f"room2-{publication.sid}") try: @@ -542,19 +547,20 @@ def on_room2_track_unpublished( @pytest.mark.asyncio -@skip_if_no_credentials() -async def test_connection_state_transitions(): +@skip_if_no_credentials() # type: ignore[untyped-decorator] +async def test_connection_state_transitions() -> None: """Test that connection state transitions work correctly""" room_name = unique_room_name("test-connection-state") url = os.getenv("LIVEKIT_URL") + assert url is not None room = rtc.Room() token = create_token("state-test", room_name) - states = [] + states: List[rtc.ConnectionState] = [] @room.on("connection_state_changed") - def on_state_changed(state: rtc.ConnectionState): + def on_state_changed(state: rtc.ConnectionState) -> None: states.append(state) try: @@ -567,7 +573,7 @@ def on_state_changed(state: rtc.ConnectionState): message="Room did not reach CONN_CONNECTED state", ) await assert_eventually( - lambda: rtc.ConnectionState.CONN_CONNECTED in states, + lambda: rtc.ConnectionState.CONN_CONNECTED in states, # type: ignore[comparison-overlap] message="CONN_CONNECTED state not in state change events", ) @@ -584,12 +590,12 @@ def on_state_changed(state: rtc.ConnectionState): @pytest.mark.asyncio -@skip_if_no_credentials() +@skip_if_no_credentials() # type: ignore[untyped-decorator] @pytest.mark.skipif( os.getenv("RUN_DATA_TRACK_TESTS") != "1", reason="SFU support requires data tracks support to be enabled via config; remove once this is no longer the case.", ) -async def test_data_track(): +async def test_data_track() -> None: """Test that a published data track delivers frames with correct payloads and timestamps.""" FRAME_COUNT = 5 PAYLOAD_SIZE = 64 @@ -600,6 +606,7 @@ async def test_data_track(): room_name = unique_room_name("test-data-track") url = os.getenv("LIVEKIT_URL") + assert url is not None publisher_room = rtc.Room() subscriber_room = rtc.Room() @@ -608,23 +615,24 @@ async def test_data_track(): subscriber_token = create_token(SUBSCRIBER_IDENTITY, room_name) remote_track_event = asyncio.Event() - remote_track = None + remote_track: None | rtc.RemoteDataTrack = None unpublished_event = asyncio.Event() unpublished_sid = None @subscriber_room.on("data_track_published") - def on_data_track_published(track: rtc.RemoteDataTrack): + def on_data_track_published(track: rtc.RemoteDataTrack) -> None: nonlocal remote_track remote_track = track remote_track_event.set() @subscriber_room.on("data_track_unpublished") - def on_data_track_unpublished(sid: str): + def on_data_track_unpublished(sid: str) -> None: nonlocal unpublished_sid unpublished_sid = sid unpublished_event.set() try: + assert url is not None await subscriber_room.connect(url, subscriber_token) await publisher_room.connect(url, publisher_token) @@ -641,7 +649,7 @@ def on_data_track_unpublished(sid: str): stream = remote_track.subscribe() - async def push_frames(): + async def push_frames() -> None: for i in range(FRAME_COUNT): frame = rtc.DataTrackFrame( payload=bytes([i] * PAYLOAD_SIZE), @@ -651,7 +659,7 @@ async def push_frames(): await asyncio.sleep(0.1) await local_track.unpublish() - async def publish_and_receive(): + async def publish_and_receive(stream: rtc.DataTrackStream) -> int: push_task = asyncio.create_task(push_frames()) recv_count = 0 async for frame in stream: @@ -665,7 +673,7 @@ async def publish_and_receive(): await push_task return recv_count - recv_count = await asyncio.wait_for(publish_and_receive(), timeout=10.0) + recv_count = await asyncio.wait_for(publish_and_receive(stream), timeout=10.0) assert recv_count > 0, "No frames were received" await asyncio.wait_for(unpublished_event.wait(), timeout=5.0) diff --git a/tests/rtc/test_emitter.py b/tests/rtc/test_emitter.py index 830feb3a..b5a1bf59 100644 --- a/tests/rtc/test_emitter.py +++ b/tests/rtc/test_emitter.py @@ -1,16 +1,16 @@ from livekit.rtc import EventEmitter -from typing import Literal +from typing import Any, Literal import pytest -def test_events(): +def test_events() -> None: EventTypes = Literal["connected", "reconnected", "disconnected"] emitter = EventEmitter[EventTypes]() connected_calls = [] @emitter.once("connected") - def on_connected(): + def on_connected() -> None: connected_calls.append(True) emitter.emit("connected") @@ -22,7 +22,7 @@ def on_connected(): reconnected_calls = [] @emitter.on("reconnected") - def on_reconnected(): + def on_reconnected() -> None: reconnected_calls.append(True) emitter.emit("reconnected") @@ -32,11 +32,11 @@ def on_reconnected(): disconnected_calls = [] @emitter.on("disconnected") - def on_disconnected(): + def on_disconnected() -> None: disconnected_calls.append(True) @emitter.on("disconnected") - def on_disconnected_another(): + def on_disconnected_another() -> None: disconnected_calls.append(True) emitter.emit("disconnected") @@ -46,7 +46,7 @@ def on_disconnected_another(): assert len(disconnected_calls) == 5 -def test_args(): +def test_args() -> None: EventTypes = Literal["whatever"] emitter = EventEmitter[EventTypes]() @@ -54,7 +54,7 @@ def test_args(): calls = [] @emitter.on("whatever") - def on_whatever(first, second, third): + def on_whatever(first: Any, second: Any, third: Any) -> None: calls.append((first, second, third)) emitter.emit("whatever", 1, 2, 3) @@ -66,7 +66,7 @@ def on_whatever(first, second, third): emitter.emit("whatever", 1, 2) -def test_varargs(): +def test_varargs() -> None: EventTypes = Literal["whatever"] emitter = EventEmitter[EventTypes]() @@ -74,7 +74,7 @@ def test_varargs(): calls = [] @emitter.on("whatever") - def on_whatever_varargs(*args): + def on_whatever_varargs(*args: Any) -> None: calls.append(args) emitter.emit("whatever", 1, 2, 3, 4, 5) @@ -83,7 +83,7 @@ def on_whatever_varargs(*args): assert calls == [(1, 2, 3, 4, 5), (1, 2)] -def test_throw(): +def test_throw() -> None: EventTypes = Literal["error"] emitter = EventEmitter[EventTypes]() @@ -91,12 +91,12 @@ def test_throw(): calls = [] @emitter.on("error") - def on_error(): + def on_error() -> None: calls.append(True) raise ValueError("error") @emitter.on("error") - def on_error_another(): + def on_error_another() -> None: calls.append(True) emitter.emit("error") diff --git a/tests/rtc/test_ffi_queue.py b/tests/rtc/test_ffi_queue.py index 329018db..5f76aa6e 100644 --- a/tests/rtc/test_ffi_queue.py +++ b/tests/rtc/test_ffi_queue.py @@ -19,6 +19,7 @@ """ import asyncio +from collections.abc import Generator from dataclasses import dataclass import pytest @@ -40,15 +41,17 @@ class TestFfiQueueFilterFn: """Test suite for FfiQueue filter_fn functionality.""" @pytest.fixture - def event_loop(self): + def event_loop(self) -> Generator[asyncio.AbstractEventLoop, None, None]: """Create event loop for tests.""" loop = asyncio.new_event_loop() yield loop loop.close() - def test_subscribe_without_filter_receives_all_events(self, event_loop): + def test_subscribe_without_filter_receives_all_events( + self, event_loop: asyncio.AbstractEventLoop + ) -> None: """Subscriber without filter_fn receives all events.""" - queue = FfiQueue() + queue: FfiQueue[MockFfiEvent] = FfiQueue() sub = queue.subscribe(event_loop, filter_fn=None) events = [ @@ -69,9 +72,11 @@ def test_subscribe_without_filter_receives_all_events(self, event_loop): assert len(received) == 4 - def test_subscribe_with_filter_receives_only_matching_events(self, event_loop): + def test_subscribe_with_filter_receives_only_matching_events( + self, event_loop: asyncio.AbstractEventLoop + ) -> None: """Subscriber with filter_fn only receives matching events.""" - queue = FfiQueue() + queue: FfiQueue[MockFfiEvent] = FfiQueue() sub = queue.subscribe( event_loop, filter_fn=lambda e: e.WhichOneof("message") == "audio_stream_event", @@ -97,9 +102,11 @@ def test_subscribe_with_filter_receives_only_matching_events(self, event_loop): assert len(received) == 2 assert all(e._message_type == "audio_stream_event" for e in received) - def test_multiple_subscribers_different_filters(self, event_loop): + def test_multiple_subscribers_different_filters( + self, event_loop: asyncio.AbstractEventLoop + ) -> None: """Multiple subscribers can have different filters.""" - queue = FfiQueue() + queue: FfiQueue[MockFfiEvent] = FfiQueue() sub_audio = queue.subscribe( event_loop, @@ -142,9 +149,9 @@ def test_multiple_subscribers_different_filters(self, event_loop): assert video_count == 1 assert all_count == 4 - def test_filter_with_multiple_event_types(self, event_loop): + def test_filter_with_multiple_event_types(self, event_loop: asyncio.AbstractEventLoop) -> None: """Filter can match multiple event types.""" - queue = FfiQueue() + queue: FfiQueue[MockFfiEvent] = FfiQueue() sub = queue.subscribe( event_loop, filter_fn=lambda e: ( @@ -172,9 +179,11 @@ def test_filter_with_multiple_event_types(self, event_loop): types = {e._message_type for e in received} assert types == {"audio_stream_event", "video_stream_event"} - def test_unsubscribe_works_with_filtered_subscriber(self, event_loop): + def test_unsubscribe_works_with_filtered_subscriber( + self, event_loop: asyncio.AbstractEventLoop + ) -> None: """Unsubscribe correctly removes filtered subscriber.""" - queue = FfiQueue() + queue: FfiQueue[MockFfiEvent] = FfiQueue() sub = queue.subscribe( event_loop, filter_fn=lambda e: e.WhichOneof("message") == "audio_stream_event", @@ -195,11 +204,11 @@ def test_unsubscribe_works_with_filtered_subscriber(self, event_loop): assert sub.empty() - def test_filter_error_delivers_item(self, event_loop): + def test_filter_error_delivers_item(self, event_loop: asyncio.AbstractEventLoop) -> None: """If filter_fn raises, item is still delivered.""" - queue = FfiQueue() + queue: FfiQueue[MockFfiEvent] = FfiQueue() - def bad_filter(e): + def bad_filter(e: MockFfiEvent) -> bool: raise ValueError("oops") sub = queue.subscribe(event_loop, filter_fn=bad_filter) @@ -219,14 +228,14 @@ class TestFfiQueueMemoryReduction: """Test that filtering actually reduces object creation.""" @pytest.fixture - def event_loop(self): + def event_loop(self) -> Generator[asyncio.AbstractEventLoop, None, None]: loop = asyncio.new_event_loop() yield loop loop.close() - def test_filtering_reduces_callback_calls(self, event_loop): + def test_filtering_reduces_callback_calls(self, event_loop: asyncio.AbstractEventLoop) -> None: """Verify filtering prevents call_soon_threadsafe for non-matching events.""" - queue = FfiQueue() + queue: FfiQueue[MockFfiEvent] = FfiQueue() # Create 10 subscribers, each only wants audio events subscribers = [] diff --git a/tests/rtc/test_import.py b/tests/rtc/test_import.py index f6934f19..2728f93e 100644 --- a/tests/rtc/test_import.py +++ b/tests/rtc/test_import.py @@ -1,7 +1,7 @@ """Smoke test: import the SDK and initialize the FFI library.""" -def test_import_and_ffi_initialize(): +def test_import_and_ffi_initialize() -> None: from livekit import rtc # noqa: F401 from livekit.rtc._ffi_client import FfiClient diff --git a/tests/rtc/test_packet_trailer.py b/tests/rtc/test_packet_trailer.py index 8c9feeb9..b13cc19f 100644 --- a/tests/rtc/test_packet_trailer.py +++ b/tests/rtc/test_packet_trailer.py @@ -1,17 +1,21 @@ from __future__ import annotations from types import SimpleNamespace +from typing import cast import pytest from livekit import rtc from livekit.rtc import video_source as video_source_module +from livekit.rtc._ffi_client import FfiHandle from livekit.rtc._proto import e2ee_pb2 as proto_e2ee +from livekit.rtc._proto import ffi_pb2 as proto_ffi from livekit.rtc._proto import handle_pb2 as proto_handle from livekit.rtc._proto import participant_pb2 as proto_participant from livekit.rtc._proto import room_pb2 as proto_room from livekit.rtc._proto import track_pb2 as proto_track from livekit.rtc.participant import LocalParticipant +from livekit.rtc.track import Track def _publication_info( @@ -74,10 +78,10 @@ async def test_track_publication_exposes_packet_trailer_features() -> None: def test_video_source_capture_frame_copies_metadata(monkeypatch: pytest.MonkeyPatch) -> None: - captured_requests = [] + captured_requests: list[proto_ffi.FfiRequest] = [] class FakeClient: - def request(self, req): + def request(self, req: proto_ffi.FfiRequest) -> None: captured_requests.append(req) class FakeFfiClient: @@ -86,7 +90,7 @@ class FakeFfiClient: monkeypatch.setattr(video_source_module, "FfiClient", FakeFfiClient) source = video_source_module.VideoSource.__new__(video_source_module.VideoSource) - source._ffi_handle = SimpleNamespace(handle=42) + source._ffi_handle = cast(FfiHandle, SimpleNamespace(handle=42)) frame = rtc.VideoFrame( width=1, height=1, @@ -138,7 +142,7 @@ async def test_local_track_republished_updates_existing_publication() -> None: packet_trailer_features=[proto_track.PTF_USER_TIMESTAMP], ) ) - publication._track = SimpleNamespace(_info=SimpleNamespace(sid="TR_OLD")) + publication._track = cast(Track, SimpleNamespace(_info=SimpleNamespace(sid="TR_OLD"))) local_participant._track_publications[publication.sid] = publication room._on_room_event( diff --git a/tests/rtc/test_resampler.py b/tests/rtc/test_resampler.py index 25079da8..db05c4fd 100644 --- a/tests/rtc/test_resampler.py +++ b/tests/rtc/test_resampler.py @@ -1,13 +1,14 @@ -from livekit.rtc import AudioResampler, AudioResamplerQuality +from livekit.rtc import AudioResampler, AudioResamplerQuality, AudioFrame import time import wave import os +from typing import List # Test fixture directory FIXTURES_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "fixtures") -def test_audio_resampler(): +def test_audio_resampler() -> None: wav_file_path = os.path.join(FIXTURES_DIR, "test_audio.wav") # Open the wave file @@ -29,9 +30,9 @@ def test_audio_resampler(): ] for quality in qualities: - total_time = 0 + total_time = 0.0 nb_runs = 20 - output_frames = [] + output_frames: List[AudioFrame] = [] for i in range(nb_runs): output_frames = [] resampler = AudioResampler(44100, 8000, quality=quality)