feat(moq-net,moq-srt): meter payload usage in the model; bill non-MoQ gateway traffic#1873
feat(moq-net,moq-srt): meter payload usage in the model; bill non-MoQ gateway traffic#1873kixelated wants to merge 4 commits into
Conversation
cbb5bf1 to
e30a9d5
Compare
Move per-track payload counting (bytes/frames/groups) out of the moq-lite and IETF wire handlers and into the transport-agnostic model layer, so any module driving the model types (moq-srt, moq-rtc, moq-edge, ...) records usage uniformly instead of re-instrumenting its own data path. A shared `Usage` struct (three atomics) lives in the stats `Counters` behind an `Arc`; the model carries a cheap `Meter` handle (`Option<Arc<Usage>>`) on TrackProducer -> GroupProducer (ingress) and TrackSubscriber -> GroupConsumer (egress) and bumps it as groups/frames/bytes flow. The stats layer reads the same `Usage` when it snapshots. No trait and no dynamic dispatch: a bump is a direct fetch_add on the shared atomic. Egress is per-consumer, so N viewers of a track count N times; ingress is single-writer and counts once. The per-track `subscriptions` lifecycle and the session gauges (sessions, broadcasts) stay session-side, where they belong. Behaviour is unchanged for moq-lite/IETF: the wire handlers attach a meter once per track instead of bumping inline, and fetch keeps its own counting (fetched groups use random-access reads that aren't model-metered). One nuance: the model counts raw (decompressed) bytes where the handlers counted post-compression wire bytes; in practice only the hang catalog track compresses, so the difference is noise, and a follow-up will cache compressed bytes in the model. Tests: moq-net 391 lib + moq-native 62 e2e pass; two new model tests cover ingress-counted-once and egress-counted-per-viewer. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
e30a9d5 to
c7fb951
Compare
| // groups/frames/bytes as they're served to this subscriber (egress is | ||
| // per-viewer). `track_stats` (the PublisherTrack guard, subscriptions | ||
| // lifecycle) stays owned by this function for the subscription's duration. | ||
| track.set_meter(track_stats.meter()); |
There was a problem hiding this comment.
It needs to be possible to do this before subscribing.
|
|
||
| stream.encode(&msg).await?; | ||
| track_stats.group(); | ||
| // Groups/frames/bytes are counted by the model via the subscriber's meter |
| /// handle to the *same* shared `Usage`, so N subscribers reading the same | ||
| /// cached frame each bump it (matching per-viewer egress). | ||
| #[derive(Clone, Default)] | ||
| pub struct Meter(Option<Arc<Usage>>); |
There was a problem hiding this comment.
Why is this Option? Can we just make it always set; it's only a few atomic operations even if nobody is reading it. IDK just merge Meter/Usage
The per-track meter from the previous commit can't reach the non-MoQ gateways: they hand a broadcast to a moq-mux importer/exporter, which creates tracks and opens subscriptions internally, so there's no TrackProducer/TrackSubscriber the gateway can call set_meter on. Propagate the meter one level up instead. BroadcastProducer::set_meter flows into every create_track/unique_track (ingress); BroadcastConsumer:: set_meter rides track() -> TrackConsumer -> subscribe() -> TrackSubscriber (egress). A gateway attaches one meter where it knows the path + tier, and every track underneath inherits it, including the ones born deep inside a muxer. PublisherStats and SubscriberStats gain meter() so a single broadcast guard does keep-alive, lifecycle, and payload metering together. Wire moq-srt as the proof: Server::with_stats(StatsHandle) (mirroring moq_native's Server) rides each Request; Publish/Subscribe::accept open the ingress/egress guard, thread its meter to the broadcast seam in ts.rs, and hold it for the connection. So an SRT publish/request now records ingress/egress usage even though the TS importer/exporter owns the tracks. The unauthenticated run() stays no-op; stats-enabled embedders use with_stats. moq-rtc/moq-rtmp follow the same pattern as follow-ups. Tests: moq-net 402 lib (3 new model propagation + 1 stats-layer gateway path) plus moq-srt 7 lib; clippy and fmt clean. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…r-track Collapse the six per-track set_meter calls scattered through the lite/IETF wire handlers onto the single broadcast-level mechanism, so there's one way to attach a meter instead of two. The meter now propagates through every track-creation path, not just create_track: - BroadcastProducer -> BroadcastDynamic -> TrackRequest -> accept (live tracks served on demand, the relay's ingress shape). - TrackRequest -> TrackDynamic -> GroupRequest -> accept (the fetch responder). - reserve_track stamps it directly. With that the handlers attach the meter once, where they already have the broadcast and the path/tier: - ingress: BroadcastProducer::set_meter in start_announce / restart_announce (lite) and start_announce (IETF). IETF start_publish now creates its track through the broadcast instead of TrackProducer::new + insert_track. - egress: BroadcastConsumer::set_meter on the per-subscribe consumer in lite/IETF publisher.rs. TrackProducer::set_meter is now pub(crate) (internal propagation only); TrackSubscriber::set_meter and the obsolete GroupProducer::set_meter are removed. Egress still counts per-viewer (each consumer carries its own handle over the shared per-(broadcast, tier, role) Usage); ingress counts once. Tests: moq-net 405 lib (new: dynamic-track, fetch, reserve_track meter inheritance) and moq-native 62 e2e (lite 01-03, IETF 14-18, fetch, compression, negotiation) all pass. clippy + fmt clean. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
| // local to this subscription (per-viewer: N viewers count N times) and the | ||
| // meter rides into the track subscription below. `track_stats` (the | ||
| // PublisherTrack guard, subscriptions lifecycle) stays owned by this function. | ||
| broadcast.set_meter(track_stats.meter()); |
There was a problem hiding this comment.
Is there any way we could call with_meter() before the await?
| hops, | ||
| ..Default::default() | ||
| } | ||
| .produce(); |
There was a problem hiding this comment.
I think .with_meter(self) is better, make it work like a builder.
Should we put the meter in BroadcastInfo maybe??? IDK if that would work, but it's worth thinking about.
| // Bumped once per group here and once per frame/byte in the group; a no-op | ||
| // until a stats sink is attached via [`Self::set_meter`]. Clones share it | ||
| // (parallel producers of one track feed the same sink). | ||
| meter: Meter, |
There was a problem hiding this comment.
I think this needs to be a Vec. So if the track is inserted into multiple broadcasts, it updates all of them?
| pub fn meter(&self) -> Meter { | ||
| match &self.entry { | ||
| Some(entry) => entry.subscriber[self.tier.idx()].meter(), | ||
| None => Meter::default(), |
There was a problem hiding this comment.
looks like an unwrap_or_default to mee
…sts own tracks Addresses review feedback on the metering API: - Merge `Meter` into `Usage` and drop the `Option`. The model now carries a plain `Arc<Usage>` (renamed module meter.rs -> usage.rs) and bumps it unconditionally; an unattached type gets a fresh default `Arc<Usage>` nobody reads, so a bump is a few cheap atomics with no branch. No wrapper type. - Builder `BroadcastProducer::with_meter` / `BroadcastConsumer::with_meter` replace the `set_meter` setters, so the relay/gateways attach in the produce/await chain (e.g. `broadcast?.await?.with_meter(..)`), before any await that could flow data. The model-internal propagation keeps `pub(crate)` setters. - Drop `BroadcastProducer::insert_track`: a track is now created by exactly one broadcast (`create_track` / dynamic `accept` / `reserve_track`), so it has one owner and one meter. This sidesteps the double-counting a `Vec<Arc<Usage>>` would cause for a track shared across broadcasts, mirroring the recent group-ownership tightening on dev. No production code reused tracks across broadcasts; only `assert_insert_track` and the `insert` test did (the latter is now `create_before_consume`). `TrackProducer::new` stays public for now; privatizing it (the ~80-test rewrite) is a separate ownership pass. - The stats guard `meter()` accessors return `Arc<Usage>` via `unwrap_or_default` (the reviewer's suggestion), and two stale "counted by the model" comments are gone. Tests: moq-net 405 lib + moq-native 62 e2e (lite 01-03, IETF 14-18, fetch, compression, negotiation) + moq-srt 7 lib all pass. clippy + fmt clean. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
| } | ||
| .produce(); | ||
| .produce() | ||
| .with_meter(self.stats.broadcast(&abs).subscriber_meter()); |
There was a problem hiding this comment.
is it a problem that we're not using the subscriber handle? This is a little concerning because it might be masking a bug that we're not counting subscribers?
What
Records ingress/egress usage (
bytes/frames/groups) for the non-MoQ gateways (moq-srt, moq-rtc, moq-rtmp), which were blind spots: usage was only counted at the moq-lite / IETFpublisher.rs/subscriber.rswire boundary, so anything a gateway drove through the model directly went uncounted.It does this by making the broadcast the single place a meter is attached, and having the model propagate it down to every track:
create_track, the dynamic handler, fetch,reserve_track). Oneset_meterper broadcast bills the whole thing.set_meterscattered through the data path.1. Payload counting in the model
model/meter.rs: a sharedUsagestruct (three atomics) plus a cheap, cloneableMeter(Option<Arc<Usage>>)handle. No trait, no dynamic dispatch — a bump is a directfetch_add.Countersholds its payload counters as anArc<Usage>and reads them back when it snapshots.create_group/append_group(groups),append_frame(frames + bytes), and the egress duals on the consumer side.2. Broadcast-level propagation
The per-track
set_metercan't reach a gateway's tracks: a gateway hands a broadcast to amoq-muximporter/exporter that creates tracks and opens subscriptions internally (dozens ofbroadcast.track(name).subscribe()/broadcast.unique_track(...)sites). The same is true of the relay, which serves tracks on demand via the dynamic handler. So the meter rides the broadcast and flows down:BroadcastProducer::set_meter->create_track/unique_track, and the dynamic pathBroadcastDynamic -> TrackRequest -> accept, and the fetch pathTrackDynamic -> GroupRequest -> accept, andreserve_track(ingress).BroadcastConsumer::set_meter->track()->TrackConsumer->subscribe()->TrackSubscriber(egress).PublisherStats::meter()/SubscriberStats::meter()/BroadcastStats::subscriber_meter()expose the sink to attach; the broadcast-lifetime guard keeps the counters alive for the snapshot task.TrackProducer::set_meteris nowpub(crate)(internal propagation only);TrackSubscriber::set_meterand an obsoleteGroupProducer::set_meterare removed. The single public way to meter is at the broadcast.3. Both the relay and moq-srt attach at the broadcast
set_metercalls collapse to one broadcast-level attach each: ingress instart_announce/restart_announce, egress on the per-subscribeBroadcastConsumer. IETFstart_publishnow creates its track through the broadcast. The per-trackSubscriberTrack/PublisherTrackguards stay for thesubscriptionslifecycle counter only.Server::with_stats(StatsHandle)(mirrorsmoq_native) rides eachRequest;Publish/Subscribe::acceptopen the ingress/egress guard and thread its meter to the broadcast seam ints.rs. So an SRTm=publish/m=requestrecords usage even thoughts::Import/ts::Exportown the tracks. The unauthenticatedrun()stays no-op; embedders (e.g. moq-pro) usewith_stats. moq-rtc/moq-rtmp follow the same pattern.Semantics
BroadcastConsumerclone carries its own handle over the sameUsage). Ingress is single-writer: counts once. For the relay, ingress (from upstream) and egress (to downstream) are independent roles, so a middle node counts both directions without double-counting either.groups.compress, so the difference is noise; a follow-up will cache compressed bytes in the model so the count is the wire size again.Testing
moq-net: 405 lib tests pass, including new ones for ingress-once, egress-per-viewer, broadcast→track propagation, the no-op default, the stats-layer gateway path, and dynamic-track / fetch / reserve_track meter inheritance.moq-native: 62 e2e tests pass (moq-lite 01-03 + IETF transport 14-18, fetch, compression, negotiation), plus 62 more unit/integration tests across the crate.moq-srt: builds + 7 lib tests pass.fmt --checkclean.Cross-package sync
js/net: not mirrored — the stats aggregator / meter is relay-side Rust only; the browser client has no equivalent. (Nodoc/conceptwire change: this is API, not wire.)Follow-ups
Server::with_stats, attach the broadcast meter at their create/consume seam). moq-edge lives in moq-pro.sessions(presence) /broadcasts(per-viewer) lifecycle counters for the gateways; this PR wiresannounced+ payload.🤖 Generated with Claude Code
(Written by Claude)