Skip to content

feat(moq-net): rework payload compression, kept compressed in RAM#1889

Open
kixelated wants to merge 2 commits into
devfrom
claude/charming-herschel-994993
Open

feat(moq-net): rework payload compression, kept compressed in RAM#1889
kixelated wants to merge 2 commits into
devfrom
claude/charming-herschel-994993

Conversation

@kixelated

Copy link
Copy Markdown
Collaborator

Implements moq-dev/drafts#39 (moq-lite payload compression) on lite-05, with one wire refinement folded in (algorithm explicit on the wire rather than flagless — the draft is being updated to match).

What changed

Codec (model/compression.rs)

  • Compression { Deflate, Zstd } — deflate is the mandatory baseline, zstd optional (new zstd dep, experimental feature for magicless frames).
  • Group-scoped streaming Encoder/Decoder: one stream per group, sliced per frame. deflate uses raw DEFLATE with the RFC 7692 sync-flush trim; zstd uses magicless, checksum-less frames. The decoder enforces a cumulative per-group decompressed bound (decompression-bomb guard).

Negotiation + wire

  • New SETUP Compression parameter (0x3) advertises the algorithms an endpoint can decode, default [Zstd, Deflate]. Negotiated per hop.
  • TRACK_INFO carries compression: Option<Compression> — the algorithm actually used, which must be one the receiver advertised (else a protocol violation).

Model — payloads stay compressed in RAM

  • Cooked API compresses on write / decompresses on read, so origins and relays cache compressed bytes:
    • GroupProducer::write_frame(payload) / write_frame_at(ts, payload) compress whole-frame into the per-group stream.
    • GroupConsumer::read_frame / next_frame decompress (per-consumer decoder).
  • Raw API is the size-upfront streaming path a relay uses to forward compressed frames without inflating:
    • FrameProducer / FrameConsumer + GroupProducer::create_frame(size) / append_frame store/stream bytes verbatim.
    • GroupConsumer::read_frame_raw / next_frame_raw pass the stored slice through untouched.
  • The egress (lite/publisher.rs) passes through to a hop that speaks the cached algorithm (zero CPU) and transcodes/decompresses otherwise.

Public API (all on dev, breaking OK)

  • Compression: now { Deflate, Zstd } (no None variant — absence is Option<Compression>); adds encoder()/decoder()/select() + Encoder/Decoder; removes the old whole-payload compress/decompress. Breaking.
  • lite::Setup gains compression: Vec<Compression>; lite::TrackInfo.compression is now Option<Compression> (was an always-present algorithm) with validation. Breaking (wire + type).
  • model TrackInfo: compress: boolcompression: Option<Compression> (with_compress(bool) kept; adds with_compression). Breaking (field), ergonomic builder preserved.
  • GroupProducer::write_frame_at; GroupConsumer::compression() / read_frame_raw / next_frame_raw. Additive (default behavior on uncompressed tracks unchanged).

Cross-package sync

Per the table, rs/moq-net wire/API changes pair with js/net and doc/concept. Those mirrors are a deliberate follow-up — lite-05 is still WIP, so the JS side can lag briefly. No other rows apply.

Test plan

  • cargo test -p moq-net — 406 tests (codec round-trips for both algorithms incl. cross-frame redundancy + decompression-bomb, SETUP decoder-list + TRACK_INFO Option wire round-trips, model cooked/raw split)
  • cargo test -p moq-native --test broadcast — 64 tests, incl. new deflate + zstd subscribe e2e and the existing deflate timed fetch e2e
  • cargo test --workspace — all green (38 binaries)
  • cargo fmt --all --check, cargo clippy --workspace --all-targets — clean
  • just check via the pinned nix toolchain — not run here (the dev environment had no pty for nix develop); please confirm in a real terminal before merge

(Written by Claude)

kixelated and others added 2 commits June 23, 2026 09:59
Implements drafts PR #39 (moq-lite payload compression) with an
explicit-algorithm wire refinement, on lite-05.

- `Compression { Deflate, Zstd }` with a group-scoped streaming codec: raw
  DEFLATE + RFC 7692 sync-flush trim; magicless, checksum-less zstd. A
  cumulative per-group bound guards against decompression bombs.
- New SETUP `Compression` parameter (0x3) advertises the algorithms an
  endpoint can decode (default `[Zstd, Deflate]`), negotiated per hop.
- TRACK_INFO carries `Option<Compression>`: the algorithm actually used,
  which must be one the receiver advertised (else a protocol violation).
- Payloads stay compressed in RAM. The cooked API (`write_frame` /
  `write_frame_at`, `read_frame` / `next_frame`) compresses on write and
  decompresses on read, so origins and relays both cache compressed bytes.
  `FrameProducer` / `FrameConsumer` + `create_frame` are the raw,
  size-upfront API a relay uses to stream compressed frames without
  inflating; `*_raw` reads pass them through. The egress passes through to a
  hop that speaks the cached algorithm and transcodes/decompresses otherwise.

The js/net and doc/concept mirrors are a follow-up (lite-05 is still WIP).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…el-994993

# Conflicts:
#	rs/moq-net/src/lite/publisher.rs
/// property: a publisher sets it to request compression (the model compresses on
/// write to honor it), and a subscriber sets it from the algorithm named in
/// TRACK_INFO. It is local and per-hop, so it is kept out of the catalog.
#[cfg_attr(feature = "serde", serde(skip))]

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess support it in serde, make it a string

/// Request compression for this track's frame payloads, returning `self` for
/// chaining. `true` picks the default algorithm ([`Compression::default`], the
/// universal baseline); use [`Self::with_compression`] to name one explicitly.
pub fn with_compress(self, compress: bool) -> Self {

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete

let mut frame = self.create_frame(data.len())?;
frame.write(data)?;
/// The parent track's in-RAM compression algorithm, or `None` for plaintext.
pub fn compression(&self) -> Option<Compression> {

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this public?

/// Create a frame with an upfront size.
/// Compress one whole frame through the per-group encoder, under the state lock
/// so the single stream stays in frame order.
fn encode(&mut self, payload: &[u8]) -> Result<Bytes> {

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we grab the lock twice, once in encode and once in create_frame? Can we avoid this?

.write(bytes::Bytes::from(payload))
let timestamp = Timestamp::new(us, Timescale::MICRO).unwrap();
group
.write_frame_at(timestamp, bytes::Bytes::from(payload))

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you switch the order of the arguments? Feels more natural.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also it's a little weird that the timestamp is optional. It feels very easy to misuse, as we'll get a runtime error if we forget to write one.

Maybe make the timestamp/timescale required for moq-lite in general? For older transports that don't have a timestamp/timescale, we can use milliseconds and Instant::now() or something IDK.


/// Encodes a group's frame payloads into one shared compressed stream, one slice
/// per frame. Hold one per group (the stream is reset at each group boundary).
pub struct Encoder(EncoderInner);

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like a trait. Or at least enum directly, instead of EncodeInner.

I think a Zstd struct and Deflate struct is the write call regardless of an enum or trait. I don't really like the floating functions.

/// trailing `00 00 FF FF` the sync flush emits (RFC 7692); the decoder re-inserts
/// it. The window carries over to the next frame.
fn deflate_frame(c: &mut Compress, payload: &[u8]) -> Vec<u8> {
let mut out = Vec::with_capacity(payload.len() + 16);

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we additionally send the decompressed size over the wire? So we know the buffer size to allocate?


/// Inverse of [`zstd_frame`].
fn zstd_unframe(d: &mut zstd::stream::raw::Decoder<'static>, slice: &[u8], produced: &mut u64) -> Result<Vec<u8>> {
let mut out = Vec::with_capacity(slice.len() * 2 + 16);

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it help if we knew the output size in advance?

}

/// Inverse of [`zstd_frame`].
fn zstd_unframe(d: &mut zstd::stream::raw::Decoder<'static>, slice: &[u8], produced: &mut u64) -> Result<Vec<u8>> {

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is produced?


/// Inverse of [`deflate_frame`]: re-append the `00 00 FF FF` trailer and inflate.
fn deflate_unframe(d: &mut Decompress, slice: &[u8], produced: &mut u64) -> Result<Vec<u8>> {
let mut input = Vec::with_capacity(slice.len() + 4);

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't allocate just to add 4 bytes to the end, it's disgusting.

Can't we call decompress_vec on a const MAGIC_BYTES at the end insead?

Make a constnat for 0, 0, 0xff, 0xff

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant