diff --git a/rs/moq-net/src/ietf/publisher.rs b/rs/moq-net/src/ietf/publisher.rs index 6ef79aa83..fd7ab5746 100644 --- a/rs/moq-net/src/ietf/publisher.rs +++ b/rs/moq-net/src/ietf/publisher.rs @@ -158,7 +158,7 @@ impl Publisher { // Attach the publisher-side payload meter so the model counts // groups/frames/bytes as they're served to this subscriber (egress is - // per-viewer). `track_stats` (the PublisherTrack guard, subscriptions + // per-viewer). `track_stats` (the track guard, subscriptions // lifecycle) stays owned by this function for the subscription's duration. track.set_meter(track_stats.meter()); @@ -483,7 +483,7 @@ impl Publisher { // Each accepted namespace holds a `publisher()` announce guard (bumps // `announced` / `announced_closed`) alongside its stream, so dropping the // tuple on unannounce or cleanup records the close. - let mut namespace_streams: HashMap, crate::PublisherStats)> = + let mut namespace_streams: HashMap, crate::BroadcastGuard)> = HashMap::new(); let mut announced = self.origin.announced(); @@ -528,7 +528,7 @@ impl Publisher { async fn announce_namespace( &self, suffix: crate::PathOwned, - namespace_streams: &mut HashMap, crate::PublisherStats)>, + namespace_streams: &mut HashMap, crate::BroadcastGuard)>, ) -> Result<(), Error> { let absolute = self.origin.absolute(&suffix).to_owned(); tracing::debug!(broadcast = %absolute, "announce"); @@ -582,7 +582,7 @@ impl Publisher { async fn unannounce_namespace( &self, suffix: &crate::PathOwned, - namespace_streams: &mut HashMap, crate::PublisherStats)>, + namespace_streams: &mut HashMap, crate::BroadcastGuard)>, ) { tracing::debug!(broadcast = %self.origin.absolute(suffix), "unannounce"); // Dropping `_stats` on removal records the announce close. diff --git a/rs/moq-net/src/ietf/subscriber.rs b/rs/moq-net/src/ietf/subscriber.rs index c046d7013..c81062061 100644 --- a/rs/moq-net/src/ietf/subscriber.rs +++ b/rs/moq-net/src/ietf/subscriber.rs @@ -3,8 +3,8 @@ use std::collections::{HashMap, hash_map::Entry}; use std::sync::Arc; use crate::{ - BroadcastDynamic, BroadcastInfo, Error, Frame, FrameProducer, Group, GroupProducer, MAX_FRAME_SIZE, OriginProducer, - OriginPublish, Path, PathOwned, StatsHandle, SubscriberStats, SubscriberTrack, TrackProducer, TrackRequest, + BroadcastDynamic, BroadcastGuard, BroadcastInfo, Error, Frame, FrameProducer, Group, GroupProducer, MAX_FRAME_SIZE, + OriginProducer, OriginPublish, Path, PathOwned, StatsHandle, TrackGuard, TrackProducer, TrackRequest, coding::{Reader, Stream}, ietf::{self, Control, FilterType, GroupOrder, RequestId}, model::BroadcastProducer, @@ -34,9 +34,9 @@ struct TrackState { /// created), so the model records groups/frames/bytes as data is produced into it. producer: TrackProducer, alias: Option, - /// The `SubscriberTrack` guard (subscriptions lifecycle), held for the entry's - /// life; records `subscriptions_closed` when the entry is dropped. - _stats: Arc, + /// The track guard (subscriptions lifecycle), held for the entry's life; + /// records `subscriptions_closed` when the entry is dropped. + _stats: Arc, } struct BroadcastState { @@ -52,7 +52,7 @@ struct BroadcastState { /// Subscriber-side announce guard (bumps `announced` / `announced_closed`), /// held for as long as the broadcast is announced into our origin. - _stats: SubscriberStats, + _stats: BroadcastGuard, } #[derive(Clone)] diff --git a/rs/moq-net/src/lite/publisher.rs b/rs/moq-net/src/lite/publisher.rs index f4dac096d..048aaea5f 100644 --- a/rs/moq-net/src/lite/publisher.rs +++ b/rs/moq-net/src/lite/publisher.rs @@ -208,7 +208,7 @@ impl Publisher { // Per-path stats guards: dropping the guard records `broadcasts_closed`. // The origin contract guarantees announce/unannounce toggles per path, so a // new active announcement must always be for a path with no live guard. - let mut stats_guards: std::collections::HashMap = + let mut stats_guards: std::collections::HashMap = std::collections::HashMap::new(); match version { @@ -515,7 +515,7 @@ impl Publisher { // The track guard (bumps `subscriptions`), the per-session broadcast // tracker, and the broadcast path. The `broadcasts` sentinel is taken // below, after the subscription is validated, and held for its lifetime. - stats: (crate::PublisherTrack, crate::SessionBroadcasts, crate::PathOwned), + stats: (crate::TrackGuard, crate::SessionBroadcasts, crate::PathOwned), version: Version, ) -> Result<(), Error> { let (track_stats, broadcasts, absolute) = stats; @@ -535,7 +535,7 @@ impl Publisher { // Attach the publisher-side payload meter so the model counts // groups/frames/bytes as they're served to this subscriber (egress is // per-subscriber, so N viewers of one track count N times). `track_stats` - // (the PublisherTrack guard, subscriptions lifecycle) stays owned by this + // (the track guard, subscriptions lifecycle) stays owned by this // function for the subscription's duration. track.set_meter(track_stats.meter()); @@ -633,9 +633,14 @@ impl Publisher { // `request_broadcast` resolves it immediately, or falls back to an `OriginDynamic` // handler (as in recv_subscribe). let broadcast = self.origin.request_broadcast(&fetch.broadcast); - let track_stats = self.stats.broadcast(&absolute).publisher_track(&track); - - if let Err(err) = Self::run_fetch(&mut stream, &fetch, broadcast, track_stats, self.version).await { + // The track guard records the fetch as a subscription for its lifetime; the + // Meter over the same shared counters carries the payload counts. Fetch reads + // random-access frames (`get_frame`), which the model doesn't auto-meter, so + // run_fetch bumps the Meter itself. `_fetch_guard` is held until run_fetch returns. + let _fetch_guard = self.stats.broadcast(&absolute).publisher_track(&track); + let meter = _fetch_guard.meter(); + + if let Err(err) = Self::run_fetch(&mut stream, &fetch, broadcast, meter, self.version).await { match &err { Error::Cancel | Error::Transport(_) => { tracing::info!(broadcast = %absolute, %track, %group, "fetch cancelled") @@ -654,7 +659,7 @@ impl Publisher { stream: &mut Stream, fetch: &lite::Fetch<'_>, broadcast: Result, Error>, - track_stats: crate::PublisherTrack, + meter: crate::Meter, version: Version, ) -> Result<(), Error> { let broadcast = broadcast?.await?; @@ -688,7 +693,7 @@ impl Publisher { // Lite05+ FETCH responds with bare FRAME messages; the subscriber already has // the codec/timescale from TRACK_INFO and the group sequence from its request. - track_stats.group(); + meter.group(); // Honor frame_start: skip earlier frames, then stream the rest in order. The // delta-timestamp baseline resets to 0, so the first served frame's delta is @@ -702,7 +707,7 @@ impl Publisher { compression, timescale, &mut prev_ts, - &track_stats, + &meter, ) .await?; index += 1; @@ -768,18 +773,18 @@ async fn write_fetch_frame( compression: Compression, timescale: Option, prev_ts: &mut u64, - track_stats: &crate::PublisherTrack, + meter: &crate::Meter, ) -> Result<(), Error> { encode_frame_timing(writer, frame, timescale, prev_ts).await?; match compression { Compression::None => { writer.encode(&frame.size).await?; - track_stats.frame(); + meter.frame(); while let Some(mut chunk) = frame.read_chunk().await? { let n = chunk.len() as u64; writer.write_all(&mut chunk).await?; - track_stats.bytes(n); + meter.bytes(n); } } compression => { @@ -787,9 +792,9 @@ async fn write_fetch_frame( let mut chunk = bytes::Bytes::from(compression.compress(&payload)); let n = chunk.len() as u64; writer.encode(&n).await?; - track_stats.frame(); + meter.frame(); writer.write_all(&mut chunk).await?; - track_stats.bytes(n); + meter.bytes(n); } } diff --git a/rs/moq-net/src/lite/subscriber.rs b/rs/moq-net/src/lite/subscriber.rs index b50c53521..eb23816f9 100644 --- a/rs/moq-net/src/lite/subscriber.rs +++ b/rs/moq-net/src/lite/subscriber.rs @@ -11,9 +11,9 @@ use futures::{StreamExt, stream::FuturesUnordered}; use crate::util::{MaybeBoxedExt, MaybeSendBox}; use crate::{ - AsPath, BandwidthProducer, BroadcastDynamic, BroadcastInfo, Compression, Error, Frame, FrameProducer, Group, - GroupProducer, GroupRequest, MAX_FRAME_SIZE, OriginProducer, OriginPublish, Path, PathOwned, StatsHandle, - SubscriberStats, SubscriberTrack, Subscription, Timescale, Timestamp, TrackInfo, TrackProducer, TrackRequest, + AsPath, BandwidthProducer, BroadcastDynamic, BroadcastGuard, BroadcastInfo, Compression, Error, Frame, + FrameProducer, Group, GroupProducer, GroupRequest, MAX_FRAME_SIZE, OriginProducer, OriginPublish, Path, PathOwned, + StatsHandle, Subscription, Timescale, Timestamp, TrackGuard, TrackInfo, TrackProducer, TrackRequest, coding::{Reader, Stream}, lite, }; @@ -197,7 +197,7 @@ impl Subscriber { // `subscriber.broadcasts_closed`. We only insert a guard when start_announce // actually accepted the announcement (it may drop reflected loops), so the // guard set tracks `producers` exactly. - let mut stats_guards: HashMap = HashMap::new(); + let mut stats_guards: HashMap = HashMap::new(); // Stats keys are absolute paths (matching the publisher side) so the // fanned-out level keys line up with the absolute broadcast paths a @@ -752,7 +752,7 @@ struct TrackServe { subscriber: Subscriber, path: PathOwned, broadcast: BroadcastDynamic, - track_stats: Arc, + track_stats: Arc, name: String, } @@ -1102,7 +1102,7 @@ impl TrackServe { // Attach the subscriber-side payload meter so the model counts // groups/frames/bytes as they're produced into this track, regardless of - // transport. The SubscriberTrack guard (subscriptions lifecycle) stays held + // transport. The track guard (subscriptions lifecycle) stays held // by this serve task in `self.track_stats`. producer.set_meter(self.track_stats.meter()); diff --git a/rs/moq-net/src/stats.rs b/rs/moq-net/src/stats.rs index 151488386..5f05bb7d5 100644 --- a/rs/moq-net/src/stats.rs +++ b/rs/moq-net/src/stats.rs @@ -140,39 +140,90 @@ use crate::{AsPath, BroadcastInfo, Meter, OriginProducer, Path, PathOwned, Track /// broadcast bumps `broadcasts`, the last to close bumps `broadcasts_closed`), /// so summed across sessions `broadcasts - broadcasts_closed` is the count of /// distinct sessions currently subscribed. +/// An open/closed counter pair: `open - closed` is the number currently live. +/// Bump with the RAII [`Gauge::open`], or the raw helpers for the refcounting +/// `broadcasts` sentinel. One shared `Arc` is held both by the entry (to +/// snapshot) and by each guard that bumps it. #[derive(Default, Debug)] -#[non_exhaustive] -pub struct Counters { - pub announced: AtomicU64, - pub announced_closed: AtomicU64, - pub subscriptions: AtomicU64, - pub subscriptions_closed: AtomicU64, - pub broadcasts: AtomicU64, - pub broadcasts_closed: AtomicU64, - /// Payload counters (groups/frames/bytes). Shared by `Arc` with the model - /// producer/consumer that bumps them as data flows (see [`Counters::meter`] - /// and [`crate::Meter`]); read here when snapshotting. +struct Gauge { + open: AtomicU64, + closed: AtomicU64, +} + +impl Gauge { + /// Record one open and return a guard that records the matching close on drop. + fn open(self: &Arc) -> Guard { + self.open.fetch_add(1, Ordering::Relaxed); + Guard(Some(self.clone())) + } + + /// Record one open without an RAII guard, for [`SessionBroadcasts`], which + /// refcounts and closes manually on the last release. + fn open_raw(&self) { + self.open.fetch_add(1, Ordering::Relaxed); + } + + /// Record one close. `Release` pairs with the snapshot reader's `Acquire` + /// load of `closed`. + fn close_raw(&self) { + self.closed.fetch_add(1, Ordering::Release); + } + + /// Read `(open, closed)`. `closed` is loaded `Acquire` before `open` is loaded + /// `Relaxed`, so the readout never shows `closed > open`: the Acquire + /// synchronizes-with the matching `Release` on the close bump, which + /// transitively makes the earlier open bump visible here (see the module + /// "Snapshot atomicity" note). This is the ONE place that ordering lives. + fn snapshot(&self) -> (u64, u64) { + let closed = self.closed.load(Ordering::Acquire); + let open = self.open.load(Ordering::Relaxed); + (open, closed) + } +} + +/// RAII close for a [`Gauge::open`]: records one close on drop, a no-op when +/// detached (disabled aggregator). The single lifecycle guard every stats guard +/// embeds, instead of each hand-rolling a `Drop` with its own ordering. +#[must_use = "drop the guard to record the close"] +struct Guard(Option>); + +impl Guard { + /// A detached guard whose drop does nothing. + fn noop() -> Self { + Self(None) + } +} + +impl Drop for Guard { + fn drop(&mut self) { + if let Some(gauge) = &self.0 { + gauge.close_raw(); + } + } +} + +/// Per-broadcast counters for one `(tier, side)` slot: three lifecycle [`Gauge`]s +/// plus the shared payload [`Usage`] the model bumps. Each is its own `Arc` so a +/// guard (or the model) holds exactly the piece it bumps. +#[derive(Default, Debug)] +struct Counters { + /// Broadcast announce/unannounce. + announced: Arc, + /// Per-track subscription open/close. + subscriptions: Arc, + /// Per-(broadcast, session) viewer sentinel, refcounted by [`SessionBroadcasts`]. + broadcasts: Arc, + /// Payload counters (groups/frames/bytes). Shared with the model + /// producer/consumer that bumps them (see [`Counters::meter`], [`crate::Meter`]). usage: Arc, } impl Counters { - /// Read all atomics into a `RawCounts`. Closed counters are read with - /// `Acquire` ordering before their open counterparts so the snapshot - /// always satisfies `open >= closed`; see the module-level "Snapshot - /// atomicity" note. Open / payload counters stay `Relaxed`: the - /// Acquire on close synchronizes-with the matching Release on the - /// close bump, which transitively makes all earlier writes (including - /// the prior open bump) visible to this thread. + /// Read every counter into a `RawCounts` for snapshotting. fn snapshot(&self) -> RawCounts { - let announced_closed = self.announced_closed.load(Ordering::Acquire); - let subscriptions_closed = self.subscriptions_closed.load(Ordering::Acquire); - let broadcasts_closed = self.broadcasts_closed.load(Ordering::Acquire); - let announced = self.announced.load(Ordering::Relaxed); - let subscriptions = self.subscriptions.load(Ordering::Relaxed); - let broadcasts = self.broadcasts.load(Ordering::Relaxed); - let bytes = self.usage.bytes(); - let frames = self.usage.frames(); - let groups = self.usage.groups(); + let (announced, announced_closed) = self.announced.snapshot(); + let (subscriptions, subscriptions_closed) = self.subscriptions.snapshot(); + let (broadcasts, broadcasts_closed) = self.broadcasts.snapshot(); RawCounts { announced, announced_closed, @@ -180,9 +231,9 @@ impl Counters { broadcasts_closed, subscriptions, subscriptions_closed, - bytes, - frames, - groups, + bytes: self.usage.bytes(), + frames: self.usage.frames(), + groups: self.usage.groups(), } } @@ -193,25 +244,10 @@ impl Counters { } } -/// Per-(tier, root) session gauge. One of these is shared (via `Arc`) by every -/// [`SessionStats`] guard for the same auth root on the same tier: `sessions` -/// bumps on connect, `sessions_closed` on disconnect. -#[derive(Default, Debug)] -struct SessionCounters { - sessions: AtomicU64, - sessions_closed: AtomicU64, -} - -impl SessionCounters { - /// Read `(sessions, sessions_closed)`. Closed is loaded with `Acquire` - /// before open with `Relaxed`, the same pairing as [`Counters::snapshot`], - /// so the readout never shows `closed > open`. - fn snapshot(&self) -> (u64, u64) { - let closed = self.sessions_closed.load(Ordering::Acquire); - let open = self.sessions.load(Ordering::Relaxed); - (open, closed) - } -} +/// Per-(tier, root) connected-session gauge, shared by every [`SessionStats`] +/// guard for the same auth root and tier. It's just a [`Gauge`] (`open` = +/// sessions, `closed` = sessions_closed). +type SessionCounters = Gauge; /// Raw counter readout. Intermediate type that doesn't escape this module. #[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] @@ -607,66 +643,49 @@ impl BroadcastStats { /// Bumps `announced` on construction and `announced_closed` on drop. /// (The `broadcasts` sentinel is driven separately by /// [`SessionBroadcasts`]; see the module docs.) - pub fn publisher(&self) -> PublisherStats { - if let Some(entry) = &self.entry { - entry.publisher[self.tier.idx()] - .announced - .fetch_add(1, Ordering::Relaxed); - } - PublisherStats { - entry: self.entry.clone(), - tier: self.tier, - } + pub fn publisher(&self) -> BroadcastGuard { + self.role(Side::Publisher) } /// Open a broadcast-lifetime guard for the subscriber (ingress) role. /// Bumps `announced` on construction and `announced_closed` on drop. /// (The `broadcasts` sentinel is driven separately by /// [`SessionBroadcasts`]; see the module docs.) - pub fn subscriber(&self) -> SubscriberStats { - if let Some(entry) = &self.entry { - entry.subscriber[self.tier.idx()] - .announced - .fetch_add(1, Ordering::Relaxed); - } - SubscriberStats { + pub fn subscriber(&self) -> BroadcastGuard { + self.role(Side::Subscriber) + } + + fn role(&self, side: Side) -> BroadcastGuard { + let announced = match self.entry.as_deref() { + Some(entry) => side.counters(entry, self.tier).announced.open(), + None => Guard::noop(), + }; + BroadcastGuard { entry: self.entry.clone(), tier: self.tier, + side, + _announced: announced, } } /// Open a publisher-track guard. /// - /// `_name` is unused; counters are per-broadcast only. The track name + /// `name` is unused; counters are per-broadcast only. The track name /// parameter is kept for symmetry with the rest of moq-net so callers /// don't have to thread an `Option<&str>` through subscribe sites. - pub fn publisher_track(&self, _name: &str) -> PublisherTrack { - if let Some(entry) = &self.entry { - entry.publisher[self.tier.idx()] - .subscriptions - .fetch_add(1, Ordering::Relaxed); - } - PublisherTrack { - entry: self.entry.clone(), - tier: self.tier, - } + pub fn publisher_track(&self, name: &str) -> TrackGuard { + track_guard(self.entry.as_deref(), self.tier, Side::Publisher, name) } /// Subscriber-side counterpart to [`Self::publisher_track`]. - pub fn subscriber_track(&self, _name: &str) -> SubscriberTrack { - if let Some(entry) = &self.entry { - entry.subscriber[self.tier.idx()] - .subscriptions - .fetch_add(1, Ordering::Relaxed); - } - SubscriberTrack { - entry: self.entry.clone(), - tier: self.tier, - } + pub fn subscriber_track(&self, name: &str) -> TrackGuard { + track_guard(self.entry.as_deref(), self.tier, Side::Subscriber, name) } } -/// Which side of a [`BroadcastEntry`] a [`SessionBroadcasts`] bumps. +/// Which side (publisher/egress vs subscriber/ingress) of a [`BroadcastEntry`] a +/// guard bumps. Selects the `Counters` slot for the broadcast/track guards and +/// the [`SessionBroadcasts`] sentinel. #[derive(Copy, Clone)] enum Side { Publisher, @@ -730,10 +749,7 @@ impl SessionBroadcasts { }; if first { if let Some(entry) = &entry { - self.side - .counters(entry, self.tier) - .broadcasts - .fetch_add(1, Ordering::Relaxed); + self.side.counters(entry, self.tier).broadcasts.open_raw(); } } BroadcastSubscription { @@ -776,12 +792,7 @@ impl Drop for BroadcastSubscription { }; if last { if let Some(entry) = &self.entry { - // Release pairs with the snapshot reader's Acquire load of - // `broadcasts_closed`; see `PublisherStats::drop`. - self.side - .counters(entry, self.tier) - .broadcasts_closed - .fetch_add(1, Ordering::Release); + self.side.counters(entry, self.tier).broadcasts.close_raw(); } } } @@ -792,189 +803,78 @@ impl Drop for BroadcastSubscription { /// [`StatsHandle::session`]. #[must_use = "drop the guard to record the session as closed"] pub struct SessionStats { - /// `None` for a no-op aggregator; bumps are then dropped. - counters: Option>, + /// Records `sessions_closed` on drop; detached (no-op) for a disabled aggregator. + _guard: Guard, } impl SessionStats { fn new(counters: Option>) -> Self { - if let Some(counters) = &counters { - counters.sessions.fetch_add(1, Ordering::Relaxed); - } - Self { counters } - } -} - -impl Drop for SessionStats { - fn drop(&mut self) { - if let Some(counters) = &self.counters { - // Release pairs with the snapshot reader's Acquire load of - // `sessions_closed`; see `PublisherStats::drop`. - counters.sessions_closed.fetch_add(1, Ordering::Release); - } - } -} - -/// RAII broadcast guard for the publisher role. See [`BroadcastStats::publisher`]. -#[must_use = "drop the guard to record the broadcast as closed"] -pub struct PublisherStats { - entry: Option>, - tier: Tier, -} - -impl PublisherStats { - /// Open a track-subscription guard. Bumps `subscriptions` on construction - /// and `subscriptions_closed` on drop. - pub fn track(&self, name: &str) -> PublisherTrack { - BroadcastStats { - entry: self.entry.clone(), - tier: self.tier, - } - .publisher_track(name) - } -} - -impl Drop for PublisherStats { - fn drop(&mut self) { - if let Some(entry) = &self.entry { - // Release pairs with the snapshot reader's Acquire load of - // `announced_closed`, propagating the open-bump from this - // guard's construction to whichever thread observes the close. - entry.publisher[self.tier.idx()] - .announced_closed - .fetch_add(1, Ordering::Release); + Self { + _guard: counters.map_or_else(Guard::noop, |gauge| gauge.open()), } } } -/// RAII broadcast guard for the subscriber role. See [`BroadcastStats::subscriber`]. +/// A broadcast-lifetime guard for one `(path, tier, side)`. Records `announced` +/// on construction and `announced_closed` on drop (via its embedded [`Guard`]). +/// Open a track guard under it with [`Self::track`], or skip straight to one via +/// [`BroadcastStats::publisher_track`] / [`subscriber_track`]. +/// +/// Returned by [`BroadcastStats::publisher`] / [`subscriber`]; the side is fixed +/// at construction. #[must_use = "drop the guard to record the broadcast as closed"] -pub struct SubscriberStats { +pub struct BroadcastGuard { entry: Option>, tier: Tier, + side: Side, + _announced: Guard, } -impl SubscriberStats { - /// Open a track-subscription guard. Mirrors [`PublisherStats::track`]. - pub fn track(&self, name: &str) -> SubscriberTrack { - BroadcastStats { - entry: self.entry.clone(), - tier: self.tier, - } - .subscriber_track(name) - } -} - -impl Drop for SubscriberStats { - fn drop(&mut self) { - if let Some(entry) = &self.entry { - // See `PublisherStats::drop` for why this is Release. - entry.subscriber[self.tier.idx()] - .announced_closed - .fetch_add(1, Ordering::Release); - } - } -} - -/// RAII subscription guard for the publisher role. -#[must_use = "drop the guard to record the subscription as closed"] -pub struct PublisherTrack { - entry: Option>, - tier: Tier, -} - -impl PublisherTrack { - /// A [`Meter`] over this track's shared egress payload counters, to attach to - /// the model `TrackSubscriber` (via `set_meter`) so groups/frames/bytes are - /// counted as media is served. No-op for a disabled aggregator. - pub fn meter(&self) -> Meter { - match &self.entry { - Some(entry) => entry.publisher[self.tier.idx()].meter(), - None => Meter::default(), - } - } - - /// Bumps `frames` once. - pub fn frame(&self) { - if let Some(entry) = &self.entry { - entry.publisher[self.tier.idx()].usage.add_frame(); - } - } - - /// Bumps `bytes` by `n`. - pub fn bytes(&self, n: u64) { - if let Some(entry) = &self.entry { - entry.publisher[self.tier.idx()].usage.add_bytes(n); - } - } - - /// Bumps `groups` once. - pub fn group(&self) { - if let Some(entry) = &self.entry { - entry.publisher[self.tier.idx()].usage.add_group(); - } - } -} - -impl Drop for PublisherTrack { - fn drop(&mut self) { - if let Some(entry) = &self.entry { - // See `PublisherStats::drop` for why this is Release. - entry.publisher[self.tier.idx()] - .subscriptions_closed - .fetch_add(1, Ordering::Release); - } +impl BroadcastGuard { + /// Open a track-subscription guard on the same `(path, tier, side)` slot. + pub fn track(&self, name: &str) -> TrackGuard { + track_guard(self.entry.as_deref(), self.tier, self.side, name) } } -/// RAII subscription guard for the subscriber role. +/// A track-subscription guard for one `(path, tier, side)`. Records +/// `subscriptions` on construction and `subscriptions_closed` on drop (via its +/// embedded [`Guard`]), and exposes a [`Meter`] over the slot's shared payload +/// counters for the model to bump (attach it with `TrackProducer::set_meter` / +/// `TrackSubscriber::set_meter`). +/// +/// Returned by [`BroadcastStats::publisher_track`] / [`subscriber_track`] and +/// [`BroadcastGuard::track`]. #[must_use = "drop the guard to record the subscription as closed"] -pub struct SubscriberTrack { - entry: Option>, - tier: Tier, +pub struct TrackGuard { + meter: Meter, + _subscription: Guard, } -impl SubscriberTrack { - /// A [`Meter`] over this track's shared ingress payload counters, to attach to - /// the model `TrackProducer` (via `set_meter`) so groups/frames/bytes are - /// counted as media is produced. No-op for a disabled aggregator. +impl TrackGuard { + /// A [`Meter`] over this track's shared payload counters, for the model to + /// bump as media flows. No-op for a disabled aggregator. pub fn meter(&self) -> Meter { - match &self.entry { - Some(entry) => entry.subscriber[self.tier.idx()].meter(), - None => Meter::default(), - } - } - - /// Bumps `frames` once. - pub fn frame(&self) { - if let Some(entry) = &self.entry { - entry.subscriber[self.tier.idx()].usage.add_frame(); - } - } - - /// Bumps `bytes` by `n`. - pub fn bytes(&self, n: u64) { - if let Some(entry) = &self.entry { - entry.subscriber[self.tier.idx()].usage.add_bytes(n); - } - } - - /// Bumps `groups` once. - pub fn group(&self) { - if let Some(entry) = &self.entry { - entry.subscriber[self.tier.idx()].usage.add_group(); - } + self.meter.clone() } } -impl Drop for SubscriberTrack { - fn drop(&mut self) { - if let Some(entry) = &self.entry { - // See `PublisherStats::drop` for why this is Release. - entry.subscriber[self.tier.idx()] - .subscriptions_closed - .fetch_add(1, Ordering::Release); +/// Build a [`TrackGuard`] for `(entry, tier, side)`: bump `subscriptions` and +/// snapshot the slot's payload [`Meter`]. `name` is unused (counters are +/// per-broadcast); it's kept for call-site symmetry across moq-net. +fn track_guard(entry: Option<&BroadcastEntry>, tier: Tier, side: Side, _name: &str) -> TrackGuard { + match entry { + Some(entry) => { + let counters = side.counters(entry, tier); + TrackGuard { + meter: counters.meter(), + _subscription: counters.subscriptions.open(), + } } + None => TrackGuard { + meter: Meter::default(), + _subscription: Guard::noop(), + }, } } @@ -1304,9 +1204,9 @@ mod tests { let bs1 = stats.tier(Tier::External).broadcast("demo/bbb"); let bs2 = stats.tier(Tier::External).broadcast("demo/ccc"); let g1 = bs1.publisher().track("video"); - g1.bytes(100); + g1.meter().bytes(100); let g2 = bs2.publisher().track("video"); - g2.bytes(7); + g2.meter().bytes(7); let entries = stats.shared().entries.lock(); let e1 = entries.get(&PathOwned::from("demo/bbb")).expect("entry"); @@ -1322,9 +1222,9 @@ mod tests { let int = stats.tier(Tier::Internal); let ext_track = ext.broadcast("demo/bbb").publisher().track("video"); - ext_track.bytes(100); + ext_track.meter().bytes(100); let int_track = int.broadcast("demo/bbb").subscriber().track("audio"); - int_track.bytes(7); + int_track.meter().bytes(7); let entries = stats.shared().entries.lock(); let entry = entries.get(&PathOwned::from("demo/bbb")).expect("entry"); @@ -1343,7 +1243,7 @@ mod tests { assert!(bs.is_empty()); let p = bs.publisher(); let track = p.track("video"); - track.bytes(100); + track.meter().bytes(100); drop(track); drop(p); assert!(stats.shared().entries.lock().is_empty()); @@ -1359,7 +1259,7 @@ mod tests { assert!(bs.is_empty()); let p = bs.publisher(); let track = p.track("video"); - track.bytes(100); + track.meter().bytes(100); drop(track); drop(p); } @@ -1470,8 +1370,8 @@ mod tests { let mut consumer = origin.consume().announced(); let bs = stats.tier(Tier::External).broadcast("foo/bar"); let track = bs.publisher().track("video"); - track.bytes(42); - track.frame(); + track.meter().bytes(42); + track.meter().frame(); let sessions = stats.tier(Tier::External).publisher_broadcasts(); let _sub = sessions.subscribe("foo/bar"); @@ -1535,8 +1435,8 @@ mod tests { let sessions = stats.tier(Tier::External).publisher_broadcasts(); { let track = bs.publisher().track("video"); - track.bytes(123); - track.frame(); + track.meter().bytes(123); + track.meter().frame(); let _sub = sessions.subscribe("foo/bar"); // track + sub dropped here, all within tick 1 } @@ -1734,7 +1634,7 @@ mod tests { let mut consumer = origin.consume().announced(); let bs = stats.tier(Tier::External).broadcast("foo/bar"); let track = bs.publisher().track("video"); - track.frame(); + track.meter().frame(); drive_ticks(2).await; @@ -1773,58 +1673,30 @@ mod tests { } #[test] - fn snapshot_reads_closed_before_open() { - // Reading closed counters before their open counterparts is the - // guarantee that the emitted Snapshot never shows close > open - // under concurrent bumps. This unit-test pins the ordering at the - // source level so a future refactor that re-orders the loads - // trips the test. + fn gauge_snapshot_reads_closed_before_open() { + // Every lifecycle counter (announced / subscriptions / broadcasts / + // sessions) now reads through exactly one place: `Gauge::snapshot`. Pin + // its `closed`-before-`open` ordering at the source level so a reordering + // refactor can't let `closed > open` leak into an emitted frame (which + // would surface downstream as a negative live count). `closed` must be + // loaded with Acquire before `open` is loaded Relaxed. let src = include_str!("stats.rs"); - // Find the body of `impl Counters { fn snapshot(...) ... }` and - // check the line order. let body_start = src - .find("fn snapshot(&self) -> RawCounts") - .expect("snapshot fn present"); + .find("fn snapshot(&self) -> (u64, u64)") + .expect("Gauge::snapshot present"); let body = &src[body_start..]; - let closed_pos = body.find("self.announced_closed.load").expect("announced_closed load"); - let open_pos = body.find("self.announced.load(").expect("announced load"); + let closed_pos = body + .find("self.closed.load(Ordering::Acquire)") + .expect("closed Acquire load"); + let open_pos = body + .find("self.open.load(Ordering::Relaxed)") + .expect("open Relaxed load"); assert!( closed_pos < open_pos, - "announced_closed must be loaded before announced; reversing breaks the open>=closed invariant", - ); - let subs_closed_pos = body - .find("self.subscriptions_closed.load") - .expect("subscriptions_closed load"); - let subs_pos = body.find("self.subscriptions.load").expect("subscriptions load"); - assert!( - subs_closed_pos < subs_pos, - "subscriptions_closed must be loaded before subscriptions", - ); - let bcast_closed_pos = body - .find("self.broadcasts_closed.load") - .expect("broadcasts_closed load"); - let bcast_pos = body.find("self.broadcasts.load").expect("broadcasts load"); - assert!( - bcast_closed_pos < bcast_pos, - "broadcasts_closed must be loaded before broadcasts", + "Gauge::snapshot must load `closed` (Acquire) before `open` (Relaxed); reversing breaks the open>=closed invariant", ); } - #[test] - fn session_snapshot_reads_closed_before_open() { - // Same `closed`-before-`open` invariant as `Counters::snapshot`, pinned - // at the source level so a reordering refactor can't let - // `sessions_closed > sessions` leak into an emitted session frame. - let src = include_str!("stats.rs"); - let body_start = src - .find("fn snapshot(&self) -> (u64, u64)") - .expect("SessionCounters::snapshot fn present"); - let body = &src[body_start..]; - let closed_pos = body.find("self.sessions_closed.load").expect("sessions_closed load"); - let open_pos = body.find("self.sessions.load").expect("sessions load"); - assert!(closed_pos < open_pos, "sessions_closed must be loaded before sessions",); - } - async fn read_frame(mut track: crate::TrackSubscriber) -> BTreeMap { let bytes = track.read_frame().await.expect("ok").expect("frame"); serde_json::from_slice(&bytes).expect("json parse")