From 07ae7ac711916134909bbb9fe0e717d0bfc73e5e Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 24 Jun 2026 01:01:13 +0000 Subject: [PATCH 1/7] feat(moq-json): optional group-scoped zstd compression Add an optional `Config.compression` to moq-json that compresses each group as a single zstd stream, flushed at every frame so a snapshot followed by deltas shares one warm window (later frames reuse the earlier ones as context). Frames are magicless with no per-frame checksum, since moq-net's framing already delimits each slice. An optional shared dictionary primes the window so even a group's first frame compresses well. When compression is enabled the delta-vs-snapshot rolling budget is measured on the real (compressed) slice sizes rather than the raw JSON, so the warm window's progressively smaller deltas pack more updates into a group. The plaintext path is unchanged, and compression defaults off, so existing tracks (including the hang catalog) are byte-identical on the wire. A cumulative per-group decompressed-size cap plus zstd's windowLogMax guard against decompression bombs. `Config` becomes `#[non_exhaustive]`; a `Consumer::with_compression` constructor takes the matching settings. A cloned consumer rebuilds its (non-cloneable) decoder window by replaying the group's already-read slices. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01B3xwgHid4UYjeugewkxUyj --- Cargo.lock | 1 + Cargo.toml | 1 + rs/moq-json/Cargo.toml | 1 + rs/moq-json/src/compression.rs | 265 ++++++++++++++++++++++ rs/moq-json/src/lib.rs | 344 +++++++++++++++++++++++++---- rs/moq-mux/src/catalog/producer.rs | 4 +- 6 files changed, 577 insertions(+), 39 deletions(-) create mode 100644 rs/moq-json/src/compression.rs diff --git a/Cargo.lock b/Cargo.lock index d98c9caa9..54faf01b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3855,6 +3855,7 @@ dependencies = [ "serde", "serde_json", "thiserror 2.0.18", + "zstd", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 8eb47380f..fcc6ff419 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,6 +69,7 @@ web-transport-proto = "0.6" web-transport-quiche = "0.4" web-transport-quinn = "0.11" web-transport-trait = "0.3.4" +zstd = "0.13" [profile.dev] panic = "abort" diff --git a/rs/moq-json/Cargo.toml b/rs/moq-json/Cargo.toml index d8ca7398d..1da452ac9 100644 --- a/rs/moq-json/Cargo.toml +++ b/rs/moq-json/Cargo.toml @@ -23,3 +23,4 @@ moq-net = { workspace = true } serde = { workspace = true } serde_json = "1" thiserror = "2" +zstd = { workspace = true, features = ["experimental"] } diff --git a/rs/moq-json/src/compression.rs b/rs/moq-json/src/compression.rs new file mode 100644 index 000000000..2b806ca10 --- /dev/null +++ b/rs/moq-json/src/compression.rs @@ -0,0 +1,265 @@ +//! Group-scoped zstd compression for the JSON frame stream. +//! +//! Within a group the frame payloads form a single zstd stream, flushed at each frame +//! boundary so every frame carries its own slice while later frames reuse the earlier ones as +//! context (a snapshot followed by deltas compresses far better than each frame alone). The +//! [`Encoder`]/[`Decoder`] hold that per-group state; both are recreated at every group +//! boundary. +//! +//! Frames use magicless zstd frames with no content checksum: moq-net's framing already +//! delimits each slice, so the per-frame magic number and checksum would be redundant bytes. +//! An optional shared [dictionary](Compression::dictionary) primes the window so even a +//! group's first frame compresses well. + +use bytes::Bytes; +use zstd::stream::raw::{CParameter, DParameter, InBuffer, Operation, OutBuffer}; +use zstd::zstd_safe::FrameFormat; + +use crate::{Error, Result}; + +/// Default zstd level: the library default, a good size/speed balance for the small, +/// repetitive payloads this targets. +const DEFAULT_LEVEL: i32 = 3; + +/// Maximum cumulative *decompressed* size of a single group, across all its frame payloads. +/// +/// Compression is group-scoped, so a malicious publisher could otherwise send tiny slices +/// that each inflate hugely. zstd has no built-in total-output limit for streaming magicless +/// frames, so the [`Decoder`] enforces this bound across the group and returns +/// [`Error::TooLarge`] when exceeded, stopping rather than allocating without limit. +const MAX_DECOMPRESSED_GROUP: u64 = 64 * 1024 * 1024; + +/// Upper bound on the zstd decode window (log2 bytes); caps per-frame memory amplification so +/// a tiny input can't force a huge window allocation. 27 (128 MiB) is zstd's normal ceiling. +const WINDOW_LOG_MAX: u32 = 27; + +/// Scratch buffer size for the zstd streaming loops. +const CHUNK: usize = 8 * 1024; + +/// zstd compression settings for a JSON track. +/// +/// Construct from [`Default`] and override fields (the struct is `#[non_exhaustive]`, so new +/// options stay additive). Both ends of a track must agree on the [`dictionary`](Self::dictionary); +/// the level is a sender-only choice and need not match. +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct Compression { + /// zstd compression level. Higher is smaller but slower. Defaults to `3`. + pub level: i32, + + /// An optional shared dictionary that primes the window, so even a group's first frame + /// compresses against known content. Must be identical on the producer and consumer; how a + /// consumer obtains it is out of band. + pub dictionary: Option, +} + +impl Default for Compression { + fn default() -> Self { + Self { + level: DEFAULT_LEVEL, + dictionary: None, + } + } +} + +impl Compression { + /// Start a fresh per-group encoder. + pub(crate) fn encoder(&self) -> Encoder { + let mut e = match &self.dictionary { + Some(dict) => zstd::stream::raw::Encoder::with_dictionary(self.level, dict).expect("zstd encoder"), + None => zstd::stream::raw::Encoder::new(self.level).expect("zstd encoder"), + }; + // Magicless + no content checksum/size: the slices are delimited by moq-net, so those + // bytes would only be redundant. + e.set_parameter(CParameter::Format(FrameFormat::Magicless)) + .expect("zstd format"); + e.set_parameter(CParameter::ChecksumFlag(false)).expect("zstd checksum"); + e.set_parameter(CParameter::ContentSizeFlag(false)) + .expect("zstd content size"); + Encoder(e) + } + + /// Start a fresh per-group decoder. + pub(crate) fn decoder(&self) -> Decoder { + let mut d = match &self.dictionary { + Some(dict) => zstd::stream::raw::Decoder::with_dictionary(dict).expect("zstd decoder"), + None => zstd::stream::raw::Decoder::new().expect("zstd decoder"), + }; + d.set_parameter(DParameter::Format(FrameFormat::Magicless)) + .expect("zstd format"); + d.set_parameter(DParameter::WindowLogMax(WINDOW_LOG_MAX)) + .expect("zstd window"); + Decoder { inner: d, produced: 0 } + } +} + +/// Encodes a group's frame payloads into one shared zstd stream, one slice per frame. Hold one +/// per group; the stream is recreated at each group boundary. +pub(crate) struct Encoder(zstd::stream::raw::Encoder<'static>); + +impl Encoder { + /// Compress the next frame's `payload`, returning its slice of the group stream. + /// + /// An empty payload contributes nothing and yields an empty slice. Later frames reuse + /// earlier ones as context, so slices must be produced (and later decoded) in frame order. + pub(crate) fn frame(&mut self, payload: &[u8]) -> Bytes { + if payload.is_empty() { + return Bytes::new(); + } + + let mut out = Vec::with_capacity(payload.len() / 2 + 32); + let mut tmp = [0u8; CHUNK]; + let mut input = InBuffer::around(payload); + + loop { + let n = { + let mut output = OutBuffer::around(&mut tmp); + self.0.run(&mut input, &mut output).expect("zstd run"); + output.pos() + }; + out.extend_from_slice(&tmp[..n]); + if input.pos() == payload.len() { + break; + } + } + + // Flush (retaining the window) so this frame's slice is self-delimited while later + // frames in the group keep reusing the context. + loop { + let (remaining, n) = { + let mut output = OutBuffer::around(&mut tmp); + let remaining = self.0.flush(&mut output).expect("zstd flush"); + (remaining, output.pos()) + }; + out.extend_from_slice(&tmp[..n]); + if remaining == 0 { + break; + } + } + + Bytes::from(out) + } +} + +/// Decodes a group's frame slices back into the original payloads. Hold one per group; feed +/// slices in frame order (each frame builds on the earlier ones). +pub(crate) struct Decoder { + inner: zstd::stream::raw::Decoder<'static>, + // Cumulative decompressed bytes this group, for the zip-bomb bound. + produced: u64, +} + +impl Decoder { + /// Decompress the next frame's `slice` back into its payload. + /// + /// An empty slice yields an empty payload. Returns [`Error::TooLarge`] if the group's + /// cumulative decompressed size would exceed the bound, and [`Error::Decompress`] on + /// malformed input. + pub(crate) fn frame(&mut self, slice: &[u8]) -> Result { + if slice.is_empty() { + return Ok(Bytes::new()); + } + + let mut out = Vec::with_capacity(slice.len() * 2 + 16); + let mut tmp = [0u8; CHUNK]; + let mut input = InBuffer::around(slice); + + loop { + let n = { + let mut output = OutBuffer::around(&mut tmp); + self.inner.run(&mut input, &mut output).map_err(|_| Error::Decompress)?; + output.pos() + }; + out.extend_from_slice(&tmp[..n]); + if self.produced + out.len() as u64 > MAX_DECOMPRESSED_GROUP { + return Err(Error::TooLarge(MAX_DECOMPRESSED_GROUP)); + } + if input.pos() == slice.len() && n == 0 { + break; + } + } + + self.produced += out.len() as u64; + Ok(Bytes::from(out)) + } +} + +#[cfg(test)] +mod test { + use super::*; + + /// Round-trip a sequence of frames through a group encoder/decoder pair. + fn roundtrip(config: &Compression, frames: &[&[u8]]) -> Vec> { + let mut enc = config.encoder(); + let slices: Vec = frames.iter().map(|f| enc.frame(f)).collect(); + + let mut dec = config.decoder(); + slices.iter().map(|s| dec.frame(s).unwrap().to_vec()).collect() + } + + #[test] + fn group_roundtrip() { + let frames: &[&[u8]] = &[b"the quick brown fox", b"the quick brown dog", b"the lazy fox"]; + let got = roundtrip(&Compression::default(), frames); + for (a, b) in frames.iter().zip(&got) { + assert_eq!(*a, b.as_slice()); + } + } + + #[test] + fn empty_frames_roundtrip() { + let frames: &[&[u8]] = &[b"", b"hello", b"", b"world"]; + let got = roundtrip(&Compression::default(), frames); + assert_eq!( + got, + vec![b"".to_vec(), b"hello".to_vec(), b"".to_vec(), b"world".to_vec()] + ); + } + + #[test] + fn cross_frame_redundancy_shrinks() { + // A later frame identical to an earlier one compresses to far fewer bytes once the + // window holds the earlier copy. + let config = Compression::default(); + let payload = b"Media over QUIC delivers real-time latency at massive scale.".repeat(6); + let mut enc = config.encoder(); + let first = enc.frame(&payload); + let second = enc.frame(&payload); + assert!( + second.len() < first.len(), + "repeat frame {} should be smaller than first {}", + second.len(), + first.len() + ); + } + + #[test] + fn dictionary_shrinks_first_frame() { + // A dictionary primes the window, so even the group's first frame compresses against it. + let payload = br#"{"video":{"renditions":{"video0":{"codec":"avc1.64001f"}}}}"#; + let plain = Compression::default(); + let primed = Compression { + dictionary: Some(Bytes::from_static(payload)), + ..Default::default() + }; + + let first_plain = plain.encoder().frame(payload); + let first_primed = primed.encoder().frame(payload); + assert!( + first_primed.len() < first_plain.len(), + "dictionary frame {} should beat undictionaried {}", + first_primed.len(), + first_plain.len() + ); + + // And it still round-trips with the same dictionary on the decode side. + let mut dec = primed.decoder(); + assert_eq!(dec.frame(&first_primed).unwrap(), Bytes::from_static(payload)); + } + + #[test] + fn decompress_rejects_garbage() { + let mut dec = Compression::default().decoder(); + assert!(matches!(dec.frame(b"not a zstd stream at all"), Err(Error::Decompress))); + } +} diff --git a/rs/moq-json/src/lib.rs b/rs/moq-json/src/lib.rs index d26b8ad5c..b805a02c9 100644 --- a/rs/moq-json/src/lib.rs +++ b/rs/moq-json/src/lib.rs @@ -9,6 +9,7 @@ //! Deltas are controlled by [`Config::delta_ratio`]. A ratio of `0` disables them, so every //! change is a fresh snapshot group, matching a plain "one JSON blob per group" track. +mod compression; mod diff; use std::marker::PhantomData; @@ -16,10 +17,14 @@ use std::ops::{Deref, DerefMut}; use std::sync::{Arc, Mutex, MutexGuard}; use std::task::Poll; +use bytes::Bytes; use serde::Serialize; use serde::de::DeserializeOwned; use serde_json::Value; +pub use compression::Compression; + +use crate::compression::{Decoder, Encoder}; use crate::diff::diff; /// Maximum frames (snapshot + deltas) in a single group before a new snapshot is forced. @@ -41,6 +46,14 @@ pub enum Error { /// Stored as a string since [`serde_json::Error`] is not [`Clone`]. #[error("json: {0}")] Json(String), + + /// A compressed frame could not be decoded (malformed or truncated stream). + #[error("decompression failed")] + Decompress, + + /// A group's cumulative decompressed size exceeded the limit (zip-bomb guard). + #[error("decompressed group exceeded {0} bytes")] + TooLarge(u64), } impl From for Error { @@ -53,7 +66,11 @@ impl From for Error { pub type Result = std::result::Result; /// Configuration for a [`Producer`]. +/// +/// Build from [`Default`] and override fields (the struct is `#[non_exhaustive]`, so new +/// options stay additive): `let mut config = Config::default(); config.delta_ratio = 0;`. #[derive(Debug, Clone)] +#[non_exhaustive] pub struct Config { /// Controls how aggressively the producer emits deltas (merge patches) instead of full snapshots. /// @@ -61,16 +78,31 @@ pub struct Config { /// /// A positive ratio enables deltas. A delta is appended to the current group as long as the /// accumulated deltas (excluding the snapshot frame) stay within `ratio` times the size of a - /// fresh snapshot; otherwise a new snapshot group is started. So `1` allows deltas totalling up + /// snapshot; otherwise a new snapshot group is started. So `1` allows deltas totalling up /// to one snapshot before rolling, and a larger ratio tolerates more deltas per snapshot. /// + /// When [`compression`](Self::compression) is set, both sides of the comparison are measured + /// on *compressed* bytes (a warm per-group window shrinks each successive delta), so more + /// deltas pack into a group than the raw sizes would suggest. + /// /// Defaults to `8`. pub delta_ratio: u32, + + /// Optional zstd compression for the frame stream. + /// + /// `None` (the default) writes plaintext JSON frames, identical on the wire to a track with + /// no compression. `Some(..)` compresses each group as one zstd stream (see [`Compression`]); + /// a [`Consumer`] reading the track must be created with the matching settings via + /// [`Consumer::with_compression`]. + pub compression: Option, } impl Default for Config { fn default() -> Self { - Self { delta_ratio: 8 } + Self { + delta_ratio: 8, + compression: None, + } } } @@ -106,8 +138,10 @@ impl Producer { inner: Arc::new(Mutex::new(Inner { track, group: None, + encoder: None, last: None, delta_bytes: 0, + snapshot_len: 0, group_frames: 0, config, })), @@ -207,9 +241,15 @@ impl Drop for Guard<'_, T> { struct Inner { track: moq_net::TrackProducer, group: Option, + // Per-group zstd encoder, `Some` while a compressed group is open (recreated per group). + encoder: Option, last: Option, // Bytes of deltas accumulated in the current group, excluding the snapshot frame. + // Compressed slice sizes when compressing, raw patch sizes otherwise. delta_bytes: u64, + // Reference size the delta budget is measured against: the current group's snapshot frame, + // as its compressed slice size. Only consulted on the compressed path. + snapshot_len: u64, group_frames: usize, config: Config, } @@ -221,10 +261,10 @@ impl Inner { } match self.delta(&json, snapshot.len())? { - Some(delta) => { + Some(slice) => { let group = self.group.as_mut().expect("delta requires an open group"); - let len = delta.len() as u64; - group.write_frame(delta)?; + let len = slice.len() as u64; + group.write_frame(slice)?; self.delta_bytes += len; self.group_frames += 1; } @@ -235,34 +275,51 @@ impl Inner { Ok(()) } - /// Serialize a delta if deltas are enabled and appending one keeps the group within budget; - /// otherwise `None`, signalling that a fresh snapshot should be published instead. - fn delta(&self, value: &Value, snapshot_len: usize) -> Result>> { + /// Serialize (and, when compressing, encode) a delta if deltas are enabled and appending one + /// keeps the group within budget; otherwise `None`, signalling that a fresh snapshot should be + /// published instead. Returns the frame slice ready to write. + fn delta(&mut self, value: &Value, snapshot_len: usize) -> Result> { let ratio = self.config.delta_ratio; if ratio == 0 { return Ok(None); } - let Some(last) = &self.last else { - return Ok(None); - }; if self.group.is_none() || self.group_frames >= MAX_DELTA_FRAMES { return Ok(None); } - let diff = diff(last, value); - if diff.forced_snapshot { - return Ok(None); - } - - let delta = serde_json::to_vec(&diff.patch)?; + let patch = { + let Some(last) = &self.last else { + return Ok(None); + }; + let diff = diff(last, value); + if diff.forced_snapshot { + return Ok(None); + } + serde_json::to_vec(&diff.patch)? + }; - // Roll a snapshot once the deltas would outgrow the budget (snapshot frame excluded). - let projected = self.delta_bytes + delta.len() as u64; - if projected > ratio as u64 * snapshot_len as u64 { - return Ok(None); + match self.encoder.as_mut() { + // Compressed: measure against the group's anchoring snapshot's compressed size and the + // delta's *compressed* slice size (the real wire cost). Encoding advances the per-group + // window; if the delta doesn't fit we roll a new group with a fresh encoder, discarding + // this slice (the abandoned window has no effect on the new group). + Some(encoder) => { + let slice = encoder.frame(&patch); + let projected = self.delta_bytes + slice.len() as u64; + if projected > ratio as u64 * self.snapshot_len { + return Ok(None); + } + Ok(Some(slice)) + } + // Uncompressed: raw delta bytes against a fresh snapshot of the current value. + None => { + let projected = self.delta_bytes + patch.len() as u64; + if projected > ratio as u64 * snapshot_len as u64 { + return Ok(None); + } + Ok(Some(Bytes::from(patch))) + } } - - Ok(Some(delta)) } /// Start a new group with a full snapshot as its first frame. @@ -273,15 +330,28 @@ impl Inner { } let mut group = self.track.append_group()?; - group.write_frame(snapshot)?; + + // Open a fresh per-group encoder (cold window) and compress the snapshot as frame 0. + let (slice, encoder) = match &self.config.compression { + Some(config) => { + let mut encoder = config.encoder(); + let slice = encoder.frame(&snapshot); + (slice, Some(encoder)) + } + None => (Bytes::from(snapshot), None), + }; + self.snapshot_len = slice.len() as u64; + group.write_frame(slice)?; self.delta_bytes = 0; self.group_frames = 1; + self.encoder = encoder; if self.config.delta_ratio != 0 { - // Keep the group open so future deltas can be appended. + // Keep the group (and its encoder) open so future deltas can be appended. self.group = Some(group); } else { // Deltas disabled: one frame per group, identical to a plain JSON track. + self.encoder = None; group.finish()?; } @@ -301,6 +371,13 @@ impl Inner { pub struct Consumer { track: moq_net::TrackConsumer, group: Option, + // Compression settings, matching the producer's; `None` reads plaintext frames. + compression: Option, + // Per-group zstd decoder, built lazily on the first compressed frame of a group. + decoder: Option, + // Compressed slices read so far in the current group, in order. Lets a cloned consumer + // rebuild the (non-cloneable) decoder window by replaying them. Empty when uncompressed. + group_slices: Vec, current: Option, frames_read: usize, _marker: PhantomData T>, @@ -313,6 +390,11 @@ impl Clone for Consumer { Self { track: self.track.clone(), group: self.group.clone(), + compression: self.compression.clone(), + // A zstd decoder can't be cloned (per-group window state), so the clone starts without + // one and rebuilds it from `group_slices` on its next compressed read. + decoder: None, + group_slices: self.group_slices.clone(), current: self.current.clone(), frames_read: self.frames_read, _marker: PhantomData, @@ -321,11 +403,26 @@ impl Clone for Consumer { } impl Consumer { - /// Create a consumer reading from the given track subscriber. + /// Create a consumer reading plaintext frames from the given track subscriber. pub fn new(track: moq_net::TrackConsumer) -> Self { + Self::build(track, None) + } + + /// Create a consumer that decompresses frames with the given [`Compression`] settings. + /// + /// These must match the producer's [`Config::compression`], including the + /// [`dictionary`](Compression::dictionary). + pub fn with_compression(track: moq_net::TrackConsumer, compression: Compression) -> Self { + Self::build(track, Some(compression)) + } + + fn build(track: moq_net::TrackConsumer, compression: Option) -> Self { Self { track, group: None, + compression, + decoder: None, + group_slices: Vec::new(), current: None, frames_read: 0, _marker: PhantomData, @@ -352,6 +449,9 @@ impl Consumer { self.group = Some(group); self.current = None; self.frames_read = 0; + // Each group is its own compressed stream, so reset the decoder state. + self.decoder = None; + self.group_slices.clear(); } Poll::Ready(None) => break true, Poll::Pending => break false, @@ -374,8 +474,32 @@ impl Consumer { } } + /// Decompress a frame slice, or pass it through when the track is uncompressed. + /// + /// The per-group decoder is built lazily on the first compressed frame. A cloned consumer + /// starts without a decoder, so the first call replays the group's already-read slices to + /// rebuild the (non-cloneable) zstd window before decoding the new frame. + fn decode(&mut self, slice: Bytes) -> Result { + let Some(compression) = &self.compression else { + return Ok(slice); + }; + + if self.decoder.is_none() { + let mut decoder = compression.decoder(); + for prev in &self.group_slices { + decoder.frame(prev)?; + } + self.decoder = Some(decoder); + } + + let plain = self.decoder.as_mut().unwrap().frame(&slice)?; + self.group_slices.push(slice); + Ok(plain) + } + /// Apply one frame: frame 0 of a group is a snapshot, the rest are merge patches. - fn apply(&mut self, frame: bytes::Bytes) -> Result { + fn apply(&mut self, frame: Bytes) -> Result { + let frame = self.decode(frame)?; if self.frames_read == 0 { self.current = Some(serde_json::from_slice(&frame)?); } else { @@ -398,15 +522,35 @@ mod test { use super::*; use serde_json::json; + /// An uncompressed config with the given delta ratio. + fn cfg(delta_ratio: u32) -> Config { + Config { + delta_ratio, + ..Default::default() + } + } + + /// A zstd-compressed config with the given delta ratio. + fn cfg_zstd(delta_ratio: u32) -> Config { + Config { + delta_ratio, + compression: Some(Compression::default()), + } + } + fn producer(config: Config) -> (Producer, moq_net::TrackConsumer) { let track = moq_net::Track::new("test").produce(); let consumer = track.consume(); (Producer::new(track, config), consumer) } - /// Drain every value currently available from a consumer without blocking. + /// Drain every value currently available from a plaintext consumer without blocking. fn drain(track: moq_net::TrackConsumer) -> Vec { - let mut consumer = Consumer::::new(track); + drain_with(Consumer::::new(track)) + } + + /// Drain every value currently available from an already-built consumer without blocking. + fn drain_with(mut consumer: Consumer) -> Vec { let waiter = kio::Waiter::noop(); let mut out = Vec::new(); while let Poll::Ready(Ok(Some(value))) = consumer.poll_next(&waiter) { @@ -417,7 +561,7 @@ mod test { #[test] fn deltas_off_snapshot_per_group() { - let (mut producer, track) = producer(Config { delta_ratio: 0 }); + let (mut producer, track) = producer(cfg(0)); producer.update(&json!({ "a": 1 })).unwrap(); producer.update(&json!({ "a": 2 })).unwrap(); producer.finish().unwrap(); @@ -456,7 +600,7 @@ mod test { #[test] fn deltas_share_one_group() { - let config = Config { delta_ratio: 100 }; + let config = cfg(100); let (mut producer, track) = producer(config); producer.update(&json!({ "a": 1, "b": 1 })).unwrap(); producer.update(&json!({ "a": 1, "b": 2 })).unwrap(); @@ -474,7 +618,7 @@ mod test { // A ratio of 1 admits deltas only up to the snapshot size: with equal 7-byte frames that is a // single delta per group, so it rolls every other update. (Still distinct from 0, which would // disable deltas entirely and never produce the group-0 delta below.) - let config = Config { delta_ratio: 1 }; + let config = cfg(1); let (mut producer, track) = producer(config); producer.update(&json!({ "a": 1 })).unwrap(); // snapshot, group 0 producer.update(&json!({ "a": 2 })).unwrap(); // delta, group 0 @@ -491,7 +635,7 @@ mod test { // snapshot size. Single-digit values keep every frame at a constant 7 bytes (`{"n":N}`), so a // `ratio = 8` admits 8 deltas (8x the snapshot, the inclusive limit) on top of the snapshot // before the 9th delta rolls. - let config = Config { delta_ratio: 8 }; + let config = cfg(8); let (mut producer, track) = producer(config); for n in 0..=9 { producer.update(&json!({ "n": n })).unwrap(); @@ -505,7 +649,7 @@ mod test { #[test] fn array_change_is_delta() { - let config = Config { delta_ratio: 100 }; + let config = cfg(100); let (mut producer, track) = producer(config); producer.update(&json!({ "list": [1, 2] })).unwrap(); producer.update(&json!({ "list": [1, 2, 3] })).unwrap(); @@ -518,7 +662,7 @@ mod test { #[test] fn frame_cap_rolls_snapshot() { - let config = Config { delta_ratio: 1_000_000 }; + let config = cfg(1_000_000); let (mut producer, track) = producer(config); // First update is the snapshot (frame 0); then MAX_DELTA_FRAMES - 1 deltas fill the group. for i in 0..=MAX_DELTA_FRAMES { @@ -533,7 +677,7 @@ mod test { #[test] fn late_joiner_reconstructs_from_deltas() { - let config = Config { delta_ratio: 100 }; + let config = cfg(100); let (mut producer, track) = producer(config); producer.update(&json!({ "a": 1, "b": 1 })).unwrap(); producer.update(&json!({ "a": 1, "b": 2 })).unwrap(); @@ -588,7 +732,7 @@ mod test { #[test] fn newer_group_supersedes_in_progress_reconstruction() { // A tight ratio lets one delta fit, then forces the next update into a new snapshot group. - let config = Config { delta_ratio: 1 }; + let config = cfg(1); let (mut producer, track) = producer(config); let observer = producer.consume(); let mut consumer = Consumer::::new(track); @@ -616,7 +760,7 @@ mod test { #[test] fn cloned_consumer_reconstructs_independently() { // Deltas share one group, so a clone taken mid-group carries in-progress reconstruction state. - let config = Config { delta_ratio: 100 }; + let config = cfg(100); let (mut producer, track) = producer(config); let mut consumer = Consumer::::new(track); let waiter = kio::Waiter::noop(); @@ -642,4 +786,128 @@ mod test { } } } + + #[test] + fn compressed_snapshot_per_group_roundtrips() { + let (mut producer, track) = producer(cfg_zstd(0)); + producer.update(&json!({ "a": 1 })).unwrap(); + producer.update(&json!({ "a": 2 })).unwrap(); + producer.finish().unwrap(); + + // Deltas disabled: one compressed snapshot per group, latest reconstructs identically. + assert_eq!(track.latest(), Some(1)); + let values = drain_with(Consumer::with_compression(track, Compression::default())); + assert_eq!(values, vec![json!({ "a": 2 })]); + } + + #[test] + fn compressed_deltas_share_one_group() { + let (mut producer, track) = producer(cfg_zstd(100)); + producer.update(&json!({ "a": 1, "b": 1 })).unwrap(); + producer.update(&json!({ "a": 1, "b": 2 })).unwrap(); + producer.update(&json!({ "a": 1, "b": 3 })).unwrap(); + producer.finish().unwrap(); + + // Snapshot + deltas in one group, reconstructed through the per-group decoder. + assert_eq!(track.latest(), Some(0)); + let values = drain_with(Consumer::with_compression(track, Compression::default())); + assert_eq!(values.last().unwrap(), &json!({ "a": 1, "b": 3 })); + } + + #[test] + fn compressed_late_joiner_reconstructs_from_deltas() { + let (mut producer, track) = producer(cfg_zstd(100)); + producer.update(&json!({ "a": 1, "b": 1 })).unwrap(); + producer.update(&json!({ "a": 1, "b": 2 })).unwrap(); + producer.update(&json!({ "a": 5, "b": 2 })).unwrap(); + producer.finish().unwrap(); + + // A consumer created only now rebuilds the final value from the compressed snapshot + deltas. + let values = drain_with(Consumer::with_compression(track, Compression::default())); + assert_eq!(values.last().unwrap(), &json!({ "a": 5, "b": 2 })); + } + + #[test] + fn compressed_cloned_consumer_reconstructs_mid_group() { + // A clone taken mid-group has no decoder window; it must rebuild from the retained slices. + let (mut producer, track) = producer(cfg_zstd(100)); + let mut consumer = Consumer::::with_compression(track, Compression::default()); + let waiter = kio::Waiter::noop(); + + producer.update(&json!({ "a": 1, "b": 1 })).unwrap(); // compressed snapshot, group 0 + match consumer.poll_next(&waiter) { + Poll::Ready(Ok(Some(value))) => assert_eq!(value, json!({ "a": 1, "b": 1 })), + other => panic!("expected snapshot, got {other:?}"), + } + + let mut clone = consumer.clone(); + + producer.update(&json!({ "a": 1, "b": 2 })).unwrap(); // compressed delta, group 0 + producer.finish().unwrap(); + + let expected = json!({ "a": 1, "b": 2 }); + for consumer in [&mut consumer, &mut clone] { + match consumer.poll_next(&waiter) { + Poll::Ready(Ok(Some(value))) => assert_eq!(value, expected), + other => panic!("expected delta, got {other:?}"), + } + } + } + + #[test] + fn dictionary_roundtrips() { + // A shared dictionary on both ends primes the window; the value still reconstructs exactly. + let dict = bytes::Bytes::from_static(br#"{"video":{"renditions":{}},"audio":{"renditions":{}}}"#); + let compression = Compression { + dictionary: Some(dict), + ..Default::default() + }; + + let track = moq_net::Track::new("test").produce(); + let consumer = track.consume(); + let mut producer = Producer::::new( + track, + Config { + delta_ratio: 8, + compression: Some(compression.clone()), + }, + ); + + let value = json!({ "video": { "renditions": { "v0": { "codec": "avc1.64001f" } } } }); + producer.update(&value).unwrap(); + producer.finish().unwrap(); + + let values = drain_with(Consumer::with_compression(consumer, compression)); + assert_eq!(values.last().unwrap(), &value); + } + + #[test] + fn compression_shrinks_wire_frames() { + // A repetitive payload should serialize to fewer wire bytes compressed than plaintext. + let value = json!({ "renditions": ["video".repeat(50), "video".repeat(50), "video".repeat(50)] }); + + let plaintext_bytes = wire_frame_len(cfg(0), &value); + let compressed_bytes = wire_frame_len(cfg_zstd(0), &value); + assert!( + compressed_bytes < plaintext_bytes, + "compressed frame {compressed_bytes} should be smaller than plaintext {plaintext_bytes}" + ); + } + + /// Publish a single value and return the byte length of the resulting (frame 0) wire frame. + fn wire_frame_len(config: Config, value: &Value) -> usize { + let (mut producer, mut track) = producer(config); + producer.update(value).unwrap(); + producer.finish().unwrap(); + + let waiter = kio::Waiter::noop(); + let Poll::Ready(Ok(Some(mut group))) = track.poll_next_group(&waiter) else { + panic!("expected a group"); + }; + // Read the stored (possibly compressed) frame bytes verbatim, without reconstructing JSON. + let Poll::Ready(Ok(Some(frame))) = group.poll_read_frame(&waiter) else { + panic!("expected a frame"); + }; + frame.len() + } } diff --git a/rs/moq-mux/src/catalog/producer.rs b/rs/moq-mux/src/catalog/producer.rs index 9ee7e69c8..d85c0c623 100644 --- a/rs/moq-mux/src/catalog/producer.rs +++ b/rs/moq-mux/src/catalog/producer.rs @@ -75,7 +75,9 @@ impl Producer { let msf_track = broadcast.create_track(moq_net::Track::new(moq_msf::DEFAULT_NAME))?; // Disable deltas for now to stay byte-compatible with consumers that only read snapshots. - let hang = moq_json::Producer::new(hang_track, moq_json::Config { delta_ratio: 0 }); + let mut json_config = moq_json::Config::default(); + json_config.delta_ratio = 0; + let hang = moq_json::Producer::new(hang_track, json_config); Ok(Self { hang, From 479a851ea85056790070299886fa513151d26664 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 24 Jun 2026 04:31:35 +0000 Subject: [PATCH 2/7] feat(json): per-frame deflate compression with JS support Replace the zstd group-stream compression with per-frame DEFLATE (RFC 1951 raw deflate) and mirror it in @moq/json so a Rust producer and a browser consumer interoperate. The browser exposes no zstd, and its Compression Streams API can't flush mid-stream (verified: a long-lived CompressionStream emits nothing until close), so cross-frame window sharing isn't reachable in JS. Each frame is therefore compressed independently as a standalone deflate-raw blob, which the platform CompressionStream and Rust flate2 both speak. Snapshots and large frames still shrink well; tiny deltas barely benefit. Rust: - Swap the zstd dependency for flate2; compression.rs is now stateless per-frame compress/decompress with a per-frame zip-bomb cap. - Compression keeps only a `level`; the dictionary and window-sharing are gone (unreachable in the browser). Consumer::with_compression no longer takes settings, since deflate decode is self-describing. JS: - New deflate-raw codec helper on the Compression Streams API. - Producer gains `compression?: boolean`. Compression is async, so the compressed path serializes track writes through an ordered FIFO chain to preserve frame/group order while update()/mutate() stay synchronous. - Consumer decompresses each frame; the delta-vs-snapshot budget is decided synchronously on raw sizes (vs Rust's compressed sizes), which doesn't affect the wire format. Co-Authored-By: Claude Opus 4.8 --- Cargo.lock | 2 +- Cargo.toml | 2 +- js/json/src/compression.test.ts | 112 +++++++++++++ js/json/src/compression.ts | 76 +++++++++ js/json/src/consumer.ts | 14 +- js/json/src/producer.ts | 74 ++++++++- rs/moq-json/Cargo.toml | 2 +- rs/moq-json/src/compression.rs | 273 +++++++++----------------------- rs/moq-json/src/lib.rs | 174 +++++++------------- 9 files changed, 399 insertions(+), 330 deletions(-) create mode 100644 js/json/src/compression.test.ts create mode 100644 js/json/src/compression.ts diff --git a/Cargo.lock b/Cargo.lock index 54faf01b4..fc5392b60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3849,13 +3849,13 @@ name = "moq-json" version = "0.0.4" dependencies = [ "bytes", + "flate2", "json-patch", "kio", "moq-net", "serde", "serde_json", "thiserror 2.0.18", - "zstd", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index fcc6ff419..be0c4d56f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ resolver = "2" rust-version = "1.85" [workspace.dependencies] +flate2 = "1" hang = { version = "0.19", path = "rs/hang" } kio = { version = "0.4", path = "rs/kio" } moq-audio = { version = "0.0.5", path = "rs/moq-audio" } @@ -69,7 +70,6 @@ web-transport-proto = "0.6" web-transport-quiche = "0.4" web-transport-quinn = "0.11" web-transport-trait = "0.3.4" -zstd = "0.13" [profile.dev] panic = "abort" diff --git a/js/json/src/compression.test.ts b/js/json/src/compression.test.ts new file mode 100644 index 000000000..a26b7045c --- /dev/null +++ b/js/json/src/compression.test.ts @@ -0,0 +1,112 @@ +import { expect, test } from "bun:test"; +import { Track } from "@moq/net"; +import { deflate, inflate } from "./compression.ts"; +import { Consumer } from "./consumer.ts"; +import { Producer } from "./producer.ts"; + +type Value = Record; + +const enc = new TextEncoder(); +const dec = new TextDecoder(); + +// Reconstruct every value a compressed consumer yields, in order. +async function drainCompressed(track: Track): Promise { + const out: Value[] = []; + for await (const value of new Consumer(track, { compression: true })) out.push(value); + return out; +} + +// The raw (stored) bytes of a track's first frame, without reconstructing JSON. +async function firstFrame(track: Track): Promise { + const group = await track.nextGroupOrdered(); + if (!group) throw new Error("expected a group"); + const frame = await group.readFrame(); + if (!frame) throw new Error("expected a frame"); + return frame; +} + +test("codec round-trips a frame", async () => { + const payload = enc.encode("the quick brown fox"); + expect(dec.decode(await inflate(await deflate(payload)))).toBe("the quick brown fox"); +}); + +test("codec round-trips an empty frame", async () => { + expect((await deflate(new Uint8Array())).length).toBe(0); + expect((await inflate(new Uint8Array())).length).toBe(0); +}); + +test("codec rejects garbage", async () => { + await expect(inflate(new Uint8Array(64).fill(0xff))).rejects.toThrow(); +}); + +test("compressed snapshot per group round-trips", async () => { + const track = new Track("test"); + const producer = new Producer(track, { deltaRatio: 0, compression: true }); + producer.update({ a: 1 }); + producer.update({ a: 2 }); + producer.finish(); + + // Deltas off: one compressed snapshot per group, reconstructed in order. + expect(await drainCompressed(track)).toEqual([{ a: 1 }, { a: 2 }]); +}); + +test("compressed live consumer sees each update in order", async () => { + // Compression makes writes async, so this exercises that the per-frame deflate pipeline still + // delivers frames (and groups) strictly in order. + const track = new Track("test"); + const producer = new Producer(track, { deltaRatio: 100, compression: true }); + const consumer = new Consumer(track, { compression: true }); + + for (let n = 1; n <= 5; n++) { + producer.update({ a: n }); + expect(await consumer.next()).toEqual({ a: n }); + } +}); + +test("compressed deltas share one group and reconstruct", async () => { + const track = new Track("test"); + const producer = new Producer(track, { deltaRatio: 100, compression: true }); + producer.update({ a: 1, b: 1 }); + producer.update({ a: 1, b: 2 }); + producer.update({ a: 5, b: 2 }); + producer.finish(); + + expect((await drainCompressed(track)).at(-1)).toEqual({ a: 5, b: 2 }); +}); + +test("compressed late joiner reconstructs from snapshot + deltas", async () => { + const track = new Track("test"); + const producer = new Producer(track, { deltaRatio: 100, compression: true }); + producer.update({ a: 1, b: 1 }); + producer.update({ a: 1, b: 2 }); + producer.update({ a: 5, b: 2 }); + producer.finish(); + + // A consumer created only now still rebuilds the final value. + expect((await drainCompressed(track)).at(-1)).toEqual({ a: 5, b: 2 }); +}); + +test("each compressed frame is valid standalone deflate-raw", async () => { + // The frame the producer stored should decode on its own back to the original snapshot, which + // is what keeps it interoperable with the Rust producer's per-frame format. + const track = new Track("test"); + const producer = new Producer(track, { deltaRatio: 0, compression: true }); + producer.update({ hello: "world" }); + producer.finish(); + + const frame = await firstFrame(track); + expect(JSON.parse(dec.decode(await inflate(frame)))).toEqual({ hello: "world" }); +}); + +test("compression shrinks a repetitive frame", async () => { + const value = { renditions: Array(3).fill("video".repeat(50)) }; + + const plain = new Track("plain"); + new Producer(plain, { deltaRatio: 0 }).update(value); + const compressed = new Track("compressed"); + new Producer(compressed, { deltaRatio: 0, compression: true }).update(value); + + const plainLen = (await firstFrame(plain)).length; + const compressedLen = (await firstFrame(compressed)).length; + expect(compressedLen).toBeLessThan(plainLen); +}); diff --git a/js/json/src/compression.ts b/js/json/src/compression.ts new file mode 100644 index 000000000..4e4fdf9b5 --- /dev/null +++ b/js/json/src/compression.ts @@ -0,0 +1,76 @@ +/** + * Per-frame DEFLATE compression for the JSON frame stream, built on the platform + * {@link https://developer.mozilla.org/en-US/docs/Web/API/Compression_Streams_API | Compression Streams API}. + * + * Each frame is compressed on its own as a raw DEFLATE ([RFC 1951](https://www.rfc-editor.org/rfc/rfc1951.html)) + * blob (`deflate-raw`), the same format the Rust `moq-json` producer writes, so the two + * interoperate on the wire. There is no cross-frame context, so snapshots and large frames shrink + * well while tiny deltas barely benefit. The browser API exposes no level or dictionary knobs, so + * compression is a plain on/off toggle. + * + * @module + */ + +// Maximum decompressed size of a single frame. A malicious publisher could otherwise send a tiny +// slice that inflates hugely, so {@link inflate} stops rather than allocating without limit. +// Mirrors the Rust `MAX_DECOMPRESSED_FRAME`. +const MAX_DECOMPRESSED_FRAME = 64 * 1024 * 1024; + +/** Compress one frame payload into a standalone `deflate-raw` blob. Empty in yields empty out. */ +export async function deflate(payload: Uint8Array): Promise { + if (payload.length === 0) return payload; + const cs = new CompressionStream("deflate-raw"); + return pump(cs, payload); +} + +/** + * Decompress one `deflate-raw` frame back into its payload. Empty in yields empty out. + * + * Throws if the input is malformed or inflates past the per-frame size limit. + */ +export async function inflate(slice: Uint8Array): Promise { + if (slice.length === 0) return slice; + const ds = new DecompressionStream("deflate-raw"); + return pump(ds, slice, MAX_DECOMPRESSED_FRAME); +} + +// Drive a (de)compression stream end-to-end: feed it `input`, read every output chunk, and +// concatenate. Reads concurrently with writing to avoid the transform's backpressure deadlock. +async function pump( + transform: CompressionStream | DecompressionStream, + input: Uint8Array, + limit = Number.POSITIVE_INFINITY, +): Promise { + const writer = transform.writable.getWriter(); + // The same error surfaces from the reader below, so swallow the writer's copy to avoid an + // unhandled rejection on malformed input. The cast narrows `ArrayBufferLike` to `ArrayBuffer`: + // our inputs are never SharedArrayBuffer-backed, which is all the DOM `BufferSource` type wants. + const written = (async () => { + await writer.write(input as Uint8Array); + await writer.close(); + })().catch(() => {}); + + const reader = transform.readable.getReader(); + const chunks: Uint8Array[] = []; + let total = 0; + for (;;) { + const { value, done } = await reader.read(); + if (done) break; + total += value.length; + if (total > limit) { + await reader.cancel(); + throw new Error(`decompressed frame exceeded ${limit} bytes`); + } + chunks.push(value); + } + await written; + + if (chunks.length === 1) return chunks[0]; + const out = new Uint8Array(total); + let offset = 0; + for (const chunk of chunks) { + out.set(chunk, offset); + offset += chunk.length; + } + return out; +} diff --git a/js/json/src/consumer.ts b/js/json/src/consumer.ts index 5db5a4dd5..9cd77c18a 100644 --- a/js/json/src/consumer.ts +++ b/js/json/src/consumer.ts @@ -1,5 +1,6 @@ import type * as Moq from "@moq/net"; import type * as z from "zod/mini"; +import { inflate } from "./compression.ts"; import { merge } from "./diff.ts"; import type { Config } from "./producer.ts"; @@ -12,6 +13,8 @@ import type { Config } from "./producer.ts"; export class Consumer { #track: Moq.Track; #schema?: z.ZodMiniType; + // Whether frames are `deflate-raw` compressed. Must match the producer's {@link Config.compression}. + #decompress: boolean; #group?: Moq.Group; #current?: unknown; @@ -20,6 +23,7 @@ export class Consumer { constructor(track: Moq.Track, config: Config = {}) { this.#track = track; this.#schema = config.schema; + this.#decompress = config.compression ?? false; } /** Get the next reconstructed value, or `undefined` once the track ends. */ @@ -40,7 +44,7 @@ export class Consumer { continue; } - return this.#apply(frame); + return await this.#apply(frame); } } @@ -52,9 +56,11 @@ export class Consumer { } } - // Frame 0 of a group is a snapshot, the rest are merge patches. - #apply(frame: Uint8Array): T { - const parsed = JSON.parse(new TextDecoder().decode(frame)); + // Frame 0 of a group is a snapshot, the rest are merge patches. Each frame is its own DEFLATE + // blob when compressed, so decoding needs no per-group state. + async #apply(frame: Uint8Array): Promise { + const payload = this.#decompress ? await inflate(frame) : frame; + const parsed = JSON.parse(new TextDecoder().decode(payload)); if (this.#framesRead === 0) { this.#current = parsed; } else { diff --git a/js/json/src/producer.ts b/js/json/src/producer.ts index eecbe7846..8ab857e1e 100644 --- a/js/json/src/producer.ts +++ b/js/json/src/producer.ts @@ -2,6 +2,7 @@ import * as Moq from "@moq/net"; import type { Effect } from "@moq/signals"; import type * as z from "zod/mini"; +import { deflate } from "./compression.ts"; import { deepEqual, diff } from "./diff.ts"; // Maximum frames (snapshot + deltas) in a single group before a new snapshot is forced. Kept @@ -30,6 +31,12 @@ export interface Config { // Starting value for {@link Producer.mutate} before anything has been published. Required to // mutate a producer that hasn't published yet (e.g. a fresh catalog); ignored once a value exists. initial?: T; + + // Compress each frame independently with `deflate-raw` (RFC 1951), interoperable with the Rust + // `moq-json` producer. `false`/unset (the default) writes plaintext JSON frames. A + // {@link Consumer} reading the track must set the same flag. The browser deflate exposes no + // level or dictionary knobs, so this is a plain on/off toggle. + compression?: boolean; } /** @@ -49,10 +56,19 @@ export class Producer { #track?: Moq.Track; #group?: Moq.Group; #last?: unknown; - // Bytes of deltas accumulated in the current group, excluding the snapshot frame. + // Bytes of deltas accumulated in the current group, excluding the snapshot frame. Always raw + // (uncompressed) sizes, even when compressing: the delta-vs-snapshot decision is made + // synchronously in `update()`, before the async compression runs. #deltaBytes = 0; #groupFrames = 0; + // Per-frame `deflate-raw` compression. Compression is async (the platform CompressionStream), + // so on the compressed path every track write is serialized through `#chain` to preserve frame + // and group order while `update()` stays synchronous. Decisions still run synchronously above. + #compress = false; + #chain: Promise = Promise.resolve(); + #failed = false; + // Fan-out mode: retains the value and serves a child (leaf) Producer per subscriber. #outputs?: Set>; #value?: T; @@ -70,6 +86,7 @@ export class Producer { this.#outputs = new Set(); this.#value = this.#config.initial; } + this.#compress = this.#config.compression ?? false; } /** The current value, or `undefined` if nothing has been published yet. */ @@ -110,7 +127,7 @@ export class Producer { const snapshot = new TextEncoder().encode(text); const delta = this.#delta(json, snapshot.length); if (delta && this.#group) { - this.#group.writeFrame(delta); + this.#writeFrame(this.#group, delta); this.#deltaBytes += delta.length; this.#groupFrames += 1; } else { @@ -179,9 +196,9 @@ export class Producer { return; } - this.#group?.close(); + if (this.#group) this.#closeGroup(this.#group); this.#group = undefined; - this.#track.close(); + this.#closeTrack(this.#track); } // Resolved delta ratio: the configured value, or the default when unset. `0` disables deltas. @@ -208,10 +225,10 @@ export class Producer { #snapshot(track: Moq.Track, snapshot: Uint8Array): void { // The previous group is complete; no more frames will be appended to it. - this.#group?.close(); + if (this.#group) this.#closeGroup(this.#group); const group = track.appendGroup(); - group.writeFrame(snapshot); + this.#writeFrame(group, snapshot); this.#deltaBytes = 0; this.#groupFrames = 1; @@ -220,8 +237,51 @@ export class Producer { this.#group = group; } else { // Deltas disabled: one frame per group, identical to a plain JSON track. - group.close(); + this.#closeGroup(group); this.#group = undefined; } } + + // Write a frame to `group`. Synchronous when uncompressed; on the compressed path the frame is + // deflated and written through `#chain` so frames and groups still land in order. + #writeFrame(group: Moq.Group, frame: Uint8Array): void { + if (!this.#compress) { + group.writeFrame(frame); + return; + } + this.#enqueue(async () => group.writeFrame(await deflate(frame))); + } + + // Close `group`, ordered after its pending compressed writes when compressing. + #closeGroup(group: Moq.Group): void { + if (!this.#compress) { + group.close(); + return; + } + this.#enqueue(() => group.close()); + } + + // Close the track, ordered after every pending compressed write when compressing. + #closeTrack(track: Moq.Track): void { + if (!this.#compress) { + track.close(); + return; + } + this.#enqueue(() => track.close()); + } + + // Append an ordered step to the compressed-write pipeline. The first failure tears the track + // down and turns later steps into no-ops, mirroring the synchronous path's fail-fast behavior. + #enqueue(step: () => Promise | void): void { + this.#chain = this.#chain + .then(() => { + if (!this.#failed) return step(); + }) + .catch((err) => { + if (this.#failed) return; + this.#failed = true; + console.warn("dropping json producer after a compressed write failed", err); + this.#track?.close(); + }); + } } diff --git a/rs/moq-json/Cargo.toml b/rs/moq-json/Cargo.toml index 1da452ac9..8cbeac345 100644 --- a/rs/moq-json/Cargo.toml +++ b/rs/moq-json/Cargo.toml @@ -17,10 +17,10 @@ doctest = false [dependencies] bytes = "1" +flate2 = { workspace = true } json-patch = "4" kio = { workspace = true } moq-net = { workspace = true } serde = { workspace = true } serde_json = "1" thiserror = "2" -zstd = { workspace = true, features = ["experimental"] } diff --git a/rs/moq-json/src/compression.rs b/rs/moq-json/src/compression.rs index 2b806ca10..8e4389cb6 100644 --- a/rs/moq-json/src/compression.rs +++ b/rs/moq-json/src/compression.rs @@ -1,204 +1,113 @@ -//! Group-scoped zstd compression for the JSON frame stream. +//! Per-frame DEFLATE compression for the JSON frame stream. //! -//! Within a group the frame payloads form a single zstd stream, flushed at each frame -//! boundary so every frame carries its own slice while later frames reuse the earlier ones as -//! context (a snapshot followed by deltas compresses far better than each frame alone). The -//! [`Encoder`]/[`Decoder`] hold that per-group state; both are recreated at every group -//! boundary. +//! Each frame payload is compressed on its own as a raw DEFLATE ([RFC 1951]) blob, the same +//! format the browser's `CompressionStream("deflate-raw")` produces and consumes. That keeps a +//! Rust producer and a browser (`@moq/json`) consumer interoperable on the wire, at the cost of +//! no cross-frame context: each frame compresses in isolation, so snapshots and large frames +//! shrink well while tiny deltas barely benefit. //! -//! Frames use magicless zstd frames with no content checksum: moq-net's framing already -//! delimits each slice, so the per-frame magic number and checksum would be redundant bytes. -//! An optional shared [dictionary](Compression::dictionary) primes the window so even a -//! group's first frame compresses well. +//! [RFC 1951]: https://www.rfc-editor.org/rfc/rfc1951.html + +use std::io::{Read, Write}; use bytes::Bytes; -use zstd::stream::raw::{CParameter, DParameter, InBuffer, Operation, OutBuffer}; -use zstd::zstd_safe::FrameFormat; +use flate2::Compression as Level; +use flate2::read::DeflateDecoder; +use flate2::write::DeflateEncoder; use crate::{Error, Result}; -/// Default zstd level: the library default, a good size/speed balance for the small, +/// Default DEFLATE level: zlib's own default, a good size/speed balance for the small, /// repetitive payloads this targets. -const DEFAULT_LEVEL: i32 = 3; +const DEFAULT_LEVEL: u32 = 6; -/// Maximum cumulative *decompressed* size of a single group, across all its frame payloads. +/// Maximum decompressed size of a single frame. /// -/// Compression is group-scoped, so a malicious publisher could otherwise send tiny slices -/// that each inflate hugely. zstd has no built-in total-output limit for streaming magicless -/// frames, so the [`Decoder`] enforces this bound across the group and returns -/// [`Error::TooLarge`] when exceeded, stopping rather than allocating without limit. -const MAX_DECOMPRESSED_GROUP: u64 = 64 * 1024 * 1024; - -/// Upper bound on the zstd decode window (log2 bytes); caps per-frame memory amplification so -/// a tiny input can't force a huge window allocation. 27 (128 MiB) is zstd's normal ceiling. -const WINDOW_LOG_MAX: u32 = 27; +/// A malicious publisher could otherwise send a tiny slice that inflates hugely, so +/// [`decompress`] stops and returns [`Error::TooLarge`] rather than allocating without limit. +const MAX_DECOMPRESSED_FRAME: u64 = 64 * 1024 * 1024; -/// Scratch buffer size for the zstd streaming loops. +/// Scratch buffer size for the streaming decompress loop. const CHUNK: usize = 8 * 1024; -/// zstd compression settings for a JSON track. +/// DEFLATE compression settings for a JSON track. /// -/// Construct from [`Default`] and override fields (the struct is `#[non_exhaustive]`, so new -/// options stay additive). Both ends of a track must agree on the [`dictionary`](Self::dictionary); -/// the level is a sender-only choice and need not match. +/// Build from [`Default`] and override fields (the struct is `#[non_exhaustive]`, so new options +/// stay additive). Only the producer needs these; decompression is self-describing, so a +/// [`Consumer`](crate::Consumer) is told via [`Consumer::with_compression`](crate::Consumer::with_compression) +/// only *that* a track is compressed, not how. #[derive(Debug, Clone)] #[non_exhaustive] pub struct Compression { - /// zstd compression level. Higher is smaller but slower. Defaults to `3`. - pub level: i32, - - /// An optional shared dictionary that primes the window, so even a group's first frame - /// compresses against known content. Must be identical on the producer and consumer; how a - /// consumer obtains it is out of band. - pub dictionary: Option, + /// DEFLATE level, `0..=9`. Higher is smaller but slower. Defaults to `6`. + /// + /// This is a sender-only choice and need not match the consumer (the wire format is the same + /// at any level). Browser producers can't set it; the platform deflate picks its own level. + pub level: u32, } impl Default for Compression { fn default() -> Self { - Self { - level: DEFAULT_LEVEL, - dictionary: None, - } + Self { level: DEFAULT_LEVEL } } } impl Compression { - /// Start a fresh per-group encoder. - pub(crate) fn encoder(&self) -> Encoder { - let mut e = match &self.dictionary { - Some(dict) => zstd::stream::raw::Encoder::with_dictionary(self.level, dict).expect("zstd encoder"), - None => zstd::stream::raw::Encoder::new(self.level).expect("zstd encoder"), - }; - // Magicless + no content checksum/size: the slices are delimited by moq-net, so those - // bytes would only be redundant. - e.set_parameter(CParameter::Format(FrameFormat::Magicless)) - .expect("zstd format"); - e.set_parameter(CParameter::ChecksumFlag(false)).expect("zstd checksum"); - e.set_parameter(CParameter::ContentSizeFlag(false)) - .expect("zstd content size"); - Encoder(e) - } - - /// Start a fresh per-group decoder. - pub(crate) fn decoder(&self) -> Decoder { - let mut d = match &self.dictionary { - Some(dict) => zstd::stream::raw::Decoder::with_dictionary(dict).expect("zstd decoder"), - None => zstd::stream::raw::Decoder::new().expect("zstd decoder"), - }; - d.set_parameter(DParameter::Format(FrameFormat::Magicless)) - .expect("zstd format"); - d.set_parameter(DParameter::WindowLogMax(WINDOW_LOG_MAX)) - .expect("zstd window"); - Decoder { inner: d, produced: 0 } - } -} - -/// Encodes a group's frame payloads into one shared zstd stream, one slice per frame. Hold one -/// per group; the stream is recreated at each group boundary. -pub(crate) struct Encoder(zstd::stream::raw::Encoder<'static>); - -impl Encoder { - /// Compress the next frame's `payload`, returning its slice of the group stream. + /// Compress one frame `payload` into a standalone raw DEFLATE blob. /// - /// An empty payload contributes nothing and yields an empty slice. Later frames reuse - /// earlier ones as context, so slices must be produced (and later decoded) in frame order. - pub(crate) fn frame(&mut self, payload: &[u8]) -> Bytes { + /// An empty payload yields an empty slice (compressing nothing). + pub(crate) fn compress(&self, payload: &[u8]) -> Bytes { if payload.is_empty() { return Bytes::new(); } - let mut out = Vec::with_capacity(payload.len() / 2 + 32); - let mut tmp = [0u8; CHUNK]; - let mut input = InBuffer::around(payload); - - loop { - let n = { - let mut output = OutBuffer::around(&mut tmp); - self.0.run(&mut input, &mut output).expect("zstd run"); - output.pos() - }; - out.extend_from_slice(&tmp[..n]); - if input.pos() == payload.len() { - break; - } - } - - // Flush (retaining the window) so this frame's slice is self-delimited while later - // frames in the group keep reusing the context. - loop { - let (remaining, n) = { - let mut output = OutBuffer::around(&mut tmp); - let remaining = self.0.flush(&mut output).expect("zstd flush"); - (remaining, output.pos()) - }; - out.extend_from_slice(&tmp[..n]); - if remaining == 0 { - break; - } - } - - Bytes::from(out) + let mut encoder = DeflateEncoder::new(Vec::with_capacity(payload.len() / 2 + 16), Level::new(self.level)); + encoder.write_all(payload).expect("deflate write"); + Bytes::from(encoder.finish().expect("deflate finish")) } } -/// Decodes a group's frame slices back into the original payloads. Hold one per group; feed -/// slices in frame order (each frame builds on the earlier ones). -pub(crate) struct Decoder { - inner: zstd::stream::raw::Decoder<'static>, - // Cumulative decompressed bytes this group, for the zip-bomb bound. - produced: u64, -} - -impl Decoder { - /// Decompress the next frame's `slice` back into its payload. - /// - /// An empty slice yields an empty payload. Returns [`Error::TooLarge`] if the group's - /// cumulative decompressed size would exceed the bound, and [`Error::Decompress`] on - /// malformed input. - pub(crate) fn frame(&mut self, slice: &[u8]) -> Result { - if slice.is_empty() { - return Ok(Bytes::new()); - } +/// Decompress one frame `slice` back into its raw DEFLATE payload. +/// +/// An empty slice yields an empty payload. Returns [`Error::TooLarge`] if the frame inflates past +/// the per-frame bound, and [`Error::Decompress`] on malformed or truncated input. +pub(crate) fn decompress(slice: &[u8]) -> Result { + if slice.is_empty() { + return Ok(Bytes::new()); + } - let mut out = Vec::with_capacity(slice.len() * 2 + 16); - let mut tmp = [0u8; CHUNK]; - let mut input = InBuffer::around(slice); + let mut decoder = DeflateDecoder::new(slice); + let mut out = Vec::with_capacity(slice.len() * 2 + 16); + let mut tmp = [0u8; CHUNK]; - loop { - let n = { - let mut output = OutBuffer::around(&mut tmp); - self.inner.run(&mut input, &mut output).map_err(|_| Error::Decompress)?; - output.pos() - }; - out.extend_from_slice(&tmp[..n]); - if self.produced + out.len() as u64 > MAX_DECOMPRESSED_GROUP { - return Err(Error::TooLarge(MAX_DECOMPRESSED_GROUP)); - } - if input.pos() == slice.len() && n == 0 { - break; - } + loop { + let n = decoder.read(&mut tmp).map_err(|_| Error::Decompress)?; + if n == 0 { + break; } - - self.produced += out.len() as u64; - Ok(Bytes::from(out)) + if out.len() as u64 + n as u64 > MAX_DECOMPRESSED_FRAME { + return Err(Error::TooLarge(MAX_DECOMPRESSED_FRAME)); + } + out.extend_from_slice(&tmp[..n]); } + + Ok(Bytes::from(out)) } #[cfg(test)] mod test { use super::*; - /// Round-trip a sequence of frames through a group encoder/decoder pair. + /// Round-trip a sequence of frames, each compressed and decompressed independently. fn roundtrip(config: &Compression, frames: &[&[u8]]) -> Vec> { - let mut enc = config.encoder(); - let slices: Vec = frames.iter().map(|f| enc.frame(f)).collect(); - - let mut dec = config.decoder(); - slices.iter().map(|s| dec.frame(s).unwrap().to_vec()).collect() + frames + .iter() + .map(|f| decompress(&config.compress(f)).unwrap().to_vec()) + .collect() } #[test] - fn group_roundtrip() { + fn frame_roundtrip() { let frames: &[&[u8]] = &[b"the quick brown fox", b"the quick brown dog", b"the lazy fox"]; let got = roundtrip(&Compression::default(), frames); for (a, b) in frames.iter().zip(&got) { @@ -207,59 +116,29 @@ mod test { } #[test] - fn empty_frames_roundtrip() { - let frames: &[&[u8]] = &[b"", b"hello", b"", b"world"]; - let got = roundtrip(&Compression::default(), frames); - assert_eq!( - got, - vec![b"".to_vec(), b"hello".to_vec(), b"".to_vec(), b"world".to_vec()] - ); + fn empty_frame_roundtrips() { + assert!(Compression::default().compress(b"").is_empty()); + assert!(decompress(b"").unwrap().is_empty()); } #[test] - fn cross_frame_redundancy_shrinks() { - // A later frame identical to an earlier one compresses to far fewer bytes once the - // window holds the earlier copy. + fn repetitive_payload_shrinks() { + // A payload with lots of internal redundancy compresses well even on its own. let config = Compression::default(); let payload = b"Media over QUIC delivers real-time latency at massive scale.".repeat(6); - let mut enc = config.encoder(); - let first = enc.frame(&payload); - let second = enc.frame(&payload); - assert!( - second.len() < first.len(), - "repeat frame {} should be smaller than first {}", - second.len(), - first.len() - ); - } - - #[test] - fn dictionary_shrinks_first_frame() { - // A dictionary primes the window, so even the group's first frame compresses against it. - let payload = br#"{"video":{"renditions":{"video0":{"codec":"avc1.64001f"}}}}"#; - let plain = Compression::default(); - let primed = Compression { - dictionary: Some(Bytes::from_static(payload)), - ..Default::default() - }; - - let first_plain = plain.encoder().frame(payload); - let first_primed = primed.encoder().frame(payload); + let compressed = config.compress(&payload); assert!( - first_primed.len() < first_plain.len(), - "dictionary frame {} should beat undictionaried {}", - first_primed.len(), - first_plain.len() + compressed.len() < payload.len(), + "compressed {} should beat raw {}", + compressed.len(), + payload.len() ); - - // And it still round-trips with the same dictionary on the decode side. - let mut dec = primed.decoder(); - assert_eq!(dec.frame(&first_primed).unwrap(), Bytes::from_static(payload)); + assert_eq!(decompress(&compressed).unwrap(), Bytes::from(payload)); } #[test] fn decompress_rejects_garbage() { - let mut dec = Compression::default().decoder(); - assert!(matches!(dec.frame(b"not a zstd stream at all"), Err(Error::Decompress))); + // Random bytes that don't form a valid DEFLATE stream are rejected, not silently truncated. + assert!(matches!(decompress(&[0xff; 64]), Err(Error::Decompress))); } } diff --git a/rs/moq-json/src/lib.rs b/rs/moq-json/src/lib.rs index b805a02c9..8ed02a4ca 100644 --- a/rs/moq-json/src/lib.rs +++ b/rs/moq-json/src/lib.rs @@ -24,7 +24,6 @@ use serde_json::Value; pub use compression::Compression; -use crate::compression::{Decoder, Encoder}; use crate::diff::diff; /// Maximum frames (snapshot + deltas) in a single group before a new snapshot is forced. @@ -51,8 +50,8 @@ pub enum Error { #[error("decompression failed")] Decompress, - /// A group's cumulative decompressed size exceeded the limit (zip-bomb guard). - #[error("decompressed group exceeded {0} bytes")] + /// A frame's decompressed size exceeded the limit (zip-bomb guard). + #[error("decompressed frame exceeded {0} bytes")] TooLarge(u64), } @@ -82,18 +81,16 @@ pub struct Config { /// to one snapshot before rolling, and a larger ratio tolerates more deltas per snapshot. /// /// When [`compression`](Self::compression) is set, both sides of the comparison are measured - /// on *compressed* bytes (a warm per-group window shrinks each successive delta), so more - /// deltas pack into a group than the raw sizes would suggest. + /// on the *compressed* frame sizes (the real wire cost). /// /// Defaults to `8`. pub delta_ratio: u32, - /// Optional zstd compression for the frame stream. + /// Optional per-frame DEFLATE compression for the frame stream. /// - /// `None` (the default) writes plaintext JSON frames, identical on the wire to a track with - /// no compression. `Some(..)` compresses each group as one zstd stream (see [`Compression`]); - /// a [`Consumer`] reading the track must be created with the matching settings via - /// [`Consumer::with_compression`]. + /// `None` (the default) writes plaintext JSON frames, identical on the wire to a track with no + /// compression. `Some(..)` compresses each frame independently (see [`Compression`]); a + /// [`Consumer`] reading the track must be created via [`Consumer::with_compression`]. pub compression: Option, } @@ -138,7 +135,6 @@ impl Producer { inner: Arc::new(Mutex::new(Inner { track, group: None, - encoder: None, last: None, delta_bytes: 0, snapshot_len: 0, @@ -241,14 +237,12 @@ impl Drop for Guard<'_, T> { struct Inner { track: moq_net::TrackProducer, group: Option, - // Per-group zstd encoder, `Some` while a compressed group is open (recreated per group). - encoder: Option, last: Option, - // Bytes of deltas accumulated in the current group, excluding the snapshot frame. - // Compressed slice sizes when compressing, raw patch sizes otherwise. + // Bytes of deltas accumulated in the current group, excluding the snapshot frame. Compressed + // frame sizes when compressing, raw patch sizes otherwise. delta_bytes: u64, - // Reference size the delta budget is measured against: the current group's snapshot frame, - // as its compressed slice size. Only consulted on the compressed path. + // Reference size the delta budget is measured against: the current group's snapshot frame. + // Compressed when compressing, raw otherwise. snapshot_len: u64, group_frames: usize, config: Config, @@ -275,7 +269,7 @@ impl Inner { Ok(()) } - /// Serialize (and, when compressing, encode) a delta if deltas are enabled and appending one + /// Serialize (and, when compressing, compress) a delta if deltas are enabled and appending one /// keeps the group within budget; otherwise `None`, signalling that a fresh snapshot should be /// published instead. Returns the frame slice ready to write. fn delta(&mut self, value: &Value, snapshot_len: usize) -> Result> { @@ -298,13 +292,12 @@ impl Inner { serde_json::to_vec(&diff.patch)? }; - match self.encoder.as_mut() { - // Compressed: measure against the group's anchoring snapshot's compressed size and the - // delta's *compressed* slice size (the real wire cost). Encoding advances the per-group - // window; if the delta doesn't fit we roll a new group with a fresh encoder, discarding - // this slice (the abandoned window has no effect on the new group). - Some(encoder) => { - let slice = encoder.frame(&patch); + match &self.config.compression { + // Compressed: measure the delta's *compressed* size (the real wire cost) against the + // group's anchoring snapshot, also compressed. If it doesn't fit we roll a new group and + // discard this slice (frames compress independently, so nothing carries over). + Some(compression) => { + let slice = compression.compress(&patch); let projected = self.delta_bytes + slice.len() as u64; if projected > ratio as u64 * self.snapshot_len { return Ok(None); @@ -331,27 +324,21 @@ impl Inner { let mut group = self.track.append_group()?; - // Open a fresh per-group encoder (cold window) and compress the snapshot as frame 0. - let (slice, encoder) = match &self.config.compression { - Some(config) => { - let mut encoder = config.encoder(); - let slice = encoder.frame(&snapshot); - (slice, Some(encoder)) - } - None => (Bytes::from(snapshot), None), + // Compress the snapshot as frame 0 if enabled, recording its wire size as the delta anchor. + let slice = match &self.config.compression { + Some(compression) => compression.compress(&snapshot), + None => Bytes::from(snapshot), }; self.snapshot_len = slice.len() as u64; group.write_frame(slice)?; self.delta_bytes = 0; self.group_frames = 1; - self.encoder = encoder; if self.config.delta_ratio != 0 { - // Keep the group (and its encoder) open so future deltas can be appended. + // Keep the group open so future deltas can be appended. self.group = Some(group); } else { // Deltas disabled: one frame per group, identical to a plain JSON track. - self.encoder = None; group.finish()?; } @@ -371,13 +358,8 @@ impl Inner { pub struct Consumer { track: moq_net::TrackConsumer, group: Option, - // Compression settings, matching the producer's; `None` reads plaintext frames. - compression: Option, - // Per-group zstd decoder, built lazily on the first compressed frame of a group. - decoder: Option, - // Compressed slices read so far in the current group, in order. Lets a cloned consumer - // rebuild the (non-cloneable) decoder window by replaying them. Empty when uncompressed. - group_slices: Vec, + // Whether frames are DEFLATE-compressed, matching the producer's [`Config::compression`]. + compressed: bool, current: Option, frames_read: usize, _marker: PhantomData T>, @@ -390,11 +372,7 @@ impl Clone for Consumer { Self { track: self.track.clone(), group: self.group.clone(), - compression: self.compression.clone(), - // A zstd decoder can't be cloned (per-group window state), so the clone starts without - // one and rebuilds it from `group_slices` on its next compressed read. - decoder: None, - group_slices: self.group_slices.clone(), + compressed: self.compressed, current: self.current.clone(), frames_read: self.frames_read, _marker: PhantomData, @@ -405,24 +383,23 @@ impl Clone for Consumer { impl Consumer { /// Create a consumer reading plaintext frames from the given track subscriber. pub fn new(track: moq_net::TrackConsumer) -> Self { - Self::build(track, None) + Self::build(track, false) } - /// Create a consumer that decompresses frames with the given [`Compression`] settings. + /// Create a consumer that decompresses DEFLATE frames written by a producer with + /// [`Config::compression`] set. /// - /// These must match the producer's [`Config::compression`], including the - /// [`dictionary`](Compression::dictionary). - pub fn with_compression(track: moq_net::TrackConsumer, compression: Compression) -> Self { - Self::build(track, Some(compression)) + /// Decompression is self-describing, so no settings are needed beyond knowing the track is + /// compressed (the producer's level does not have to be matched). + pub fn with_compression(track: moq_net::TrackConsumer) -> Self { + Self::build(track, true) } - fn build(track: moq_net::TrackConsumer, compression: Option) -> Self { + fn build(track: moq_net::TrackConsumer, compressed: bool) -> Self { Self { track, group: None, - compression, - decoder: None, - group_slices: Vec::new(), + compressed, current: None, frames_read: 0, _marker: PhantomData, @@ -449,9 +426,6 @@ impl Consumer { self.group = Some(group); self.current = None; self.frames_read = 0; - // Each group is its own compressed stream, so reset the decoder state. - self.decoder = None; - self.group_slices.clear(); } Poll::Ready(None) => break true, Poll::Pending => break false, @@ -476,25 +450,13 @@ impl Consumer { /// Decompress a frame slice, or pass it through when the track is uncompressed. /// - /// The per-group decoder is built lazily on the first compressed frame. A cloned consumer - /// starts without a decoder, so the first call replays the group's already-read slices to - /// rebuild the (non-cloneable) zstd window before decoding the new frame. - fn decode(&mut self, slice: Bytes) -> Result { - let Some(compression) = &self.compression else { - return Ok(slice); - }; - - if self.decoder.is_none() { - let mut decoder = compression.decoder(); - for prev in &self.group_slices { - decoder.frame(prev)?; - } - self.decoder = Some(decoder); + /// Each frame is its own DEFLATE blob, so this needs no per-group state. + fn decode(&self, slice: Bytes) -> Result { + if self.compressed { + compression::decompress(&slice) + } else { + Ok(slice) } - - let plain = self.decoder.as_mut().unwrap().frame(&slice)?; - self.group_slices.push(slice); - Ok(plain) } /// Apply one frame: frame 0 of a group is a snapshot, the rest are merge patches. @@ -530,8 +492,8 @@ mod test { } } - /// A zstd-compressed config with the given delta ratio. - fn cfg_zstd(delta_ratio: u32) -> Config { + /// A DEFLATE-compressed config with the given delta ratio. + fn cfg_deflate(delta_ratio: u32) -> Config { Config { delta_ratio, compression: Some(Compression::default()), @@ -789,49 +751,50 @@ mod test { #[test] fn compressed_snapshot_per_group_roundtrips() { - let (mut producer, track) = producer(cfg_zstd(0)); + let (mut producer, track) = producer(cfg_deflate(0)); producer.update(&json!({ "a": 1 })).unwrap(); producer.update(&json!({ "a": 2 })).unwrap(); producer.finish().unwrap(); // Deltas disabled: one compressed snapshot per group, latest reconstructs identically. assert_eq!(track.latest(), Some(1)); - let values = drain_with(Consumer::with_compression(track, Compression::default())); + let values = drain_with(Consumer::with_compression(track)); assert_eq!(values, vec![json!({ "a": 2 })]); } #[test] fn compressed_deltas_share_one_group() { - let (mut producer, track) = producer(cfg_zstd(100)); + let (mut producer, track) = producer(cfg_deflate(100)); producer.update(&json!({ "a": 1, "b": 1 })).unwrap(); producer.update(&json!({ "a": 1, "b": 2 })).unwrap(); producer.update(&json!({ "a": 1, "b": 3 })).unwrap(); producer.finish().unwrap(); - // Snapshot + deltas in one group, reconstructed through the per-group decoder. + // Snapshot + deltas in one group, each frame decompressed independently. assert_eq!(track.latest(), Some(0)); - let values = drain_with(Consumer::with_compression(track, Compression::default())); + let values = drain_with(Consumer::with_compression(track)); assert_eq!(values.last().unwrap(), &json!({ "a": 1, "b": 3 })); } #[test] fn compressed_late_joiner_reconstructs_from_deltas() { - let (mut producer, track) = producer(cfg_zstd(100)); + let (mut producer, track) = producer(cfg_deflate(100)); producer.update(&json!({ "a": 1, "b": 1 })).unwrap(); producer.update(&json!({ "a": 1, "b": 2 })).unwrap(); producer.update(&json!({ "a": 5, "b": 2 })).unwrap(); producer.finish().unwrap(); // A consumer created only now rebuilds the final value from the compressed snapshot + deltas. - let values = drain_with(Consumer::with_compression(track, Compression::default())); + let values = drain_with(Consumer::with_compression(track)); assert_eq!(values.last().unwrap(), &json!({ "a": 5, "b": 2 })); } #[test] fn compressed_cloned_consumer_reconstructs_mid_group() { - // A clone taken mid-group has no decoder window; it must rebuild from the retained slices. - let (mut producer, track) = producer(cfg_zstd(100)); - let mut consumer = Consumer::::with_compression(track, Compression::default()); + // Frames compress independently, so a clone taken mid-group just inherits the reconstruction + // state and keeps decoding each frame on its own. + let (mut producer, track) = producer(cfg_deflate(100)); + let mut consumer = Consumer::::with_compression(track); let waiter = kio::Waiter::noop(); producer.update(&json!({ "a": 1, "b": 1 })).unwrap(); // compressed snapshot, group 0 @@ -854,40 +817,13 @@ mod test { } } - #[test] - fn dictionary_roundtrips() { - // A shared dictionary on both ends primes the window; the value still reconstructs exactly. - let dict = bytes::Bytes::from_static(br#"{"video":{"renditions":{}},"audio":{"renditions":{}}}"#); - let compression = Compression { - dictionary: Some(dict), - ..Default::default() - }; - - let track = moq_net::Track::new("test").produce(); - let consumer = track.consume(); - let mut producer = Producer::::new( - track, - Config { - delta_ratio: 8, - compression: Some(compression.clone()), - }, - ); - - let value = json!({ "video": { "renditions": { "v0": { "codec": "avc1.64001f" } } } }); - producer.update(&value).unwrap(); - producer.finish().unwrap(); - - let values = drain_with(Consumer::with_compression(consumer, compression)); - assert_eq!(values.last().unwrap(), &value); - } - #[test] fn compression_shrinks_wire_frames() { // A repetitive payload should serialize to fewer wire bytes compressed than plaintext. let value = json!({ "renditions": ["video".repeat(50), "video".repeat(50), "video".repeat(50)] }); let plaintext_bytes = wire_frame_len(cfg(0), &value); - let compressed_bytes = wire_frame_len(cfg_zstd(0), &value); + let compressed_bytes = wire_frame_len(cfg_deflate(0), &value); assert!( compressed_bytes < plaintext_bytes, "compressed frame {compressed_bytes} should be smaller than plaintext {plaintext_bytes}" From 2f5a91c9a108044862a563f4afa5fa626c751900 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 24 Jun 2026 04:44:07 +0000 Subject: [PATCH 3/7] fix(moq-json): bound inflate allocation and guard dropped producer Cap the initial inflate buffer at the per-frame ceiling so a publisher- controlled compressed frame can't force a large allocation before the streaming size guard runs. Add a #failed guard to the JS producer's update() so a torn-down compressed producer no-ops instead of throwing from appendGroup() on a closed track. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01B3xwgHid4UYjeugewkxUyj --- js/json/src/producer.ts | 4 ++++ rs/moq-json/src/compression.rs | 9 ++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/js/json/src/producer.ts b/js/json/src/producer.ts index 8ab857e1e..22ef6e459 100644 --- a/js/json/src/producer.ts +++ b/js/json/src/producer.ts @@ -116,6 +116,10 @@ export class Producer { return; } + // A failed compressed write has already torn the track down; stay quiet rather than throw + // from the synchronous path (e.g. `#snapshot()` calling `appendGroup()` on a closed track). + if (this.#failed) return; + const valid = this.#config.schema ? this.#config.schema.parse(value) : value; // Serialize once; parse it back to a normalized JSON value for diffing and comparison diff --git a/rs/moq-json/src/compression.rs b/rs/moq-json/src/compression.rs index 8e4389cb6..62c9a95f3 100644 --- a/rs/moq-json/src/compression.rs +++ b/rs/moq-json/src/compression.rs @@ -77,7 +77,14 @@ pub(crate) fn decompress(slice: &[u8]) -> Result { } let mut decoder = DeflateDecoder::new(slice); - let mut out = Vec::with_capacity(slice.len() * 2 + 16); + // `slice.len()` is publisher-controlled, so cap the initial guess at the per-frame ceiling: a + // huge compressed frame can't force a huge allocation before the streaming guard below runs. + let initial_capacity = slice + .len() + .saturating_mul(2) + .saturating_add(16) + .min(MAX_DECOMPRESSED_FRAME as usize); + let mut out = Vec::with_capacity(initial_capacity); let mut tmp = [0u8; CHUNK]; loop { From 8a05365753ad83228d478b6e69ec90fa05b7cf3b Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 24 Jun 2026 04:54:02 +0000 Subject: [PATCH 4/7] refactor(moq-json): make compression level a validated newtype Replace the public `level: u32` on Compression with a `Level` newtype whose constructor clamps to the valid 0..=9 range, so an out-of-range level can't be constructed and reach flate2 (where it would produce backend-dependent output). Compression::default() is now derived. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01B3xwgHid4UYjeugewkxUyj --- rs/moq-json/src/compression.rs | 51 ++++++++++++++++++++++++++-------- rs/moq-json/src/lib.rs | 2 +- 2 files changed, 41 insertions(+), 12 deletions(-) diff --git a/rs/moq-json/src/compression.rs b/rs/moq-json/src/compression.rs index 62c9a95f3..8fcc7dbf3 100644 --- a/rs/moq-json/src/compression.rs +++ b/rs/moq-json/src/compression.rs @@ -11,7 +11,6 @@ use std::io::{Read, Write}; use bytes::Bytes; -use flate2::Compression as Level; use flate2::read::DeflateDecoder; use flate2::write::DeflateEncoder; @@ -21,6 +20,33 @@ use crate::{Error, Result}; /// repetitive payloads this targets. const DEFAULT_LEVEL: u32 = 6; +/// A DEFLATE compression level in the valid `0..=9` range. +/// +/// `0` stores without compressing, `9` is smallest but slowest. Construct via [`Level::new`], +/// which clamps out-of-range values, so an invalid level (e.g. `99`) is unrepresentable rather +/// than producing backend-dependent output. The level is a sender-only choice and need not match +/// the consumer. +#[derive(Debug, Clone, Copy)] +pub struct Level(u32); + +impl Level { + /// Wrap a raw level, clamping to the valid `0..=9` range. + pub fn new(level: u32) -> Self { + Self(level.min(9)) + } + + /// The raw level, guaranteed to be in `0..=9`. + pub fn get(self) -> u32 { + self.0 + } +} + +impl Default for Level { + fn default() -> Self { + Self(DEFAULT_LEVEL) + } +} + /// Maximum decompressed size of a single frame. /// /// A malicious publisher could otherwise send a tiny slice that inflates hugely, so @@ -36,20 +62,14 @@ const CHUNK: usize = 8 * 1024; /// stay additive). Only the producer needs these; decompression is self-describing, so a /// [`Consumer`](crate::Consumer) is told via [`Consumer::with_compression`](crate::Consumer::with_compression) /// only *that* a track is compressed, not how. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] #[non_exhaustive] pub struct Compression { - /// DEFLATE level, `0..=9`. Higher is smaller but slower. Defaults to `6`. + /// DEFLATE [`Level`]. Higher is smaller but slower. Defaults to `6`. /// /// This is a sender-only choice and need not match the consumer (the wire format is the same /// at any level). Browser producers can't set it; the platform deflate picks its own level. - pub level: u32, -} - -impl Default for Compression { - fn default() -> Self { - Self { level: DEFAULT_LEVEL } - } + pub level: Level, } impl Compression { @@ -61,7 +81,8 @@ impl Compression { return Bytes::new(); } - let mut encoder = DeflateEncoder::new(Vec::with_capacity(payload.len() / 2 + 16), Level::new(self.level)); + let level = flate2::Compression::new(self.level.get()); + let mut encoder = DeflateEncoder::new(Vec::with_capacity(payload.len() / 2 + 16), level); encoder.write_all(payload).expect("deflate write"); Bytes::from(encoder.finish().expect("deflate finish")) } @@ -143,6 +164,14 @@ mod test { assert_eq!(decompress(&compressed).unwrap(), Bytes::from(payload)); } + #[test] + fn level_clamps_out_of_range() { + // An out-of-range level is clamped, not stored verbatim, so it can't reach the backend. + assert_eq!(Level::new(99).get(), 9); + assert_eq!(Level::new(6).get(), 6); + assert_eq!(Level::default().get(), DEFAULT_LEVEL); + } + #[test] fn decompress_rejects_garbage() { // Random bytes that don't form a valid DEFLATE stream are rejected, not silently truncated. diff --git a/rs/moq-json/src/lib.rs b/rs/moq-json/src/lib.rs index 8ed02a4ca..666c4d194 100644 --- a/rs/moq-json/src/lib.rs +++ b/rs/moq-json/src/lib.rs @@ -22,7 +22,7 @@ use serde::Serialize; use serde::de::DeserializeOwned; use serde_json::Value; -pub use compression::Compression; +pub use compression::{Compression, Level}; use crate::diff::diff; From cf76ed788d3a7a8bfbff22f83b42c092c6dde7b2 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 24 Jun 2026 06:16:47 +0000 Subject: [PATCH 5/7] feat(moq-json): group-scoped DEFLATE with shared window Replace per-frame-independent compression with one sync-flushed DEFLATE stream per group, so deltas reuse the snapshot (and earlier deltas) as context and shrink ~3x. Each frame is a self-delimited slice; the fixed 00 00 ff ff sync-flush marker is stripped on the wire and re-appended to decode (RFC 7692 style), saving 4 bytes per frame. Rust uses flate2 streaming (Compress/Decompress with Z_SYNC_FLUSH). The consumer keeps a per-group decoder and replays a group's slices to rebuild a cloned consumer's window. JS uses pako for both encode and decode (the platform CompressionStream can't flush mid-stream, and fflate's flush is broken on some inputs). pako is an optional peer dependency loaded via dynamic import, so consumers that never compress a track don't bundle it. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01B3xwgHid4UYjeugewkxUyj --- bun.lock | 10 ++ js/json/package.json | 8 ++ js/json/src/compression.test.ts | 65 +++++++-- js/json/src/compression.ts | 181 +++++++++++++++++------- js/json/src/consumer.ts | 16 ++- js/json/src/producer.ts | 50 +++++-- rs/moq-json/src/compression.rs | 240 ++++++++++++++++++++++---------- rs/moq-json/src/lib.rs | 117 ++++++++++++---- 8 files changed, 501 insertions(+), 186 deletions(-) diff --git a/bun.lock b/bun.lock index 2d0fbca7f..898fbcb98 100644 --- a/bun.lock +++ b/bun.lock @@ -109,12 +109,18 @@ }, "devDependencies": { "@types/bun": "^1.3.14", + "@types/pako": "^2.0.4", + "pako": "^2.2.0", "rimraf": "^6.1.3", "typescript": "^6.0.3", }, "peerDependencies": { + "pako": "^2.2.0", "zod": "^4.0.0", }, + "optionalPeers": [ + "pako", + ], }, "js/loc": { "name": "@moq/loc", @@ -795,6 +801,8 @@ "@types/node": ["@types/node@25.9.2", "", { "dependencies": { "undici-types": ">=7.24.0 <7.24.7" } }, "sha512-G05zqtJhcDLb8uslf5EjCxXg9G1KQxiV8OS0R26IC//Eoyitzqe8z37I7cqvnZlrlSfgocQRfSn/AHBZJJFyGw=="], + "@types/pako": ["@types/pako@2.0.4", "", {}, "sha512-VWDCbrLeVXJM9fihYodcLiIv0ku+AlOa/TQ1SvYOaBuyrSKgEcro95LJyIsJ4vSo6BXIxOKxiJAat04CmST9Fw=="], + "@types/react": ["@types/react@19.2.17", "", { "dependencies": { "csstype": "^3.2.2" } }, "sha512-MXfmqaVPEVgkBT/aY0aGCkRWWtByiYQXo3xdQ8r5RzuFrPiRn8Gar2tQdXSUQ2GKV3bkXckek89V8wQBY2Q/Aw=="], "@types/supports-color": ["@types/supports-color@8.1.3", "", {}, "sha512-Hy6UMpxhE3j1tLpl27exp1XqHD7n8chAiNPzWfz16LPZoMMoSc4dzLl6w9qijkEb/r5O1ozdu1CWGA2L83ZeZg=="], @@ -1337,6 +1345,8 @@ "package-manager-detector": ["package-manager-detector@1.6.0", "", {}, "sha512-61A5ThoTiDG/C8s8UMZwSorAGwMJ0ERVGj2OjoW5pAalsNOg15+iQiPzrLJ4jhZ1HJzmC2PIHT2oEiH3R5fzNA=="], + "pako": ["pako@2.2.0", "", {}, "sha512-zJq6RP/5q+TO2OpFV3FHzlPnFjmkb7Nc99a5SNjJE+uu/PkpChs+NIZSSzbBoD+6kjiISXjfYdwj1ZRQ81dz/w=="], + "param-case": ["param-case@3.0.4", "", { "dependencies": { "dot-case": "^3.0.4", "tslib": "^2.0.3" } }, "sha512-RXlj7zCYokReqWpOPH9oYivUzLYZ5vAPIfEmCTNViosC78F8F0H9y7T7gG2M39ymgutxF5gcFEsyZQSph9Bp3A=="], "parse-entities": ["parse-entities@4.0.2", "", { "dependencies": { "@types/unist": "^2.0.0", "character-entities-legacy": "^3.0.0", "character-reference-invalid": "^2.0.0", "decode-named-character-reference": "^1.0.0", "is-alphanumerical": "^2.0.0", "is-decimal": "^2.0.0", "is-hexadecimal": "^2.0.0" } }, "sha512-GG2AQYWoLgL877gQIKeRPGO1xF9+eG1ujIb5soS5gPvLQ1y2o8FL90w2QWNdf9I361Mpp7726c+lj3U0qK1uGw=="], diff --git a/js/json/package.json b/js/json/package.json index 7526da354..c97f5e1e1 100644 --- a/js/json/package.json +++ b/js/json/package.json @@ -20,10 +20,18 @@ "@moq/signals": "workspace:^" }, "peerDependencies": { + "pako": "^2.2.0", "zod": "^4.0.0" }, + "peerDependenciesMeta": { + "pako": { + "optional": true + } + }, "devDependencies": { "@types/bun": "^1.3.14", + "@types/pako": "^2.0.4", + "pako": "^2.2.0", "rimraf": "^6.1.3", "typescript": "^6.0.3" } diff --git a/js/json/src/compression.test.ts b/js/json/src/compression.test.ts index a26b7045c..86fc213d5 100644 --- a/js/json/src/compression.test.ts +++ b/js/json/src/compression.test.ts @@ -1,6 +1,6 @@ import { expect, test } from "bun:test"; import { Track } from "@moq/net"; -import { deflate, inflate } from "./compression.ts"; +import { Decoder, Encoder } from "./compression.ts"; import { Consumer } from "./consumer.ts"; import { Producer } from "./producer.ts"; @@ -25,18 +25,34 @@ async function firstFrame(track: Track): Promise { return frame; } -test("codec round-trips a frame", async () => { - const payload = enc.encode("the quick brown fox"); - expect(dec.decode(await inflate(await deflate(payload)))).toBe("the quick brown fox"); +test("codec round-trips a group of frames in order", async () => { + const frames = ["the quick brown fox", "the quick brown dog", "the lazy fox"]; + const encoder = await Encoder.create(); + const slices = frames.map((f) => encoder.frame(enc.encode(f))); + + const decoder = await Decoder.create(); + expect(slices.map((s) => dec.decode(decoder.frame(s)))).toEqual(frames); }); test("codec round-trips an empty frame", async () => { - expect((await deflate(new Uint8Array())).length).toBe(0); - expect((await inflate(new Uint8Array())).length).toBe(0); + const encoder = await Encoder.create(); + const decoder = await Decoder.create(); + expect(encoder.frame(new Uint8Array()).length).toBe(0); + expect(decoder.frame(new Uint8Array()).length).toBe(0); }); test("codec rejects garbage", async () => { - await expect(inflate(new Uint8Array(64).fill(0xff))).rejects.toThrow(); + const decoder = await Decoder.create(); + expect(() => decoder.frame(new Uint8Array(64).fill(0xff))).toThrow(); +}); + +test("cross-frame context shrinks a repeated frame", async () => { + // A later frame identical to an earlier one compresses far smaller once the window holds it. + const encoder = await Encoder.create(); + const payload = enc.encode("Media over QUIC delivers real-time latency at massive scale.".repeat(6)); + const first = encoder.frame(payload); + const second = encoder.frame(payload); + expect(second.length).toBeLessThan(first.length); }); test("compressed snapshot per group round-trips", async () => { @@ -51,8 +67,8 @@ test("compressed snapshot per group round-trips", async () => { }); test("compressed live consumer sees each update in order", async () => { - // Compression makes writes async, so this exercises that the per-frame deflate pipeline still - // delivers frames (and groups) strictly in order. + // Compression makes writes async, so this exercises that the streaming pipeline still delivers + // frames (and groups) strictly in order. const track = new Track("test"); const producer = new Producer(track, { deltaRatio: 100, compression: true }); const consumer = new Consumer(track, { compression: true }); @@ -82,20 +98,39 @@ test("compressed late joiner reconstructs from snapshot + deltas", async () => { producer.update({ a: 5, b: 2 }); producer.finish(); - // A consumer created only now still rebuilds the final value. + // A consumer created only now still rebuilds the final value from the snapshot + deltas. expect((await drainCompressed(track)).at(-1)).toEqual({ a: 5, b: 2 }); }); -test("each compressed frame is valid standalone deflate-raw", async () => { - // The frame the producer stored should decode on its own back to the original snapshot, which - // is what keeps it interoperable with the Rust producer's per-frame format. +test("a group's snapshot decodes from a fresh decoder", async () => { + // Frame 0 opens a cold window, so a brand-new decoder reconstructs it, which is what lets a late + // joiner (or the Rust consumer) start mid-stream at any group boundary. const track = new Track("test"); const producer = new Producer(track, { deltaRatio: 0, compression: true }); producer.update({ hello: "world" }); producer.finish(); - const frame = await firstFrame(track); - expect(JSON.parse(dec.decode(await inflate(frame)))).toEqual({ hello: "world" }); + const decoder = await Decoder.create(); + expect(JSON.parse(dec.decode(decoder.frame(await firstFrame(track))))).toEqual({ hello: "world" }); +}); + +test("compressed deltas reuse the window", async () => { + // The shared per-group window is the point: a delta restating snapshot content shrinks sharply. + const track = new Track("test"); + const producer = new Producer(track, { deltaRatio: 100, compression: true }); + const phrase = "Media over QUIC delivers real-time latency at massive scale"; + producer.update({ note: phrase }); + producer.update({ note: phrase, echo: phrase }); + producer.finish(); + + const group = await track.nextGroupOrdered(); + if (!group) throw new Error("expected a group"); + await group.readFrame(); // snapshot + const delta = await group.readFrame(); + if (!delta) throw new Error("expected a delta"); + + const rawDelta = enc.encode(JSON.stringify({ echo: phrase })); + expect(delta.length).toBeLessThan(rawDelta.length / 2); }); test("compression shrinks a repetitive frame", async () => { diff --git a/js/json/src/compression.ts b/js/json/src/compression.ts index 4e4fdf9b5..52d2aa0d8 100644 --- a/js/json/src/compression.ts +++ b/js/json/src/compression.ts @@ -1,70 +1,50 @@ /** - * Per-frame DEFLATE compression for the JSON frame stream, built on the platform - * {@link https://developer.mozilla.org/en-US/docs/Web/API/Compression_Streams_API | Compression Streams API}. + * Group-scoped DEFLATE compression for the JSON frame stream, using + * {@link https://github.com/nodeca/pako | pako}'s streaming deflate/inflate. * - * Each frame is compressed on its own as a raw DEFLATE ([RFC 1951](https://www.rfc-editor.org/rfc/rfc1951.html)) - * blob (`deflate-raw`), the same format the Rust `moq-json` producer writes, so the two - * interoperate on the wire. There is no cross-frame context, so snapshots and large frames shrink - * well while tiny deltas barely benefit. The browser API exposes no level or dictionary knobs, so - * compression is a plain on/off toggle. + * Within a group the frame payloads form a single raw DEFLATE + * ([RFC 1951](https://www.rfc-editor.org/rfc/rfc1951.html)) stream, sync-flushed at each frame + * boundary so every frame is self-delimited while later frames reuse the earlier ones as context + * (a snapshot followed by deltas compresses far better than each frame alone). This matches the + * Rust `moq-json` producer, so the two interoperate on the wire. + * + * A sync flush always ends in the fixed 4-byte marker `00 00 ff ff`. {@link Encoder.frame} drops + * it and {@link Decoder.frame} re-appends it, saving 4 bytes per frame, the same trick + * [RFC 7692](https://www.rfc-editor.org/rfc/rfc7692.html#section-7.2.1) (permessage-deflate) uses. + * + * `pako` is an optional peer dependency loaded on demand, so consumers that never compress a track + * never bundle it. * * @module */ // Maximum decompressed size of a single frame. A malicious publisher could otherwise send a tiny -// slice that inflates hugely, so {@link inflate} stops rather than allocating without limit. +// slice that inflates hugely, so {@link Decoder.frame} stops rather than allocating without limit. // Mirrors the Rust `MAX_DECOMPRESSED_FRAME`. const MAX_DECOMPRESSED_FRAME = 64 * 1024 * 1024; -/** Compress one frame payload into a standalone `deflate-raw` blob. Empty in yields empty out. */ -export async function deflate(payload: Uint8Array): Promise { - if (payload.length === 0) return payload; - const cs = new CompressionStream("deflate-raw"); - return pump(cs, payload); -} - -/** - * Decompress one `deflate-raw` frame back into its payload. Empty in yields empty out. - * - * Throws if the input is malformed or inflates past the per-frame size limit. - */ -export async function inflate(slice: Uint8Array): Promise { - if (slice.length === 0) return slice; - const ds = new DecompressionStream("deflate-raw"); - return pump(ds, slice, MAX_DECOMPRESSED_FRAME); -} +// The trailing bytes of a DEFLATE sync flush, stripped on the wire and re-appended to decode. +const SYNC_FLUSH_TAIL = new Uint8Array([0x00, 0x00, 0xff, 0xff]); -// Drive a (de)compression stream end-to-end: feed it `input`, read every output chunk, and -// concatenate. Reads concurrently with writing to avoid the transform's backpressure deadlock. -async function pump( - transform: CompressionStream | DecompressionStream, - input: Uint8Array, - limit = Number.POSITIVE_INFINITY, -): Promise { - const writer = transform.writable.getWriter(); - // The same error surfaces from the reader below, so swallow the writer's copy to avoid an - // unhandled rejection on malformed input. The cast narrows `ArrayBufferLike` to `ArrayBuffer`: - // our inputs are never SharedArrayBuffer-backed, which is all the DOM `BufferSource` type wants. - const written = (async () => { - await writer.write(input as Uint8Array); - await writer.close(); - })().catch(() => {}); +type Pako = typeof import("pako"); +let pako: Promise | undefined; - const reader = transform.readable.getReader(); - const chunks: Uint8Array[] = []; - let total = 0; - for (;;) { - const { value, done } = await reader.read(); - if (done) break; - total += value.length; - if (total > limit) { - await reader.cancel(); - throw new Error(`decompressed frame exceeded ${limit} bytes`); - } - chunks.push(value); - } - await written; +// Load pako once, on demand. The optional peer dependency keeps it out of bundles that never +// enable compression; a clear error points at the missing install if it's reached without it. +function loadPako(): Promise { + pako ??= import("pako") + .then((m) => (m as { default?: Pako }).default ?? (m as Pako)) + .catch((err) => { + pako = undefined; + throw new Error("@moq/json compression requires the optional peer dependency `pako` to be installed", { + cause: err, + }); + }); + return pako; +} +// Concatenate chunks into one buffer (a single chunk passes through untouched). +function concat(chunks: Uint8Array[], total: number): Uint8Array { if (chunks.length === 1) return chunks[0]; const out = new Uint8Array(total); let offset = 0; @@ -74,3 +54,96 @@ async function pump( } return out; } + +/** + * Encodes a group's frame payloads into one shared DEFLATE stream, one self-delimited slice per + * frame. Hold one per group; create a new one at each group boundary. Build with {@link create}. + */ +export class Encoder { + #deflate: import("pako").Deflate; + #flush: number; + #chunks: Uint8Array[] = []; + #total = 0; + + private constructor(lib: Pako) { + this.#deflate = new lib.Deflate({ raw: true }); + this.#flush = lib.constants.Z_SYNC_FLUSH; + this.#deflate.onData = (chunk) => { + const bytes = chunk as Uint8Array; + this.#chunks.push(bytes); + this.#total += bytes.length; + }; + } + + /** Start a fresh per-group encoder with a cold window. */ + static async create(): Promise { + return new Encoder(await loadPako()); + } + + /** + * Compress the next frame's `payload`, returning its slice of the group stream (minus the fixed + * sync-flush marker). Empty in yields empty out. Slices must be produced in frame order. + */ + frame(payload: Uint8Array): Uint8Array { + if (payload.length === 0) return payload; + this.#chunks = []; + this.#total = 0; + this.#deflate.push(payload, this.#flush); + const full = concat(this.#chunks, this.#total); + // Drop the trailing sync-flush marker; the decoder re-appends it. + return full.subarray(0, full.length - SYNC_FLUSH_TAIL.length); + } +} + +/** + * Decodes a group's frame slices back into the original payloads. Hold one per group; feed slices + * in frame order (each frame builds on the earlier ones). Build with {@link create}. + */ +export class Decoder { + #inflate: import("pako").Inflate; + #flush: number; + #chunks: Uint8Array[] = []; + #total = 0; + #tooLarge = false; + + private constructor(lib: Pako) { + this.#inflate = new lib.Inflate({ raw: true }); + this.#flush = lib.constants.Z_SYNC_FLUSH; + this.#inflate.onData = (chunk) => { + const bytes = chunk as Uint8Array; + this.#total += bytes.length; + // Bound memory: stop retaining output past the cap, then reject after the push returns. + if (this.#total > MAX_DECOMPRESSED_FRAME) { + this.#tooLarge = true; + return; + } + this.#chunks.push(bytes); + }; + } + + /** Start a fresh per-group decoder with a cold window. */ + static async create(): Promise { + return new Decoder(await loadPako()); + } + + /** + * Decompress the next frame's `slice` back into its payload. Empty in yields empty out. Throws + * if the input is malformed or inflates past the per-frame size limit. + */ + frame(slice: Uint8Array): Uint8Array { + if (slice.length === 0) return slice; + this.#chunks = []; + this.#total = 0; + this.#tooLarge = false; + + // Re-append the stripped sync-flush marker, which delimits the frame and flushes its bytes out. + const input = new Uint8Array(slice.length + SYNC_FLUSH_TAIL.length); + input.set(slice); + input.set(SYNC_FLUSH_TAIL, slice.length); + + this.#inflate.push(input, this.#flush); + if (this.#inflate.err) throw new Error(`decompression failed: ${this.#inflate.msg}`); + if (this.#tooLarge) throw new Error(`decompressed frame exceeded ${MAX_DECOMPRESSED_FRAME} bytes`); + return concat(this.#chunks, this.#total); + } +} diff --git a/js/json/src/consumer.ts b/js/json/src/consumer.ts index 9cd77c18a..6f4a603b1 100644 --- a/js/json/src/consumer.ts +++ b/js/json/src/consumer.ts @@ -1,6 +1,6 @@ import type * as Moq from "@moq/net"; import type * as z from "zod/mini"; -import { inflate } from "./compression.ts"; +import { Decoder } from "./compression.ts"; import { merge } from "./diff.ts"; import type { Config } from "./producer.ts"; @@ -17,6 +17,8 @@ export class Consumer { #decompress: boolean; #group?: Moq.Group; + // Per-group DEFLATE decoder, built lazily on the first frame of a group and reset at each boundary. + #decoder?: Decoder; #current?: unknown; #framesRead = 0; @@ -35,6 +37,8 @@ export class Consumer { if (!this.#group) return undefined; this.#current = undefined; this.#framesRead = 0; + // Each group is its own compressed stream, so start a fresh decoder. + this.#decoder = undefined; } const frame = await this.#group.readFrame(); @@ -56,10 +60,14 @@ export class Consumer { } } - // Frame 0 of a group is a snapshot, the rest are merge patches. Each frame is its own DEFLATE - // blob when compressed, so decoding needs no per-group state. + // Frame 0 of a group is a snapshot, the rest are merge patches. When compressed, frames share one + // per-group DEFLATE stream, so they decode in order through a decoder built on the group's first frame. async #apply(frame: Uint8Array): Promise { - const payload = this.#decompress ? await inflate(frame) : frame; + let payload = frame; + if (this.#decompress) { + this.#decoder ??= await Decoder.create(); + payload = this.#decoder.frame(frame); + } const parsed = JSON.parse(new TextDecoder().decode(payload)); if (this.#framesRead === 0) { this.#current = parsed; diff --git a/js/json/src/producer.ts b/js/json/src/producer.ts index 22ef6e459..fde990257 100644 --- a/js/json/src/producer.ts +++ b/js/json/src/producer.ts @@ -2,7 +2,7 @@ import * as Moq from "@moq/net"; import type { Effect } from "@moq/signals"; import type * as z from "zod/mini"; -import { deflate } from "./compression.ts"; +import { Encoder } from "./compression.ts"; import { deepEqual, diff } from "./diff.ts"; // Maximum frames (snapshot + deltas) in a single group before a new snapshot is forced. Kept @@ -32,10 +32,10 @@ export interface Config { // mutate a producer that hasn't published yet (e.g. a fresh catalog); ignored once a value exists. initial?: T; - // Compress each frame independently with `deflate-raw` (RFC 1951), interoperable with the Rust - // `moq-json` producer. `false`/unset (the default) writes plaintext JSON frames. A - // {@link Consumer} reading the track must set the same flag. The browser deflate exposes no - // level or dictionary knobs, so this is a plain on/off toggle. + // Compress each group as one sync-flushed `deflate-raw` (RFC 1951) stream, so deltas reuse the + // snapshot as context and shrink sharply. Interoperable with the Rust `moq-json` producer. + // `false`/unset (the default) writes plaintext JSON frames. A {@link Consumer} reading the track + // must set the same flag. Enabling this loads the optional `pako` peer dependency on demand. compression?: boolean; } @@ -58,14 +58,17 @@ export class Producer { #last?: unknown; // Bytes of deltas accumulated in the current group, excluding the snapshot frame. Always raw // (uncompressed) sizes, even when compressing: the delta-vs-snapshot decision is made - // synchronously in `update()`, before the async compression runs. + // synchronously in `update()`, before the async compression runs, so a compressed producer rolls + // groups on raw sizes (still valid on the wire, just a touch sooner than the Rust producer). #deltaBytes = 0; #groupFrames = 0; - // Per-frame `deflate-raw` compression. Compression is async (the platform CompressionStream), - // so on the compressed path every track write is serialized through `#chain` to preserve frame - // and group order while `update()` stays synchronous. Decisions still run synchronously above. + // Group-scoped `deflate-raw` compression. Loading pako and (de)compressing is async, so on the + // compressed path every track write is serialized through `#chain` to preserve frame and group + // order while `update()` stays synchronous. The `#encoder` is the current group's stream, swapped + // for a fresh one at each snapshot inside the chain. Decisions still run synchronously above. #compress = false; + #encoder?: Encoder; #chain: Promise = Promise.resolve(); #failed = false; @@ -131,7 +134,7 @@ export class Producer { const snapshot = new TextEncoder().encode(text); const delta = this.#delta(json, snapshot.length); if (delta && this.#group) { - this.#writeFrame(this.#group, delta); + this.#writeDelta(this.#group, delta); this.#deltaBytes += delta.length; this.#groupFrames += 1; } else { @@ -232,7 +235,7 @@ export class Producer { if (this.#group) this.#closeGroup(this.#group); const group = track.appendGroup(); - this.#writeFrame(group, snapshot); + this.#writeSnapshot(group, snapshot); this.#deltaBytes = 0; this.#groupFrames = 1; @@ -246,14 +249,31 @@ export class Producer { } } - // Write a frame to `group`. Synchronous when uncompressed; on the compressed path the frame is - // deflated and written through `#chain` so frames and groups still land in order. - #writeFrame(group: Moq.Group, frame: Uint8Array): void { + // Write a group's snapshot (frame 0). On the compressed path this opens a fresh per-group encoder + // (cold window) inside the chain, so the snapshot and its deltas share one DEFLATE stream. + #writeSnapshot(group: Moq.Group, frame: Uint8Array): void { if (!this.#compress) { group.writeFrame(frame); return; } - this.#enqueue(async () => group.writeFrame(await deflate(frame))); + this.#enqueue(async () => { + this.#encoder = await Encoder.create(); + group.writeFrame(this.#encoder.frame(frame)); + }); + } + + // Write a delta frame, compressed against the current group's encoder when compressing. The + // snapshot step that opened the encoder is ordered before this one in `#chain`. + #writeDelta(group: Moq.Group, frame: Uint8Array): void { + if (!this.#compress) { + group.writeFrame(frame); + return; + } + this.#enqueue(() => { + const encoder = this.#encoder; + if (!encoder) throw new Error("compressed delta requires an open group"); + group.writeFrame(encoder.frame(frame)); + }); } // Close `group`, ordered after its pending compressed writes when compressing. diff --git a/rs/moq-json/src/compression.rs b/rs/moq-json/src/compression.rs index 8fcc7dbf3..188c0a8a8 100644 --- a/rs/moq-json/src/compression.rs +++ b/rs/moq-json/src/compression.rs @@ -1,25 +1,45 @@ -//! Per-frame DEFLATE compression for the JSON frame stream. +//! Group-scoped DEFLATE compression for the JSON frame stream. //! -//! Each frame payload is compressed on its own as a raw DEFLATE ([RFC 1951]) blob, the same -//! format the browser's `CompressionStream("deflate-raw")` produces and consumes. That keeps a -//! Rust producer and a browser (`@moq/json`) consumer interoperable on the wire, at the cost of -//! no cross-frame context: each frame compresses in isolation, so snapshots and large frames -//! shrink well while tiny deltas barely benefit. +//! Within a group the frame payloads form a single raw DEFLATE ([RFC 1951]) stream, sync-flushed +//! at each frame boundary so every frame carries its own self-delimited slice while later frames +//! reuse the earlier ones as context (a snapshot followed by deltas compresses far better than +//! each frame alone). The [`Encoder`]/[`Decoder`] hold that per-group state; both are recreated at +//! every group boundary. +//! +//! This is plain raw DEFLATE with a `Z_SYNC_FLUSH` after each frame, so a browser (`@moq/json`) +//! peer interoperates on the wire using the same primitive (zlib's sync flush). The window is at +//! most 32 KiB, so a single frame can't inflate without bound, and [`Decoder::frame`] additionally +//! caps each frame's output. +//! +//! A sync flush always ends in the 4-byte empty-block marker `00 00 ff ff`. That marker is fixed, +//! so [`Encoder::frame`] drops it from each slice and [`Decoder::frame`] re-appends it before +//! inflating, saving 4 bytes per frame. This is the same trick [RFC 7692] (permessage-deflate) +//! uses for WebSocket messages. //! //! [RFC 1951]: https://www.rfc-editor.org/rfc/rfc1951.html - -use std::io::{Read, Write}; +//! [RFC 7692]: https://www.rfc-editor.org/rfc/rfc7692.html#section-7.2.1 use bytes::Bytes; -use flate2::read::DeflateDecoder; -use flate2::write::DeflateEncoder; +use flate2::{Compress, Decompress, FlushCompress, FlushDecompress, Status}; use crate::{Error, Result}; -/// Default DEFLATE level: zlib's own default, a good size/speed balance for the small, -/// repetitive payloads this targets. +/// Default DEFLATE level: zlib's own default, a good size/speed balance for the small, repetitive +/// payloads this targets. const DEFAULT_LEVEL: u32 = 6; +/// The trailing bytes of a DEFLATE sync flush, stripped on the wire and re-appended to decode. +const SYNC_FLUSH_TAIL: [u8; 4] = [0x00, 0x00, 0xff, 0xff]; + +/// Maximum decompressed size of a single frame. +/// +/// A malicious publisher could otherwise send a tiny slice that inflates hugely, so +/// [`Decoder::frame`] stops and returns [`Error::TooLarge`] rather than allocating without limit. +const MAX_DECOMPRESSED_FRAME: u64 = 64 * 1024 * 1024; + +/// Scratch buffer size for the streaming (de)compress loops. +const CHUNK: usize = 8 * 1024; + /// A DEFLATE compression level in the valid `0..=9` range. /// /// `0` stores without compressing, `9` is smallest but slowest. Construct via [`Level::new`], @@ -47,15 +67,6 @@ impl Default for Level { } } -/// Maximum decompressed size of a single frame. -/// -/// A malicious publisher could otherwise send a tiny slice that inflates hugely, so -/// [`decompress`] stops and returns [`Error::TooLarge`] rather than allocating without limit. -const MAX_DECOMPRESSED_FRAME: u64 = 64 * 1024 * 1024; - -/// Scratch buffer size for the streaming decompress loop. -const CHUNK: usize = 8 * 1024; - /// DEFLATE compression settings for a JSON track. /// /// Build from [`Default`] and override fields (the struct is `#[non_exhaustive]`, so new options @@ -73,69 +84,133 @@ pub struct Compression { } impl Compression { - /// Compress one frame `payload` into a standalone raw DEFLATE blob. + /// Start a fresh per-group encoder with a cold window. + pub(crate) fn encoder(&self) -> Encoder { + // `false`: raw DEFLATE, no zlib header/trailer, matching `deflate-raw` on the browser side. + Encoder(Compress::new(flate2::Compression::new(self.level.get()), false)) + } +} + +/// Encodes a group's frame payloads into one shared DEFLATE stream, one self-delimited slice per +/// frame. Hold one per group; the stream is recreated at each group boundary. +pub(crate) struct Encoder(Compress); + +impl Encoder { + /// Compress the next frame's `payload`, returning its slice of the group stream. /// - /// An empty payload yields an empty slice (compressing nothing). - pub(crate) fn compress(&self, payload: &[u8]) -> Bytes { + /// An empty payload contributes nothing and yields an empty slice. Later frames reuse earlier + /// ones as context, so slices must be produced (and later decoded) in frame order. + pub(crate) fn frame(&mut self, payload: &[u8]) -> Bytes { if payload.is_empty() { return Bytes::new(); } - let level = flate2::Compression::new(self.level.get()); - let mut encoder = DeflateEncoder::new(Vec::with_capacity(payload.len() / 2 + 16), level); - encoder.write_all(payload).expect("deflate write"); - Bytes::from(encoder.finish().expect("deflate finish")) + let mut out = Vec::with_capacity(payload.len() / 2 + 16); + let mut tmp = [0u8; CHUNK]; + let mut input = payload; + + // Drive the stream with a sync flush so this frame's slice is self-delimited (byte-aligned, + // window retained). The classic zlib loop: keep going while the output buffer fills up. + loop { + let before_in = self.0.total_in(); + let before_out = self.0.total_out(); + self.0.compress(input, &mut tmp, FlushCompress::Sync).expect("deflate"); + let consumed = (self.0.total_in() - before_in) as usize; + let produced = (self.0.total_out() - before_out) as usize; + out.extend_from_slice(&tmp[..produced]); + input = &input[consumed..]; + if produced < tmp.len() { + break; + } + } + + // Drop the fixed sync-flush marker; the decoder re-appends it (see the module docs). + debug_assert!( + out.ends_with(&SYNC_FLUSH_TAIL), + "a sync flush must end in the deflate marker" + ); + out.truncate(out.len() - SYNC_FLUSH_TAIL.len()); + Bytes::from(out) } } -/// Decompress one frame `slice` back into its raw DEFLATE payload. -/// -/// An empty slice yields an empty payload. Returns [`Error::TooLarge`] if the frame inflates past -/// the per-frame bound, and [`Error::Decompress`] on malformed or truncated input. -pub(crate) fn decompress(slice: &[u8]) -> Result { - if slice.is_empty() { - return Ok(Bytes::new()); +/// Decodes a group's frame slices back into the original payloads. Hold one per group; feed slices +/// in frame order (each frame builds on the earlier ones). +pub(crate) struct Decoder(Decompress); + +impl Decoder { + /// Start a fresh per-group decoder with a cold window. + pub(crate) fn new() -> Self { + // `false`: raw DEFLATE, matching the encoder. + Self(Decompress::new(false)) } - let mut decoder = DeflateDecoder::new(slice); - // `slice.len()` is publisher-controlled, so cap the initial guess at the per-frame ceiling: a - // huge compressed frame can't force a huge allocation before the streaming guard below runs. - let initial_capacity = slice - .len() - .saturating_mul(2) - .saturating_add(16) - .min(MAX_DECOMPRESSED_FRAME as usize); - let mut out = Vec::with_capacity(initial_capacity); - let mut tmp = [0u8; CHUNK]; - - loop { - let n = decoder.read(&mut tmp).map_err(|_| Error::Decompress)?; - if n == 0 { - break; + /// Decompress the next frame's `slice` back into its payload. + /// + /// An empty slice yields an empty payload. Returns [`Error::TooLarge`] if the frame inflates + /// past the per-frame bound, and [`Error::Decompress`] on malformed input. + pub(crate) fn frame(&mut self, slice: &[u8]) -> Result { + if slice.is_empty() { + return Ok(Bytes::new()); } - if out.len() as u64 + n as u64 > MAX_DECOMPRESSED_FRAME { - return Err(Error::TooLarge(MAX_DECOMPRESSED_FRAME)); + + let initial = slice + .len() + .saturating_mul(2) + .saturating_add(16) + .min(MAX_DECOMPRESSED_FRAME as usize); + let mut out = Vec::with_capacity(initial); + let mut tmp = [0u8; CHUNK]; + + // Feed the wire slice followed by the re-appended sync-flush marker, which delimits the frame + // and flushes its last bytes out of the inflate buffer. + for segment in [slice, &SYNC_FLUSH_TAIL] { + let mut input = segment; + loop { + let before_in = self.0.total_in(); + let before_out = self.0.total_out(); + let status = self + .0 + .decompress(input, &mut tmp, FlushDecompress::Sync) + .map_err(|_| Error::Decompress)?; + let consumed = (self.0.total_in() - before_in) as usize; + let produced = (self.0.total_out() - before_out) as usize; + if out.len() as u64 + produced as u64 > MAX_DECOMPRESSED_FRAME { + return Err(Error::TooLarge(MAX_DECOMPRESSED_FRAME)); + } + out.extend_from_slice(&tmp[..produced]); + input = &input[consumed..]; + + // Move to the next segment once this one is drained and the buffer wasn't saturated. The + // no-progress guard avoids spinning when the marker needs no further output. + if matches!(status, Status::StreamEnd) || (input.is_empty() && produced < tmp.len()) { + break; + } + if consumed == 0 && produced == 0 { + break; + } + } } - out.extend_from_slice(&tmp[..n]); - } - Ok(Bytes::from(out)) + Ok(Bytes::from(out)) + } } #[cfg(test)] mod test { use super::*; - /// Round-trip a sequence of frames, each compressed and decompressed independently. + /// Round-trip a sequence of frames through a group encoder/decoder pair. fn roundtrip(config: &Compression, frames: &[&[u8]]) -> Vec> { - frames - .iter() - .map(|f| decompress(&config.compress(f)).unwrap().to_vec()) - .collect() + let mut enc = config.encoder(); + let slices: Vec = frames.iter().map(|f| enc.frame(f)).collect(); + + let mut dec = Decoder::new(); + slices.iter().map(|s| dec.frame(s).unwrap().to_vec()).collect() } #[test] - fn frame_roundtrip() { + fn group_roundtrip() { let frames: &[&[u8]] = &[b"the quick brown fox", b"the quick brown dog", b"the lazy fox"]; let got = roundtrip(&Compression::default(), frames); for (a, b) in frames.iter().zip(&got) { @@ -144,24 +219,38 @@ mod test { } #[test] - fn empty_frame_roundtrips() { - assert!(Compression::default().compress(b"").is_empty()); - assert!(decompress(b"").unwrap().is_empty()); + fn empty_frames_roundtrip() { + let frames: &[&[u8]] = &[b"", b"hello", b"", b"world"]; + let got = roundtrip(&Compression::default(), frames); + assert_eq!( + got, + vec![b"".to_vec(), b"hello".to_vec(), b"".to_vec(), b"world".to_vec()] + ); + } + + #[test] + fn large_frame_roundtrips() { + // A frame larger than the scratch buffer exercises the multi-iteration (de)compress loops. + let payload = b"abcdefghij".repeat(4096); // 40 KiB, > CHUNK + let got = roundtrip(&Compression::default(), &[&payload]); + assert_eq!(got[0], payload); } #[test] - fn repetitive_payload_shrinks() { - // A payload with lots of internal redundancy compresses well even on its own. + fn cross_frame_context_shrinks() { + // A later frame identical to an earlier one compresses to far fewer bytes once the window + // holds the earlier copy: this is the whole point of a shared per-group stream. let config = Compression::default(); let payload = b"Media over QUIC delivers real-time latency at massive scale.".repeat(6); - let compressed = config.compress(&payload); + let mut enc = config.encoder(); + let first = enc.frame(&payload); + let second = enc.frame(&payload); assert!( - compressed.len() < payload.len(), - "compressed {} should beat raw {}", - compressed.len(), - payload.len() + second.len() < first.len(), + "repeat frame {} should be smaller than first {}", + second.len(), + first.len() ); - assert_eq!(decompress(&compressed).unwrap(), Bytes::from(payload)); } #[test] @@ -174,7 +263,10 @@ mod test { #[test] fn decompress_rejects_garbage() { - // Random bytes that don't form a valid DEFLATE stream are rejected, not silently truncated. - assert!(matches!(decompress(&[0xff; 64]), Err(Error::Decompress))); + let mut dec = Decoder::new(); + assert!(matches!( + dec.frame(b"not a deflate stream at all"), + Err(Error::Decompress) + )); } } diff --git a/rs/moq-json/src/lib.rs b/rs/moq-json/src/lib.rs index 666c4d194..bbbf4f611 100644 --- a/rs/moq-json/src/lib.rs +++ b/rs/moq-json/src/lib.rs @@ -24,6 +24,7 @@ use serde_json::Value; pub use compression::{Compression, Level}; +use crate::compression::{Decoder, Encoder}; use crate::diff::diff; /// Maximum frames (snapshot + deltas) in a single group before a new snapshot is forced. @@ -86,11 +87,12 @@ pub struct Config { /// Defaults to `8`. pub delta_ratio: u32, - /// Optional per-frame DEFLATE compression for the frame stream. + /// Optional group-scoped DEFLATE compression for the frame stream. /// /// `None` (the default) writes plaintext JSON frames, identical on the wire to a track with no - /// compression. `Some(..)` compresses each frame independently (see [`Compression`]); a - /// [`Consumer`] reading the track must be created via [`Consumer::with_compression`]. + /// compression. `Some(..)` compresses each group as one sync-flushed DEFLATE stream (see + /// [`Compression`]), so deltas reuse the snapshot as context and shrink sharply; a [`Consumer`] + /// reading the track must be created via [`Consumer::with_compression`]. pub compression: Option, } @@ -135,6 +137,7 @@ impl Producer { inner: Arc::new(Mutex::new(Inner { track, group: None, + encoder: None, last: None, delta_bytes: 0, snapshot_len: 0, @@ -237,12 +240,14 @@ impl Drop for Guard<'_, T> { struct Inner { track: moq_net::TrackProducer, group: Option, + // Per-group DEFLATE encoder, `Some` while a compressed group is open (recreated per group). + encoder: Option, last: Option, // Bytes of deltas accumulated in the current group, excluding the snapshot frame. Compressed - // frame sizes when compressing, raw patch sizes otherwise. + // slice sizes when compressing, raw patch sizes otherwise. delta_bytes: u64, // Reference size the delta budget is measured against: the current group's snapshot frame. - // Compressed when compressing, raw otherwise. + // Its compressed slice size when compressing, raw otherwise. snapshot_len: u64, group_frames: usize, config: Config, @@ -292,12 +297,13 @@ impl Inner { serde_json::to_vec(&diff.patch)? }; - match &self.config.compression { - // Compressed: measure the delta's *compressed* size (the real wire cost) against the - // group's anchoring snapshot, also compressed. If it doesn't fit we roll a new group and - // discard this slice (frames compress independently, so nothing carries over). - Some(compression) => { - let slice = compression.compress(&patch); + match self.encoder.as_mut() { + // Compressed: measure the delta's *compressed* slice size (the real wire cost) against the + // group's anchoring snapshot, also compressed. Encoding advances the per-group window; if + // the delta doesn't fit we roll a new group with a fresh encoder, discarding this slice + // (the abandoned window has no effect on the new group). + Some(encoder) => { + let slice = encoder.frame(&patch); let projected = self.delta_bytes + slice.len() as u64; if projected > ratio as u64 * self.snapshot_len { return Ok(None); @@ -324,21 +330,28 @@ impl Inner { let mut group = self.track.append_group()?; - // Compress the snapshot as frame 0 if enabled, recording its wire size as the delta anchor. - let slice = match &self.config.compression { - Some(compression) => compression.compress(&snapshot), - None => Bytes::from(snapshot), + // Open a fresh per-group encoder (cold window) and compress the snapshot as frame 0, recording + // its wire size as the delta anchor. + let (slice, encoder) = match &self.config.compression { + Some(compression) => { + let mut encoder = compression.encoder(); + let slice = encoder.frame(&snapshot); + (slice, Some(encoder)) + } + None => (Bytes::from(snapshot), None), }; self.snapshot_len = slice.len() as u64; group.write_frame(slice)?; self.delta_bytes = 0; self.group_frames = 1; + self.encoder = encoder; if self.config.delta_ratio != 0 { - // Keep the group open so future deltas can be appended. + // Keep the group (and its encoder) open so future deltas can be appended. self.group = Some(group); } else { // Deltas disabled: one frame per group, identical to a plain JSON track. + self.encoder = None; group.finish()?; } @@ -360,6 +373,11 @@ pub struct Consumer { group: Option, // Whether frames are DEFLATE-compressed, matching the producer's [`Config::compression`]. compressed: bool, + // Per-group DEFLATE decoder, built lazily on the first compressed frame of a group. + decoder: Option, + // Compressed slices read so far in the current group, in order. Lets a cloned consumer rebuild + // the (non-cloneable) decoder window by replaying them. Empty when uncompressed. + group_slices: Vec, current: Option, frames_read: usize, _marker: PhantomData T>, @@ -373,6 +391,10 @@ impl Clone for Consumer { track: self.track.clone(), group: self.group.clone(), compressed: self.compressed, + // A DEFLATE decoder can't be cloned (per-group window state), so the clone starts without + // one and rebuilds it from `group_slices` on its next compressed read. + decoder: None, + group_slices: self.group_slices.clone(), current: self.current.clone(), frames_read: self.frames_read, _marker: PhantomData, @@ -400,6 +422,8 @@ impl Consumer { track, group: None, compressed, + decoder: None, + group_slices: Vec::new(), current: None, frames_read: 0, _marker: PhantomData, @@ -426,6 +450,9 @@ impl Consumer { self.group = Some(group); self.current = None; self.frames_read = 0; + // Each group is its own compressed stream, so reset the decoder state. + self.decoder = None; + self.group_slices.clear(); } Poll::Ready(None) => break true, Poll::Pending => break false, @@ -450,13 +477,25 @@ impl Consumer { /// Decompress a frame slice, or pass it through when the track is uncompressed. /// - /// Each frame is its own DEFLATE blob, so this needs no per-group state. - fn decode(&self, slice: Bytes) -> Result { - if self.compressed { - compression::decompress(&slice) - } else { - Ok(slice) + /// The per-group decoder is built lazily on the first compressed frame. A cloned consumer starts + /// without a decoder, so the first call replays the group's already-read slices to rebuild the + /// (non-cloneable) DEFLATE window before decoding the new frame. + fn decode(&mut self, slice: Bytes) -> Result { + if !self.compressed { + return Ok(slice); + } + + if self.decoder.is_none() { + let mut decoder = Decoder::new(); + for prev in &self.group_slices { + decoder.frame(prev)?; + } + self.decoder = Some(decoder); } + + let plain = self.decoder.as_mut().unwrap().frame(&slice)?; + self.group_slices.push(slice); + Ok(plain) } /// Apply one frame: frame 0 of a group is a snapshot, the rest are merge patches. @@ -791,8 +830,7 @@ mod test { #[test] fn compressed_cloned_consumer_reconstructs_mid_group() { - // Frames compress independently, so a clone taken mid-group just inherits the reconstruction - // state and keeps decoding each frame on its own. + // A clone taken mid-group has no decoder window; it must rebuild from the retained slices. let (mut producer, track) = producer(cfg_deflate(100)); let mut consumer = Consumer::::with_compression(track); let waiter = kio::Waiter::noop(); @@ -830,6 +868,37 @@ mod test { ); } + #[test] + fn compressed_deltas_reuse_window() { + // The shared per-group window is the whole point: a delta that restates content already in + // the snapshot compresses to far fewer bytes than the raw patch. + let (mut producer, mut track) = producer(cfg_deflate(100)); + let phrase = "Media over QUIC delivers real-time latency at massive scale"; + producer.update(&json!({ "note": phrase })).unwrap(); + producer.update(&json!({ "note": phrase, "echo": phrase })).unwrap(); + producer.finish().unwrap(); + + // Both frames land in group 0; read the delta (frame 1) verbatim. + let waiter = kio::Waiter::noop(); + let Poll::Ready(Ok(Some(mut group))) = track.poll_next_group(&waiter) else { + panic!("expected a group"); + }; + let mut frames = Vec::new(); + while let Poll::Ready(Ok(Some(frame))) = group.poll_read_frame(&waiter) { + frames.push(frame); + } + assert_eq!(frames.len(), 2, "snapshot + one delta in a single group"); + + // The raw patch repeats the whole phrase; compressed against the window it's a fraction. + let raw_delta = serde_json::to_vec(&json!({ "echo": phrase })).unwrap(); + assert!( + frames[1].len() < raw_delta.len() / 2, + "windowed delta {} should be far below the raw patch {}", + frames[1].len(), + raw_delta.len() + ); + } + /// Publish a single value and return the byte length of the resulting (frame 0) wire frame. fn wire_frame_len(config: Config, value: &Value) -> usize { let (mut producer, mut track) = producer(config); From 078e84277203b60bbf6d9769ea87cfea701b80fc Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 24 Jun 2026 06:27:06 +0000 Subject: [PATCH 6/7] feat(moq-json): prefix compressed frames with decompressed length Each compressed frame now carries a QUIC-varint prefix of its decompressed length (matching @moq/net's Varint). The decoder sizes its output buffer and rejects an over-cap frame before inflating, and verifies the inflated length matches the prefix. This also lets a future browser decoder delimit DecompressionStream output, which carries no frame boundary of its own. Also address review feedback: reword the module doc so the size bound is MAX_DECOMPRESSED_FRAME rather than the 32 KiB window, mark the JS Encoder/Decoder @public, and add cap/mismatch regression tests on both sides. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01B3xwgHid4UYjeugewkxUyj --- js/json/src/compression.test.ts | 21 +++++- js/json/src/compression.ts | 47 ++++++++++--- rs/moq-json/src/compression.rs | 118 +++++++++++++++++++++++++++----- 3 files changed, 159 insertions(+), 27 deletions(-) diff --git a/js/json/src/compression.test.ts b/js/json/src/compression.test.ts index 86fc213d5..27b548b07 100644 --- a/js/json/src/compression.test.ts +++ b/js/json/src/compression.test.ts @@ -1,5 +1,5 @@ import { expect, test } from "bun:test"; -import { Track } from "@moq/net"; +import { Track, Varint } from "@moq/net"; import { Decoder, Encoder } from "./compression.ts"; import { Consumer } from "./consumer.ts"; import { Producer } from "./producer.ts"; @@ -46,6 +46,25 @@ test("codec rejects garbage", async () => { expect(() => decoder.frame(new Uint8Array(64).fill(0xff))).toThrow(); }); +test("codec rejects frames that declare more than the cap", async () => { + // The length prefix bounds the frame before inflating, so a payload past the 64 MiB cap is + // rejected on the declared length without materializing it. + const encoder = await Encoder.create(); + const decoder = await Decoder.create(); + const slice = encoder.frame(enc.encode("a".repeat(64 * 1024 * 1024 + 1))); + expect(() => decoder.frame(slice)).toThrow(/exceeded/); +}); + +test("codec rejects a length-prefix mismatch", async () => { + // A prefix that disagrees with the inflated output is rejected as corrupt. + const encoder = await Encoder.create(); + const decoder = await Decoder.create(); + const slice = encoder.frame(enc.encode("hello world")); + const [, deflate] = Varint.decode(slice); + const tampered = new Uint8Array([...Varint.encode(4), ...deflate]); // payload is 11 bytes + expect(() => decoder.frame(tampered)).toThrow(/mismatch/); +}); + test("cross-frame context shrinks a repeated frame", async () => { // A later frame identical to an earlier one compresses far smaller once the window holds it. const encoder = await Encoder.create(); diff --git a/js/json/src/compression.ts b/js/json/src/compression.ts index 52d2aa0d8..e3f2a457f 100644 --- a/js/json/src/compression.ts +++ b/js/json/src/compression.ts @@ -12,12 +12,17 @@ * it and {@link Decoder.frame} re-appends it, saving 4 bytes per frame, the same trick * [RFC 7692](https://www.rfc-editor.org/rfc/rfc7692.html#section-7.2.1) (permessage-deflate) uses. * + * Each slice is prefixed with its decompressed length as a QUIC varint (via `@moq/net`'s `Varint`), + * so the decoder bounds the frame before inflating and matches the Rust producer on the wire. + * * `pako` is an optional peer dependency loaded on demand, so consumers that never compress a track * never bundle it. * * @module */ +import { Varint } from "@moq/net"; + // Maximum decompressed size of a single frame. A malicious publisher could otherwise send a tiny // slice that inflates hugely, so {@link Decoder.frame} stops rather than allocating without limit. // Mirrors the Rust `MAX_DECOMPRESSED_FRAME`. @@ -58,6 +63,8 @@ function concat(chunks: Uint8Array[], total: number): Uint8Array { /** * Encodes a group's frame payloads into one shared DEFLATE stream, one self-delimited slice per * frame. Hold one per group; create a new one at each group boundary. Build with {@link create}. + * + * @public */ export class Encoder { #deflate: import("pako").Deflate; @@ -81,8 +88,9 @@ export class Encoder { } /** - * Compress the next frame's `payload`, returning its slice of the group stream (minus the fixed - * sync-flush marker). Empty in yields empty out. Slices must be produced in frame order. + * Compress the next frame's `payload`, returning its slice of the group stream: a decompressed- + * length varint prefix, then the DEFLATE bytes minus the fixed sync-flush marker. Empty in yields + * empty out. Slices must be produced in frame order. */ frame(payload: Uint8Array): Uint8Array { if (payload.length === 0) return payload; @@ -90,14 +98,21 @@ export class Encoder { this.#total = 0; this.#deflate.push(payload, this.#flush); const full = concat(this.#chunks, this.#total); - // Drop the trailing sync-flush marker; the decoder re-appends it. - return full.subarray(0, full.length - SYNC_FLUSH_TAIL.length); + // Drop the trailing sync-flush marker (the decoder re-appends it) and prefix the length. + const deflate = full.subarray(0, full.length - SYNC_FLUSH_TAIL.length); + const prefix = Varint.encode(payload.length); + const out = new Uint8Array(prefix.length + deflate.length); + out.set(prefix); + out.set(deflate, prefix.length); + return out; } } /** * Decodes a group's frame slices back into the original payloads. Hold one per group; feed slices * in frame order (each frame builds on the earlier ones). Build with {@link create}. + * + * @public */ export class Decoder { #inflate: import("pako").Inflate; @@ -128,22 +143,36 @@ export class Decoder { /** * Decompress the next frame's `slice` back into its payload. Empty in yields empty out. Throws - * if the input is malformed or inflates past the per-frame size limit. + * if the input is malformed, declares more than the per-frame size limit, or inflates to a length + * that doesn't match its prefix. */ frame(slice: Uint8Array): Uint8Array { if (slice.length === 0) return slice; + + // The decompressed-length prefix bounds the frame before any inflation. + const [declared, deflate] = Varint.decode(slice); + if (declared > MAX_DECOMPRESSED_FRAME) { + throw new Error(`decompressed frame exceeded ${MAX_DECOMPRESSED_FRAME} bytes`); + } + this.#chunks = []; this.#total = 0; this.#tooLarge = false; // Re-append the stripped sync-flush marker, which delimits the frame and flushes its bytes out. - const input = new Uint8Array(slice.length + SYNC_FLUSH_TAIL.length); - input.set(slice); - input.set(SYNC_FLUSH_TAIL, slice.length); + const input = new Uint8Array(deflate.length + SYNC_FLUSH_TAIL.length); + input.set(deflate); + input.set(SYNC_FLUSH_TAIL, deflate.length); this.#inflate.push(input, this.#flush); if (this.#inflate.err) throw new Error(`decompression failed: ${this.#inflate.msg}`); if (this.#tooLarge) throw new Error(`decompressed frame exceeded ${MAX_DECOMPRESSED_FRAME} bytes`); - return concat(this.#chunks, this.#total); + + const out = concat(this.#chunks, this.#total); + // A mismatch with the declared length means a corrupt or lying frame. + if (out.length !== declared) { + throw new Error(`decompressed length mismatch: expected ${declared}, got ${out.length}`); + } + return out; } } diff --git a/rs/moq-json/src/compression.rs b/rs/moq-json/src/compression.rs index 188c0a8a8..9cfc93b40 100644 --- a/rs/moq-json/src/compression.rs +++ b/rs/moq-json/src/compression.rs @@ -7,17 +7,23 @@ //! every group boundary. //! //! This is plain raw DEFLATE with a `Z_SYNC_FLUSH` after each frame, so a browser (`@moq/json`) -//! peer interoperates on the wire using the same primitive (zlib's sync flush). The window is at -//! most 32 KiB, so a single frame can't inflate without bound, and [`Decoder::frame`] additionally -//! caps each frame's output. +//! peer interoperates on the wire using the same primitive (zlib's sync flush). A small slice can +//! still inflate to far more than its own size, so [`Decoder::frame`] bounds each frame's output by +//! its declared length, capped at [`MAX_DECOMPRESSED_FRAME`]. //! //! A sync flush always ends in the 4-byte empty-block marker `00 00 ff ff`. That marker is fixed, //! so [`Encoder::frame`] drops it from each slice and [`Decoder::frame`] re-appends it before //! inflating, saving 4 bytes per frame. This is the same trick [RFC 7692] (permessage-deflate) //! uses for WebSocket messages. //! +//! Each slice is prefixed with its decompressed length as a [QUIC varint][RFC 9000] (matching +//! `@moq/net`'s `Varint`). The decoder sizes its output buffer up front and rejects an oversized +//! frame before inflating; a future browser decoder can use it to delimit `DecompressionStream` +//! output, which carries no frame boundary of its own. +//! //! [RFC 1951]: https://www.rfc-editor.org/rfc/rfc1951.html //! [RFC 7692]: https://www.rfc-editor.org/rfc/rfc7692.html#section-7.2.1 +//! [RFC 9000]: https://www.rfc-editor.org/rfc/rfc9000.html#section-16 use bytes::Bytes; use flate2::{Compress, Decompress, FlushCompress, FlushDecompress, Status}; @@ -40,6 +46,38 @@ const MAX_DECOMPRESSED_FRAME: u64 = 64 * 1024 * 1024; /// Scratch buffer size for the streaming (de)compress loops. const CHUNK: usize = 8 * 1024; +/// Append `v` as a QUIC varint (RFC 9000 ยง16). `v` must fit in 62 bits, which a frame length always +/// does. Matches `@moq/net`'s `Varint` so the two ends agree on the wire. +fn put_varint(out: &mut Vec, v: u64) { + if v <= 0x3f { + out.push(v as u8); + } else if v <= 0x3fff { + out.extend_from_slice(&((v as u16) | 0x4000).to_be_bytes()); + } else if v <= 0x3fff_ffff { + out.extend_from_slice(&((v as u32) | 0x8000_0000).to_be_bytes()); + } else { + out.extend_from_slice(&(v | 0xc000_0000_0000_0000).to_be_bytes()); + } +} + +/// Read a QUIC varint from the front of `buf`, returning the value and the rest. Errors if `buf` is +/// empty or shorter than the encoded length. +fn get_varint(buf: &[u8]) -> Result<(u64, &[u8])> { + let first = *buf.first().ok_or(Error::Decompress)?; + let len = 1usize << (first >> 6); + if buf.len() < len { + return Err(Error::Decompress); + } + let (head, rest) = buf.split_at(len); + let value = match len { + 1 => (head[0] & 0x3f) as u64, + 2 => (u16::from_be_bytes([head[0], head[1]]) & 0x3fff) as u64, + 4 => (u32::from_be_bytes([head[0], head[1], head[2], head[3]]) & 0x3fff_ffff) as u64, + _ => u64::from_be_bytes(head.try_into().expect("8 bytes")) & 0x3fff_ffff_ffff_ffff, + }; + Ok((value, rest)) +} + /// A DEFLATE compression level in the valid `0..=9` range. /// /// `0` stores without compressing, `9` is smallest but slowest. Construct via [`Level::new`], @@ -96,7 +134,8 @@ impl Compression { pub(crate) struct Encoder(Compress); impl Encoder { - /// Compress the next frame's `payload`, returning its slice of the group stream. + /// Compress the next frame's `payload`, returning its slice of the group stream: a decompressed- + /// length varint prefix, then the DEFLATE bytes minus the fixed sync-flush marker. /// /// An empty payload contributes nothing and yields an empty slice. Later frames reuse earlier /// ones as context, so slices must be produced (and later decoded) in frame order. @@ -106,6 +145,9 @@ impl Encoder { } let mut out = Vec::with_capacity(payload.len() / 2 + 16); + // Decompressed-length prefix, so the decoder sizes its buffer and bounds the frame up front. + put_varint(&mut out, payload.len() as u64); + let prefix = out.len(); let mut tmp = [0u8; CHUNK]; let mut input = payload; @@ -124,9 +166,10 @@ impl Encoder { } } - // Drop the fixed sync-flush marker; the decoder re-appends it (see the module docs). + // Drop the fixed sync-flush marker; the decoder re-appends it (see the module docs). It sits + // after the varint prefix, so there's always a full marker to strip. debug_assert!( - out.ends_with(&SYNC_FLUSH_TAIL), + out.len() >= prefix + SYNC_FLUSH_TAIL.len() && out.ends_with(&SYNC_FLUSH_TAIL), "a sync flush must end in the deflate marker" ); out.truncate(out.len() - SYNC_FLUSH_TAIL.len()); @@ -147,24 +190,25 @@ impl Decoder { /// Decompress the next frame's `slice` back into its payload. /// - /// An empty slice yields an empty payload. Returns [`Error::TooLarge`] if the frame inflates - /// past the per-frame bound, and [`Error::Decompress`] on malformed input. + /// An empty slice yields an empty payload. Returns [`Error::TooLarge`] if the declared length + /// exceeds the per-frame bound, and [`Error::Decompress`] on malformed input or a length that + /// doesn't match the inflated output. pub(crate) fn frame(&mut self, slice: &[u8]) -> Result { if slice.is_empty() { return Ok(Bytes::new()); } - let initial = slice - .len() - .saturating_mul(2) - .saturating_add(16) - .min(MAX_DECOMPRESSED_FRAME as usize); - let mut out = Vec::with_capacity(initial); + // The decompressed-length prefix bounds and sizes the output before any inflation. + let (declared, deflate) = get_varint(slice)?; + if declared > MAX_DECOMPRESSED_FRAME { + return Err(Error::TooLarge(MAX_DECOMPRESSED_FRAME)); + } + let mut out = Vec::with_capacity(declared as usize); let mut tmp = [0u8; CHUNK]; - // Feed the wire slice followed by the re-appended sync-flush marker, which delimits the frame - // and flushes its last bytes out of the inflate buffer. - for segment in [slice, &SYNC_FLUSH_TAIL] { + // Feed the DEFLATE bytes followed by the re-appended sync-flush marker, which delimits the + // frame and flushes its last bytes out of the inflate buffer. + for segment in [deflate, &SYNC_FLUSH_TAIL] { let mut input = segment; loop { let before_in = self.0.total_in(); @@ -192,6 +236,10 @@ impl Decoder { } } + // The inflated output must match the declared length; a mismatch means a corrupt or lying frame. + if out.len() as u64 != declared { + return Err(Error::Decompress); + } Ok(Bytes::from(out)) } } @@ -269,4 +317,40 @@ mod test { Err(Error::Decompress) )); } + + #[test] + fn rejects_oversized_declared_length() { + // A forged prefix claiming more than the cap is rejected before any inflation, so the guard + // holds without materializing a huge buffer. + let mut forged = Vec::new(); + put_varint(&mut forged, MAX_DECOMPRESSED_FRAME + 1); + forged.push(0); + let mut dec = Decoder::new(); + assert!(matches!(dec.frame(&forged), Err(Error::TooLarge(_)))); + } + + #[test] + fn rejects_length_mismatch() { + // A prefix that disagrees with the inflated output (here understated) is rejected as corrupt. + let mut enc = Compression::default().encoder(); + let slice = enc.frame(b"hello world"); + let (_, deflate) = get_varint(&slice).unwrap(); + let mut tampered = Vec::new(); + put_varint(&mut tampered, 4); // the payload is 11 bytes + tampered.extend_from_slice(deflate); + let mut dec = Decoder::new(); + assert!(matches!(dec.frame(&tampered), Err(Error::Decompress))); + } + + #[test] + fn varint_round_trips() { + // Spot-check the QUIC varint boundaries the prefix relies on. + for v in [0u64, 0x3f, 0x40, 0x3fff, 0x4000, 0x3fff_ffff, MAX_DECOMPRESSED_FRAME] { + let mut buf = Vec::new(); + put_varint(&mut buf, v); + let (got, rest) = get_varint(&buf).unwrap(); + assert_eq!(got, v); + assert!(rest.is_empty()); + } + } } From 369106bb73f2c9fcbc788e54186f52fca1d292df Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 24 Jun 2026 06:38:02 +0000 Subject: [PATCH 7/7] test(json): pin pako over fflate for streaming flush fflate's streaming Deflate.flush() mis-encodes some inputs: a catalog snapshot + 3 deltas can't round-trip even through fflate's own Inflate, while the pako codec handles it. Add a regression test contrasting the two (with a positive control showing fflate works on simpler frames) so a future swap to the smaller fflate is caught. Adds fflate as a devDependency. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01B3xwgHid4UYjeugewkxUyj --- bun.lock | 3 ++ js/json/package.json | 1 + js/json/src/compression.test.ts | 75 +++++++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+) diff --git a/bun.lock b/bun.lock index 898fbcb98..ce7afe29a 100644 --- a/bun.lock +++ b/bun.lock @@ -110,6 +110,7 @@ "devDependencies": { "@types/bun": "^1.3.14", "@types/pako": "^2.0.4", + "fflate": "^0.8.2", "pako": "^2.2.0", "rimraf": "^6.1.3", "typescript": "^6.0.3", @@ -1049,6 +1050,8 @@ "fdir": ["fdir@6.5.0", "", { "peerDependencies": { "picomatch": "^3 || ^4" }, "optionalPeers": ["picomatch"] }, "sha512-tIbYtZbucOs0BRGqPJkshJUYdL+SDH7dVM8gjy+ERp3WAUjLEFJE+02kanyHtwjWOnwrKYBiwAmM0p4kLJAnXg=="], + "fflate": ["fflate@0.8.3", "", {}, "sha512-tbZNuJrLwGUp3zshBtdy4W+ORxZuIh8a5ilyIEQDC5rY1f3U20JMry0Ll3WBzU58EZKsEuJFXhb5gwv8CsPvgA=="], + "file-uri-to-path": ["file-uri-to-path@1.0.0", "", {}, "sha512-0Zt+s3L7Vf1biwWZ29aARiVYLx7iMGnEUl9x33fbB/j3jR81u/O2LbqK+Bm1CDSNDKVtJ/YjwY7TUd5SkeLQLw=="], "filelist": ["filelist@1.0.4", "", { "dependencies": { "minimatch": "^5.0.1" } }, "sha512-w1cEuf3S+DrLCQL7ET6kz+gmlJdbq9J7yXCSjK/OZCPA+qEN1WyF4ZAf0YYJa4/shHJra2t/d/r8SV4Ji+x+8Q=="], diff --git a/js/json/package.json b/js/json/package.json index c97f5e1e1..27624fb28 100644 --- a/js/json/package.json +++ b/js/json/package.json @@ -31,6 +31,7 @@ "devDependencies": { "@types/bun": "^1.3.14", "@types/pako": "^2.0.4", + "fflate": "^0.8.2", "pako": "^2.2.0", "rimraf": "^6.1.3", "typescript": "^6.0.3" diff --git a/js/json/src/compression.test.ts b/js/json/src/compression.test.ts index 27b548b07..1b7c94470 100644 --- a/js/json/src/compression.test.ts +++ b/js/json/src/compression.test.ts @@ -1,5 +1,6 @@ import { expect, test } from "bun:test"; import { Track, Varint } from "@moq/net"; +import { Deflate, Inflate } from "fflate"; import { Decoder, Encoder } from "./compression.ts"; import { Consumer } from "./consumer.ts"; import { Producer } from "./producer.ts"; @@ -9,6 +10,44 @@ type Value = Record; const enc = new TextEncoder(); const dec = new TextDecoder(); +function concatBytes(chunks: Uint8Array[]): Uint8Array { + const out = new Uint8Array(chunks.reduce((n, c) => n + c.length, 0)); + let offset = 0; + for (const chunk of chunks) { + out.set(chunk, offset); + offset += chunk.length; + } + return out; +} + +// Round-trip frames through fflate's streaming `Deflate.flush(true)` + `Inflate`, the same +// shared-window scheme our pako codec uses. Returns true only if every frame survives unchanged. +function fflateRoundTrips(frames: Uint8Array[]): boolean { + try { + let captured: Uint8Array[] = []; + const deflate = new Deflate({ level: 6 }); + deflate.ondata = (chunk) => captured.push(chunk); + const slices = frames.map((frame) => { + captured = []; + deflate.push(frame, false); + deflate.flush(true); // sync flush: byte-align and retain the window + return concatBytes(captured); + }); + + let inflated: Uint8Array[] = []; + const inflate = new Inflate(); + inflate.ondata = (chunk) => inflated.push(chunk); + return slices.every((slice, i) => { + inflated = []; + inflate.push(slice, false); + const got = concatBytes(inflated); + return got.length === frames[i].length && got.every((b, j) => b === frames[i][j]); + }); + } catch { + return false; + } +} + // Reconstruct every value a compressed consumer yields, in order. async function drainCompressed(track: Track): Promise { const out: Value[] = []; @@ -152,6 +191,42 @@ test("compressed deltas reuse the window", async () => { expect(delta.length).toBeLessThan(rawDelta.length / 2); }); +test("pako round-trips a group that fflate's flush corrupts", async () => { + // A catalog snapshot + 3 deltas that fflate's streaming flush mis-encodes: even fflate's own + // Inflate can't round-trip its output here. This pins why @moq/json depends on pako, not the + // smaller fflate. If this ever fails (fflateRoundTrips returns true), fflate may have fixed its + // sync-flush encoder, and dropping the pako dependency could be reconsidered. + const group: Value[] = [ + { + video: { + renditions: { + v0: { codec: "avc1.64001f", bitrate: 6000000 }, + v1: { codec: "avc1.640015", bitrate: 3000000 }, + }, + }, + audio: { renditions: { a0: { codec: "opus", bitrate: 128000 } } }, + }, + { video: { renditions: { v0: { bitrate: 6200000 } } } }, + { video: { renditions: { v0: { bitrate: 5800000 } } } }, + { audio: { renditions: { a0: { bitrate: 96000 } } } }, + ]; + const frames = group.map((value) => enc.encode(JSON.stringify(value))); + + // Our pako codec round-trips every frame of the group exactly. + const encoder = await Encoder.create(); + const decoder = await Decoder.create(); + for (const frame of frames) { + expect(decoder.frame(encoder.frame(frame))).toEqual(frame); + } + + // Positive control: fflate's flush works on simpler frames, so the helper is sound and fflate is + // only selectively broken, not failing for some unrelated reason. + expect(fflateRoundTrips(["the quick brown fox", "the quick brown dog"].map((s) => enc.encode(s)))).toBe(true); + + // fflate's streaming flush does not round-trip the same group our pako codec handles. + expect(fflateRoundTrips(frames)).toBe(false); +}); + test("compression shrinks a repetitive frame", async () => { const value = { renditions: Array(3).fill("video".repeat(50)) };