Skip to content

feat(moq-net,moq-srt): meter payload usage in the model; bill non-MoQ gateway traffic#1873

Open
kixelated wants to merge 4 commits into
devfrom
stats-in-model
Open

feat(moq-net,moq-srt): meter payload usage in the model; bill non-MoQ gateway traffic#1873
kixelated wants to merge 4 commits into
devfrom
stats-in-model

Conversation

@kixelated

@kixelated kixelated commented Jun 22, 2026

Copy link
Copy Markdown
Collaborator

What

Records ingress/egress usage (bytes / frames / groups) for the non-MoQ gateways (moq-srt, moq-rtc, moq-rtmp), which were blind spots: usage was only counted at the moq-lite / IETF publisher.rs / subscriber.rs wire boundary, so anything a gateway drove through the model directly went uncounted.

It does this by making the broadcast the single place a meter is attached, and having the model propagate it down to every track:

  1. Move per-track payload counting into the transport-agnostic model.
  2. Attach the meter at the broadcast level and propagate it through every track-creation path (create_track, the dynamic handler, fetch, reserve_track). One set_meter per broadcast bills the whole thing.
  3. Use that one mechanism everywhere — the lite/IETF relay handlers and the moq-srt gateway both attach at the broadcast, so there's no per-track set_meter scattered through the data path.

1. Payload counting in the model

  • New model/meter.rs: a shared Usage struct (three atomics) plus a cheap, cloneable Meter(Option<Arc<Usage>>) handle. No trait, no dynamic dispatch — a bump is a direct fetch_add.
  • The stats Counters holds its payload counters as an Arc<Usage> and reads them back when it snapshots.
  • The model bumps the meter at the real data-flow points: create_group/append_group (groups), append_frame (frames + bytes), and the egress duals on the consumer side.

2. Broadcast-level propagation

The per-track set_meter can't reach a gateway's tracks: a gateway hands a broadcast to a moq-mux importer/exporter that creates tracks and opens subscriptions internally (dozens of broadcast.track(name).subscribe() / broadcast.unique_track(...) sites). The same is true of the relay, which serves tracks on demand via the dynamic handler. So the meter rides the broadcast and flows down:

  • BroadcastProducer::set_meter -> create_track / unique_track, and the dynamic path BroadcastDynamic -> TrackRequest -> accept, and the fetch path TrackDynamic -> GroupRequest -> accept, and reserve_track (ingress).
  • BroadcastConsumer::set_meter -> track() -> TrackConsumer -> subscribe() -> TrackSubscriber (egress).
  • PublisherStats::meter() / SubscriberStats::meter() / BroadcastStats::subscriber_meter() expose the sink to attach; the broadcast-lifetime guard keeps the counters alive for the snapshot task.

TrackProducer::set_meter is now pub(crate) (internal propagation only); TrackSubscriber::set_meter and an obsolete GroupProducer::set_meter are removed. The single public way to meter is at the broadcast.

3. Both the relay and moq-srt attach at the broadcast

  • lite/IETF relay: the six former per-track set_meter calls collapse to one broadcast-level attach each: ingress in start_announce / restart_announce, egress on the per-subscribe BroadcastConsumer. IETF start_publish now creates its track through the broadcast. The per-track SubscriberTrack/PublisherTrack guards stay for the subscriptions lifecycle counter only.
  • moq-srt (the proof for the gateways): Server::with_stats(StatsHandle) (mirrors moq_native) rides each Request; Publish/Subscribe::accept open the ingress/egress guard and thread its meter to the broadcast seam in ts.rs. So an SRT m=publish / m=request records usage even though ts::Import / ts::Export own the tracks. The unauthenticated run() stays no-op; embedders (e.g. moq-pro) use with_stats. moq-rtc/moq-rtmp follow the same pattern.

Semantics

  • Egress is per-consumer: N viewers of a track count N times (each BroadcastConsumer clone carries its own handle over the same Usage). Ingress is single-writer: counts once. For the relay, ingress (from upstream) and egress (to downstream) are independent roles, so a middle node counts both directions without double-counting either.
  • The lite fetch path is now metered through the model (the fetched group's frames/bytes), where before it counted separately. A fetch re-serves an existing group, so it bumps frames/bytes but not groups.
  • The model counts raw (decompressed) bytes where the handlers counted post-compression wire bytes. In practice only the hang catalog track sets compress, so the difference is noise; a follow-up will cache compressed bytes in the model so the count is the wire size again.

Testing

  • moq-net: 405 lib tests pass, including new ones for ingress-once, egress-per-viewer, broadcast→track propagation, the no-op default, the stats-layer gateway path, and dynamic-track / fetch / reserve_track meter inheritance.
  • moq-native: 62 e2e tests pass (moq-lite 01-03 + IETF transport 14-18, fetch, compression, negotiation), plus 62 more unit/integration tests across the crate.
  • moq-srt: builds + 7 lib tests pass.
  • clippy + fmt --check clean.

Note: validated with the repo's rustc 1.96 (the earlier "rustc 1.91" note was a stale toolchain); moq-srt fmt/clippy ran via system cargo after a disk-pressure event broke nix's pty mid-session. CI's nix run is the source of truth.

Cross-package sync

  • js/net: not mirrored — the stats aggregator / meter is relay-side Rust only; the browser client has no equivalent. (No doc/concept wire change: this is API, not wire.)

Follow-ups

  • Wire moq-rtc (WHIP/WHEP) and moq-rtmp the same way (add Server::with_stats, attach the broadcast meter at their create/consume seam). moq-edge lives in moq-pro.
  • Optionally thread sessions (presence) / broadcasts (per-viewer) lifecycle counters for the gateways; this PR wires announced + payload.
  • Cache compressed bytes in the model so the byte count is the wire size for compressed tracks.

🤖 Generated with Claude Code

(Written by Claude)

Move per-track payload counting (bytes/frames/groups) out of the moq-lite
and IETF wire handlers and into the transport-agnostic model layer, so any
module driving the model types (moq-srt, moq-rtc, moq-edge, ...) records
usage uniformly instead of re-instrumenting its own data path.

A shared `Usage` struct (three atomics) lives in the stats `Counters` behind
an `Arc`; the model carries a cheap `Meter` handle (`Option<Arc<Usage>>`) on
TrackProducer -> GroupProducer (ingress) and TrackSubscriber -> GroupConsumer
(egress) and bumps it as groups/frames/bytes flow. The stats layer reads the
same `Usage` when it snapshots. No trait and no dynamic dispatch: a bump is a
direct fetch_add on the shared atomic.

Egress is per-consumer, so N viewers of a track count N times; ingress is
single-writer and counts once. The per-track `subscriptions` lifecycle and the
session gauges (sessions, broadcasts) stay session-side, where they belong.

Behaviour is unchanged for moq-lite/IETF: the wire handlers attach a meter once
per track instead of bumping inline, and fetch keeps its own counting (fetched
groups use random-access reads that aren't model-metered). One nuance: the model
counts raw (decompressed) bytes where the handlers counted post-compression wire
bytes; in practice only the hang catalog track compresses, so the difference is
noise, and a follow-up will cache compressed bytes in the model.

Tests: moq-net 391 lib + moq-native 62 e2e pass; two new model tests cover
ingress-counted-once and egress-counted-per-viewer.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Comment thread rs/moq-net/src/ietf/publisher.rs Outdated
// groups/frames/bytes as they're served to this subscriber (egress is
// per-viewer). `track_stats` (the PublisherTrack guard, subscriptions
// lifecycle) stays owned by this function for the subscription's duration.
track.set_meter(track_stats.meter());

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs to be possible to do this before subscribing.

Comment thread rs/moq-net/src/ietf/publisher.rs Outdated

stream.encode(&msg).await?;
track_stats.group();
// Groups/frames/bytes are counted by the model via the subscriber's meter

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useless comment

Comment thread rs/moq-net/src/model/meter.rs Outdated
/// handle to the *same* shared `Usage`, so N subscribers reading the same
/// cached frame each bump it (matching per-viewer egress).
#[derive(Clone, Default)]
pub struct Meter(Option<Arc<Usage>>);

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this Option? Can we just make it always set; it's only a few atomic operations even if nobody is reading it. IDK just merge Meter/Usage

The per-track meter from the previous commit can't reach the non-MoQ
gateways: they hand a broadcast to a moq-mux importer/exporter, which
creates tracks and opens subscriptions internally, so there's no
TrackProducer/TrackSubscriber the gateway can call set_meter on.

Propagate the meter one level up instead. BroadcastProducer::set_meter
flows into every create_track/unique_track (ingress); BroadcastConsumer::
set_meter rides track() -> TrackConsumer -> subscribe() -> TrackSubscriber
(egress). A gateway attaches one meter where it knows the path + tier, and
every track underneath inherits it, including the ones born deep inside a
muxer. PublisherStats and SubscriberStats gain meter() so a single broadcast
guard does keep-alive, lifecycle, and payload metering together.

Wire moq-srt as the proof: Server::with_stats(StatsHandle) (mirroring
moq_native's Server) rides each Request; Publish/Subscribe::accept open the
ingress/egress guard, thread its meter to the broadcast seam in ts.rs, and
hold it for the connection. So an SRT publish/request now records
ingress/egress usage even though the TS importer/exporter owns the tracks.
The unauthenticated run() stays no-op; stats-enabled embedders use
with_stats. moq-rtc/moq-rtmp follow the same pattern as follow-ups.

Tests: moq-net 402 lib (3 new model propagation + 1 stats-layer gateway
path) plus moq-srt 7 lib; clippy and fmt clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@kixelated kixelated changed the title feat(moq-net): meter payload usage in the model Producer/Consumer feat(moq-net,moq-srt): meter payload usage in the model; bill non-MoQ gateway traffic Jun 23, 2026
…r-track

Collapse the six per-track set_meter calls scattered through the lite/IETF wire
handlers onto the single broadcast-level mechanism, so there's one way to attach
a meter instead of two.

The meter now propagates through every track-creation path, not just
create_track:
- BroadcastProducer -> BroadcastDynamic -> TrackRequest -> accept (live tracks
  served on demand, the relay's ingress shape).
- TrackRequest -> TrackDynamic -> GroupRequest -> accept (the fetch responder).
- reserve_track stamps it directly.

With that the handlers attach the meter once, where they already have the
broadcast and the path/tier:
- ingress: BroadcastProducer::set_meter in start_announce / restart_announce
  (lite) and start_announce (IETF). IETF start_publish now creates its track
  through the broadcast instead of TrackProducer::new + insert_track.
- egress: BroadcastConsumer::set_meter on the per-subscribe consumer in
  lite/IETF publisher.rs.

TrackProducer::set_meter is now pub(crate) (internal propagation only);
TrackSubscriber::set_meter and the obsolete GroupProducer::set_meter are
removed. Egress still counts per-viewer (each consumer carries its own handle
over the shared per-(broadcast, tier, role) Usage); ingress counts once.

Tests: moq-net 405 lib (new: dynamic-track, fetch, reserve_track meter
inheritance) and moq-native 62 e2e (lite 01-03, IETF 14-18, fetch, compression,
negotiation) all pass. clippy + fmt clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Comment thread rs/moq-net/src/ietf/publisher.rs Outdated
// local to this subscription (per-viewer: N viewers count N times) and the
// meter rides into the track subscription below. `track_stats` (the
// PublisherTrack guard, subscriptions lifecycle) stays owned by this function.
broadcast.set_meter(track_stats.meter());

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way we could call with_meter() before the await?

Comment thread rs/moq-net/src/ietf/subscriber.rs Outdated
hops,
..Default::default()
}
.produce();

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think .with_meter(self) is better, make it work like a builder.

Should we put the meter in BroadcastInfo maybe??? IDK if that would work, but it's worth thinking about.

Comment thread rs/moq-net/src/model/track.rs Outdated
// Bumped once per group here and once per frame/byte in the group; a no-op
// until a stats sink is attached via [`Self::set_meter`]. Clones share it
// (parallel producers of one track feed the same sink).
meter: Meter,

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be a Vec. So if the track is inserted into multiple broadcasts, it updates all of them?

Comment thread rs/moq-net/src/stats.rs Outdated
pub fn meter(&self) -> Meter {
match &self.entry {
Some(entry) => entry.subscriber[self.tier.idx()].meter(),
None => Meter::default(),

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like an unwrap_or_default to mee

…sts own tracks

Addresses review feedback on the metering API:

- Merge `Meter` into `Usage` and drop the `Option`. The model now carries a
  plain `Arc<Usage>` (renamed module meter.rs -> usage.rs) and bumps it
  unconditionally; an unattached type gets a fresh default `Arc<Usage>` nobody
  reads, so a bump is a few cheap atomics with no branch. No wrapper type.

- Builder `BroadcastProducer::with_meter` / `BroadcastConsumer::with_meter`
  replace the `set_meter` setters, so the relay/gateways attach in the
  produce/await chain (e.g. `broadcast?.await?.with_meter(..)`), before any
  await that could flow data. The model-internal propagation keeps `pub(crate)`
  setters.

- Drop `BroadcastProducer::insert_track`: a track is now created by exactly one
  broadcast (`create_track` / dynamic `accept` / `reserve_track`), so it has one
  owner and one meter. This sidesteps the double-counting a `Vec<Arc<Usage>>`
  would cause for a track shared across broadcasts, mirroring the recent
  group-ownership tightening on dev. No production code reused tracks across
  broadcasts; only `assert_insert_track` and the `insert` test did (the latter
  is now `create_before_consume`). `TrackProducer::new` stays public for now;
  privatizing it (the ~80-test rewrite) is a separate ownership pass.

- The stats guard `meter()` accessors return `Arc<Usage>` via `unwrap_or_default`
  (the reviewer's suggestion), and two stale "counted by the model" comments are
  gone.

Tests: moq-net 405 lib + moq-native 62 e2e (lite 01-03, IETF 14-18, fetch,
compression, negotiation) + moq-srt 7 lib all pass. clippy + fmt clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
}
.produce();
.produce()
.with_meter(self.stats.broadcast(&abs).subscriber_meter());

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it a problem that we're not using the subscriber handle? This is a little concerning because it might be masking a bug that we're not counting subscribers?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant