Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions rs/moq-net/src/ietf/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl<S: web_transport_trait::Session> Publisher<S> {

// Attach the publisher-side payload meter so the model counts
// groups/frames/bytes as they're served to this subscriber (egress is
// per-viewer). `track_stats` (the PublisherTrack guard, subscriptions
// per-viewer). `track_stats` (the track guard, subscriptions
// lifecycle) stays owned by this function for the subscription's duration.
track.set_meter(track_stats.meter());

Expand Down Expand Up @@ -483,7 +483,7 @@ impl<S: web_transport_trait::Session> Publisher<S> {
// Each accepted namespace holds a `publisher()` announce guard (bumps
// `announced` / `announced_closed`) alongside its stream, so dropping the
// tuple on unannounce or cleanup records the close.
let mut namespace_streams: HashMap<crate::PathOwned, (RequestId, Stream<S, Version>, crate::PublisherStats)> =
let mut namespace_streams: HashMap<crate::PathOwned, (RequestId, Stream<S, Version>, crate::BroadcastGuard)> =
HashMap::new();
let mut announced = self.origin.announced();

Expand Down Expand Up @@ -528,7 +528,7 @@ impl<S: web_transport_trait::Session> Publisher<S> {
async fn announce_namespace(
&self,
suffix: crate::PathOwned,
namespace_streams: &mut HashMap<crate::PathOwned, (RequestId, Stream<S, Version>, crate::PublisherStats)>,
namespace_streams: &mut HashMap<crate::PathOwned, (RequestId, Stream<S, Version>, crate::BroadcastGuard)>,
) -> Result<(), Error> {
let absolute = self.origin.absolute(&suffix).to_owned();
tracing::debug!(broadcast = %absolute, "announce");
Expand Down Expand Up @@ -582,7 +582,7 @@ impl<S: web_transport_trait::Session> Publisher<S> {
async fn unannounce_namespace(
&self,
suffix: &crate::PathOwned,
namespace_streams: &mut HashMap<crate::PathOwned, (RequestId, Stream<S, Version>, crate::PublisherStats)>,
namespace_streams: &mut HashMap<crate::PathOwned, (RequestId, Stream<S, Version>, crate::BroadcastGuard)>,
) {
tracing::debug!(broadcast = %self.origin.absolute(suffix), "unannounce");
// Dropping `_stats` on removal records the announce close.
Expand Down
12 changes: 6 additions & 6 deletions rs/moq-net/src/ietf/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::collections::{HashMap, hash_map::Entry};
use std::sync::Arc;

use crate::{
BroadcastDynamic, BroadcastInfo, Error, Frame, FrameProducer, Group, GroupProducer, MAX_FRAME_SIZE, OriginProducer,
OriginPublish, Path, PathOwned, StatsHandle, SubscriberStats, SubscriberTrack, TrackProducer, TrackRequest,
BroadcastDynamic, BroadcastGuard, BroadcastInfo, Error, Frame, FrameProducer, Group, GroupProducer, MAX_FRAME_SIZE,
OriginProducer, OriginPublish, Path, PathOwned, StatsHandle, TrackGuard, TrackProducer, TrackRequest,
coding::{Reader, Stream},
ietf::{self, Control, FilterType, GroupOrder, RequestId},
model::BroadcastProducer,
Expand Down Expand Up @@ -34,9 +34,9 @@ struct TrackState {
/// created), so the model records groups/frames/bytes as data is produced into it.
producer: TrackProducer,
alias: Option<u64>,
/// The `SubscriberTrack` guard (subscriptions lifecycle), held for the entry's
/// life; records `subscriptions_closed` when the entry is dropped.
_stats: Arc<SubscriberTrack>,
/// The track guard (subscriptions lifecycle), held for the entry's life;
/// records `subscriptions_closed` when the entry is dropped.
_stats: Arc<TrackGuard>,
}

struct BroadcastState {
Expand All @@ -52,7 +52,7 @@ struct BroadcastState {

/// Subscriber-side announce guard (bumps `announced` / `announced_closed`),
/// held for as long as the broadcast is announced into our origin.
_stats: SubscriberStats,
_stats: BroadcastGuard,
}

#[derive(Clone)]
Expand Down
33 changes: 19 additions & 14 deletions rs/moq-net/src/lite/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ impl<S: web_transport_trait::Session> Publisher<S> {
// Per-path stats guards: dropping the guard records `broadcasts_closed`.
// The origin contract guarantees announce/unannounce toggles per path, so a
// new active announcement must always be for a path with no live guard.
let mut stats_guards: std::collections::HashMap<crate::PathOwned, crate::PublisherStats> =
let mut stats_guards: std::collections::HashMap<crate::PathOwned, crate::BroadcastGuard> =
std::collections::HashMap::new();

match version {
Expand Down Expand Up @@ -515,7 +515,7 @@ impl<S: web_transport_trait::Session> Publisher<S> {
// The track guard (bumps `subscriptions`), the per-session broadcast
// tracker, and the broadcast path. The `broadcasts` sentinel is taken
// below, after the subscription is validated, and held for its lifetime.
stats: (crate::PublisherTrack, crate::SessionBroadcasts, crate::PathOwned),
stats: (crate::TrackGuard, crate::SessionBroadcasts, crate::PathOwned),
version: Version,
) -> Result<(), Error> {
let (track_stats, broadcasts, absolute) = stats;
Expand All @@ -535,7 +535,7 @@ impl<S: web_transport_trait::Session> Publisher<S> {
// Attach the publisher-side payload meter so the model counts
// groups/frames/bytes as they're served to this subscriber (egress is
// per-subscriber, so N viewers of one track count N times). `track_stats`
// (the PublisherTrack guard, subscriptions lifecycle) stays owned by this
// (the track guard, subscriptions lifecycle) stays owned by this
// function for the subscription's duration.
track.set_meter(track_stats.meter());

Expand Down Expand Up @@ -633,9 +633,14 @@ impl<S: web_transport_trait::Session> Publisher<S> {
// `request_broadcast` resolves it immediately, or falls back to an `OriginDynamic`
// handler (as in recv_subscribe).
let broadcast = self.origin.request_broadcast(&fetch.broadcast);
let track_stats = self.stats.broadcast(&absolute).publisher_track(&track);

if let Err(err) = Self::run_fetch(&mut stream, &fetch, broadcast, track_stats, self.version).await {
// The track guard records the fetch as a subscription for its lifetime; the
// Meter over the same shared counters carries the payload counts. Fetch reads
// random-access frames (`get_frame`), which the model doesn't auto-meter, so
// run_fetch bumps the Meter itself. `_fetch_guard` is held until run_fetch returns.
let _fetch_guard = self.stats.broadcast(&absolute).publisher_track(&track);
let meter = _fetch_guard.meter();

if let Err(err) = Self::run_fetch(&mut stream, &fetch, broadcast, meter, self.version).await {
match &err {
Error::Cancel | Error::Transport(_) => {
tracing::info!(broadcast = %absolute, %track, %group, "fetch cancelled")
Expand All @@ -654,7 +659,7 @@ impl<S: web_transport_trait::Session> Publisher<S> {
stream: &mut Stream<S, Version>,
fetch: &lite::Fetch<'_>,
broadcast: Result<kio::Pending<crate::BroadcastRequested>, Error>,
track_stats: crate::PublisherTrack,
meter: crate::Meter,
version: Version,
) -> Result<(), Error> {
let broadcast = broadcast?.await?;
Expand Down Expand Up @@ -688,7 +693,7 @@ impl<S: web_transport_trait::Session> Publisher<S> {

// Lite05+ FETCH responds with bare FRAME messages; the subscriber already has
// the codec/timescale from TRACK_INFO and the group sequence from its request.
track_stats.group();
meter.group();

// Honor frame_start: skip earlier frames, then stream the rest in order. The
// delta-timestamp baseline resets to 0, so the first served frame's delta is
Expand All @@ -702,7 +707,7 @@ impl<S: web_transport_trait::Session> Publisher<S> {
compression,
timescale,
&mut prev_ts,
&track_stats,
&meter,
)
.await?;
index += 1;
Expand Down Expand Up @@ -768,28 +773,28 @@ async fn write_fetch_frame<W: web_transport_trait::SendStream>(
compression: Compression,
timescale: Option<crate::Timescale>,
prev_ts: &mut u64,
track_stats: &crate::PublisherTrack,
meter: &crate::Meter,
) -> Result<(), Error> {
encode_frame_timing(writer, frame, timescale, prev_ts).await?;

match compression {
Compression::None => {
writer.encode(&frame.size).await?;
track_stats.frame();
meter.frame();
while let Some(mut chunk) = frame.read_chunk().await? {
let n = chunk.len() as u64;
writer.write_all(&mut chunk).await?;
track_stats.bytes(n);
meter.bytes(n);
}
}
compression => {
let payload = frame.read_all().await?;
let mut chunk = bytes::Bytes::from(compression.compress(&payload));
let n = chunk.len() as u64;
writer.encode(&n).await?;
track_stats.frame();
meter.frame();
writer.write_all(&mut chunk).await?;
track_stats.bytes(n);
meter.bytes(n);
}
}

Expand Down
12 changes: 6 additions & 6 deletions rs/moq-net/src/lite/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use futures::{StreamExt, stream::FuturesUnordered};
use crate::util::{MaybeBoxedExt, MaybeSendBox};

use crate::{
AsPath, BandwidthProducer, BroadcastDynamic, BroadcastInfo, Compression, Error, Frame, FrameProducer, Group,
GroupProducer, GroupRequest, MAX_FRAME_SIZE, OriginProducer, OriginPublish, Path, PathOwned, StatsHandle,
SubscriberStats, SubscriberTrack, Subscription, Timescale, Timestamp, TrackInfo, TrackProducer, TrackRequest,
AsPath, BandwidthProducer, BroadcastDynamic, BroadcastGuard, BroadcastInfo, Compression, Error, Frame,
FrameProducer, Group, GroupProducer, GroupRequest, MAX_FRAME_SIZE, OriginProducer, OriginPublish, Path, PathOwned,
StatsHandle, Subscription, Timescale, Timestamp, TrackGuard, TrackInfo, TrackProducer, TrackRequest,
coding::{Reader, Stream},
lite,
};
Expand Down Expand Up @@ -197,7 +197,7 @@ impl<S: web_transport_trait::Session> Subscriber<S> {
// `subscriber.broadcasts_closed`. We only insert a guard when start_announce
// actually accepted the announcement (it may drop reflected loops), so the
// guard set tracks `producers` exactly.
let mut stats_guards: HashMap<PathOwned, SubscriberStats> = HashMap::new();
let mut stats_guards: HashMap<PathOwned, BroadcastGuard> = HashMap::new();

// Stats keys are absolute paths (matching the publisher side) so the
// fanned-out level keys line up with the absolute broadcast paths a
Expand Down Expand Up @@ -752,7 +752,7 @@ struct TrackServe<S: web_transport_trait::Session> {
subscriber: Subscriber<S>,
path: PathOwned,
broadcast: BroadcastDynamic,
track_stats: Arc<SubscriberTrack>,
track_stats: Arc<TrackGuard>,
name: String,
}

Expand Down Expand Up @@ -1102,7 +1102,7 @@ impl<S: web_transport_trait::Session> TrackServe<S> {

// Attach the subscriber-side payload meter so the model counts
// groups/frames/bytes as they're produced into this track, regardless of
// transport. The SubscriberTrack guard (subscriptions lifecycle) stays held
// transport. The track guard (subscriptions lifecycle) stays held
// by this serve task in `self.track_stats`.
producer.set_meter(self.track_stats.meter());

Expand Down
Loading
Loading