feat(moq-net): rework payload compression, kept compressed in RAM#1889
feat(moq-net): rework payload compression, kept compressed in RAM#1889kixelated wants to merge 2 commits into
Conversation
Implements drafts PR #39 (moq-lite payload compression) with an explicit-algorithm wire refinement, on lite-05. - `Compression { Deflate, Zstd }` with a group-scoped streaming codec: raw DEFLATE + RFC 7692 sync-flush trim; magicless, checksum-less zstd. A cumulative per-group bound guards against decompression bombs. - New SETUP `Compression` parameter (0x3) advertises the algorithms an endpoint can decode (default `[Zstd, Deflate]`), negotiated per hop. - TRACK_INFO carries `Option<Compression>`: the algorithm actually used, which must be one the receiver advertised (else a protocol violation). - Payloads stay compressed in RAM. The cooked API (`write_frame` / `write_frame_at`, `read_frame` / `next_frame`) compresses on write and decompresses on read, so origins and relays both cache compressed bytes. `FrameProducer` / `FrameConsumer` + `create_frame` are the raw, size-upfront API a relay uses to stream compressed frames without inflating; `*_raw` reads pass them through. The egress passes through to a hop that speaks the cached algorithm and transcodes/decompresses otherwise. The js/net and doc/concept mirrors are a follow-up (lite-05 is still WIP). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…el-994993 # Conflicts: # rs/moq-net/src/lite/publisher.rs
| /// property: a publisher sets it to request compression (the model compresses on | ||
| /// write to honor it), and a subscriber sets it from the algorithm named in | ||
| /// TRACK_INFO. It is local and per-hop, so it is kept out of the catalog. | ||
| #[cfg_attr(feature = "serde", serde(skip))] |
There was a problem hiding this comment.
I guess support it in serde, make it a string
| /// Request compression for this track's frame payloads, returning `self` for | ||
| /// chaining. `true` picks the default algorithm ([`Compression::default`], the | ||
| /// universal baseline); use [`Self::with_compression`] to name one explicitly. | ||
| pub fn with_compress(self, compress: bool) -> Self { |
| let mut frame = self.create_frame(data.len())?; | ||
| frame.write(data)?; | ||
| /// The parent track's in-RAM compression algorithm, or `None` for plaintext. | ||
| pub fn compression(&self) -> Option<Compression> { |
There was a problem hiding this comment.
Why is this public?
| /// Create a frame with an upfront size. | ||
| /// Compress one whole frame through the per-group encoder, under the state lock | ||
| /// so the single stream stays in frame order. | ||
| fn encode(&mut self, payload: &[u8]) -> Result<Bytes> { |
There was a problem hiding this comment.
I think we grab the lock twice, once in encode and once in create_frame? Can we avoid this?
| .write(bytes::Bytes::from(payload)) | ||
| let timestamp = Timestamp::new(us, Timescale::MICRO).unwrap(); | ||
| group | ||
| .write_frame_at(timestamp, bytes::Bytes::from(payload)) |
There was a problem hiding this comment.
Can you switch the order of the arguments? Feels more natural.
There was a problem hiding this comment.
Also it's a little weird that the timestamp is optional. It feels very easy to misuse, as we'll get a runtime error if we forget to write one.
Maybe make the timestamp/timescale required for moq-lite in general? For older transports that don't have a timestamp/timescale, we can use milliseconds and Instant::now() or something IDK.
|
|
||
| /// Encodes a group's frame payloads into one shared compressed stream, one slice | ||
| /// per frame. Hold one per group (the stream is reset at each group boundary). | ||
| pub struct Encoder(EncoderInner); |
There was a problem hiding this comment.
This feels like a trait. Or at least enum directly, instead of EncodeInner.
I think a Zstd struct and Deflate struct is the write call regardless of an enum or trait. I don't really like the floating functions.
| /// trailing `00 00 FF FF` the sync flush emits (RFC 7692); the decoder re-inserts | ||
| /// it. The window carries over to the next frame. | ||
| fn deflate_frame(c: &mut Compress, payload: &[u8]) -> Vec<u8> { | ||
| let mut out = Vec::with_capacity(payload.len() + 16); |
There was a problem hiding this comment.
Should we additionally send the decompressed size over the wire? So we know the buffer size to allocate?
|
|
||
| /// Inverse of [`zstd_frame`]. | ||
| fn zstd_unframe(d: &mut zstd::stream::raw::Decoder<'static>, slice: &[u8], produced: &mut u64) -> Result<Vec<u8>> { | ||
| let mut out = Vec::with_capacity(slice.len() * 2 + 16); |
There was a problem hiding this comment.
Would it help if we knew the output size in advance?
| } | ||
|
|
||
| /// Inverse of [`zstd_frame`]. | ||
| fn zstd_unframe(d: &mut zstd::stream::raw::Decoder<'static>, slice: &[u8], produced: &mut u64) -> Result<Vec<u8>> { |
There was a problem hiding this comment.
what is produced?
|
|
||
| /// Inverse of [`deflate_frame`]: re-append the `00 00 FF FF` trailer and inflate. | ||
| fn deflate_unframe(d: &mut Decompress, slice: &[u8], produced: &mut u64) -> Result<Vec<u8>> { | ||
| let mut input = Vec::with_capacity(slice.len() + 4); |
There was a problem hiding this comment.
Please don't allocate just to add 4 bytes to the end, it's disgusting.
Can't we call decompress_vec on a const MAGIC_BYTES at the end insead?
Make a constnat for 0, 0, 0xff, 0xff
Implements moq-dev/drafts#39 (moq-lite payload compression) on lite-05, with one wire refinement folded in (algorithm explicit on the wire rather than flagless — the draft is being updated to match).
What changed
Codec (
model/compression.rs)Compression { Deflate, Zstd }— deflate is the mandatory baseline, zstd optional (newzstddep,experimentalfeature for magicless frames).Encoder/Decoder: one stream per group, sliced per frame. deflate uses raw DEFLATE with the RFC 7692 sync-flush trim; zstd uses magicless, checksum-less frames. The decoder enforces a cumulative per-group decompressed bound (decompression-bomb guard).Negotiation + wire
Compressionparameter (0x3) advertises the algorithms an endpoint can decode, default[Zstd, Deflate]. Negotiated per hop.TRACK_INFOcarriescompression: Option<Compression>— the algorithm actually used, which must be one the receiver advertised (else a protocol violation).Model — payloads stay compressed in RAM
GroupProducer::write_frame(payload)/write_frame_at(ts, payload)compress whole-frame into the per-group stream.GroupConsumer::read_frame/next_framedecompress (per-consumer decoder).FrameProducer/FrameConsumer+GroupProducer::create_frame(size)/append_framestore/stream bytes verbatim.GroupConsumer::read_frame_raw/next_frame_rawpass the stored slice through untouched.lite/publisher.rs) passes through to a hop that speaks the cached algorithm (zero CPU) and transcodes/decompresses otherwise.Public API (all on
dev, breaking OK)Compression: now{ Deflate, Zstd }(noNonevariant — absence isOption<Compression>); addsencoder()/decoder()/select()+Encoder/Decoder; removes the old whole-payloadcompress/decompress. Breaking.lite::Setupgainscompression: Vec<Compression>;lite::TrackInfo.compressionis nowOption<Compression>(was an always-present algorithm) with validation. Breaking (wire + type).TrackInfo:compress: bool→compression: Option<Compression>(with_compress(bool)kept; addswith_compression). Breaking (field), ergonomic builder preserved.GroupProducer::write_frame_at;GroupConsumer::compression()/read_frame_raw/next_frame_raw. Additive (default behavior on uncompressed tracks unchanged).Cross-package sync
Per the table,
rs/moq-netwire/API changes pair withjs/netanddoc/concept. Those mirrors are a deliberate follow-up — lite-05 is still WIP, so the JS side can lag briefly. No other rows apply.Test plan
cargo test -p moq-net— 406 tests (codec round-trips for both algorithms incl. cross-frame redundancy + decompression-bomb, SETUP decoder-list + TRACK_INFOOptionwire round-trips, model cooked/raw split)cargo test -p moq-native --test broadcast— 64 tests, incl. new deflate + zstd subscribe e2e and the existing deflate timed fetch e2ecargo test --workspace— all green (38 binaries)cargo fmt --all --check,cargo clippy --workspace --all-targets— cleanjust checkvia the pinned nix toolchain — not run here (the dev environment had no pty fornix develop); please confirm in a real terminal before merge(Written by Claude)