From bf145af019a8ccd2043589c99df011d92dfdcab0 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Tue, 9 Jun 2026 14:36:25 -0700 Subject: [PATCH 1/2] feat(moq-net,moq-cli): per-track stats + `moq-cli --stats` usage display Make moq-net stats per-track and surface them as a live usage table in moq-cli. A broadcast total stays O(1) to read: the existing broadcast-level `Counters` rollup is kept in sync on every bump, so reading a total never iterates the per-track map. moq-net: - Add `TrackCounters` and per-(side,tier) track maps on `BroadcastEntry`. The track guards bump both the broadcast rollup and the track's own atomics. - Add `Stats::snapshot() -> StatsSnapshot` (+ public `BroadcastSnapshot` / `SideSnapshot` / `TrackCounts` / `SessionCounts`) so an in-process consumer reads counters directly instead of subscribing to the `.stats` broadcast. - The published per-broadcast JSON gains an additive `tracks` object; the wire frame now reuses `SideSnapshot` instead of a separate struct. `Tier::idx` is public since the snapshot arrays are tier-indexed. - Dead tracks are GC'd in the snapshot task; their bytes remain in the rollup. moq-cli: - Add `--stats` / `--stats-interval` to publish/subscribe/serve/accept. Builds an enabled aggregator (throwaway origin), attaches the tier handle via `with_stats`, and repaints a per-track table on stderr each interval, with rates derived from successive snapshots. stdout stays clean for piped media. docs: document the `tracks` field in doc/bin/relay/config.md and the `--stats` flag in doc/bin/cli.md. Co-Authored-By: Claude Opus 4.8 (1M context) --- doc/bin/cli.md | 23 ++ doc/bin/relay/config.md | 19 +- rs/moq-cli/src/client.rs | 17 +- rs/moq-cli/src/main.rs | 44 +++- rs/moq-cli/src/stats.rs | 189 +++++++++++++++++ rs/moq-net/src/stats.rs | 444 +++++++++++++++++++++++++++++++++------ 6 files changed, 660 insertions(+), 76 deletions(-) create mode 100644 rs/moq-cli/src/stats.rs diff --git a/doc/bin/cli.md b/doc/bin/cli.md index 22bb42473..beb3af2af 100644 --- a/doc/bin/cli.md +++ b/doc/bin/cli.md @@ -147,6 +147,29 @@ TS export carries H.264 / H.265 as Annex-B and AAC as ADTS. Both in-band sources work: the parameter sets are read from the bitstream or the catalog `description` and re-injected as Annex-B on each keyframe. +## Usage Stats + +Pass `--stats` to `publish`, `subscribe`, `serve`, or `accept` to print a +live, self-refreshing table of per-track upload/download usage to stderr. +It tracks each track being produced or consumed and rewrites the rates and +totals in place every second (`--stats-interval` to change the cadence): + +```bash +moq-cli subscribe --url https://relay.example.com --broadcast my-stream --format ts --stats > /dev/null +``` + +``` +track rate total frames groups +my-stream 21.7 KB/s 156.2 KB 211 8 + video 21.5 KB/s 156.0 KB 210 7 + catalog.json 0 B/s 162 B 1 1 +``` + +Each broadcast row is the rollup (its total upload when publishing, or download +when subscribing); the indented rows are its individual tracks. The table goes +to stderr, so stdout stays clean for piped media. Lower the log level (e.g. +`RUST_LOG=error`) to keep logs from interleaving with the table. + ## Authentication Pass a JWT token via the URL: diff --git a/doc/bin/relay/config.md b/doc/bin/relay/config.md index fa5116a02..6d299ca61 100644 --- a/doc/bin/relay/config.md +++ b/doc/bin/relay/config.md @@ -220,13 +220,20 @@ counterpart no traffic can flow, so the entry is dropped: "announced": 1, "announced_closed": 0, "broadcasts": 1, "broadcasts_closed": 0, "subscriptions": 5, "subscriptions_closed": 2, - "bytes": 12345, "frames": 678, "groups": 9 + "bytes": 12345, "frames": 678, "groups": 9, + "tracks": { + "video": { "bytes": 12000, "frames": 600, "groups": 8 }, + "audio": { "bytes": 345, "frames": 78, "groups": 1 } + } }, "anon/foo": { "announced": 1, "announced_closed": 0, "broadcasts": 1, "broadcasts_closed": 0, "subscriptions": 2, "subscriptions_closed": 0, - "bytes": 234, "frames": 12, "groups": 1 + "bytes": 234, "frames": 12, "groups": 1, + "tracks": { + "catalog.json": { "bytes": 234, "frames": 12, "groups": 1 } + } } } ``` @@ -246,7 +253,13 @@ Field semantics: - `subscriptions` / `subscriptions_closed`: cumulative count of track-level subscription guards opened and dropped. - `bytes` / `frames` / `groups`: cumulative payload counters from the - session loops (both the `moq-lite` and IETF `moq-transport` paths). + session loops (both the `moq-lite` and IETF `moq-transport` paths). These + are the broadcast-level rollup: the sum across every track, maintained + incrementally so reading a total never iterates the per-track map. +- `tracks`: per-track breakdown of the `bytes` / `frames` / `groups` rollup, + keyed by track name. A track appears while it has an open subscription and + is dropped once it stops flowing; its payload stays counted in the rollup + totals above. Use this to see which tracks within a broadcast are active. The session tracks (`sessions.json`, `internal/sessions.json`) instead map each auth root to a `{ sessions, sessions_closed }` snapshot. `sessions` diff --git a/rs/moq-cli/src/client.rs b/rs/moq-cli/src/client.rs index 1ecaf1923..15ed3aa73 100644 --- a/rs/moq-cli/src/client.rs +++ b/rs/moq-cli/src/client.rs @@ -1,16 +1,28 @@ -use crate::Publish; +use crate::{Publish, StatsArgs, run_stats}; use anyhow::Context; use hang::moq_net; use url::Url; -pub async fn run_client(client: moq_native::Client, url: Url, name: String, publish: Publish) -> anyhow::Result<()> { +pub async fn run_client( + client: moq_native::Client, + url: Url, + name: String, + publish: Publish, + stats: StatsArgs, +) -> anyhow::Result<()> { // Create an origin producer to publish to the broadcast. let origin = moq_net::Origin::random().produce(); let _publish = origin .publish_broadcast(&name, publish.consume()) .context("failed to publish broadcast")?; + let stats_agg = stats.build(); + let client = match &stats_agg { + Some(agg) => client.with_stats(agg.tier(moq_net::Tier::External)), + None => client, + }; + tracing::info!(%url, %name, "connecting"); let reconnect = client.with_publisher(origin.clone()).reconnect(url); @@ -22,6 +34,7 @@ pub async fn run_client(client: moq_native::Client, url: Url, name: String, publ tokio::select! { res = publish.run() => res, res = reconnect.closed() => res, + res = run_stats(stats_agg, stats.interval) => res, _ = tokio::signal::ctrl_c() => Ok(()), } } diff --git a/rs/moq-cli/src/main.rs b/rs/moq-cli/src/main.rs index 7871b9be6..8671f926c 100644 --- a/rs/moq-cli/src/main.rs +++ b/rs/moq-cli/src/main.rs @@ -1,6 +1,7 @@ mod client; mod publish; mod server; +mod stats; mod subscribe; mod web; @@ -8,6 +9,7 @@ use client::*; use hang::moq_net; use publish::*; use server::*; +use stats::*; use subscribe::*; use web::*; @@ -45,6 +47,9 @@ pub enum Command { #[arg(long)] dir: Option, + #[command(flatten)] + stats: StatsArgs, + /// The format of the input media. #[command(subcommand)] format: PublishFormat, @@ -62,6 +67,9 @@ pub enum Command { #[arg(long)] dir: Option, + #[command(flatten)] + stats: StatsArgs, + #[command(flatten)] args: SubscribeArgs, }, @@ -88,6 +96,9 @@ pub enum Command { #[arg(long, alias = "name")] broadcast: String, + #[command(flatten)] + stats: StatsArgs, + /// The format of the input media. #[command(subcommand)] format: PublishFormat, @@ -106,6 +117,9 @@ pub enum Command { #[arg(long, alias = "name")] broadcast: String, + #[command(flatten)] + stats: StatsArgs, + #[command(flatten)] args: SubscribeArgs, }, @@ -130,6 +144,7 @@ async fn main() -> anyhow::Result<()> { config, dir, broadcast, + stats, format, } => { warn_if_missing_format(&broadcast); @@ -140,18 +155,26 @@ async fn main() -> anyhow::Result<()> { #[cfg(feature = "iroh")] let server = server.with_iroh(iroh); + let stats_agg = stats.build(); + let server = match &stats_agg { + Some(agg) => server.with_stats(agg.tier(moq_net::Tier::External)), + None => server, + }; + let web_tls = server.tls_info(); tokio::select! { res = run_server(server, broadcast, publish.consume()) => res, res = run_web(&web_bind, web_tls, dir) => res, res = publish.run() => res, + res = run_stats(stats_agg, stats.interval) => res, } } Command::Accept { config, broadcast, dir, + stats, args, } => { let web_bind = config.bind.clone().unwrap_or_else(|| "[::]:443".to_string()); @@ -160,6 +183,12 @@ async fn main() -> anyhow::Result<()> { #[cfg(feature = "iroh")] let server = server.with_iroh(iroh); + let stats_agg = stats.build(); + let server = match &stats_agg { + Some(agg) => server.with_stats(agg.tier(moq_net::Tier::External)), + None => server, + }; + let web_tls = server.tls_info(); let origin = moq_net::Origin::random().produce(); @@ -169,6 +198,7 @@ async fn main() -> anyhow::Result<()> { res = run_accept(server, origin) => res, res = run_web(&web_bind, web_tls, dir) => res, res = run_announced_subscribe(consumer, broadcast, args) => res, + res = run_stats(stats_agg, stats.interval) => res, _ = tokio::signal::ctrl_c() => Ok(()), } } @@ -176,6 +206,7 @@ async fn main() -> anyhow::Result<()> { config, url, broadcast, + stats, format, } => { warn_if_missing_format(&broadcast); @@ -185,12 +216,13 @@ async fn main() -> anyhow::Result<()> { #[cfg(feature = "iroh")] let client = client.with_iroh(iroh); - run_client(client, url, broadcast, publish).await + run_client(client, url, broadcast, publish, stats).await } Command::Subscribe { config, url, broadcast, + stats, args, } => { let client = config.init()?; @@ -198,7 +230,7 @@ async fn main() -> anyhow::Result<()> { #[cfg(feature = "iroh")] let client = client.with_iroh(iroh); - run_subscribe(client, url, broadcast, args).await + run_subscribe(client, url, broadcast, args, stats).await } } } @@ -217,10 +249,17 @@ async fn run_subscribe( url: Url, broadcast: String, args: SubscribeArgs, + stats: StatsArgs, ) -> anyhow::Result<()> { let origin = moq_net::Origin::random().produce(); let consumer = origin.consume(); + let stats_agg = stats.build(); + let client = match &stats_agg { + Some(agg) => client.with_stats(agg.tier(moq_net::Tier::External)), + None => client, + }; + tracing::info!(%url, %broadcast, "connecting"); let reconnect = client.with_consumer(origin).reconnect(url); @@ -231,6 +270,7 @@ async fn run_subscribe( tokio::select! { res = run_announced_subscribe(consumer, broadcast, args) => res, res = reconnect.closed() => res, + res = run_stats(stats_agg, stats.interval) => res, _ = tokio::signal::ctrl_c() => Ok(()), } } diff --git a/rs/moq-cli/src/stats.rs b/rs/moq-cli/src/stats.rs new file mode 100644 index 000000000..07781b6cd --- /dev/null +++ b/rs/moq-cli/src/stats.rs @@ -0,0 +1,189 @@ +use std::{fmt::Write as _, io::Write as _, time::Duration}; + +use hang::moq_net; + +/// CLI flags for the live usage display, flattened into each subcommand. +#[derive(clap::Args, Clone, Debug)] +pub struct StatsArgs { + /// Print a live, self-refreshing table of per-track upload/download usage + /// to stderr. + #[arg(long = "stats")] + pub enabled: bool, + + /// How often to refresh the usage table. + #[arg(long = "stats-interval", default_value = "1s", value_parser = humantime::parse_duration)] + pub interval: Duration, +} + +impl Default for StatsArgs { + fn default() -> Self { + Self { + enabled: false, + interval: Duration::from_secs(1), + } + } +} + +impl StatsArgs { + /// Build an enabled aggregator when `--stats` is set, otherwise `None`. + /// + /// We read counters in-process via [`moq_net::Stats::snapshot`], but the + /// aggregator still needs an origin to actually collect, so we hand it a + /// throwaway one. Its published `.stats` broadcast simply goes unconsumed. + pub fn build(&self) -> Option { + if !self.enabled { + return None; + } + let origin = moq_net::Origin::random().produce(); + Some(moq_net::Stats::new( + moq_net::StatsConfig::new() + .with_origin(origin) + .with_interval(self.interval), + )) + } +} + +/// Drive the usage display for the lifetime of a transfer. +/// +/// With `--stats` off (`stats` is `None`) this stays pending forever, so it's +/// inert as a `tokio::select!` branch. With it on, it repaints the table every +/// `interval` until the surrounding `select!` resolves on another branch. +pub async fn run_stats(stats: Option, interval: Duration) -> anyhow::Result<()> { + match stats { + Some(stats) => run(stats, interval).await, + None => std::future::pending().await, + } +} + +/// Render the usage table on an interval until cancelled, repainting in place. +/// +/// Never returns `Ok`; it loops forever so callers can drop it into a +/// `tokio::select!` alongside the actual transfer. +async fn run(stats: moq_net::Stats, interval: Duration) -> anyhow::Result<()> { + let mut prev = stats.snapshot(); + let mut prev_lines = 0usize; + + let mut ticker = tokio::time::interval(interval); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + ticker.tick().await; // first tick is immediate + + loop { + ticker.tick().await; + let now = stats.snapshot(); + let table = render(&prev, &now, interval); + + let mut out = String::new(); + // Move the cursor back up over the previous table and clear from there + // to the end of the screen, so the table repaints in place. + if prev_lines > 0 { + let _ = write!(out, "\x1b[{prev_lines}A"); + } + out.push_str("\x1b[0J"); + out.push_str(&table); + + let mut stderr = std::io::stderr().lock(); + let _ = stderr.write_all(out.as_bytes()); + let _ = stderr.flush(); + + prev_lines = table.lines().count(); + prev = now; + } +} + +/// One row of the table: a track (or a broadcast total), with its cumulative +/// bytes and the rate since the previous snapshot. +struct Row { + label: String, + bytes: u64, + rate: f64, + frames: u64, + groups: u64, +} + +fn render(prev: &moq_net::StatsSnapshot, now: &moq_net::StatsSnapshot, interval: Duration) -> String { + let secs = interval.as_secs_f64().max(f64::MIN_POSITIVE); + let tier = moq_net::Tier::External.idx(); + let mut rows: Vec = Vec::new(); + + for (path, bc) in &now.broadcasts { + let prev_bc = prev.broadcasts.get(path); + + // A given moq-cli process only ever uploads (publisher) or downloads + // (subscriber), so at most one of these sides is populated. + for (side, prev_side) in [ + (&bc.publisher[tier], prev_bc.map(|b| &b.publisher[tier])), + (&bc.subscriber[tier], prev_bc.map(|b| &b.subscriber[tier])), + ] { + if side.bytes == 0 && side.tracks.is_empty() { + continue; + } + + rows.push(Row { + label: path.to_string(), + bytes: side.bytes, + rate: rate(side.bytes, prev_side.map(|s| s.bytes), secs), + frames: side.frames, + groups: side.groups, + }); + + for (name, track) in &side.tracks { + let prev_track = prev_side.and_then(|s| s.tracks.get(name)); + rows.push(Row { + label: format!(" {name}"), + bytes: track.bytes, + rate: rate(track.bytes, prev_track.map(|t| t.bytes), secs), + frames: track.frames, + groups: track.groups, + }); + } + } + } + + let mut out = String::new(); + if rows.is_empty() { + out.push_str("track: waiting for traffic...\n"); + return out; + } + + let label_w = rows.iter().map(|r| r.label.len()).max().unwrap_or(0).max(5); + let _ = writeln!( + out, + "{:11} {:>11} {:>8} {:>8}", + "track", "rate", "total", "frames", "groups", + ); + for r in &rows { + let _ = writeln!( + out, + "{:9}/s {:>11} {:>8} {:>8}", + r.label, + human_bytes(r.rate), + human_bytes(r.bytes as f64), + r.frames, + r.groups, + ); + } + out +} + +/// Per-second rate between two cumulative readings. A missing or decreased +/// previous value (a counter reset, e.g. a reconnect) counts from zero rather +/// than going negative. +fn rate(now: u64, prev: Option, secs: f64) -> f64 { + let delta = now.saturating_sub(prev.unwrap_or(0)); + delta as f64 / secs +} + +fn human_bytes(n: f64) -> String { + const UNITS: [&str; 5] = ["B", "KB", "MB", "GB", "TB"]; + let mut v = n; + let mut unit = 0; + while v >= 1000.0 && unit < UNITS.len() - 1 { + v /= 1000.0; + unit += 1; + } + if unit == 0 { + format!("{v:.0} {}", UNITS[unit]) + } else { + format!("{v:.1} {}", UNITS[unit]) + } +} diff --git a/rs/moq-net/src/stats.rs b/rs/moq-net/src/stats.rs index 11db29cd2..85bc06e04 100644 --- a/rs/moq-net/src/stats.rs +++ b/rs/moq-net/src/stats.rs @@ -51,7 +51,14 @@ //! * `subscriptions` / `subscriptions_closed`: cumulative count of //! track-level subscription guards opened/dropped. //! * `bytes` / `frames` / `groups`: cumulative payload counters bumped from -//! the session loops (both lite and IETF). +//! the session loops (both lite and IETF). These are the broadcast-level +//! rollup: the sum across every track, kept in sync on each bump so a +//! broadcast total is a single load, never an iteration over tracks. +//! * `tracks`: per-track breakdown of the payload rollup, keyed by track name. +//! Each bump lands in both the rollup above and the track's own counters; a +//! track is forgotten once its last subscription closes (its bytes stay in +//! the rollup). [`Stats::snapshot`] exposes the same data in-process for a +//! local consumer (e.g. a CLI usage display) without subscribing here. //! * `sessions` / `sessions_closed` (session tracks only): cumulative count //! of sessions connected/disconnected under an auth root on this tier. //! Driven by [`StatsHandle::session`]. @@ -186,6 +193,31 @@ impl Counters { } } +/// Per-track payload counters within one `(side, tier)` slot of a broadcast. +/// +/// Bumped from the track guard *alongside* the broadcast-level rollup in +/// [`Counters`], so reading a broadcast total stays a single atomic load (no +/// iteration over tracks) while the per-track breakdown is available for +/// callers that want it (e.g. a CLI usage display). Shared via `Arc` between +/// the guard and the entry's track map; the map drops its entry once the last +/// guard releases (see the GC pass in `run_publisher`). +#[derive(Default, Debug)] +struct TrackCounters { + bytes: AtomicU64, + frames: AtomicU64, + groups: AtomicU64, +} + +impl TrackCounters { + fn snapshot(&self) -> TrackCounts { + TrackCounts { + bytes: self.bytes.load(Ordering::Relaxed), + frames: self.frames.load(Ordering::Relaxed), + groups: self.groups.load(Ordering::Relaxed), + } + } +} + /// Per-(tier, root) session gauge. One of these is shared (via `Arc`) by every /// [`SessionStats`] guard for the same auth root on the same tier: `sessions` /// bumps on connect, `sessions_closed` on disconnect. @@ -231,7 +263,9 @@ pub enum Tier { } impl Tier { - fn idx(self) -> usize { + /// Index of this tier in the per-tier arrays exposed on [`StatsSnapshot`] + /// and [`BroadcastSnapshot`] (external first, internal second). + pub fn idx(self) -> usize { match self { Tier::External => 0, Tier::Internal => 1, @@ -239,6 +273,77 @@ impl Tier { } } +/// An immutable readout of every tracked counter at one instant, returned by +/// [`Stats::snapshot`]. +/// +/// This is the in-process counterpart to the published stats broadcast: a +/// caller (a CLI usage display, a test) gets the numbers directly instead of +/// subscribing to `/node/...` and parsing JSON frames. Counters are +/// cumulative; compute rates by diffing successive snapshots. See the +/// module-level docs for the meaning of each field. +#[derive(Debug, Default, Clone, PartialEq, Eq)] +#[non_exhaustive] +pub struct StatsSnapshot { + /// Per-broadcast counters, keyed by broadcast path. + pub broadcasts: BTreeMap, + /// Live session gauges keyed by auth root, indexed by [`Tier::idx`] + /// (external first, internal second). + pub sessions: [BTreeMap; 2], +} + +/// One broadcast's counters, split by side then tier (indexed by +/// [`Tier::idx`]). +#[derive(Debug, Default, Clone, PartialEq, Eq)] +#[non_exhaustive] +pub struct BroadcastSnapshot { + /// Egress (this node serving the broadcast to a peer), per tier. + pub publisher: [SideSnapshot; 2], + /// Ingress (this node receiving the broadcast from a peer), per tier. + pub subscriber: [SideSnapshot; 2], +} + +/// Counters for one `(broadcast, side, tier)` slot: the broadcast-level rollup +/// scalars plus a per-track payload breakdown. `bytes` / `frames` / `groups` +/// are the rollup totals (the sum across every track, maintained incrementally +/// so reading them never iterates `tracks`). +#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize)] +#[cfg_attr(test, derive(serde::Deserialize))] +#[non_exhaustive] +pub struct SideSnapshot { + pub announced: u64, + pub announced_closed: u64, + pub broadcasts: u64, + pub broadcasts_closed: u64, + pub subscriptions: u64, + pub subscriptions_closed: u64, + pub bytes: u64, + pub frames: u64, + pub groups: u64, + /// Per-track payload, keyed by track name. A track appears while it has an + /// open subscription and is dropped once it stops flowing; its bytes remain + /// counted in the rollup totals above. + pub tracks: BTreeMap, +} + +/// Cumulative payload counters for a single track. +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize)] +#[cfg_attr(test, derive(serde::Deserialize))] +#[non_exhaustive] +pub struct TrackCounts { + pub bytes: u64, + pub frames: u64, + pub groups: u64, +} + +/// Live session gauge for one auth root: `sessions - sessions_closed` is the +/// current connected count. +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] +#[non_exhaustive] +pub struct SessionCounts { + pub sessions: u64, + pub sessions_closed: u64, +} + /// Settings for a [`Stats`] aggregator. Construct with [`StatsConfig::new`] /// and chain the `with_*` setters (e.g. /// `StatsConfig::new().with_origin(origin).with_prefix(".foo")`), then hand it @@ -346,6 +451,13 @@ struct StatsShared { struct BroadcastEntry { publisher: [Counters; 2], subscriber: [Counters; 2], + /// Per-track payload breakdown for each `(side, tier)` slot, indexed by + /// tier the same way as `publisher` / `subscriber`. The rollup totals in + /// `Counters` are kept in sync on every bump, so these maps are only read + /// when a caller wants the per-track view; a broadcast total never iterates + /// them. + publisher_tracks: [Lock>>; 2], + subscriber_tracks: [Lock>>; 2], } impl BroadcastEntry { @@ -353,6 +465,58 @@ impl BroadcastEntry { Self { publisher: Default::default(), subscriber: Default::default(), + publisher_tracks: Default::default(), + subscriber_tracks: Default::default(), + } + } + + /// Get-or-create the shared [`TrackCounters`] for `name` on one slot. + fn track(&self, side: Side, tier: Tier, name: &str) -> Arc { + let map = match side { + Side::Publisher => &self.publisher_tracks[tier.idx()], + Side::Subscriber => &self.subscriber_tracks[tier.idx()], + }; + map.lock().entry(name.to_owned()).or_default().clone() + } + + /// Drop track entries whose last guard has released (`strong_count == 1` is + /// just the map's own `Arc`). Their payload already rolled up into + /// `Counters`, so the broadcast total is unaffected; only the per-track + /// view forgets a track once it stops flowing. Called from the snapshot + /// task after each tick's readout so a just-closed track surfaces once more + /// before disappearing. + fn gc_tracks(&self) { + for map in [&self.publisher_tracks, &self.subscriber_tracks] { + for slot in map { + slot.lock().retain(|_, t| Arc::strong_count(t) > 1); + } + } + } + + /// Read one `(side, tier)` slot into a [`SideSnapshot`]: the rollup scalars + /// plus the per-track payload breakdown. + fn side_snapshot(&self, side: Side, tier: Tier) -> SideSnapshot { + let (counters, map) = match side { + Side::Publisher => (&self.publisher[tier.idx()], &self.publisher_tracks[tier.idx()]), + Side::Subscriber => (&self.subscriber[tier.idx()], &self.subscriber_tracks[tier.idx()]), + }; + let raw = counters.snapshot(); + let tracks = map + .lock() + .iter() + .map(|(name, t)| (name.clone(), t.snapshot())) + .collect(); + SideSnapshot { + announced: raw.announced, + announced_closed: raw.announced_closed, + broadcasts: raw.broadcasts, + broadcasts_closed: raw.broadcasts_closed, + subscriptions: raw.subscriptions, + subscriptions_closed: raw.subscriptions_closed, + bytes: raw.bytes, + frames: raw.frames, + groups: raw.groups, + tracks, } } } @@ -363,9 +527,9 @@ impl BroadcastEntry { /// [`BroadcastEntry`]. #[derive(Default)] struct SlotState { - /// Last `Snapshot` we wrote to the frame for this slot, used to detect + /// Last snapshot we wrote to the frame for this slot, used to detect /// changes that warrant re-emission. - prev_emitted: Option, + prev_emitted: Option, } /// Snapshot-task-local mirror of [`BroadcastEntry`]: per-side, per-tier @@ -378,26 +542,25 @@ struct EntrySnapState { } impl EntrySnapState { - /// Iterate the four `(track_name, counters, slot_state)` slots in the - /// fixed order matching `TRACK_ORDER`. - fn zip_slots<'a>(&'a mut self, entry: &'a BroadcastEntry) -> [(&'static str, &'a Counters, &'a mut SlotState); 4] { + /// Iterate the four `(track_name, side, tier, slot_state)` slots in the + /// fixed order matching `TRACK_ORDER`. The caller reads the counters off + /// `entry` via [`BroadcastEntry::side_snapshot`]. + fn zip_slots(&mut self) -> [(&'static str, Side, Tier, &mut SlotState); 4] { let [pub_ext_state, pub_int_state] = &mut self.publisher; let [sub_ext_state, sub_int_state] = &mut self.subscriber; [ - ("publisher.json", &entry.publisher[Tier::External.idx()], pub_ext_state), - ( - "subscriber.json", - &entry.subscriber[Tier::External.idx()], - sub_ext_state, - ), + ("publisher.json", Side::Publisher, Tier::External, pub_ext_state), + ("subscriber.json", Side::Subscriber, Tier::External, sub_ext_state), ( "internal/publisher.json", - &entry.publisher[Tier::Internal.idx()], + Side::Publisher, + Tier::Internal, pub_int_state, ), ( "internal/subscriber.json", - &entry.subscriber[Tier::Internal.idx()], + Side::Subscriber, + Tier::Internal, sub_int_state, ), ] @@ -477,6 +640,68 @@ impl Stats { } } + /// Read every tracked counter into a [`StatsSnapshot`] right now. + /// + /// This is the in-process alternative to subscribing to the published stats + /// broadcast: a local consumer (e.g. a CLI usage display) calls this on an + /// interval and diffs successive snapshots to derive rates. Counters are + /// cumulative and the read is lock-per-map, not a global stop-the-world, so + /// a snapshot can straddle concurrent bumps (the same `open >= closed` + /// guarantee as the published path applies). + /// + /// Returns an empty snapshot for a no-op aggregator (one built without an + /// origin). To collect stats from a tool that doesn't otherwise publish + /// them, build a [`Stats`] with a throwaway origin: the published broadcast + /// simply goes unconsumed while `snapshot` reads the same counters directly. + pub fn snapshot(&self) -> StatsSnapshot { + let mut out = StatsSnapshot::default(); + let Some(shared) = &self.shared else { + return out; + }; + + let entries: Vec<(PathOwned, Arc)> = shared + .entries + .lock() + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + for (path, entry) in entries { + out.broadcasts.insert( + path, + BroadcastSnapshot { + publisher: [ + entry.side_snapshot(Side::Publisher, Tier::External), + entry.side_snapshot(Side::Publisher, Tier::Internal), + ], + subscriber: [ + entry.side_snapshot(Side::Subscriber, Tier::External), + entry.side_snapshot(Side::Subscriber, Tier::Internal), + ], + }, + ); + } + + for tier_idx in 0..2 { + let roots: Vec<(PathOwned, Arc)> = shared.sessions[tier_idx] + .lock() + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + for (root, counters) in roots { + let (sessions, sessions_closed) = counters.snapshot(); + out.sessions[tier_idx].insert( + root, + SessionCounts { + sessions, + sessions_closed, + }, + ); + } + } + + out + } + fn entry(&self, path: impl AsPath) -> Option> { // No-op aggregator (no origin) never allocates state. let shared = self.shared.as_ref()?; @@ -628,32 +853,37 @@ impl BroadcastStats { } } - /// Open a publisher-track guard. + /// Open a publisher-track guard for `name`. /// - /// `_name` is unused; counters are per-broadcast only. The track name - /// parameter is kept for symmetry with the rest of moq-net so callers - /// don't have to thread an `Option<&str>` through subscribe sites. - pub fn publisher_track(&self, _name: &str) -> PublisherTrack { - if let Some(entry) = &self.entry { + /// Bumps the broadcast-level `subscriptions` rollup and resolves (or + /// creates) the per-track payload counters so the guard's `bytes` / `frame` + /// / `group` bumps land in both the broadcast rollup and this track's own + /// breakdown. + pub fn publisher_track(&self, name: &str) -> PublisherTrack { + let track = self.entry.as_ref().map(|entry| { entry.publisher[self.tier.idx()] .subscriptions .fetch_add(1, Ordering::Relaxed); - } + entry.track(Side::Publisher, self.tier, name) + }); PublisherTrack { entry: self.entry.clone(), + track, tier: self.tier, } } /// Subscriber-side counterpart to [`Self::publisher_track`]. - pub fn subscriber_track(&self, _name: &str) -> SubscriberTrack { - if let Some(entry) = &self.entry { + pub fn subscriber_track(&self, name: &str) -> SubscriberTrack { + let track = self.entry.as_ref().map(|entry| { entry.subscriber[self.tier.idx()] .subscriptions .fetch_add(1, Ordering::Relaxed); - } + entry.track(Side::Subscriber, self.tier, name) + }); SubscriberTrack { entry: self.entry.clone(), + track, tier: self.tier, } } @@ -870,9 +1100,14 @@ impl Drop for SubscriberStats { } /// RAII subscription guard for the publisher role. +/// +/// Each bump lands in two places: the broadcast-level rollup in [`Counters`] +/// (so a broadcast total is a single load) and `track`'s own payload counters +/// (the per-track breakdown). Both are `None` together for a no-op aggregator. #[must_use = "drop the guard to record the subscription as closed"] pub struct PublisherTrack { entry: Option>, + track: Option>, tier: Tier, } @@ -882,6 +1117,9 @@ impl PublisherTrack { if let Some(entry) = &self.entry { entry.publisher[self.tier.idx()].frames.fetch_add(1, Ordering::Relaxed); } + if let Some(track) = &self.track { + track.frames.fetch_add(1, Ordering::Relaxed); + } } /// Bumps `bytes` by `n`. @@ -889,6 +1127,9 @@ impl PublisherTrack { if let Some(entry) = &self.entry { entry.publisher[self.tier.idx()].bytes.fetch_add(n, Ordering::Relaxed); } + if let Some(track) = &self.track { + track.bytes.fetch_add(n, Ordering::Relaxed); + } } /// Bumps `groups` once. @@ -896,6 +1137,9 @@ impl PublisherTrack { if let Some(entry) = &self.entry { entry.publisher[self.tier.idx()].groups.fetch_add(1, Ordering::Relaxed); } + if let Some(track) = &self.track { + track.groups.fetch_add(1, Ordering::Relaxed); + } } } @@ -910,10 +1154,12 @@ impl Drop for PublisherTrack { } } -/// RAII subscription guard for the subscriber role. +/// RAII subscription guard for the subscriber role. See [`PublisherTrack`] for +/// the dual rollup/per-track bump behavior. #[must_use = "drop the guard to record the subscription as closed"] pub struct SubscriberTrack { entry: Option>, + track: Option>, tier: Tier, } @@ -923,6 +1169,9 @@ impl SubscriberTrack { if let Some(entry) = &self.entry { entry.subscriber[self.tier.idx()].frames.fetch_add(1, Ordering::Relaxed); } + if let Some(track) = &self.track { + track.frames.fetch_add(1, Ordering::Relaxed); + } } /// Bumps `bytes` by `n`. @@ -930,6 +1179,9 @@ impl SubscriberTrack { if let Some(entry) = &self.entry { entry.subscriber[self.tier.idx()].bytes.fetch_add(n, Ordering::Relaxed); } + if let Some(track) = &self.track { + track.bytes.fetch_add(n, Ordering::Relaxed); + } } /// Bumps `groups` once. @@ -937,6 +1189,9 @@ impl SubscriberTrack { if let Some(entry) = &self.entry { entry.subscriber[self.tier.idx()].groups.fetch_add(1, Ordering::Relaxed); } + if let Some(track) = &self.track { + track.groups.fetch_add(1, Ordering::Relaxed); + } } } @@ -951,23 +1206,18 @@ impl Drop for SubscriberTrack { } } -/// Per-tick work for a single `(side, tier)` slot: build the emitted -/// `Snapshot` from the raw counters, update the slot's `prev_emitted`, and -/// hand the snap to `emit` iff the slot is live or changed this tick. -fn process_slot(counters: &Counters, slot_state: &mut SlotState, mut emit: impl FnMut(Snapshot)) { - let raw = counters.snapshot(); - - let snap = Snapshot { - announced: raw.announced, - announced_closed: raw.announced_closed, - broadcasts: raw.broadcasts, - broadcasts_closed: raw.broadcasts_closed, - subscriptions: raw.subscriptions, - subscriptions_closed: raw.subscriptions_closed, - bytes: raw.bytes, - frames: raw.frames, - groups: raw.groups, - }; +/// Per-tick work for a single `(side, tier)` slot: read the slot into a +/// [`SideSnapshot`] (rollup scalars plus the per-track breakdown), update the +/// slot's `prev_emitted`, and hand the snap to `emit` iff the slot is live or +/// changed this tick. +fn process_slot( + entry: &BroadcastEntry, + side: Side, + tier: Tier, + slot_state: &mut SlotState, + mut emit: impl FnMut(SideSnapshot), +) { + let snap = entry.side_snapshot(side, tier); // A slot is live while any open counter still exceeds its `*_closed` // counterpart: a guard is held, so a subscription could begin at any @@ -984,15 +1234,18 @@ fn process_slot(counters: &Counters, slot_state: &mut SlotState, mut emit: impl // (incl. sub-tick flickers) and emits the final close snapshot on the // tick a slot transitions to fully closed. // - // `None` (slot never emitted) is treated as the default Snapshot so a + // `None` (slot never emitted) is treated as the default snapshot so a // first-tick all-zeros snap on an unused tier-side slot doesn't count // as a "change". Without this, every entry would surface in all four // tracks with zeros on the tick after creation even if only one slot // is actually in use. - let prev_snap = slot_state.prev_emitted.unwrap_or_default(); - let changed = snap != prev_snap; + let changed = slot_state + .prev_emitted + .as_ref() + .map(|p| p != &snap) + .unwrap_or(snap != SideSnapshot::default()); if changed { - slot_state.prev_emitted = Some(snap); + slot_state.prev_emitted = Some(snap.clone()); } if live || changed { emit(snap); @@ -1120,14 +1373,18 @@ async fn run_publisher(weak: Weak, advertised: PathOwned, interval: map.iter().map(|(k, v)| (k.clone(), v.clone())).collect() }; - let mut frames: [BTreeMap; NUM_SLOTS] = Default::default(); + let mut frames: [BTreeMap; NUM_SLOTS] = Default::default(); for (path, entry) in &entries { let snap_state = local.entry(path.clone()).or_default(); - for (i, (_track_name, counters, slot_state)) in snap_state.zip_slots(entry).into_iter().enumerate() { - process_slot(counters, slot_state, |snap| { + for (i, (_track_name, side, tier, slot_state)) in snap_state.zip_slots().into_iter().enumerate() { + process_slot(entry, side, tier, slot_state, |snap| { frames[i].insert(path.as_str().to_string(), snap); }); } + // Drop track entries whose last guard released this tick. We just + // read them above, so a closed track still appears in this frame + // before the per-track view forgets it. + entry.gc_tracks(); } drop(entries); @@ -1181,24 +1438,6 @@ async fn run_publisher(weak: Weak, advertised: PathOwned, interval: } } -/// What we emit for one entry on one tier-role track. Every field comes -/// straight from [`RawCounts`]; `broadcasts` / `broadcasts_closed` are the -/// per-(broadcast, session) subscription sentinel maintained by -/// [`SessionBroadcasts`]. -#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize)] -#[cfg_attr(test, derive(serde::Deserialize))] -struct Snapshot { - announced: u64, - announced_closed: u64, - broadcasts: u64, - broadcasts_closed: u64, - subscriptions: u64, - subscriptions_closed: u64, - bytes: u64, - frames: u64, - groups: u64, -} - /// What we emit for one root on a session track. `sessions - sessions_closed` /// is the live session count for the root. #[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize)] @@ -1288,6 +1527,73 @@ mod tests { assert_eq!(e2.publisher[Tier::External.idx()].bytes.load(Relaxed), 7); } + #[tokio::test(start_paused = true)] + async fn per_track_breakdown_rolls_up_to_broadcast() { + // Two tracks on the same broadcast: the rollup is the sum (read with a + // single load, no iteration), and `snapshot` exposes each track. + let (stats, _origin) = test_stats(Some("sjc")); + let pubr = stats.tier(Tier::External).broadcast("demo/bbb").publisher(); + let video = pubr.track("video"); + let audio = pubr.track("audio"); + video.bytes(100); + video.frame(); + audio.bytes(7); + + // Broadcast rollup is O(1): one atomic, already the sum across tracks. + { + let entries = stats.shared().entries.lock(); + let entry = entries.get(&PathOwned::from("demo/bbb")).expect("entry"); + assert_eq!(entry.publisher[Tier::External.idx()].bytes.load(Relaxed), 107); + } + + // Per-track breakdown via the public snapshot API. + let snap = stats.snapshot(); + let bc = snap.broadcasts.get(&PathOwned::from("demo/bbb")).expect("broadcast"); + let side = &bc.publisher[Tier::External.idx()]; + assert_eq!(side.bytes, 107, "rollup total"); + assert_eq!(side.tracks.get("video").expect("video").bytes, 100); + assert_eq!(side.tracks.get("video").expect("video").frames, 1); + assert_eq!(side.tracks.get("audio").expect("audio").bytes, 7); + } + + #[tokio::test(start_paused = true)] + async fn snapshot_drops_closed_track_but_keeps_rollup() { + // A track disappears from the per-track view once its guard drops, but + // its bytes stay counted in the broadcast rollup. + let (stats, _origin) = test_stats(Some("sjc")); + let pubr = stats.tier(Tier::External).broadcast("demo/bbb").publisher(); + { + let video = pubr.track("video"); + video.bytes(100); + } // guard dropped + + // Let the snapshot task run a tick so its GC pass prunes the dead track. + tokio::time::advance(Duration::from_secs(2)).await; + tokio::task::yield_now().await; + + let snap = stats.snapshot(); + let bc = snap.broadcasts.get(&PathOwned::from("demo/bbb")).expect("broadcast"); + let side = &bc.publisher[Tier::External.idx()]; + assert_eq!(side.bytes, 100, "rollup retains closed-track bytes"); + assert!( + !side.tracks.contains_key("video"), + "closed track pruned from per-track view" + ); + } + + #[tokio::test(start_paused = true)] + async fn snapshot_empty_for_noop_aggregator() { + let stats = Stats::default(); + let track = stats + .tier(Tier::External) + .broadcast("demo/bbb") + .publisher() + .track("video"); + track.bytes(100); + let snap = stats.snapshot(); + assert!(snap.broadcasts.is_empty()); + } + #[tokio::test(start_paused = true)] async fn external_and_internal_tiers_are_independent() { let (stats, _origin) = test_stats(Some("sjc")); @@ -1798,7 +2104,7 @@ mod tests { assert!(closed_pos < open_pos, "sessions_closed must be loaded before sessions",); } - async fn read_frame(mut track: crate::TrackSubscriber) -> BTreeMap { + async fn read_frame(mut track: crate::TrackSubscriber) -> BTreeMap { let bytes = track.read_frame().await.expect("ok").expect("frame"); serde_json::from_slice(&bytes).expect("json parse") } From 76f43d012b81b05f7be4e2f9ab9ddb50c9cc89e5 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Fri, 12 Jun 2026 15:44:52 -0700 Subject: [PATCH 2/2] fix(moq-ffi): await async subscribe_track in dynamic track tests The dynamic_track_request tests called the now-async subscribe_track without awaiting it, leaving the lib test build broken on dev. Add the missing .await, matching the existing call site below. Co-Authored-By: Claude Opus 4.8 (1M context) --- rs/moq-ffi/src/test.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rs/moq-ffi/src/test.rs b/rs/moq-ffi/src/test.rs index b19aeb906..a964dd6af 100644 --- a/rs/moq-ffi/src/test.rs +++ b/rs/moq-ffi/src/test.rs @@ -76,7 +76,7 @@ async fn dynamic_track_request() { let broadcast = MoqBroadcastProducer::new().unwrap(); let dynamic = broadcast.dynamic().unwrap(); let consumer = broadcast.consume().unwrap(); - let track_consumer = consumer.subscribe_track("events".into()).unwrap(); + let track_consumer = consumer.subscribe_track("events".into()).await.unwrap(); let track = tokio::time::timeout(TIMEOUT, dynamic.requested_track()) .await @@ -103,7 +103,7 @@ async fn dynamic_track_request_can_abort() { let broadcast = MoqBroadcastProducer::new().unwrap(); let dynamic = broadcast.dynamic().unwrap(); let consumer = broadcast.consume().unwrap(); - let _track_consumer = consumer.subscribe_track("unknown".into()).unwrap(); + let _track_consumer = consumer.subscribe_track("unknown".into()).await.unwrap(); let track = tokio::time::timeout(TIMEOUT, dynamic.requested_track()) .await