Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ resolver = "2"
rust-version = "1.85"

[workspace.dependencies]
flate2 = "1"
hang = { version = "0.19", path = "rs/hang" }
kio = { version = "0.4", path = "rs/kio" }
moq-audio = { version = "0.0.5", path = "rs/moq-audio" }
Expand Down
112 changes: 112 additions & 0 deletions js/json/src/compression.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import { expect, test } from "bun:test";
import { Track } from "@moq/net";
import { deflate, inflate } from "./compression.ts";
import { Consumer } from "./consumer.ts";
import { Producer } from "./producer.ts";

type Value = Record<string, unknown>;

const enc = new TextEncoder();
const dec = new TextDecoder();

// Reconstruct every value a compressed consumer yields, in order.
async function drainCompressed(track: Track): Promise<Value[]> {
const out: Value[] = [];
for await (const value of new Consumer<Value>(track, { compression: true })) out.push(value);
return out;
}

// The raw (stored) bytes of a track's first frame, without reconstructing JSON.
async function firstFrame(track: Track): Promise<Uint8Array> {
const group = await track.nextGroupOrdered();
if (!group) throw new Error("expected a group");
const frame = await group.readFrame();
if (!frame) throw new Error("expected a frame");
return frame;
}

test("codec round-trips a frame", async () => {
const payload = enc.encode("the quick brown fox");
expect(dec.decode(await inflate(await deflate(payload)))).toBe("the quick brown fox");
});

test("codec round-trips an empty frame", async () => {
expect((await deflate(new Uint8Array())).length).toBe(0);
expect((await inflate(new Uint8Array())).length).toBe(0);
});

test("codec rejects garbage", async () => {
await expect(inflate(new Uint8Array(64).fill(0xff))).rejects.toThrow();
});

test("compressed snapshot per group round-trips", async () => {
const track = new Track("test");
const producer = new Producer<Value>(track, { deltaRatio: 0, compression: true });
producer.update({ a: 1 });
producer.update({ a: 2 });
producer.finish();

// Deltas off: one compressed snapshot per group, reconstructed in order.
expect(await drainCompressed(track)).toEqual([{ a: 1 }, { a: 2 }]);
});

test("compressed live consumer sees each update in order", async () => {
// Compression makes writes async, so this exercises that the per-frame deflate pipeline still
// delivers frames (and groups) strictly in order.
const track = new Track("test");
const producer = new Producer<Value>(track, { deltaRatio: 100, compression: true });
const consumer = new Consumer<Value>(track, { compression: true });

for (let n = 1; n <= 5; n++) {
producer.update({ a: n });
expect(await consumer.next()).toEqual({ a: n });
}
});

test("compressed deltas share one group and reconstruct", async () => {
const track = new Track("test");
const producer = new Producer<Value>(track, { deltaRatio: 100, compression: true });
producer.update({ a: 1, b: 1 });
producer.update({ a: 1, b: 2 });
producer.update({ a: 5, b: 2 });
producer.finish();

expect((await drainCompressed(track)).at(-1)).toEqual({ a: 5, b: 2 });
});

test("compressed late joiner reconstructs from snapshot + deltas", async () => {
const track = new Track("test");
const producer = new Producer<Value>(track, { deltaRatio: 100, compression: true });
producer.update({ a: 1, b: 1 });
producer.update({ a: 1, b: 2 });
producer.update({ a: 5, b: 2 });
producer.finish();

// A consumer created only now still rebuilds the final value.
expect((await drainCompressed(track)).at(-1)).toEqual({ a: 5, b: 2 });
});

test("each compressed frame is valid standalone deflate-raw", async () => {
// The frame the producer stored should decode on its own back to the original snapshot, which
// is what keeps it interoperable with the Rust producer's per-frame format.
const track = new Track("test");
const producer = new Producer<Value>(track, { deltaRatio: 0, compression: true });
producer.update({ hello: "world" });
producer.finish();

const frame = await firstFrame(track);
expect(JSON.parse(dec.decode(await inflate(frame)))).toEqual({ hello: "world" });
});

test("compression shrinks a repetitive frame", async () => {
const value = { renditions: Array(3).fill("video".repeat(50)) };

const plain = new Track("plain");
new Producer<Value>(plain, { deltaRatio: 0 }).update(value);
const compressed = new Track("compressed");
new Producer<Value>(compressed, { deltaRatio: 0, compression: true }).update(value);

const plainLen = (await firstFrame(plain)).length;
const compressedLen = (await firstFrame(compressed)).length;
expect(compressedLen).toBeLessThan(plainLen);
});
76 changes: 76 additions & 0 deletions js/json/src/compression.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Per-frame DEFLATE compression for the JSON frame stream, built on the platform
* {@link https://developer.mozilla.org/en-US/docs/Web/API/Compression_Streams_API | Compression Streams API}.
*
* Each frame is compressed on its own as a raw DEFLATE ([RFC 1951](https://www.rfc-editor.org/rfc/rfc1951.html))
* blob (`deflate-raw`), the same format the Rust `moq-json` producer writes, so the two
* interoperate on the wire. There is no cross-frame context, so snapshots and large frames shrink
* well while tiny deltas barely benefit. The browser API exposes no level or dictionary knobs, so
* compression is a plain on/off toggle.
*
* @module
*/

// Maximum decompressed size of a single frame. A malicious publisher could otherwise send a tiny
// slice that inflates hugely, so {@link inflate} stops rather than allocating without limit.
// Mirrors the Rust `MAX_DECOMPRESSED_FRAME`.
const MAX_DECOMPRESSED_FRAME = 64 * 1024 * 1024;

/** Compress one frame payload into a standalone `deflate-raw` blob. Empty in yields empty out. */
export async function deflate(payload: Uint8Array): Promise<Uint8Array> {
if (payload.length === 0) return payload;
const cs = new CompressionStream("deflate-raw");
return pump(cs, payload);
}

/**
* Decompress one `deflate-raw` frame back into its payload. Empty in yields empty out.
*
* Throws if the input is malformed or inflates past the per-frame size limit.
*/
export async function inflate(slice: Uint8Array): Promise<Uint8Array> {
if (slice.length === 0) return slice;
const ds = new DecompressionStream("deflate-raw");
return pump(ds, slice, MAX_DECOMPRESSED_FRAME);
}

// Drive a (de)compression stream end-to-end: feed it `input`, read every output chunk, and
// concatenate. Reads concurrently with writing to avoid the transform's backpressure deadlock.
async function pump(
transform: CompressionStream | DecompressionStream,
input: Uint8Array,
limit = Number.POSITIVE_INFINITY,
): Promise<Uint8Array> {
const writer = transform.writable.getWriter();
// The same error surfaces from the reader below, so swallow the writer's copy to avoid an
// unhandled rejection on malformed input. The cast narrows `ArrayBufferLike` to `ArrayBuffer`:
// our inputs are never SharedArrayBuffer-backed, which is all the DOM `BufferSource` type wants.
const written = (async () => {
await writer.write(input as Uint8Array<ArrayBuffer>);
await writer.close();
})().catch(() => {});

const reader = transform.readable.getReader();
const chunks: Uint8Array[] = [];
let total = 0;
for (;;) {
const { value, done } = await reader.read();
if (done) break;
total += value.length;
if (total > limit) {
await reader.cancel();
throw new Error(`decompressed frame exceeded ${limit} bytes`);
}
chunks.push(value);
}
await written;

if (chunks.length === 1) return chunks[0];
const out = new Uint8Array(total);
let offset = 0;
for (const chunk of chunks) {
out.set(chunk, offset);
offset += chunk.length;
}
return out;
}
14 changes: 10 additions & 4 deletions js/json/src/consumer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type * as Moq from "@moq/net";
import type * as z from "zod/mini";
import { inflate } from "./compression.ts";
import { merge } from "./diff.ts";
import type { Config } from "./producer.ts";

Expand All @@ -12,6 +13,8 @@ import type { Config } from "./producer.ts";
export class Consumer<T> {
#track: Moq.Track;
#schema?: z.ZodMiniType<T>;
// Whether frames are `deflate-raw` compressed. Must match the producer's {@link Config.compression}.
#decompress: boolean;

#group?: Moq.Group;
#current?: unknown;
Expand All @@ -20,6 +23,7 @@ export class Consumer<T> {
constructor(track: Moq.Track, config: Config<T> = {}) {
this.#track = track;
this.#schema = config.schema;
this.#decompress = config.compression ?? false;
}

/** Get the next reconstructed value, or `undefined` once the track ends. */
Expand All @@ -40,7 +44,7 @@ export class Consumer<T> {
continue;
}

return this.#apply(frame);
return await this.#apply(frame);
}
}

Expand All @@ -52,9 +56,11 @@ export class Consumer<T> {
}
}

// Frame 0 of a group is a snapshot, the rest are merge patches.
#apply(frame: Uint8Array): T {
const parsed = JSON.parse(new TextDecoder().decode(frame));
// Frame 0 of a group is a snapshot, the rest are merge patches. Each frame is its own DEFLATE
// blob when compressed, so decoding needs no per-group state.
async #apply(frame: Uint8Array): Promise<T> {
const payload = this.#decompress ? await inflate(frame) : frame;
const parsed = JSON.parse(new TextDecoder().decode(payload));
if (this.#framesRead === 0) {
this.#current = parsed;
} else {
Expand Down
Loading