Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions doc/bin/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,29 @@ TS export carries H.264 / H.265 as Annex-B and AAC as ADTS. Both in-band
sources work: the parameter sets are read from the bitstream or the catalog
`description` and re-injected as Annex-B on each keyframe.

## Usage Stats

Pass `--stats` to `publish`, `subscribe`, `serve`, or `accept` to print a
live, self-refreshing table of per-track upload/download usage to stderr.
It tracks each track being produced or consumed and rewrites the rates and
totals in place every second (`--stats-interval` to change the cadence):

```bash
moq-cli subscribe --url https://relay.example.com --broadcast my-stream --format ts --stats > /dev/null
```

```
track rate total frames groups
my-stream 21.7 KB/s 156.2 KB 211 8
video 21.5 KB/s 156.0 KB 210 7
catalog.json 0 B/s 162 B 1 1
```

Each broadcast row is the rollup (its total upload when publishing, or download
when subscribing); the indented rows are its individual tracks. The table goes
to stderr, so stdout stays clean for piped media. Lower the log level (e.g.
`RUST_LOG=error`) to keep logs from interleaving with the table.

## Authentication

Pass a JWT token via the URL:
Expand Down
19 changes: 16 additions & 3 deletions doc/bin/relay/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,13 +220,20 @@ counterpart no traffic can flow, so the entry is dropped:
"announced": 1, "announced_closed": 0,
"broadcasts": 1, "broadcasts_closed": 0,
"subscriptions": 5, "subscriptions_closed": 2,
"bytes": 12345, "frames": 678, "groups": 9
"bytes": 12345, "frames": 678, "groups": 9,
"tracks": {
"video": { "bytes": 12000, "frames": 600, "groups": 8 },
"audio": { "bytes": 345, "frames": 78, "groups": 1 }
}
},
"anon/foo": {
"announced": 1, "announced_closed": 0,
"broadcasts": 1, "broadcasts_closed": 0,
"subscriptions": 2, "subscriptions_closed": 0,
"bytes": 234, "frames": 12, "groups": 1
"bytes": 234, "frames": 12, "groups": 1,
"tracks": {
"catalog.json": { "bytes": 234, "frames": 12, "groups": 1 }
}
}
}
```
Expand All @@ -246,7 +253,13 @@ Field semantics:
- `subscriptions` / `subscriptions_closed`: cumulative count of
track-level subscription guards opened and dropped.
- `bytes` / `frames` / `groups`: cumulative payload counters from the
session loops (both the `moq-lite` and IETF `moq-transport` paths).
session loops (both the `moq-lite` and IETF `moq-transport` paths). These
are the broadcast-level rollup: the sum across every track, maintained
incrementally so reading a total never iterates the per-track map.
- `tracks`: per-track breakdown of the `bytes` / `frames` / `groups` rollup,
keyed by track name. A track appears while it has an open subscription and
is dropped once it stops flowing; its payload stays counted in the rollup
totals above. Use this to see which tracks within a broadcast are active.

The session tracks (`sessions.json`, `internal/sessions.json`) instead map
each auth root to a `{ sessions, sessions_closed }` snapshot. `sessions`
Expand Down
17 changes: 15 additions & 2 deletions rs/moq-cli/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,28 @@
use crate::Publish;
use crate::{Publish, StatsArgs, run_stats};

use anyhow::Context;
use hang::moq_net;
use url::Url;

pub async fn run_client(client: moq_native::Client, url: Url, name: String, publish: Publish) -> anyhow::Result<()> {
pub async fn run_client(
client: moq_native::Client,
url: Url,
name: String,
publish: Publish,
stats: StatsArgs,
) -> anyhow::Result<()> {
// Create an origin producer to publish to the broadcast.
let origin = moq_net::Origin::random().produce();
let _publish = origin
.publish_broadcast(&name, publish.consume())
.context("failed to publish broadcast")?;

let stats_agg = stats.build();
let client = match &stats_agg {
Some(agg) => client.with_stats(agg.tier(moq_net::Tier::External)),
None => client,
};

tracing::info!(%url, %name, "connecting");

let reconnect = client.with_publisher(origin.clone()).reconnect(url);
Expand All @@ -22,6 +34,7 @@ pub async fn run_client(client: moq_native::Client, url: Url, name: String, publ
tokio::select! {
res = publish.run() => res,
res = reconnect.closed() => Ok(res?),
res = run_stats(stats_agg, stats.interval) => res,
_ = tokio::signal::ctrl_c() => Ok(()),
}
}
44 changes: 42 additions & 2 deletions rs/moq-cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
mod client;
mod publish;
mod server;
mod stats;
mod subscribe;
mod web;

use client::*;
use hang::moq_net;
use publish::*;
use server::*;
use stats::*;
use subscribe::*;
use web::*;

Expand Down Expand Up @@ -45,6 +47,9 @@ pub enum Command {
#[arg(long)]
dir: Option<PathBuf>,

#[command(flatten)]
stats: StatsArgs,

/// The format of the input media.
#[command(subcommand)]
format: PublishFormat,
Expand All @@ -62,6 +67,9 @@ pub enum Command {
#[arg(long)]
dir: Option<PathBuf>,

#[command(flatten)]
stats: StatsArgs,

#[command(flatten)]
args: SubscribeArgs,
},
Expand All @@ -88,6 +96,9 @@ pub enum Command {
#[arg(long, alias = "name")]
broadcast: String,

#[command(flatten)]
stats: StatsArgs,

/// The format of the input media.
#[command(subcommand)]
format: PublishFormat,
Expand All @@ -106,6 +117,9 @@ pub enum Command {
#[arg(long, alias = "name")]
broadcast: String,

#[command(flatten)]
stats: StatsArgs,

#[command(flatten)]
args: SubscribeArgs,
},
Expand All @@ -130,6 +144,7 @@ async fn main() -> anyhow::Result<()> {
config,
dir,
broadcast,
stats,
format,
} => {
warn_if_missing_format(&broadcast);
Expand All @@ -140,18 +155,26 @@ async fn main() -> anyhow::Result<()> {
#[cfg(feature = "iroh")]
let server = server.with_iroh(iroh);

let stats_agg = stats.build();
let server = match &stats_agg {
Some(agg) => server.with_stats(agg.tier(moq_net::Tier::External)),
None => server,
};

let web_tls = server.tls_info();

tokio::select! {
res = run_server(server, broadcast, publish.consume()) => res,
res = run_web(&web_bind, web_tls, dir) => res,
res = publish.run() => res,
res = run_stats(stats_agg, stats.interval) => res,
}
}
Command::Accept {
config,
broadcast,
dir,
stats,
args,
} => {
let web_bind = config.bind.clone().unwrap_or_else(|| "[::]:443".to_string());
Expand All @@ -160,6 +183,12 @@ async fn main() -> anyhow::Result<()> {
#[cfg(feature = "iroh")]
let server = server.with_iroh(iroh);

let stats_agg = stats.build();
let server = match &stats_agg {
Some(agg) => server.with_stats(agg.tier(moq_net::Tier::External)),
None => server,
};

let web_tls = server.tls_info();

let origin = moq_net::Origin::random().produce();
Expand All @@ -169,13 +198,15 @@ async fn main() -> anyhow::Result<()> {
res = run_accept(server, origin) => res,
res = run_web(&web_bind, web_tls, dir) => res,
res = run_announced_subscribe(consumer, broadcast, args) => res,
res = run_stats(stats_agg, stats.interval) => res,
_ = tokio::signal::ctrl_c() => Ok(()),
}
}
Command::Publish {
config,
url,
broadcast,
stats,
format,
} => {
warn_if_missing_format(&broadcast);
Expand All @@ -185,20 +216,21 @@ async fn main() -> anyhow::Result<()> {
#[cfg(feature = "iroh")]
let client = client.with_iroh(iroh);

run_client(client, url, broadcast, publish).await
run_client(client, url, broadcast, publish, stats).await
}
Command::Subscribe {
config,
url,
broadcast,
stats,
args,
} => {
let client = config.init()?;

#[cfg(feature = "iroh")]
let client = client.with_iroh(iroh);

run_subscribe(client, url, broadcast, args).await
run_subscribe(client, url, broadcast, args, stats).await
}
}
}
Expand All @@ -217,10 +249,17 @@ async fn run_subscribe(
url: Url,
broadcast: String,
args: SubscribeArgs,
stats: StatsArgs,
) -> anyhow::Result<()> {
let origin = moq_net::Origin::random().produce();
let consumer = origin.consume();

let stats_agg = stats.build();
let client = match &stats_agg {
Some(agg) => client.with_stats(agg.tier(moq_net::Tier::External)),
None => client,
};

tracing::info!(%url, %broadcast, "connecting");

let reconnect = client.with_consumer(origin).reconnect(url);
Expand All @@ -231,6 +270,7 @@ async fn run_subscribe(
tokio::select! {
res = run_announced_subscribe(consumer, broadcast, args) => res,
res = reconnect.closed() => Ok(res?),
res = run_stats(stats_agg, stats.interval) => res,
_ = tokio::signal::ctrl_c() => Ok(()),
}
}
Expand Down
Loading
Loading