diff --git a/doc/lib/c/index.md b/doc/lib/c/index.md index 8c2b0fb1c..29ef59929 100644 --- a/doc/lib/c/index.md +++ b/doc/lib/c/index.md @@ -115,6 +115,28 @@ A server can reject the connection on auth grounds: unauthorized (HTTP 401) or f Failed calls are reported only through the return code and `moq_error()`, not logged. To surface libmoq's internal logs (moq-net / QUIC activity), call `moq_log_level("debug")` (or `"trace"`, `"info"`, etc.) to install a tracing subscriber. +## Connection statistics + +`moq_session_stats(session, &dst)` fills a `moq_connection_stats` struct with a point-in-time view of the underlying QUIC/WebTransport connection: RTT, send/receive bandwidth estimates, and byte/packet counters. Unlike the callback-based APIs, this is a plain synchronous query you can poll on a timer. + +Each metric carries a `*_valid` flag because availability depends on the transport backend (native QUIC reports every metric; the browser WebTransport reports few or none). A `false` flag is not the same as a zero value, so always check it before reading the field: + +```c +moq_connection_stats stats = {0}; +int rc = moq_session_stats(session, &stats); +if (rc < 0) { + // < 0 also covers "currently reconnecting, no live connection". + fprintf(stderr, "stats failed: %s\n", moq_error()); +} else { + if (stats.rtt_valid) { + printf("rtt: %llu us\n", (unsigned long long)stats.rtt_us); + } + if (stats.send_rate_valid) { + printf("send: %llu bps\n", (unsigned long long)stats.send_rate_bps); + } +} +``` + ## Use cases - **C/C++ applications** integrating MoQ without a Rust toolchain diff --git a/doc/lib/kt/index.md b/doc/lib/kt/index.md index a517ac50f..8616a02f9 100644 --- a/doc/lib/kt/index.md +++ b/doc/lib/kt/index.md @@ -50,6 +50,10 @@ client.setConsume(origin) val session = client.connect("https://relay.example.com") +// Poll connection stats (RTT, bandwidth estimates, byte/packet counters). +// Each field is null when the transport backend doesn't report that metric. +session.stats().rttUs?.let { println("rtt: $it us") } + origin.use { val consumer = origin.consume() val announced = consumer.announced("demos/") diff --git a/doc/lib/swift/index.md b/doc/lib/swift/index.md index 686370e9f..41f98d4c4 100644 --- a/doc/lib/swift/index.md +++ b/doc/lib/swift/index.md @@ -58,6 +58,12 @@ client.setConsume(origin: origin) let session = try await client.connect(url: "https://relay.example.com") +// Poll connection stats (RTT, bandwidth estimates, byte/packet counters). +// Each field is nil when the transport backend doesn't report that metric. +if let rtt = session.stats().rttUs { + print("rtt: \(rtt) us") +} + let consumer = origin.consume() let announced = try consumer.announced(prefix: "demos/") for try await announcement in announced.announcements { diff --git a/rs/libmoq/src/api.rs b/rs/libmoq/src/api.rs index 442760f85..bdf552af4 100644 --- a/rs/libmoq/src/api.rs +++ b/rs/libmoq/src/api.rs @@ -81,6 +81,101 @@ pub struct moq_announced { pub active: bool, } +/// A snapshot of connection statistics, filled in by [moq_session_stats]. +/// +/// Each metric has a `*_valid` flag: when `false`, the matching value is meaningless because +/// the transport backend doesn't report it (a `false` flag is NOT the same as a zero value). +/// Native QUIC reports every metric; the browser WebTransport reports few or none. Initialize +/// the struct to zero before the call; [moq_session_stats] overwrites every field. +#[repr(C)] +#[allow(non_camel_case_types)] +pub struct moq_connection_stats { + /// Smoothed round-trip time, in microseconds. + pub rtt_us: u64, + /// Whether `rtt_us` is reported by the transport backend. + pub rtt_valid: bool, + + /// Estimated send bandwidth from the congestion controller, in bits per second. + pub send_rate_bps: u64, + /// Whether `send_rate_bps` is reported by the transport backend. + pub send_rate_valid: bool, + + /// Estimated receive bandwidth from MoQ PROBE, in bits per second. + pub recv_rate_bps: u64, + /// Whether `recv_rate_bps` is reported by the transport backend. + pub recv_rate_valid: bool, + + /// Total bytes sent, including retransmissions and overhead. + pub bytes_sent: u64, + /// Whether `bytes_sent` is reported by the transport backend. + pub bytes_sent_valid: bool, + + /// Total bytes received, including duplicates and overhead. + pub bytes_received: u64, + /// Whether `bytes_received` is reported by the transport backend. + pub bytes_received_valid: bool, + + /// Total bytes lost (detected via retransmission or acknowledgement). + pub bytes_lost: u64, + /// Whether `bytes_lost` is reported by the transport backend. + pub bytes_lost_valid: bool, + + /// Total datagrams sent. + pub packets_sent: u64, + /// Whether `packets_sent` is reported by the transport backend. + pub packets_sent_valid: bool, + + /// Total datagrams received. + pub packets_received: u64, + /// Whether `packets_received` is reported by the transport backend. + pub packets_received_valid: bool, + + /// Total datagrams detected as lost. + pub packets_lost: u64, + /// Whether `packets_lost` is reported by the transport backend. + pub packets_lost_valid: bool, +} + +impl From<&moq_net::ConnectionStats> for moq_connection_stats { + fn from(stats: &moq_net::ConnectionStats) -> Self { + // An Option becomes a (value, valid) pair; absent metrics report 0/false. + fn split(value: Option) -> (u64, bool) { + (value.unwrap_or(0), value.is_some()) + } + + let (rtt_us, rtt_valid) = split(stats.rtt.and_then(|d| u64::try_from(d.as_micros()).ok())); + let (send_rate_bps, send_rate_valid) = split(stats.estimated_send_rate); + let (recv_rate_bps, recv_rate_valid) = split(stats.estimated_recv_rate); + let (bytes_sent, bytes_sent_valid) = split(stats.bytes_sent); + let (bytes_received, bytes_received_valid) = split(stats.bytes_received); + let (bytes_lost, bytes_lost_valid) = split(stats.bytes_lost); + let (packets_sent, packets_sent_valid) = split(stats.packets_sent); + let (packets_received, packets_received_valid) = split(stats.packets_received); + let (packets_lost, packets_lost_valid) = split(stats.packets_lost); + + Self { + rtt_us, + rtt_valid, + send_rate_bps, + send_rate_valid, + recv_rate_bps, + recv_rate_valid, + bytes_sent, + bytes_sent_valid, + bytes_received, + bytes_received_valid, + bytes_lost, + bytes_lost_valid, + packets_sent, + packets_sent_valid, + packets_received, + packets_received_valid, + packets_lost, + packets_lost_valid, + } + } +} + /// Initialize the library with a log level. /// /// This should be called before any other functions. @@ -195,6 +290,29 @@ pub extern "C" fn moq_session_close(session: u32) -> i32 { }) } +/// Snapshot the current connection statistics for a session. +/// +/// Fills `dst` with a point-in-time view of the underlying QUIC/WebTransport connection +/// (RTT, bandwidth estimates, byte/packet counters). Each metric carries a `*_valid` flag +/// since availability depends on the transport backend; see [moq_connection_stats]. +/// +/// Returns zero on success, or a negative code on failure: the session handle is unknown, or +/// the session is currently reconnecting and has no live connection (in which case `dst` is +/// left untouched). Safe to call repeatedly to poll stats over the life of the session. +/// +/// # Safety +/// - The caller must ensure that `dst` is a valid pointer to a [moq_connection_stats] struct. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn moq_session_stats(session: u32, dst: *mut moq_connection_stats) -> i32 { + ffi::enter(move || { + let session = ffi::parse_id(session)?; + let dst = unsafe { dst.as_mut() }.ok_or(Error::InvalidPointer)?; + let stats = State::lock().session.stats(session)?; + *dst = moq_connection_stats::from(&stats); + Ok(()) + }) +} + /// Create an origin for publishing broadcasts. /// /// Origins contain any number of broadcasts addressed by path. diff --git a/rs/libmoq/src/session.rs b/rs/libmoq/src/session.rs index 1ecad5960..2956ae369 100644 --- a/rs/libmoq/src/session.rs +++ b/rs/libmoq/src/session.rs @@ -14,6 +14,8 @@ use crate::{Error, Id, NonZeroSlab, State, ffi}; struct TaskEntry { close: Option>, callback: ffi::OnStatus, + /// Reads live connection stats, reporting `None` while reconnecting. + stats: moq_native::ConnectionStatsReader, } #[derive(Default)] @@ -30,11 +32,20 @@ impl Session { consume: Option, callback: ffi::OnStatus, ) -> Result { - let closed = oneshot::channel(); + // Build the reconnect loop up front so we can grab a stats reader for it + // before moving it into the spawned task. + let reconnect = moq_native::ClientConfig::default() + .init()? + .with_publish(publish) + .with_consume(consume) + .reconnect(url); + let stats = reconnect.stats(); + let closed = oneshot::channel(); let entry = TaskEntry { close: Some(closed.0), callback, + stats, }; let id = self.task.insert(Some(entry))?; @@ -42,7 +53,7 @@ impl Session { let res = tokio::select! { // close() requested: a clean shutdown delivers a terminal 0. _ = closed.1 => Ok(()), - res = Self::connect_run(callback, url, publish, consume) => res, + res = Self::report(callback, reconnect) => res, }; // Deliver one final terminal callback (0 = closed, < 0 = error), then @@ -57,24 +68,18 @@ impl Session { Ok(id) } - /// Connect and stay connected, reconnecting with exponential backoff if the session drops. + /// Snapshot the current connection's stats. /// - /// Reports a positive connection epoch through the status callback on every (re)connect, and a - /// negative code only when reconnection permanently gives up (the backoff timeout is exceeded), - /// which is terminal. - async fn connect_run( - callback: ffi::OnStatus, - url: Url, - publish: Option, - consume: Option, - ) -> Result<(), Error> { - let reconnect = moq_native::ClientConfig::default() - .init()? - .with_publish(publish) - .with_consume(consume) - .reconnect(url); - - Self::report(callback, reconnect).await + /// Errors with [`Error::SessionNotFound`] if the handle is unknown, or [`Error::Offline`] + /// if the session is currently between connections (reconnecting). + pub fn stats(&self, id: Id) -> Result { + self.task + .get(id) + .and_then(|entry| entry.as_ref()) + .ok_or(Error::SessionNotFound)? + .stats + .stats() + .ok_or(Error::Offline) } /// Forward connection epochs to the status callback until the reconnect loop stops. diff --git a/rs/moq-ffi/src/session.rs b/rs/moq-ffi/src/session.rs index b6ebf8605..fe2f0ee03 100644 --- a/rs/moq-ffi/src/session.rs +++ b/rs/moq-ffi/src/session.rs @@ -147,6 +147,50 @@ impl MoqClient { } } +/// A snapshot of connection statistics for a [`MoqSession`]. +/// +/// Each field is `None` when the transport backend doesn't report that metric (native QUIC +/// reports all of them; the browser WebTransport reports few or none), or when it isn't yet +/// available (e.g. `send_rate_bps` before the congestion controller has a window). A `None` is +/// not the same as a zero value. +#[derive(uniffi::Record)] +pub struct MoqConnectionStats { + /// Smoothed round-trip time, in microseconds. + pub rtt_us: Option, + /// Estimated send bandwidth from the congestion controller, in bits per second. + pub send_rate_bps: Option, + /// Estimated receive bandwidth from MoQ PROBE, in bits per second. + pub recv_rate_bps: Option, + /// Total bytes sent, including retransmissions and overhead. + pub bytes_sent: Option, + /// Total bytes received, including duplicates and overhead. + pub bytes_received: Option, + /// Total bytes lost (detected via retransmission or acknowledgement). + pub bytes_lost: Option, + /// Total datagrams sent. + pub packets_sent: Option, + /// Total datagrams received. + pub packets_received: Option, + /// Total datagrams detected as lost. + pub packets_lost: Option, +} + +impl From for MoqConnectionStats { + fn from(stats: moq_net::ConnectionStats) -> Self { + Self { + rtt_us: stats.rtt.and_then(|d| u64::try_from(d.as_micros()).ok()), + send_rate_bps: stats.estimated_send_rate, + recv_rate_bps: stats.estimated_recv_rate, + bytes_sent: stats.bytes_sent, + bytes_received: stats.bytes_received, + bytes_lost: stats.bytes_lost, + packets_sent: stats.packets_sent, + packets_received: stats.packets_received, + packets_lost: stats.packets_lost, + } + } +} + #[derive(uniffi::Object)] pub struct MoqSession { inner: Option, @@ -198,4 +242,18 @@ impl MoqSession { pub fn shutdown(&self) { self.cancel(0); } + + /// Snapshot the current connection statistics (RTT, bandwidth estimates, + /// byte/packet counters). Cheap to call; intended for periodic polling. + /// + /// Individual fields are `None` when the transport backend doesn't report + /// them; see [`MoqConnectionStats`]. + pub fn stats(&self) -> MoqConnectionStats { + let _guard = crate::ffi::RUNTIME.enter(); + self.inner + .as_ref() + .map(moq_net::Session::stats) + .unwrap_or_default() + .into() + } } diff --git a/rs/moq-native/src/reconnect.rs b/rs/moq-native/src/reconnect.rs index 5e52791a9..46f4d4f8c 100644 --- a/rs/moq-native/src/reconnect.rs +++ b/rs/moq-native/src/reconnect.rs @@ -79,6 +79,25 @@ struct State { status: Option, /// Set when the reconnect loop permanently gives up (reconnect timeout exceeded). error: Option, + /// The currently-connected session, or `None` while reconnecting. Read by + /// [`ConnectionStatsReader`] to snapshot live connection stats. + session: Option, +} + +/// A cloneable read handle for the live connection stats of a [`Reconnect`] loop. +/// +/// Obtained via [`Reconnect::stats`]. [`stats`](Self::stats) returns `None` while the loop is +/// between connections (reconnecting), and `Some` snapshot while a session is established. +#[derive(Clone)] +pub struct ConnectionStatsReader { + state: kio::Consumer, +} + +impl ConnectionStatsReader { + /// Snapshot the current connection's stats, or `None` if not currently connected. + pub fn stats(&self) -> Option { + self.state.read().session.as_ref().map(moq_net::Session::stats) + } } /// Handle to a background reconnect loop. @@ -137,11 +156,13 @@ impl Reconnect { last_error = None; if let Ok(mut state) = state.write() { state.status = Some(Status::Connected); + state.session = Some(session.clone()); } let _ = session.closed().await; tracing::warn!(%url, "session closed, reconnecting"); if let Ok(mut state) = state.write() { state.status = Some(Status::Disconnected); + state.session = None; } retry_start = tokio::time::Instant::now(); } @@ -201,6 +222,15 @@ impl Reconnect { pub async fn closed(&self) -> crate::Result<()> { kio::wait(|waiter| self.poll_closed(waiter)).await } + + /// A cloneable handle for reading the current connection's stats. + /// + /// The handle keeps working across reconnects, reporting `None` between connections. + pub fn stats(&self) -> ConnectionStatsReader { + ConnectionStatsReader { + state: self.state.clone(), + } + } } impl Drop for Reconnect { diff --git a/rs/moq-net/src/session.rs b/rs/moq-net/src/session.rs index 744325c16..8376d142f 100644 --- a/rs/moq-net/src/session.rs +++ b/rs/moq-net/src/session.rs @@ -4,6 +4,45 @@ use web_transport_trait::Stats; use crate::{BandwidthConsumer, BandwidthProducer, Error, Version}; +/// A snapshot of connection statistics for a [`Session`]. +/// +/// Every field is optional: availability depends on the transport backend (native QUIC +/// reports all of them, the browser WebTransport reports few or none) and on the +/// connection state (e.g. `estimated_send_rate` is `None` until the congestion controller +/// has a window). `None` means "not reported", not "zero". +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +#[non_exhaustive] +pub struct ConnectionStats { + /// Smoothed round-trip time estimate. + pub rtt: Option, + + /// Estimated send bandwidth from the congestion controller, in bits per second. + pub estimated_send_rate: Option, + + /// Estimated receive bandwidth from MoQ PROBE, in bits per second. + /// + /// `None` unless the negotiated version supports PROBE (moq-lite-03+). + pub estimated_recv_rate: Option, + + /// Total bytes sent over the connection, including retransmissions and overhead. + pub bytes_sent: Option, + + /// Total bytes received over the connection, including duplicates and overhead. + pub bytes_received: Option, + + /// Total bytes lost (detected via retransmission or acknowledgement). + pub bytes_lost: Option, + + /// Total datagrams sent. + pub packets_sent: Option, + + /// Total datagrams received. + pub packets_received: Option, + + /// Total datagrams detected as lost. + pub packets_lost: Option, +} + /// A MoQ transport session, wrapping a WebTransport connection. /// /// Created via: @@ -67,6 +106,16 @@ impl Session { self.recv_bandwidth.clone() } + /// Returns a snapshot of the current connection statistics. + /// + /// This is a cheap, non-blocking read of the underlying transport's counters; see + /// [`ConnectionStats`] for which metrics each backend reports. + pub fn stats(&self) -> ConnectionStats { + let mut stats = self.session.stats(); + stats.estimated_recv_rate = self.recv_bandwidth.as_ref().and_then(BandwidthConsumer::peek); + stats + } + /// Close the underlying transport session. pub fn close(&mut self, err: Error) { if self.closed { @@ -139,6 +188,7 @@ async fn run_send_bandwidth_inner(session: &S, trait SessionInner: Send + Sync { fn close(&self, code: u32, reason: &str); fn closed(&self) -> Pin + Send + '_>>; + fn stats(&self) -> ConnectionStats; } impl SessionInner for S { @@ -149,4 +199,21 @@ impl SessionInner for S { fn closed(&self) -> Pin + Send + '_>> { Box::pin(async move { S::closed(self).await.to_string() }) } + + fn stats(&self) -> ConnectionStats { + // estimated_recv_rate is filled in at the Session level (it comes from MoQ PROBE, + // not the transport), so leave it at the Default `None` here. + let stats = S::stats(self); + ConnectionStats { + rtt: stats.rtt(), + estimated_send_rate: stats.estimated_send_rate(), + bytes_sent: stats.bytes_sent(), + bytes_received: stats.bytes_received(), + bytes_lost: stats.bytes_lost(), + packets_sent: stats.packets_sent(), + packets_received: stats.packets_received(), + packets_lost: stats.packets_lost(), + ..Default::default() + } + } }