diff --git a/Cargo.lock b/Cargo.lock index d98c9caa9..fc5392b60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3849,6 +3849,7 @@ name = "moq-json" version = "0.0.4" dependencies = [ "bytes", + "flate2", "json-patch", "kio", "moq-net", diff --git a/Cargo.toml b/Cargo.toml index 8eb47380f..be0c4d56f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ resolver = "2" rust-version = "1.85" [workspace.dependencies] +flate2 = "1" hang = { version = "0.19", path = "rs/hang" } kio = { version = "0.4", path = "rs/kio" } moq-audio = { version = "0.0.5", path = "rs/moq-audio" } diff --git a/bun.lock b/bun.lock index 2d0fbca7f..07ddccb22 100644 --- a/bun.lock +++ b/bun.lock @@ -106,9 +106,12 @@ "dependencies": { "@moq/net": "workspace:^", "@moq/signals": "workspace:^", + "pako": "^2.2.0", }, "devDependencies": { "@types/bun": "^1.3.14", + "@types/pako": "^2.0.4", + "fflate": "^0.8.2", "rimraf": "^6.1.3", "typescript": "^6.0.3", }, @@ -795,6 +798,8 @@ "@types/node": ["@types/node@25.9.2", "", { "dependencies": { "undici-types": ">=7.24.0 <7.24.7" } }, "sha512-G05zqtJhcDLb8uslf5EjCxXg9G1KQxiV8OS0R26IC//Eoyitzqe8z37I7cqvnZlrlSfgocQRfSn/AHBZJJFyGw=="], + "@types/pako": ["@types/pako@2.0.4", "", {}, "sha512-VWDCbrLeVXJM9fihYodcLiIv0ku+AlOa/TQ1SvYOaBuyrSKgEcro95LJyIsJ4vSo6BXIxOKxiJAat04CmST9Fw=="], + "@types/react": ["@types/react@19.2.17", "", { "dependencies": { "csstype": "^3.2.2" } }, "sha512-MXfmqaVPEVgkBT/aY0aGCkRWWtByiYQXo3xdQ8r5RzuFrPiRn8Gar2tQdXSUQ2GKV3bkXckek89V8wQBY2Q/Aw=="], "@types/supports-color": ["@types/supports-color@8.1.3", "", {}, "sha512-Hy6UMpxhE3j1tLpl27exp1XqHD7n8chAiNPzWfz16LPZoMMoSc4dzLl6w9qijkEb/r5O1ozdu1CWGA2L83ZeZg=="], @@ -1041,6 +1046,8 @@ "fdir": ["fdir@6.5.0", "", { "peerDependencies": { "picomatch": "^3 || ^4" }, "optionalPeers": ["picomatch"] }, "sha512-tIbYtZbucOs0BRGqPJkshJUYdL+SDH7dVM8gjy+ERp3WAUjLEFJE+02kanyHtwjWOnwrKYBiwAmM0p4kLJAnXg=="], + "fflate": ["fflate@0.8.3", "", {}, "sha512-tbZNuJrLwGUp3zshBtdy4W+ORxZuIh8a5ilyIEQDC5rY1f3U20JMry0Ll3WBzU58EZKsEuJFXhb5gwv8CsPvgA=="], + "file-uri-to-path": ["file-uri-to-path@1.0.0", "", {}, "sha512-0Zt+s3L7Vf1biwWZ29aARiVYLx7iMGnEUl9x33fbB/j3jR81u/O2LbqK+Bm1CDSNDKVtJ/YjwY7TUd5SkeLQLw=="], "filelist": ["filelist@1.0.4", "", { "dependencies": { "minimatch": "^5.0.1" } }, "sha512-w1cEuf3S+DrLCQL7ET6kz+gmlJdbq9J7yXCSjK/OZCPA+qEN1WyF4ZAf0YYJa4/shHJra2t/d/r8SV4Ji+x+8Q=="], @@ -1337,6 +1344,8 @@ "package-manager-detector": ["package-manager-detector@1.6.0", "", {}, "sha512-61A5ThoTiDG/C8s8UMZwSorAGwMJ0ERVGj2OjoW5pAalsNOg15+iQiPzrLJ4jhZ1HJzmC2PIHT2oEiH3R5fzNA=="], + "pako": ["pako@2.2.0", "", {}, "sha512-zJq6RP/5q+TO2OpFV3FHzlPnFjmkb7Nc99a5SNjJE+uu/PkpChs+NIZSSzbBoD+6kjiISXjfYdwj1ZRQ81dz/w=="], + "param-case": ["param-case@3.0.4", "", { "dependencies": { "dot-case": "^3.0.4", "tslib": "^2.0.3" } }, "sha512-RXlj7zCYokReqWpOPH9oYivUzLYZ5vAPIfEmCTNViosC78F8F0H9y7T7gG2M39ymgutxF5gcFEsyZQSph9Bp3A=="], "parse-entities": ["parse-entities@4.0.2", "", { "dependencies": { "@types/unist": "^2.0.0", "character-entities-legacy": "^3.0.0", "character-reference-invalid": "^2.0.0", "decode-named-character-reference": "^1.0.0", "is-alphanumerical": "^2.0.0", "is-decimal": "^2.0.0", "is-hexadecimal": "^2.0.0" } }, "sha512-GG2AQYWoLgL877gQIKeRPGO1xF9+eG1ujIb5soS5gPvLQ1y2o8FL90w2QWNdf9I361Mpp7726c+lj3U0qK1uGw=="], diff --git a/js/json/package.json b/js/json/package.json index 7526da354..5a104a402 100644 --- a/js/json/package.json +++ b/js/json/package.json @@ -17,13 +17,16 @@ }, "dependencies": { "@moq/net": "workspace:^", - "@moq/signals": "workspace:^" + "@moq/signals": "workspace:^", + "pako": "^2.2.0" }, "peerDependencies": { "zod": "^4.0.0" }, "devDependencies": { "@types/bun": "^1.3.14", + "@types/pako": "^2.0.4", + "fflate": "^0.8.2", "rimraf": "^6.1.3", "typescript": "^6.0.3" } diff --git a/js/json/src/compression.test.ts b/js/json/src/compression.test.ts new file mode 100644 index 000000000..f9257cb36 --- /dev/null +++ b/js/json/src/compression.test.ts @@ -0,0 +1,247 @@ +import { expect, test } from "bun:test"; +import { Track } from "@moq/net"; +import { Deflate, Inflate } from "fflate"; +import { Decoder, Encoder } from "./compression.ts"; +import { Consumer } from "./consumer.ts"; +import { Producer } from "./producer.ts"; + +type Value = Record; + +const enc = new TextEncoder(); +const dec = new TextDecoder(); + +function concatBytes(chunks: Uint8Array[]): Uint8Array { + const out = new Uint8Array(chunks.reduce((n, c) => n + c.length, 0)); + let offset = 0; + for (const chunk of chunks) { + out.set(chunk, offset); + offset += chunk.length; + } + return out; +} + +// Round-trip frames through fflate's streaming `Deflate.flush(true)` + `Inflate`, the same +// shared-window scheme our pako codec uses. Returns true only if every frame survives unchanged. +function fflateRoundTrips(frames: Uint8Array[]): boolean { + try { + let captured: Uint8Array[] = []; + const deflate = new Deflate({ level: 6 }); + deflate.ondata = (chunk) => captured.push(chunk); + const slices = frames.map((frame) => { + captured = []; + deflate.push(frame, false); + deflate.flush(true); // sync flush: byte-align and retain the window + return concatBytes(captured); + }); + + let inflated: Uint8Array[] = []; + const inflate = new Inflate(); + inflate.ondata = (chunk) => inflated.push(chunk); + return slices.every((slice, i) => { + inflated = []; + inflate.push(slice, false); + const got = concatBytes(inflated); + return got.length === frames[i].length && got.every((b, j) => b === frames[i][j]); + }); + } catch { + return false; + } +} + +// Reconstruct every value a compressed consumer yields, in order. +async function drainCompressed(track: Track): Promise { + const out: Value[] = []; + for await (const value of new Consumer(track, { compression: true })) out.push(value); + return out; +} + +// The raw (stored) bytes of a track's first frame, without reconstructing JSON. +async function firstFrame(track: Track): Promise { + const group = await track.nextGroupOrdered(); + if (!group) throw new Error("expected a group"); + const frame = await group.readFrame(); + if (!frame) throw new Error("expected a frame"); + return frame; +} + +test("codec round-trips a group of frames in order", async () => { + const frames = ["the quick brown fox", "the quick brown dog", "the lazy fox"]; + const encoder = new Encoder(); + const slices = frames.map((f) => encoder.frame(enc.encode(f))); + + const decoder = new Decoder(); + expect(slices.map((s) => dec.decode(decoder.frame(s)))).toEqual(frames); +}); + +test("codec round-trips an empty frame", async () => { + const encoder = new Encoder(); + const decoder = new Decoder(); + expect(encoder.frame(new Uint8Array()).length).toBe(0); + expect(decoder.frame(new Uint8Array()).length).toBe(0); +}); + +test("codec rejects garbage", async () => { + const decoder = new Decoder(); + expect(() => decoder.frame(new Uint8Array(64).fill(0xff))).toThrow(); +}); + +test("codec rejects frames that inflate past the cap", async () => { + // A tiny slice can inflate enormously, so the decoder bounds the output as it is produced. + const encoder = new Encoder(); + const decoder = new Decoder(); + const slice = encoder.frame(enc.encode("a".repeat(64 * 1024 * 1024 + 1))); + expect(() => decoder.frame(slice)).toThrow(/exceeded/); +}); + +test("a frame larger than pako's chunk size round-trips", () => { + // High-entropy data barely compresses, so the slice spans multiple pako chunks (>16 KB), which + // exercises the encoder's multi-chunk assembly and the decoder's multi-chunk concat. + let state = 0x9e3779b9 >>> 0; + const payload = new Uint8Array(64 * 1024); + for (let i = 0; i < payload.length; i++) { + state ^= state << 13; + state ^= state >>> 17; + state ^= state << 5; + state >>>= 0; + payload[i] = state & 0xff; + } + + const slice = new Encoder().frame(payload); + expect(slice.length).toBeGreaterThan(16 * 1024); // pako's default chunkSize + expect(new Decoder().frame(slice)).toEqual(payload); +}); + +test("cross-frame context shrinks a repeated frame", async () => { + // A later frame identical to an earlier one compresses far smaller once the window holds it. + const encoder = new Encoder(); + const payload = enc.encode("Media over QUIC delivers real-time latency at massive scale.".repeat(6)); + const first = encoder.frame(payload); + const second = encoder.frame(payload); + expect(second.length).toBeLessThan(first.length); +}); + +test("compressed snapshot per group round-trips", async () => { + const track = new Track("test"); + const producer = new Producer(track, { deltaRatio: 0, compression: true }); + producer.update({ a: 1 }); + producer.update({ a: 2 }); + producer.finish(); + + // Deltas off: one compressed snapshot per group, reconstructed in order. + expect(await drainCompressed(track)).toEqual([{ a: 1 }, { a: 2 }]); +}); + +test("compressed live consumer sees each update in order", async () => { + // A live consumer reconstructs each update in order from the shared per-group stream. + const track = new Track("test"); + const producer = new Producer(track, { deltaRatio: 100, compression: true }); + const consumer = new Consumer(track, { compression: true }); + + for (let n = 1; n <= 5; n++) { + producer.update({ a: n }); + expect(await consumer.next()).toEqual({ a: n }); + } +}); + +test("compressed deltas share one group and reconstruct", async () => { + const track = new Track("test"); + const producer = new Producer(track, { deltaRatio: 100, compression: true }); + producer.update({ a: 1, b: 1 }); + producer.update({ a: 1, b: 2 }); + producer.update({ a: 5, b: 2 }); + producer.finish(); + + expect((await drainCompressed(track)).at(-1)).toEqual({ a: 5, b: 2 }); +}); + +test("compressed late joiner reconstructs from snapshot + deltas", async () => { + const track = new Track("test"); + const producer = new Producer(track, { deltaRatio: 100, compression: true }); + producer.update({ a: 1, b: 1 }); + producer.update({ a: 1, b: 2 }); + producer.update({ a: 5, b: 2 }); + producer.finish(); + + // A consumer created only now still rebuilds the final value from the snapshot + deltas. + expect((await drainCompressed(track)).at(-1)).toEqual({ a: 5, b: 2 }); +}); + +test("a group's snapshot decodes from a fresh decoder", async () => { + // Frame 0 opens a cold window, so a brand-new decoder reconstructs it, which is what lets a late + // joiner (or the Rust consumer) start mid-stream at any group boundary. + const track = new Track("test"); + const producer = new Producer(track, { deltaRatio: 0, compression: true }); + producer.update({ hello: "world" }); + producer.finish(); + + const decoder = new Decoder(); + expect(JSON.parse(dec.decode(decoder.frame(await firstFrame(track))))).toEqual({ hello: "world" }); +}); + +test("compressed deltas reuse the window", async () => { + // The shared per-group window is the point: a delta restating snapshot content shrinks sharply. + const track = new Track("test"); + const producer = new Producer(track, { deltaRatio: 100, compression: true }); + const phrase = "Media over QUIC delivers real-time latency at massive scale"; + producer.update({ note: phrase }); + producer.update({ note: phrase, echo: phrase }); + producer.finish(); + + const group = await track.nextGroupOrdered(); + if (!group) throw new Error("expected a group"); + await group.readFrame(); // snapshot + const delta = await group.readFrame(); + if (!delta) throw new Error("expected a delta"); + + const rawDelta = enc.encode(JSON.stringify({ echo: phrase })); + expect(delta.length).toBeLessThan(rawDelta.length / 2); +}); + +test("pako round-trips a group that fflate's flush corrupts", async () => { + // A catalog snapshot + 3 deltas that fflate's streaming flush mis-encodes: even fflate's own + // Inflate can't round-trip its output here. This pins why @moq/json depends on pako, not the + // smaller fflate. If this ever fails (fflateRoundTrips returns true), fflate may have fixed its + // sync-flush encoder, and dropping the pako dependency could be reconsidered. + const group: Value[] = [ + { + video: { + renditions: { + v0: { codec: "avc1.64001f", bitrate: 6000000 }, + v1: { codec: "avc1.640015", bitrate: 3000000 }, + }, + }, + audio: { renditions: { a0: { codec: "opus", bitrate: 128000 } } }, + }, + { video: { renditions: { v0: { bitrate: 6200000 } } } }, + { video: { renditions: { v0: { bitrate: 5800000 } } } }, + { audio: { renditions: { a0: { bitrate: 96000 } } } }, + ]; + const frames = group.map((value) => enc.encode(JSON.stringify(value))); + + // Our pako codec round-trips every frame of the group exactly. + const encoder = new Encoder(); + const decoder = new Decoder(); + for (const frame of frames) { + expect(decoder.frame(encoder.frame(frame))).toEqual(frame); + } + + // Positive control: fflate's flush works on simpler frames, so the helper is sound and fflate is + // only selectively broken, not failing for some unrelated reason. + expect(fflateRoundTrips(["the quick brown fox", "the quick brown dog"].map((s) => enc.encode(s)))).toBe(true); + + // fflate's streaming flush does not round-trip the same group our pako codec handles. + expect(fflateRoundTrips(frames)).toBe(false); +}); + +test("compression shrinks a repetitive frame", async () => { + const value = { renditions: Array(3).fill("video".repeat(50)) }; + + const plain = new Track("plain"); + new Producer(plain, { deltaRatio: 0 }).update(value); + const compressed = new Track("compressed"); + new Producer(compressed, { deltaRatio: 0, compression: true }).update(value); + + const plainLen = (await firstFrame(plain)).length; + const compressedLen = (await firstFrame(compressed)).length; + expect(compressedLen).toBeLessThan(plainLen); +}); diff --git a/js/json/src/compression.ts b/js/json/src/compression.ts new file mode 100644 index 000000000..db874ff3e --- /dev/null +++ b/js/json/src/compression.ts @@ -0,0 +1,138 @@ +/** + * Group-scoped DEFLATE compression for the JSON frame stream, using + * {@link https://github.com/nodeca/pako | pako}'s streaming deflate/inflate. + * + * Within a group the frame payloads form a single raw DEFLATE + * ([RFC 1951](https://www.rfc-editor.org/rfc/rfc1951.html)) stream, sync-flushed at each frame + * boundary so every frame is self-delimited while later frames reuse the earlier ones as context + * (a snapshot followed by deltas compresses far better than each frame alone). This matches the + * Rust `moq-json` producer, so the two interoperate on the wire. + * + * A sync flush always ends in the fixed 4-byte marker `00 00 ff ff`. {@link Encoder.frame} drops + * it and {@link Decoder.frame} re-appends it, saving 4 bytes per frame, the same trick + * [RFC 7692](https://www.rfc-editor.org/rfc/rfc7692.html#section-7.2.1) (permessage-deflate) uses. + * moq-net frames each slice, so there's no length prefix; {@link Decoder.frame} instead caps the + * inflated output as it is produced. + * + * pako is synchronous, so the whole codec is synchronous; it is a normal dependency. + * + * @module + */ + +import * as pako from "pako"; + +// Maximum decompressed size of a single frame. A malicious publisher could otherwise send a tiny +// slice that inflates hugely, so {@link Decoder} stops retaining output past this and rejects the +// frame. Mirrors the Rust `MAX_DECOMPRESSED_FRAME`. +const MAX_DECOMPRESSED_FRAME = 64 * 1024 * 1024; + +// The trailing bytes of a DEFLATE sync flush, stripped on the wire and re-appended to decode. +const SYNC_FLUSH_TAIL = new Uint8Array([0x00, 0x00, 0xff, 0xff]); + +// Concatenate chunks into one tight buffer (a single chunk passes through untouched). Safe only for +// output that is consumed before the next push, since a single chunk is pako's own reused buffer. +function concat(chunks: Uint8Array[], total: number): Uint8Array { + if (chunks.length === 1) return chunks[0]; + const out = new Uint8Array(total); + let offset = 0; + for (const chunk of chunks) { + out.set(chunk, offset); + offset += chunk.length; + } + return out; +} + +/** + * Encodes a group's frame payloads into one shared DEFLATE stream, one self-delimited slice per + * frame. Hold one per group; create a new one at each group boundary. + * + * @public + */ +export class Encoder { + #deflate = new pako.Deflate({ raw: true }); + #chunks: Uint8Array[] = []; + #total = 0; + + /** Start a fresh per-group encoder with a cold window. */ + constructor() { + this.#deflate.onData = (chunk) => { + const bytes = chunk as Uint8Array; + this.#chunks.push(bytes); + this.#total += bytes.length; + }; + } + + /** + * Compress the next frame's `payload`, returning its slice of the group stream: the DEFLATE bytes + * minus the fixed sync-flush marker. Empty in yields empty out. Slices must be produced in frame + * order. + */ + frame(payload: Uint8Array): Uint8Array { + if (payload.length === 0) return payload; + this.#chunks = []; + this.#total = 0; + this.#deflate.push(payload, pako.constants.Z_SYNC_FLUSH); + + // Copy into one tight owned buffer, dropping the trailing sync-flush marker. We can't return + // pako's chunk views: `writeFrame` retains the reference and pako backs each chunk with a + // ~16 KB buffer, so a view would pin far more memory than the frame. + const out = new Uint8Array(this.#total - SYNC_FLUSH_TAIL.length); + let offset = 0; + for (const chunk of this.#chunks) { + if (offset >= out.length) break; + const take = Math.min(chunk.length, out.length - offset); + out.set(chunk.subarray(0, take), offset); + offset += take; + } + return out; + } +} + +/** + * Decodes a group's frame slices back into the original payloads. Hold one per group; feed slices + * in frame order (each frame builds on the earlier ones). + * + * @public + */ +export class Decoder { + #inflate = new pako.Inflate({ raw: true }); + #chunks: Uint8Array[] = []; + #total = 0; + #tooLarge = false; + + /** Start a fresh per-group decoder with a cold window. */ + constructor() { + this.#inflate.onData = (chunk) => { + const bytes = chunk as Uint8Array; + this.#total += bytes.length; + // Bound the inflated output as it is produced; a tiny slice can expand enormously. Stop + // retaining past the cap, then reject once the push returns. + if (this.#total > MAX_DECOMPRESSED_FRAME) { + this.#tooLarge = true; + return; + } + this.#chunks.push(bytes); + }; + } + + /** + * Decompress the next frame's `slice` back into its payload. Empty in yields empty out. Throws if + * the input is malformed or inflates past the per-frame size limit. + */ + frame(slice: Uint8Array): Uint8Array { + if (slice.length === 0) return slice; + + this.#chunks = []; + this.#total = 0; + this.#tooLarge = false; + + // Feed the slice then the re-appended sync-flush marker as two pushes, so no combined buffer is + // allocated. The marker delimits the frame and flushes its last bytes out of the inflate buffer. + this.#inflate.push(slice, false); + this.#inflate.push(SYNC_FLUSH_TAIL, pako.constants.Z_SYNC_FLUSH); + if (this.#inflate.err) throw new Error(`decompression failed: ${this.#inflate.msg}`); + if (this.#tooLarge) throw new Error(`decompressed frame exceeded ${MAX_DECOMPRESSED_FRAME} bytes`); + + return concat(this.#chunks, this.#total); + } +} diff --git a/js/json/src/consumer.ts b/js/json/src/consumer.ts index 5db5a4dd5..3d2e36b3e 100644 --- a/js/json/src/consumer.ts +++ b/js/json/src/consumer.ts @@ -1,5 +1,6 @@ import type * as Moq from "@moq/net"; import type * as z from "zod/mini"; +import { Decoder } from "./compression.ts"; import { merge } from "./diff.ts"; import type { Config } from "./producer.ts"; @@ -12,14 +13,19 @@ import type { Config } from "./producer.ts"; export class Consumer { #track: Moq.Track; #schema?: z.ZodMiniType; + // Whether frames are `deflate-raw` compressed. Must match the producer's {@link Config.compression}. + #decompress: boolean; #group?: Moq.Group; + // Per-group DEFLATE decoder, built lazily on the first frame of a group and reset at each boundary. + #decoder?: Decoder; #current?: unknown; #framesRead = 0; constructor(track: Moq.Track, config: Config = {}) { this.#track = track; this.#schema = config.schema; + this.#decompress = config.compression ?? false; } /** Get the next reconstructed value, or `undefined` once the track ends. */ @@ -31,6 +37,8 @@ export class Consumer { if (!this.#group) return undefined; this.#current = undefined; this.#framesRead = 0; + // Each group is its own compressed stream, so start a fresh decoder. + this.#decoder = undefined; } const frame = await this.#group.readFrame(); @@ -52,9 +60,15 @@ export class Consumer { } } - // Frame 0 of a group is a snapshot, the rest are merge patches. + // Frame 0 of a group is a snapshot, the rest are merge patches. When compressed, frames share one + // per-group DEFLATE stream, so they decode in order through a decoder built on the group's first frame. #apply(frame: Uint8Array): T { - const parsed = JSON.parse(new TextDecoder().decode(frame)); + let payload = frame; + if (this.#decompress) { + this.#decoder ??= new Decoder(); + payload = this.#decoder.frame(frame); + } + const parsed = JSON.parse(new TextDecoder().decode(payload)); if (this.#framesRead === 0) { this.#current = parsed; } else { diff --git a/js/json/src/producer.ts b/js/json/src/producer.ts index eecbe7846..d45625348 100644 --- a/js/json/src/producer.ts +++ b/js/json/src/producer.ts @@ -2,6 +2,7 @@ import * as Moq from "@moq/net"; import type { Effect } from "@moq/signals"; import type * as z from "zod/mini"; +import { Encoder } from "./compression.ts"; import { deepEqual, diff } from "./diff.ts"; // Maximum frames (snapshot + deltas) in a single group before a new snapshot is forced. Kept @@ -30,6 +31,12 @@ export interface Config { // Starting value for {@link Producer.mutate} before anything has been published. Required to // mutate a producer that hasn't published yet (e.g. a fresh catalog); ignored once a value exists. initial?: T; + + // Compress each group as one sync-flushed `deflate-raw` (RFC 1951) stream, so deltas reuse the + // snapshot as context and shrink sharply. Interoperable with the Rust `moq-json` producer. + // `false`/unset (the default) writes plaintext JSON frames. A {@link Consumer} reading the track + // must set the same flag. + compression?: boolean; } /** @@ -49,10 +56,18 @@ export class Producer { #track?: Moq.Track; #group?: Moq.Group; #last?: unknown; - // Bytes of deltas accumulated in the current group, excluding the snapshot frame. + // Bytes of deltas accumulated in the current group, excluding the snapshot frame. Always raw + // (uncompressed) sizes, even when compressing: the delta-vs-snapshot decision measures raw bytes, + // so a compressed producer rolls groups on raw sizes (still valid on the wire, just a touch sooner + // than the Rust producer, which measures compressed sizes). #deltaBytes = 0; #groupFrames = 0; + // Group-scoped `deflate-raw` compression. `#encoder` is the current group's stream, swapped for a + // fresh one (cold window) at each snapshot, so a snapshot and its deltas share one DEFLATE stream. + #compress = false; + #encoder?: Encoder; + // Fan-out mode: retains the value and serves a child (leaf) Producer per subscriber. #outputs?: Set>; #value?: T; @@ -70,6 +85,7 @@ export class Producer { this.#outputs = new Set(); this.#value = this.#config.initial; } + this.#compress = this.#config.compression ?? false; } /** The current value, or `undefined` if nothing has been published yet. */ @@ -110,7 +126,7 @@ export class Producer { const snapshot = new TextEncoder().encode(text); const delta = this.#delta(json, snapshot.length); if (delta && this.#group) { - this.#group.writeFrame(delta); + this.#writeDelta(this.#group, delta); this.#deltaBytes += delta.length; this.#groupFrames += 1; } else { @@ -211,7 +227,7 @@ export class Producer { this.#group?.close(); const group = track.appendGroup(); - group.writeFrame(snapshot); + this.#writeSnapshot(group, snapshot); this.#deltaBytes = 0; this.#groupFrames = 1; @@ -224,4 +240,25 @@ export class Producer { this.#group = undefined; } } + + // Write a group's snapshot (frame 0). On the compressed path this opens a fresh per-group encoder + // (cold window), so the snapshot and its deltas share one DEFLATE stream. + #writeSnapshot(group: Moq.Group, frame: Uint8Array): void { + if (!this.#compress) { + group.writeFrame(frame); + return; + } + this.#encoder = new Encoder(); + group.writeFrame(this.#encoder.frame(frame)); + } + + // Write a delta frame, compressed against the current group's encoder when compressing. + #writeDelta(group: Moq.Group, frame: Uint8Array): void { + if (!this.#compress) { + group.writeFrame(frame); + return; + } + if (!this.#encoder) throw new Error("compressed delta requires an open group"); + group.writeFrame(this.#encoder.frame(frame)); + } } diff --git a/rs/moq-boy/src/input.rs b/rs/moq-boy/src/input.rs index 8c253a9ea..1ea5067c7 100644 --- a/rs/moq-boy/src/input.rs +++ b/rs/moq-boy/src/input.rs @@ -105,7 +105,7 @@ async fn handle_viewer_commands( }; let track = broadcast.subscribe_track(&command_track)?; - let mut commands = moq_json::Consumer::::new(track); + let mut commands = moq_json::Consumer::::new(track, moq_json::ConsumerConfig::default()); loop { let command = match commands.next().await { diff --git a/rs/moq-boy/src/status.rs b/rs/moq-boy/src/status.rs index 3329ba4bc..774bce69e 100644 --- a/rs/moq-boy/src/status.rs +++ b/rs/moq-boy/src/status.rs @@ -47,7 +47,7 @@ impl StatusPublisher { let producer = broadcast.create_track(track)?; Ok(Self { - producer: moq_json::Producer::new(producer, moq_json::Config::default()), + producer: moq_json::Producer::new(producer, moq_json::ProducerConfig::default()), }) } diff --git a/rs/moq-json/Cargo.toml b/rs/moq-json/Cargo.toml index d8ca7398d..8cbeac345 100644 --- a/rs/moq-json/Cargo.toml +++ b/rs/moq-json/Cargo.toml @@ -17,6 +17,7 @@ doctest = false [dependencies] bytes = "1" +flate2 = { workspace = true } json-patch = "4" kio = { workspace = true } moq-net = { workspace = true } diff --git a/rs/moq-json/src/compression.rs b/rs/moq-json/src/compression.rs new file mode 100644 index 000000000..5bc6a9303 --- /dev/null +++ b/rs/moq-json/src/compression.rs @@ -0,0 +1,224 @@ +//! Group-scoped DEFLATE compression for the JSON frame stream. +//! +//! Within a group the frame payloads form a single raw DEFLATE ([RFC 1951]) stream, sync-flushed +//! at each frame boundary so every frame carries its own self-delimited slice while later frames +//! reuse the earlier ones as context (a snapshot followed by deltas compresses far better than +//! each frame alone). The [`Encoder`]/[`Decoder`] hold that per-group state; both are recreated at +//! every group boundary. +//! +//! This is plain raw DEFLATE with a `Z_SYNC_FLUSH` after each frame, so a browser (`@moq/json`) +//! peer interoperates on the wire using the same primitive (zlib's sync flush). moq-net already +//! frames each slice, so there's no length prefix. A small slice can still inflate to far more than +//! its own size, so [`Decoder::frame`] bounds each frame's output at [`MAX_DECOMPRESSED_FRAME`]. +//! +//! A sync flush always ends in the 4-byte empty-block marker `00 00 ff ff`. That marker is fixed, +//! so [`Encoder::frame`] drops it from each slice and [`Decoder::frame`] re-appends it before +//! inflating, saving 4 bytes per frame. This is the same trick [RFC 7692] (permessage-deflate) +//! uses for WebSocket messages. +//! +//! [RFC 1951]: https://www.rfc-editor.org/rfc/rfc1951.html +//! [RFC 7692]: https://www.rfc-editor.org/rfc/rfc7692.html#section-7.2.1 + +use bytes::Bytes; +use flate2::{Compress, Decompress, FlushCompress, FlushDecompress, Status}; + +use crate::{Error, Result}; + +/// DEFLATE level for the frame stream: zlib's own default, a good size/speed balance for the small, +/// repetitive payloads this targets. +const LEVEL: u32 = 6; + +/// The trailing bytes of a DEFLATE sync flush, stripped on the wire and re-appended to decode. +const SYNC_FLUSH_TAIL: [u8; 4] = [0x00, 0x00, 0xff, 0xff]; + +/// Maximum decompressed size of a single frame. +/// +/// A malicious publisher could otherwise send a tiny slice that inflates hugely, so +/// [`Decoder::frame`] stops and returns [`Error::TooLarge`] rather than allocating without limit. +const MAX_DECOMPRESSED_FRAME: u64 = 64 * 1024 * 1024; + +/// Scratch buffer size for the streaming (de)compress loops. +const CHUNK: usize = 8 * 1024; + +/// Encodes a group's frame payloads into one shared DEFLATE stream, one self-delimited slice per +/// frame. Hold one per group; the stream is recreated at each group boundary. +pub(crate) struct Encoder(Compress); + +impl Encoder { + /// Start a fresh per-group encoder with a cold window. + pub(crate) fn new() -> Self { + // `false`: raw DEFLATE, no zlib header/trailer, matching `deflate-raw` on the browser side. + Self(Compress::new(flate2::Compression::new(LEVEL), false)) + } + + /// Compress the next frame's `payload`, returning its slice of the group stream: the DEFLATE + /// bytes minus the fixed sync-flush marker. Empty in yields empty out. Later frames reuse earlier + /// ones as context, so slices must be produced (and later decoded) in frame order. + pub(crate) fn frame(&mut self, payload: &[u8]) -> Bytes { + if payload.is_empty() { + return Bytes::new(); + } + + let mut out = Vec::with_capacity(payload.len() / 2 + 16); + let mut tmp = [0u8; CHUNK]; + let mut input = payload; + + // Drive the stream with a sync flush so this frame's slice is self-delimited (byte-aligned, + // window retained). The classic zlib loop: keep going while the output buffer fills up. + loop { + let before_in = self.0.total_in(); + let before_out = self.0.total_out(); + self.0.compress(input, &mut tmp, FlushCompress::Sync).expect("deflate"); + let consumed = (self.0.total_in() - before_in) as usize; + let produced = (self.0.total_out() - before_out) as usize; + out.extend_from_slice(&tmp[..produced]); + input = &input[consumed..]; + if produced < tmp.len() { + break; + } + } + + // Drop the fixed sync-flush marker; the decoder re-appends it (see the module docs). + debug_assert!( + out.ends_with(&SYNC_FLUSH_TAIL), + "a sync flush must end in the deflate marker" + ); + out.truncate(out.len() - SYNC_FLUSH_TAIL.len()); + Bytes::from(out) + } +} + +/// Decodes a group's frame slices back into the original payloads. Hold one per group; feed slices +/// in frame order (each frame builds on the earlier ones). +pub(crate) struct Decoder(Decompress); + +impl Decoder { + /// Start a fresh per-group decoder with a cold window. + pub(crate) fn new() -> Self { + // `false`: raw DEFLATE, matching the encoder. + Self(Decompress::new(false)) + } + + /// Decompress the next frame's `slice` back into its payload. + /// + /// An empty slice yields an empty payload. Returns [`Error::TooLarge`] if the frame inflates past + /// the per-frame bound (checked as output is produced, not from any declared size), and + /// [`Error::Decompress`] on malformed input. + pub(crate) fn frame(&mut self, slice: &[u8]) -> Result { + if slice.is_empty() { + return Ok(Bytes::new()); + } + + let mut out = Vec::new(); + let mut tmp = [0u8; CHUNK]; + + // Feed the wire slice followed by the re-appended sync-flush marker, which delimits the frame + // and flushes its last bytes out of the inflate buffer. + for segment in [slice, &SYNC_FLUSH_TAIL] { + let mut input = segment; + loop { + let before_in = self.0.total_in(); + let before_out = self.0.total_out(); + let status = self + .0 + .decompress(input, &mut tmp, FlushDecompress::Sync) + .map_err(|_| Error::Decompress)?; + let consumed = (self.0.total_in() - before_in) as usize; + let produced = (self.0.total_out() - before_out) as usize; + // Bound the inflated output as it is produced; a tiny slice can expand enormously. + if out.len() as u64 + produced as u64 > MAX_DECOMPRESSED_FRAME { + return Err(Error::TooLarge(MAX_DECOMPRESSED_FRAME)); + } + out.extend_from_slice(&tmp[..produced]); + input = &input[consumed..]; + + // Move to the next segment once this one is drained and the buffer wasn't saturated. The + // no-progress guard avoids spinning when the marker needs no further output. + if matches!(status, Status::StreamEnd) || (input.is_empty() && produced < tmp.len()) { + break; + } + if consumed == 0 && produced == 0 { + break; + } + } + } + + Ok(Bytes::from(out)) + } +} + +#[cfg(test)] +mod test { + use super::*; + + /// Round-trip a sequence of frames through a group encoder/decoder pair. + fn roundtrip(frames: &[&[u8]]) -> Vec> { + let mut enc = Encoder::new(); + let slices: Vec = frames.iter().map(|f| enc.frame(f)).collect(); + + let mut dec = Decoder::new(); + slices.iter().map(|s| dec.frame(s).unwrap().to_vec()).collect() + } + + #[test] + fn group_roundtrip() { + let frames: &[&[u8]] = &[b"the quick brown fox", b"the quick brown dog", b"the lazy fox"]; + let got = roundtrip(frames); + for (a, b) in frames.iter().zip(&got) { + assert_eq!(*a, b.as_slice()); + } + } + + #[test] + fn empty_frames_roundtrip() { + assert!(Encoder::new().frame(b"").is_empty()); + assert!(Decoder::new().frame(b"").unwrap().is_empty()); + } + + #[test] + fn cross_frame_context_shrinks() { + // A later frame identical to an earlier one compresses to far fewer bytes once the window + // holds the earlier copy: this is the whole point of a shared per-group stream. + let payload = b"Media over QUIC delivers real-time latency at massive scale.".repeat(6); + let mut enc = Encoder::new(); + let first = enc.frame(&payload); + let second = enc.frame(&payload); + assert!( + second.len() < first.len(), + "repeat frame {} should be smaller than first {}", + second.len(), + first.len() + ); + } + + #[test] + fn frame_larger_than_chunk_roundtrips() { + // High-entropy data barely compresses, so its slice exceeds the streaming `CHUNK` scratch + // buffer and the (de)compress loops must iterate. Verify it still round-trips byte for byte. + let mut state: u64 = 0x9E37_79B9_7F4A_7C15; + let payload: Vec = (0..64 * 1024) + .map(|_| { + state ^= state << 13; + state ^= state >> 7; + state ^= state << 17; + (state >> 56) as u8 + }) + .collect(); + + let mut enc = Encoder::new(); + let slice = enc.frame(&payload); + assert!(slice.len() > CHUNK, "slice {} should exceed CHUNK {CHUNK}", slice.len()); + + let mut dec = Decoder::new(); + assert_eq!(dec.frame(&slice).unwrap(), Bytes::from(payload)); + } + + #[test] + fn decompress_rejects_garbage() { + let mut dec = Decoder::new(); + assert!(matches!( + dec.frame(b"not a deflate stream at all"), + Err(Error::Decompress) + )); + } +} diff --git a/rs/moq-json/src/lib.rs b/rs/moq-json/src/lib.rs index d26b8ad5c..5a1f2dd2b 100644 --- a/rs/moq-json/src/lib.rs +++ b/rs/moq-json/src/lib.rs @@ -6,9 +6,10 @@ //! order. A consumer jumps to the newest group, reads the snapshot, and applies the deltas, so //! a late joiner never needs older groups. //! -//! Deltas are controlled by [`Config::delta_ratio`]. A ratio of `0` disables them, so every +//! Deltas are controlled by [`ProducerConfig::delta_ratio`]. A ratio of `0` disables them, so every //! change is a fresh snapshot group, matching a plain "one JSON blob per group" track. +mod compression; mod diff; use std::marker::PhantomData; @@ -16,10 +17,12 @@ use std::ops::{Deref, DerefMut}; use std::sync::{Arc, Mutex, MutexGuard}; use std::task::Poll; +use bytes::Bytes; use serde::Serialize; use serde::de::DeserializeOwned; use serde_json::Value; +use crate::compression::{Decoder, Encoder}; use crate::diff::diff; /// Maximum frames (snapshot + deltas) in a single group before a new snapshot is forced. @@ -41,6 +44,14 @@ pub enum Error { /// Stored as a string since [`serde_json::Error`] is not [`Clone`]. #[error("json: {0}")] Json(String), + + /// A compressed frame could not be decoded (malformed or truncated stream). + #[error("decompression failed")] + Decompress, + + /// A frame's decompressed size exceeded the limit (zip-bomb guard). + #[error("decompressed frame exceeded {0} bytes")] + TooLarge(u64), } impl From for Error { @@ -53,27 +64,56 @@ impl From for Error { pub type Result = std::result::Result; /// Configuration for a [`Producer`]. +/// +/// Build from [`Default`] and override fields (the struct is `#[non_exhaustive]`, so new +/// options stay additive): `let mut config = ProducerConfig::default(); config.delta_ratio = 0;`. #[derive(Debug, Clone)] -pub struct Config { +#[non_exhaustive] +pub struct ProducerConfig { /// Controls how aggressively the producer emits deltas (merge patches) instead of full snapshots. /// /// A ratio of `0` disables deltas: every change is published as a new snapshot group. /// /// A positive ratio enables deltas. A delta is appended to the current group as long as the /// accumulated deltas (excluding the snapshot frame) stay within `ratio` times the size of a - /// fresh snapshot; otherwise a new snapshot group is started. So `1` allows deltas totalling up + /// snapshot; otherwise a new snapshot group is started. So `1` allows deltas totalling up /// to one snapshot before rolling, and a larger ratio tolerates more deltas per snapshot. /// + /// When [`compression`](Self::compression) is on, both sides of the comparison are measured on + /// the *compressed* frame sizes (the real wire cost). + /// /// Defaults to `8`. pub delta_ratio: u32, + + /// Compress each group as one sync-flushed DEFLATE stream, so deltas reuse the snapshot as + /// context and shrink sharply. + /// + /// `false` (the default) writes plaintext JSON frames, identical on the wire to an uncompressed + /// track. A [`Consumer`] reading the track must set [`ConsumerConfig::compression`] to match. + pub compression: bool, } -impl Default for Config { +impl Default for ProducerConfig { fn default() -> Self { - Self { delta_ratio: 8 } + Self { + delta_ratio: 8, + compression: false, + } } } +/// Configuration for a [`Consumer`]. +/// +/// Build from [`Default`] and override fields (the struct is `#[non_exhaustive]`, so new options +/// stay additive). +#[derive(Debug, Clone, Default)] +#[non_exhaustive] +pub struct ConsumerConfig { + /// Whether the track's frames are DEFLATE-compressed. Must match the producer's + /// [`ProducerConfig::compression`]. Defaults to `false`. + pub compression: bool, +} + /// Publishes a JSON value over a track, choosing snapshots and deltas automatically. /// /// Cheaply clonable: clones share one underlying track and publishing state, like other MoQ @@ -101,13 +141,15 @@ impl Producer { impl Producer { /// Create a producer that publishes to the given track. - pub fn new(track: moq_net::TrackProducer, config: Config) -> Self { + pub fn new(track: moq_net::TrackProducer, config: ProducerConfig) -> Self { Self { inner: Arc::new(Mutex::new(Inner { track, group: None, + encoder: None, last: None, delta_bytes: 0, + snapshot_len: 0, group_frames: 0, config, })), @@ -207,11 +249,17 @@ impl Drop for Guard<'_, T> { struct Inner { track: moq_net::TrackProducer, group: Option, + // Per-group DEFLATE encoder, `Some` while a compressed group is open (recreated per group). + encoder: Option, last: Option, - // Bytes of deltas accumulated in the current group, excluding the snapshot frame. + // Bytes of deltas accumulated in the current group, excluding the snapshot frame. Compressed + // slice sizes when compressing, raw patch sizes otherwise. delta_bytes: u64, + // Reference size the delta budget is measured against: the current group's snapshot frame. + // Its compressed slice size when compressing, raw otherwise. + snapshot_len: u64, group_frames: usize, - config: Config, + config: ProducerConfig, } impl Inner { @@ -221,10 +269,10 @@ impl Inner { } match self.delta(&json, snapshot.len())? { - Some(delta) => { + Some(slice) => { let group = self.group.as_mut().expect("delta requires an open group"); - let len = delta.len() as u64; - group.write_frame(delta)?; + let len = slice.len() as u64; + group.write_frame(slice)?; self.delta_bytes += len; self.group_frames += 1; } @@ -235,34 +283,51 @@ impl Inner { Ok(()) } - /// Serialize a delta if deltas are enabled and appending one keeps the group within budget; - /// otherwise `None`, signalling that a fresh snapshot should be published instead. - fn delta(&self, value: &Value, snapshot_len: usize) -> Result>> { + /// Serialize (and, when compressing, compress) a delta if deltas are enabled and appending one + /// keeps the group within budget; otherwise `None`, signalling that a fresh snapshot should be + /// published instead. Returns the frame slice ready to write. + fn delta(&mut self, value: &Value, snapshot_len: usize) -> Result> { let ratio = self.config.delta_ratio; if ratio == 0 { return Ok(None); } - let Some(last) = &self.last else { - return Ok(None); - }; if self.group.is_none() || self.group_frames >= MAX_DELTA_FRAMES { return Ok(None); } - let diff = diff(last, value); - if diff.forced_snapshot { - return Ok(None); - } - - let delta = serde_json::to_vec(&diff.patch)?; + let patch = { + let Some(last) = &self.last else { + return Ok(None); + }; + let diff = diff(last, value); + if diff.forced_snapshot { + return Ok(None); + } + serde_json::to_vec(&diff.patch)? + }; - // Roll a snapshot once the deltas would outgrow the budget (snapshot frame excluded). - let projected = self.delta_bytes + delta.len() as u64; - if projected > ratio as u64 * snapshot_len as u64 { - return Ok(None); + match self.encoder.as_mut() { + // Compressed: measure the delta's *compressed* slice size (the real wire cost) against the + // group's anchoring snapshot, also compressed. Encoding advances the per-group window; if + // the delta doesn't fit we roll a new group with a fresh encoder, discarding this slice + // (the abandoned window has no effect on the new group). + Some(encoder) => { + let slice = encoder.frame(&patch); + let projected = self.delta_bytes + slice.len() as u64; + if projected > ratio as u64 * self.snapshot_len { + return Ok(None); + } + Ok(Some(slice)) + } + // Uncompressed: raw delta bytes against a fresh snapshot of the current value. + None => { + let projected = self.delta_bytes + patch.len() as u64; + if projected > ratio as u64 * snapshot_len as u64 { + return Ok(None); + } + Ok(Some(Bytes::from(patch))) + } } - - Ok(Some(delta)) } /// Start a new group with a full snapshot as its first frame. @@ -273,15 +338,28 @@ impl Inner { } let mut group = self.track.append_group()?; - group.write_frame(snapshot)?; + + // Open a fresh per-group encoder (cold window) and compress the snapshot as frame 0, recording + // its wire size as the delta anchor. + let (slice, encoder) = if self.config.compression { + let mut encoder = Encoder::new(); + let slice = encoder.frame(&snapshot); + (slice, Some(encoder)) + } else { + (Bytes::from(snapshot), None) + }; + self.snapshot_len = slice.len() as u64; + group.write_frame(slice)?; self.delta_bytes = 0; self.group_frames = 1; + self.encoder = encoder; if self.config.delta_ratio != 0 { - // Keep the group open so future deltas can be appended. + // Keep the group (and its encoder) open so future deltas can be appended. self.group = Some(group); } else { // Deltas disabled: one frame per group, identical to a plain JSON track. + self.encoder = None; group.finish()?; } @@ -301,6 +379,13 @@ impl Inner { pub struct Consumer { track: moq_net::TrackConsumer, group: Option, + // Whether frames are DEFLATE-compressed, matching the producer's [`Config::compression`]. + compressed: bool, + // Per-group DEFLATE decoder, built lazily on the first compressed frame of a group. + decoder: Option, + // Compressed slices read so far in the current group, in order. Lets a cloned consumer rebuild + // the (non-cloneable) decoder window by replaying them. Empty when uncompressed. + group_slices: Vec, current: Option, frames_read: usize, _marker: PhantomData T>, @@ -313,6 +398,11 @@ impl Clone for Consumer { Self { track: self.track.clone(), group: self.group.clone(), + compressed: self.compressed, + // A DEFLATE decoder can't be cloned (per-group window state), so the clone starts without + // one and rebuilds it from `group_slices` on its next compressed read. + decoder: None, + group_slices: self.group_slices.clone(), current: self.current.clone(), frames_read: self.frames_read, _marker: PhantomData, @@ -322,10 +412,16 @@ impl Clone for Consumer { impl Consumer { /// Create a consumer reading from the given track subscriber. - pub fn new(track: moq_net::TrackConsumer) -> Self { + /// + /// Set [`ConsumerConfig::compression`] to read a track written by a producer with + /// [`ProducerConfig::compression`] on. + pub fn new(track: moq_net::TrackConsumer, config: ConsumerConfig) -> Self { Self { track, group: None, + compressed: config.compression, + decoder: None, + group_slices: Vec::new(), current: None, frames_read: 0, _marker: PhantomData, @@ -352,6 +448,9 @@ impl Consumer { self.group = Some(group); self.current = None; self.frames_read = 0; + // Each group is its own compressed stream, so reset the decoder state. + self.decoder = None; + self.group_slices.clear(); } Poll::Ready(None) => break true, Poll::Pending => break false, @@ -374,8 +473,32 @@ impl Consumer { } } + /// Decompress a frame slice, or pass it through when the track is uncompressed. + /// + /// The per-group decoder is built lazily on the first compressed frame. A cloned consumer starts + /// without a decoder, so the first call replays the group's already-read slices to rebuild the + /// (non-cloneable) DEFLATE window before decoding the new frame. + fn decode(&mut self, slice: Bytes) -> Result { + if !self.compressed { + return Ok(slice); + } + + if self.decoder.is_none() { + let mut decoder = Decoder::new(); + for prev in &self.group_slices { + decoder.frame(prev)?; + } + self.decoder = Some(decoder); + } + + let plain = self.decoder.as_mut().unwrap().frame(&slice)?; + self.group_slices.push(slice); + Ok(plain) + } + /// Apply one frame: frame 0 of a group is a snapshot, the rest are merge patches. - fn apply(&mut self, frame: bytes::Bytes) -> Result { + fn apply(&mut self, frame: Bytes) -> Result { + let frame = self.decode(frame)?; if self.frames_read == 0 { self.current = Some(serde_json::from_slice(&frame)?); } else { @@ -398,15 +521,40 @@ mod test { use super::*; use serde_json::json; - fn producer(config: Config) -> (Producer, moq_net::TrackConsumer) { + /// An uncompressed config with the given delta ratio. + fn cfg(delta_ratio: u32) -> ProducerConfig { + ProducerConfig { + delta_ratio, + ..Default::default() + } + } + + /// A DEFLATE-compressed config with the given delta ratio. + fn cfg_deflate(delta_ratio: u32) -> ProducerConfig { + ProducerConfig { + delta_ratio, + compression: true, + } + } + + /// A consumer reading compressed frames. + fn deflate_consumer(track: moq_net::TrackConsumer) -> Consumer { + Consumer::new(track, ConsumerConfig { compression: true }) + } + + fn producer(config: ProducerConfig) -> (Producer, moq_net::TrackConsumer) { let track = moq_net::Track::new("test").produce(); let consumer = track.consume(); (Producer::new(track, config), consumer) } - /// Drain every value currently available from a consumer without blocking. + /// Drain every value currently available from a plaintext consumer without blocking. fn drain(track: moq_net::TrackConsumer) -> Vec { - let mut consumer = Consumer::::new(track); + drain_with(Consumer::::new(track, ConsumerConfig::default())) + } + + /// Drain every value currently available from an already-built consumer without blocking. + fn drain_with(mut consumer: Consumer) -> Vec { let waiter = kio::Waiter::noop(); let mut out = Vec::new(); while let Poll::Ready(Ok(Some(value))) = consumer.poll_next(&waiter) { @@ -417,7 +565,7 @@ mod test { #[test] fn deltas_off_snapshot_per_group() { - let (mut producer, track) = producer(Config { delta_ratio: 0 }); + let (mut producer, track) = producer(cfg(0)); producer.update(&json!({ "a": 1 })).unwrap(); producer.update(&json!({ "a": 2 })).unwrap(); producer.finish().unwrap(); @@ -430,8 +578,8 @@ mod test { #[test] fn live_consumer_sees_each_update() { - let (mut producer, track) = producer(Config::default()); - let mut consumer = Consumer::::new(track); + let (mut producer, track) = producer(ProducerConfig::default()); + let mut consumer = Consumer::::new(track, ConsumerConfig::default()); let waiter = kio::Waiter::noop(); for n in 1..=3 { @@ -445,7 +593,7 @@ mod test { #[test] fn unchanged_value_writes_nothing() { - let (mut producer, track) = producer(Config::default()); + let (mut producer, track) = producer(ProducerConfig::default()); producer.update(&json!({ "a": 1 })).unwrap(); producer.update(&json!({ "a": 1 })).unwrap(); producer.finish().unwrap(); @@ -456,7 +604,7 @@ mod test { #[test] fn deltas_share_one_group() { - let config = Config { delta_ratio: 100 }; + let config = cfg(100); let (mut producer, track) = producer(config); producer.update(&json!({ "a": 1, "b": 1 })).unwrap(); producer.update(&json!({ "a": 1, "b": 2 })).unwrap(); @@ -474,7 +622,7 @@ mod test { // A ratio of 1 admits deltas only up to the snapshot size: with equal 7-byte frames that is a // single delta per group, so it rolls every other update. (Still distinct from 0, which would // disable deltas entirely and never produce the group-0 delta below.) - let config = Config { delta_ratio: 1 }; + let config = cfg(1); let (mut producer, track) = producer(config); producer.update(&json!({ "a": 1 })).unwrap(); // snapshot, group 0 producer.update(&json!({ "a": 2 })).unwrap(); // delta, group 0 @@ -491,7 +639,7 @@ mod test { // snapshot size. Single-digit values keep every frame at a constant 7 bytes (`{"n":N}`), so a // `ratio = 8` admits 8 deltas (8x the snapshot, the inclusive limit) on top of the snapshot // before the 9th delta rolls. - let config = Config { delta_ratio: 8 }; + let config = cfg(8); let (mut producer, track) = producer(config); for n in 0..=9 { producer.update(&json!({ "n": n })).unwrap(); @@ -505,7 +653,7 @@ mod test { #[test] fn array_change_is_delta() { - let config = Config { delta_ratio: 100 }; + let config = cfg(100); let (mut producer, track) = producer(config); producer.update(&json!({ "list": [1, 2] })).unwrap(); producer.update(&json!({ "list": [1, 2, 3] })).unwrap(); @@ -518,7 +666,7 @@ mod test { #[test] fn frame_cap_rolls_snapshot() { - let config = Config { delta_ratio: 1_000_000 }; + let config = cfg(1_000_000); let (mut producer, track) = producer(config); // First update is the snapshot (frame 0); then MAX_DELTA_FRAMES - 1 deltas fill the group. for i in 0..=MAX_DELTA_FRAMES { @@ -533,7 +681,7 @@ mod test { #[test] fn late_joiner_reconstructs_from_deltas() { - let config = Config { delta_ratio: 100 }; + let config = cfg(100); let (mut producer, track) = producer(config); producer.update(&json!({ "a": 1, "b": 1 })).unwrap(); producer.update(&json!({ "a": 1, "b": 2 })).unwrap(); @@ -557,7 +705,7 @@ mod test { let track = moq_net::Track::new("test").produce(); let consumer = track.consume(); - let mut producer = Producer::::new(track, Config::default()); + let mut producer = Producer::::new(track, ProducerConfig::default()); // First owner sets its field. producer.lock().video = Some("v1".to_string()); @@ -570,7 +718,7 @@ mod test { producer.finish().unwrap(); - let mut consumer = Consumer::::new(consumer); + let mut consumer = Consumer::::new(consumer, ConsumerConfig::default()); let waiter = kio::Waiter::noop(); let mut last = None; while let Poll::Ready(Ok(Some(value))) = consumer.poll_next(&waiter) { @@ -588,10 +736,10 @@ mod test { #[test] fn newer_group_supersedes_in_progress_reconstruction() { // A tight ratio lets one delta fit, then forces the next update into a new snapshot group. - let config = Config { delta_ratio: 1 }; + let config = cfg(1); let (mut producer, track) = producer(config); let observer = producer.consume(); - let mut consumer = Consumer::::new(track); + let mut consumer = Consumer::::new(track, ConsumerConfig::default()); let waiter = kio::Waiter::noop(); producer.update(&json!({ "a": 1 })).unwrap(); // snapshot, group 0 @@ -616,9 +764,9 @@ mod test { #[test] fn cloned_consumer_reconstructs_independently() { // Deltas share one group, so a clone taken mid-group carries in-progress reconstruction state. - let config = Config { delta_ratio: 100 }; + let config = cfg(100); let (mut producer, track) = producer(config); - let mut consumer = Consumer::::new(track); + let mut consumer = Consumer::::new(track, ConsumerConfig::default()); let waiter = kio::Waiter::noop(); producer.update(&json!({ "a": 1, "b": 1 })).unwrap(); // snapshot, group 0 @@ -642,4 +790,132 @@ mod test { } } } + + #[test] + fn compressed_snapshot_per_group_roundtrips() { + let (mut producer, track) = producer(cfg_deflate(0)); + producer.update(&json!({ "a": 1 })).unwrap(); + producer.update(&json!({ "a": 2 })).unwrap(); + producer.finish().unwrap(); + + // Deltas disabled: one compressed snapshot per group, latest reconstructs identically. + assert_eq!(track.latest(), Some(1)); + let values = drain_with(deflate_consumer(track)); + assert_eq!(values, vec![json!({ "a": 2 })]); + } + + #[test] + fn compressed_deltas_share_one_group() { + let (mut producer, track) = producer(cfg_deflate(100)); + producer.update(&json!({ "a": 1, "b": 1 })).unwrap(); + producer.update(&json!({ "a": 1, "b": 2 })).unwrap(); + producer.update(&json!({ "a": 1, "b": 3 })).unwrap(); + producer.finish().unwrap(); + + // Snapshot + deltas in one group, each frame decompressed independently. + assert_eq!(track.latest(), Some(0)); + let values = drain_with(deflate_consumer(track)); + assert_eq!(values.last().unwrap(), &json!({ "a": 1, "b": 3 })); + } + + #[test] + fn compressed_late_joiner_reconstructs_from_deltas() { + let (mut producer, track) = producer(cfg_deflate(100)); + producer.update(&json!({ "a": 1, "b": 1 })).unwrap(); + producer.update(&json!({ "a": 1, "b": 2 })).unwrap(); + producer.update(&json!({ "a": 5, "b": 2 })).unwrap(); + producer.finish().unwrap(); + + // A consumer created only now rebuilds the final value from the compressed snapshot + deltas. + let values = drain_with(deflate_consumer(track)); + assert_eq!(values.last().unwrap(), &json!({ "a": 5, "b": 2 })); + } + + #[test] + fn compressed_cloned_consumer_reconstructs_mid_group() { + // A clone taken mid-group has no decoder window; it must rebuild from the retained slices. + let (mut producer, track) = producer(cfg_deflate(100)); + let mut consumer = deflate_consumer(track); + let waiter = kio::Waiter::noop(); + + producer.update(&json!({ "a": 1, "b": 1 })).unwrap(); // compressed snapshot, group 0 + match consumer.poll_next(&waiter) { + Poll::Ready(Ok(Some(value))) => assert_eq!(value, json!({ "a": 1, "b": 1 })), + other => panic!("expected snapshot, got {other:?}"), + } + + let mut clone = consumer.clone(); + + producer.update(&json!({ "a": 1, "b": 2 })).unwrap(); // compressed delta, group 0 + producer.finish().unwrap(); + + let expected = json!({ "a": 1, "b": 2 }); + for consumer in [&mut consumer, &mut clone] { + match consumer.poll_next(&waiter) { + Poll::Ready(Ok(Some(value))) => assert_eq!(value, expected), + other => panic!("expected delta, got {other:?}"), + } + } + } + + #[test] + fn compression_shrinks_wire_frames() { + // A repetitive payload should serialize to fewer wire bytes compressed than plaintext. + let value = json!({ "renditions": ["video".repeat(50), "video".repeat(50), "video".repeat(50)] }); + + let plaintext_bytes = wire_frame_len(cfg(0), &value); + let compressed_bytes = wire_frame_len(cfg_deflate(0), &value); + assert!( + compressed_bytes < plaintext_bytes, + "compressed frame {compressed_bytes} should be smaller than plaintext {plaintext_bytes}" + ); + } + + #[test] + fn compressed_deltas_reuse_window() { + // The shared per-group window is the whole point: a delta that restates content already in + // the snapshot compresses to far fewer bytes than the raw patch. + let (mut producer, mut track) = producer(cfg_deflate(100)); + let phrase = "Media over QUIC delivers real-time latency at massive scale"; + producer.update(&json!({ "note": phrase })).unwrap(); + producer.update(&json!({ "note": phrase, "echo": phrase })).unwrap(); + producer.finish().unwrap(); + + // Both frames land in group 0; read the delta (frame 1) verbatim. + let waiter = kio::Waiter::noop(); + let Poll::Ready(Ok(Some(mut group))) = track.poll_next_group(&waiter) else { + panic!("expected a group"); + }; + let mut frames = Vec::new(); + while let Poll::Ready(Ok(Some(frame))) = group.poll_read_frame(&waiter) { + frames.push(frame); + } + assert_eq!(frames.len(), 2, "snapshot + one delta in a single group"); + + // The raw patch repeats the whole phrase; compressed against the window it's a fraction. + let raw_delta = serde_json::to_vec(&json!({ "echo": phrase })).unwrap(); + assert!( + frames[1].len() < raw_delta.len() / 2, + "windowed delta {} should be far below the raw patch {}", + frames[1].len(), + raw_delta.len() + ); + } + + /// Publish a single value and return the byte length of the resulting (frame 0) wire frame. + fn wire_frame_len(config: ProducerConfig, value: &Value) -> usize { + let (mut producer, mut track) = producer(config); + producer.update(value).unwrap(); + producer.finish().unwrap(); + + let waiter = kio::Waiter::noop(); + let Poll::Ready(Ok(Some(mut group))) = track.poll_next_group(&waiter) else { + panic!("expected a group"); + }; + // Read the stored (possibly compressed) frame bytes verbatim, without reconstructing JSON. + let Poll::Ready(Ok(Some(frame))) = group.poll_read_frame(&waiter) else { + panic!("expected a frame"); + }; + frame.len() + } } diff --git a/rs/moq-mux/src/catalog/hang/consumer.rs b/rs/moq-mux/src/catalog/hang/consumer.rs index 68951178d..38ca736ad 100644 --- a/rs/moq-mux/src/catalog/hang/consumer.rs +++ b/rs/moq-mux/src/catalog/hang/consumer.rs @@ -27,7 +27,7 @@ impl Consumer { /// Create a new catalog consumer from a MoQ track consumer. pub fn new(track: moq_net::TrackConsumer) -> Self { Self { - inner: moq_json::Consumer::new(track), + inner: moq_json::Consumer::new(track, moq_json::ConsumerConfig::default()), } } diff --git a/rs/moq-mux/src/catalog/producer.rs b/rs/moq-mux/src/catalog/producer.rs index 9ee7e69c8..ba47ef859 100644 --- a/rs/moq-mux/src/catalog/producer.rs +++ b/rs/moq-mux/src/catalog/producer.rs @@ -75,7 +75,9 @@ impl Producer { let msf_track = broadcast.create_track(moq_net::Track::new(moq_msf::DEFAULT_NAME))?; // Disable deltas for now to stay byte-compatible with consumers that only read snapshots. - let hang = moq_json::Producer::new(hang_track, moq_json::Config { delta_ratio: 0 }); + let mut json_config = moq_json::ProducerConfig::default(); + json_config.delta_ratio = 0; + let hang = moq_json::Producer::new(hang_track, json_config); Ok(Self { hang,