Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions js/net/src/compression.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
import { expect, test } from "bun:test";
import { Compression, compress, compressionFromCode, decompress } from "./compression.ts";

test("compression: none is a no-op", async () => {
const data = new TextEncoder().encode("the quick brown fox");
expect(await compress(Compression.None, data)).toEqual(data);
expect(await decompress(Compression.None, data)).toEqual(data);
});

test("compression: deflate round-trips and shrinks repetitive data", async () => {
const data = new Uint8Array(4096).fill(0x61); // "aaaa..." — highly compressible
const packed = await compress(Compression.Deflate, data);
Expand All @@ -30,7 +24,7 @@ test("compression: decompress enforces the max size", async () => {
});

test("compression: wire-code round-trip", () => {
expect(compressionFromCode(0)).toBe(Compression.None);
expect(compressionFromCode(0)).toBeUndefined();
expect(compressionFromCode(1)).toBe(Compression.Deflate);
expect(() => compressionFromCode(99)).toThrow();
});
32 changes: 18 additions & 14 deletions js/net/src/compression.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
/**
* Per-frame payload compression for moq-lite-05.
*
* A publisher marks a {@link Track} with `compress = true` when its frames are
* worth compressing (e.g. a JSON catalog). The concrete codec is negotiated in
* SUBSCRIBE_OK, and every frame is compressed independently so the codec never
* carries state across the lossy, out-of-order group boundary.
* A publisher marks a {@link Track} with `compress = true` to hint its frames are
* worth compressing (e.g. a JSON catalog). The algorithm is negotiated per hop (the
* SETUP `Compression` parameter) and named per frame, so a frame can opt out when
* compression wouldn't shrink it; each frame is compressed independently so the codec
* never carries state across the lossy, out-of-order group boundary.
*/

// Mirrors the Rust MAX_FRAME_SIZE cap on the receive path: reject anything that
// inflates past this so a malicious peer can't zip-bomb the receiver.
const MAX_FRAME_SIZE = 16 * 1024 * 1024;

/** The codec used to (de)compress frame payloads, negotiated per subscription. */
/**
* A frame-payload compression codec. "No compression" (verbatim) is the absence of
* a codec, modeled as `undefined` rather than a member here, so a negotiated
* algorithm list can't list it and "compress with nothing" can't be expressed.
*/
export const Compression = {
/** Frames are sent verbatim. */
None: 0,
/**
* Raw DEFLATE (RFC 1951), no zlib/gzip header. Matches the browser's
* "deflate-raw" format and the Rust side's `flate2` raw deflate. QUIC already
Expand All @@ -25,11 +28,11 @@ export const Compression = {

export type Compression = (typeof Compression)[keyof typeof Compression];

/** Parse a wire varint code, throwing on an unknown codec. */
export function compressionFromCode(code: number): Compression {
/** Parse a wire varint code into a codec, or `undefined` for verbatim (code `0`). Throws on an unknown codec. */
export function compressionFromCode(code: number): Compression | undefined {
switch (code) {
case Compression.None:
return Compression.None;
case 0:
return undefined;
case Compression.Deflate:
return Compression.Deflate;
default:
Expand Down Expand Up @@ -95,9 +98,11 @@ async function pump(transform: CompressionStream | DecompressionStream, data: Ui
return concat(chunks, total);
}

/** Compress a whole frame payload. {@link Compression.None} returns the input unchanged. */
/**
* Compress a whole frame payload with this codec. Verbatim transfer is the absence
* of a codec (`undefined`), handled by the caller, not here.
*/
export async function compress(codec: Compression, data: Uint8Array): Promise<Uint8Array> {
if (codec === Compression.None) return data;
return pump(new CompressionStream(format(codec)), data, Number.POSITIVE_INFINITY);
}

Expand All @@ -106,6 +111,5 @@ export async function compress(codec: Compression, data: Uint8Array): Promise<Ui
* `maxSize` (default {@link MAX_FRAME_SIZE}).
*/
export async function decompress(codec: Compression, data: Uint8Array, maxSize = MAX_FRAME_SIZE): Promise<Uint8Array> {
if (codec === Compression.None) return data;
return pump(new DecompressionStream(format(codec)), data, maxSize);
}
6 changes: 4 additions & 2 deletions js/net/src/lite/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Signal } from "@moq/signals";
import type { Announced } from "../announced.ts";
import { type Bandwidth, createBandwidth } from "../bandwidth.ts";
import type { Broadcast } from "../broadcast.ts";
import { Compression } from "../compression.ts";
import type { Established } from "../connection/established.ts";
import * as Path from "../path.ts";
import { type Reader, Readers, Stream, Writer } from "../stream.ts";
Expand Down Expand Up @@ -97,7 +98,7 @@ export class Connection implements Established {
this.rtt = new Signal<Time.Milli | undefined>(undefined);

this.origin = randomOrigin();
this.#publisher = new Publisher(this.#quic, this.#version, this.origin);
this.#publisher = new Publisher(this.#quic, this.#version, this.origin, this.#peerSetup);
this.#subscriber = new Subscriber(
this.#quic,
this.#version,
Expand Down Expand Up @@ -184,7 +185,8 @@ export class Connection implements Established {
const writer = await Writer.open(this.#quic);
try {
await writer.u8(DataType.Setup);
await new Setup(ProbeLevel.Report).encode(writer, this.#version);
// We can inflate DEFLATE, so a peer may compress what it sends us.
await new Setup(ProbeLevel.Report, undefined, [Compression.Deflate]).encode(writer, this.#version);
writer.close();
} catch (err: unknown) {
writer.reset(err);
Expand Down
77 changes: 60 additions & 17 deletions js/net/src/lite/publisher.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { type Dispose, Signal } from "@moq/signals";
import type { Broadcast } from "../broadcast.ts";
import { Compression, compress } from "../compression.ts";
import { Compression, compress as compressPayload } from "../compression.ts";
import type { Group } from "../group.ts";
import * as Path from "../path.ts";
import { type Stream, Writer } from "../stream.ts";
Expand All @@ -10,6 +10,7 @@ import { AnnounceBroadcast, AnnounceInit, AnnounceOk, type AnnounceRequest, epoc
import { Group as GroupMessage } from "./group.ts";
import type { Origin } from "./origin.ts";
import { Probe } from "./probe.ts";
import type { Setup } from "./setup.ts";
import {
encodeSubscribeResponse,
type Subscribe,
Expand Down Expand Up @@ -73,18 +74,37 @@ export class Publisher {
// `broadcast\0track`. A rejected lookup is evicted so a retry can re-probe.
#trackInfo = new Map<string, Promise<TrackInfoMessage>>();

// The peer's SETUP, recorded by the connection once its Setup stream is read.
// Consulted before compressing a track's egress: we may only use an algorithm
// the peer advertised it can decompress. `undefined` until it arrives.
#peerSetup?: Signal<Setup | undefined>;

/**
* Creates a new Publisher instance.
* @param quic - The WebTransport session to use
* @param version - Negotiated protocol version
* @param origin - Origin id shared with the Subscriber
* @param peerSetup - Slot for the peer's SETUP, for compression negotiation (lite-05+)
*
* @internal
*/
constructor(quic: WebTransport, version: Version, origin: Origin) {
constructor(quic: WebTransport, version: Version, origin: Origin, peerSetup?: Signal<Setup | undefined>) {
this.#quic = quic;
this.version = version;
this.origin = origin;
this.#peerSetup = peerSetup;
}

// Await the algorithms the peer can decompress, blocking until its SETUP arrives.
// We MUST NOT compress with an algorithm the peer didn't advertise. An empty list
// (no slot, or no Compression parameter) means everything must be sent verbatim.
async #peerCompression(): Promise<Compression[]> {
if (!this.#peerSetup) return [];
let setup = this.#peerSetup.peek();
while (setup === undefined) {
setup = await this.#peerSetup.next();
}
return setup.compression;
}

/**
Expand Down Expand Up @@ -233,20 +253,24 @@ export class Publisher {
const track = broadcast.subscribe(msg.track, msg.priority);

try {
let compression: Compression = Compression.None;
let compress = false;
let peerDeflate = false;

if (supportsTrackStream(this.version)) {
// Lite-05+ accepts implicitly: no SUBSCRIBE_OK (the immutable
// properties live in TRACK_INFO), and the resolved range arrives as
// SUBSCRIBE_START / SUBSCRIBE_END emitted from #runTrack.
//
// The frame codec is one of those immutable properties, so serving
// MUST use exactly what TRACK_INFO advertised. Both come from the
// producer's accept(), so they always agree. Awaiting info() also
// surfaces a rejected track (accept never called, track closed) as an
// error here, which resets the stream.
// The compress hint is one of those immutable properties; it gates the
// per-frame Compression field. Awaiting info() also surfaces a rejected
// track (accept never called, track closed) as an error here, which
// resets the stream. Whether we may actually use DEFLATE is the per-hop
// SETUP negotiation; only wait on the peer's SETUP for a hinted track.
const info = await track.info();
compression = info.compress ? Compression.Deflate : Compression.None;
compress = info.compress;
if (compress) {
peerDeflate = (await this.#peerCompression()).includes(Compression.Deflate);
}
} else {
// Older drafts acknowledge with SUBSCRIBE_OK and stream frames verbatim.
const ok = new SubscribeOk({ priority: msg.priority });
Expand All @@ -255,7 +279,7 @@ export class Publisher {

console.debug(`publish ok: broadcast=${msg.broadcast} track=${track.name}`);

const serving = this.#runTrack(msg.id, msg.broadcast, track, stream.writer, compression);
const serving = this.#runTrack(msg.id, msg.broadcast, track, stream.writer, compress, peerDeflate);

for (;;) {
const decode = SubscribeUpdate.decodeMaybe(stream.reader, this.version);
Expand Down Expand Up @@ -296,7 +320,8 @@ export class Publisher {
broadcast: Path.Valid,
track: TrackSubscriber,
stream: Writer,
compression: Compression,
compress: boolean,
peerDeflate: boolean,
) {
// Lite-05+ resolves the range on the subscribe stream: SUBSCRIBE_START once the
// first group is known, SUBSCRIBE_END when the track finishes.
Expand All @@ -319,7 +344,7 @@ export class Publisher {
}
lastSequence = group.sequence;

void this.#runGroup(sub, group, compression);
void this.#runGroup(sub, group, compress, peerDeflate);
}

if (emitRange) {
Expand Down Expand Up @@ -376,7 +401,7 @@ export class Publisher {
ordered: info.ordered,
// This implementation doesn't produce per-frame timestamps yet.
timescale: 0,
compression: info.compress ? Compression.Deflate : Compression.None,
compress: info.compress,
});
})();

Expand All @@ -393,7 +418,7 @@ export class Publisher {
*
* @internal
*/
async #runGroup(sub: bigint, group: Group, compression: Compression) {
async #runGroup(sub: bigint, group: Group, compress: boolean, peerDeflate: boolean) {
const msg = new GroupMessage(sub, group.sequence);
try {
const stream = await Writer.open(this.#quic);
Expand All @@ -405,9 +430,27 @@ export class Publisher {
const frame = await Promise.race([group.readFrame(), stream.closed]);
if (!frame) break;

// On a compressed track the wire size is the compressed length;
// the subscriber inflates it back from the SUBSCRIBE_OK codec.
const payload = await compress(compression, frame);
if (!compress) {
// No per-frame Compression field on a non-hinted track.
await stream.u53(frame.byteLength);
await stream.write(frame);
continue;
}

// Compress-hinted track: every frame carries a Compression field naming
// the codec used (`undefined` / wire code 0 = verbatim). Use DEFLATE only
// if the peer can inflate it and it actually shrinks the (non-empty)
// payload; otherwise send verbatim.
let codec: Compression | undefined;
let payload = frame;
if (peerDeflate && frame.byteLength > 0) {
const deflated = await compressPayload(Compression.Deflate, frame);
if (deflated.byteLength < frame.byteLength) {
codec = Compression.Deflate;
payload = deflated;
}
}
await stream.u53(codec ?? 0);
await stream.u53(payload.byteLength);
await stream.write(payload);
}
Expand Down
24 changes: 24 additions & 0 deletions js/net/src/lite/setup.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { expect, test } from "bun:test";
import { Compression } from "../compression.ts";
import { Reader, Writer } from "../stream.ts";
import * as Varint from "../varint.ts";
import { ProbeLevel, Setup } from "./setup.ts";
Expand Down Expand Up @@ -53,6 +54,29 @@ test("SETUP with path round-trips on draft-05", async () => {
expect(got.path).toBe("/room/123");
});

test("SETUP compression parameter round-trips on draft-05", async () => {
const got = await roundTrip(new Setup(ProbeLevel.None, undefined, [Compression.Deflate]));
expect(got.compression).toEqual([Compression.Deflate]);
});

test("SETUP compression decode skips none and unknown", async () => {
// Hand-frame a Compression parameter (id 0x3) listing none(0), deflate(1), unknown(99);
// only deflate survives the decode.
const value = concat([Varint.encode(0), Varint.encode(1), Varint.encode(99)]);
const body = await bytes(async (w) => {
await w.u53(1); // parameter count
await w.u62(0x3n); // PARAM_COMPRESSION
await w.u53(value.byteLength);
await w.write(value);
});
const framed = await bytes(async (w) => {
await w.u53(body.byteLength);
await w.write(body);
});
const got = await Setup.decode(new Reader(undefined, framed), Version.DRAFT_05_WIP);
expect(got.compression).toEqual([Compression.Deflate]);
});

test("unknown probe level saturates to Increase", async () => {
// Hand-frame a SETUP body carrying an unknown probe level (99): a 1-parameter bag
// (PROBE id 0x1) whose value is the varint 99, prefixed with the Message size.
Expand Down
Loading