diff --git a/js/net/src/compression.test.ts b/js/net/src/compression.test.ts index 4e4415e02..964f24c37 100644 --- a/js/net/src/compression.test.ts +++ b/js/net/src/compression.test.ts @@ -1,12 +1,6 @@ import { expect, test } from "bun:test"; import { Compression, compress, compressionFromCode, decompress } from "./compression.ts"; -test("compression: none is a no-op", async () => { - const data = new TextEncoder().encode("the quick brown fox"); - expect(await compress(Compression.None, data)).toEqual(data); - expect(await decompress(Compression.None, data)).toEqual(data); -}); - test("compression: deflate round-trips and shrinks repetitive data", async () => { const data = new Uint8Array(4096).fill(0x61); // "aaaa..." — highly compressible const packed = await compress(Compression.Deflate, data); @@ -30,7 +24,7 @@ test("compression: decompress enforces the max size", async () => { }); test("compression: wire-code round-trip", () => { - expect(compressionFromCode(0)).toBe(Compression.None); + expect(compressionFromCode(0)).toBeUndefined(); expect(compressionFromCode(1)).toBe(Compression.Deflate); expect(() => compressionFromCode(99)).toThrow(); }); diff --git a/js/net/src/compression.ts b/js/net/src/compression.ts index fc90d3b97..49715bbfe 100644 --- a/js/net/src/compression.ts +++ b/js/net/src/compression.ts @@ -1,20 +1,23 @@ /** * Per-frame payload compression for moq-lite-05. * - * A publisher marks a {@link Track} with `compress = true` when its frames are - * worth compressing (e.g. a JSON catalog). The concrete codec is negotiated in - * SUBSCRIBE_OK, and every frame is compressed independently so the codec never - * carries state across the lossy, out-of-order group boundary. + * A publisher marks a {@link Track} with `compress = true` to hint its frames are + * worth compressing (e.g. a JSON catalog). The algorithm is negotiated per hop (the + * SETUP `Compression` parameter) and named per frame, so a frame can opt out when + * compression wouldn't shrink it; each frame is compressed independently so the codec + * never carries state across the lossy, out-of-order group boundary. */ // Mirrors the Rust MAX_FRAME_SIZE cap on the receive path: reject anything that // inflates past this so a malicious peer can't zip-bomb the receiver. const MAX_FRAME_SIZE = 16 * 1024 * 1024; -/** The codec used to (de)compress frame payloads, negotiated per subscription. */ +/** + * A frame-payload compression codec. "No compression" (verbatim) is the absence of + * a codec, modeled as `undefined` rather than a member here, so a negotiated + * algorithm list can't list it and "compress with nothing" can't be expressed. + */ export const Compression = { - /** Frames are sent verbatim. */ - None: 0, /** * Raw DEFLATE (RFC 1951), no zlib/gzip header. Matches the browser's * "deflate-raw" format and the Rust side's `flate2` raw deflate. QUIC already @@ -25,11 +28,11 @@ export const Compression = { export type Compression = (typeof Compression)[keyof typeof Compression]; -/** Parse a wire varint code, throwing on an unknown codec. */ -export function compressionFromCode(code: number): Compression { +/** Parse a wire varint code into a codec, or `undefined` for verbatim (code `0`). Throws on an unknown codec. */ +export function compressionFromCode(code: number): Compression | undefined { switch (code) { - case Compression.None: - return Compression.None; + case 0: + return undefined; case Compression.Deflate: return Compression.Deflate; default: @@ -95,9 +98,11 @@ async function pump(transform: CompressionStream | DecompressionStream, data: Ui return concat(chunks, total); } -/** Compress a whole frame payload. {@link Compression.None} returns the input unchanged. */ +/** + * Compress a whole frame payload with this codec. Verbatim transfer is the absence + * of a codec (`undefined`), handled by the caller, not here. + */ export async function compress(codec: Compression, data: Uint8Array): Promise { - if (codec === Compression.None) return data; return pump(new CompressionStream(format(codec)), data, Number.POSITIVE_INFINITY); } @@ -106,6 +111,5 @@ export async function compress(codec: Compression, data: Uint8Array): Promise { - if (codec === Compression.None) return data; return pump(new DecompressionStream(format(codec)), data, maxSize); } diff --git a/js/net/src/lite/connection.ts b/js/net/src/lite/connection.ts index d50acd8d2..fdeaeae16 100644 --- a/js/net/src/lite/connection.ts +++ b/js/net/src/lite/connection.ts @@ -2,6 +2,7 @@ import { Signal } from "@moq/signals"; import type { Announced } from "../announced.ts"; import { type Bandwidth, createBandwidth } from "../bandwidth.ts"; import type { Broadcast } from "../broadcast.ts"; +import { Compression } from "../compression.ts"; import type { Established } from "../connection/established.ts"; import * as Path from "../path.ts"; import { type Reader, Readers, Stream, Writer } from "../stream.ts"; @@ -97,7 +98,7 @@ export class Connection implements Established { this.rtt = new Signal(undefined); this.origin = randomOrigin(); - this.#publisher = new Publisher(this.#quic, this.#version, this.origin); + this.#publisher = new Publisher(this.#quic, this.#version, this.origin, this.#peerSetup); this.#subscriber = new Subscriber( this.#quic, this.#version, @@ -184,7 +185,8 @@ export class Connection implements Established { const writer = await Writer.open(this.#quic); try { await writer.u8(DataType.Setup); - await new Setup(ProbeLevel.Report).encode(writer, this.#version); + // We can inflate DEFLATE, so a peer may compress what it sends us. + await new Setup(ProbeLevel.Report, undefined, [Compression.Deflate]).encode(writer, this.#version); writer.close(); } catch (err: unknown) { writer.reset(err); diff --git a/js/net/src/lite/publisher.ts b/js/net/src/lite/publisher.ts index f7c4690c8..c4c509b15 100644 --- a/js/net/src/lite/publisher.ts +++ b/js/net/src/lite/publisher.ts @@ -1,6 +1,6 @@ import { type Dispose, Signal } from "@moq/signals"; import type { Broadcast } from "../broadcast.ts"; -import { Compression, compress } from "../compression.ts"; +import { Compression, compress as compressPayload } from "../compression.ts"; import type { Group } from "../group.ts"; import * as Path from "../path.ts"; import { type Stream, Writer } from "../stream.ts"; @@ -10,6 +10,7 @@ import { AnnounceBroadcast, AnnounceInit, AnnounceOk, type AnnounceRequest, epoc import { Group as GroupMessage } from "./group.ts"; import type { Origin } from "./origin.ts"; import { Probe } from "./probe.ts"; +import type { Setup } from "./setup.ts"; import { encodeSubscribeResponse, type Subscribe, @@ -73,18 +74,37 @@ export class Publisher { // `broadcast\0track`. A rejected lookup is evicted so a retry can re-probe. #trackInfo = new Map>(); + // The peer's SETUP, recorded by the connection once its Setup stream is read. + // Consulted before compressing a track's egress: we may only use an algorithm + // the peer advertised it can decompress. `undefined` until it arrives. + #peerSetup?: Signal; + /** * Creates a new Publisher instance. * @param quic - The WebTransport session to use * @param version - Negotiated protocol version * @param origin - Origin id shared with the Subscriber + * @param peerSetup - Slot for the peer's SETUP, for compression negotiation (lite-05+) * * @internal */ - constructor(quic: WebTransport, version: Version, origin: Origin) { + constructor(quic: WebTransport, version: Version, origin: Origin, peerSetup?: Signal) { this.#quic = quic; this.version = version; this.origin = origin; + this.#peerSetup = peerSetup; + } + + // Await the algorithms the peer can decompress, blocking until its SETUP arrives. + // We MUST NOT compress with an algorithm the peer didn't advertise. An empty list + // (no slot, or no Compression parameter) means everything must be sent verbatim. + async #peerCompression(): Promise { + if (!this.#peerSetup) return []; + let setup = this.#peerSetup.peek(); + while (setup === undefined) { + setup = await this.#peerSetup.next(); + } + return setup.compression; } /** @@ -233,20 +253,24 @@ export class Publisher { const track = broadcast.subscribe(msg.track, msg.priority); try { - let compression: Compression = Compression.None; + let compress = false; + let peerDeflate = false; if (supportsTrackStream(this.version)) { // Lite-05+ accepts implicitly: no SUBSCRIBE_OK (the immutable // properties live in TRACK_INFO), and the resolved range arrives as // SUBSCRIBE_START / SUBSCRIBE_END emitted from #runTrack. // - // The frame codec is one of those immutable properties, so serving - // MUST use exactly what TRACK_INFO advertised. Both come from the - // producer's accept(), so they always agree. Awaiting info() also - // surfaces a rejected track (accept never called, track closed) as an - // error here, which resets the stream. + // The compress hint is one of those immutable properties; it gates the + // per-frame Compression field. Awaiting info() also surfaces a rejected + // track (accept never called, track closed) as an error here, which + // resets the stream. Whether we may actually use DEFLATE is the per-hop + // SETUP negotiation; only wait on the peer's SETUP for a hinted track. const info = await track.info(); - compression = info.compress ? Compression.Deflate : Compression.None; + compress = info.compress; + if (compress) { + peerDeflate = (await this.#peerCompression()).includes(Compression.Deflate); + } } else { // Older drafts acknowledge with SUBSCRIBE_OK and stream frames verbatim. const ok = new SubscribeOk({ priority: msg.priority }); @@ -255,7 +279,7 @@ export class Publisher { console.debug(`publish ok: broadcast=${msg.broadcast} track=${track.name}`); - const serving = this.#runTrack(msg.id, msg.broadcast, track, stream.writer, compression); + const serving = this.#runTrack(msg.id, msg.broadcast, track, stream.writer, compress, peerDeflate); for (;;) { const decode = SubscribeUpdate.decodeMaybe(stream.reader, this.version); @@ -296,7 +320,8 @@ export class Publisher { broadcast: Path.Valid, track: TrackSubscriber, stream: Writer, - compression: Compression, + compress: boolean, + peerDeflate: boolean, ) { // Lite-05+ resolves the range on the subscribe stream: SUBSCRIBE_START once the // first group is known, SUBSCRIBE_END when the track finishes. @@ -319,7 +344,7 @@ export class Publisher { } lastSequence = group.sequence; - void this.#runGroup(sub, group, compression); + void this.#runGroup(sub, group, compress, peerDeflate); } if (emitRange) { @@ -376,7 +401,7 @@ export class Publisher { ordered: info.ordered, // This implementation doesn't produce per-frame timestamps yet. timescale: 0, - compression: info.compress ? Compression.Deflate : Compression.None, + compress: info.compress, }); })(); @@ -393,7 +418,7 @@ export class Publisher { * * @internal */ - async #runGroup(sub: bigint, group: Group, compression: Compression) { + async #runGroup(sub: bigint, group: Group, compress: boolean, peerDeflate: boolean) { const msg = new GroupMessage(sub, group.sequence); try { const stream = await Writer.open(this.#quic); @@ -405,9 +430,27 @@ export class Publisher { const frame = await Promise.race([group.readFrame(), stream.closed]); if (!frame) break; - // On a compressed track the wire size is the compressed length; - // the subscriber inflates it back from the SUBSCRIBE_OK codec. - const payload = await compress(compression, frame); + if (!compress) { + // No per-frame Compression field on a non-hinted track. + await stream.u53(frame.byteLength); + await stream.write(frame); + continue; + } + + // Compress-hinted track: every frame carries a Compression field naming + // the codec used (`undefined` / wire code 0 = verbatim). Use DEFLATE only + // if the peer can inflate it and it actually shrinks the (non-empty) + // payload; otherwise send verbatim. + let codec: Compression | undefined; + let payload = frame; + if (peerDeflate && frame.byteLength > 0) { + const deflated = await compressPayload(Compression.Deflate, frame); + if (deflated.byteLength < frame.byteLength) { + codec = Compression.Deflate; + payload = deflated; + } + } + await stream.u53(codec ?? 0); await stream.u53(payload.byteLength); await stream.write(payload); } diff --git a/js/net/src/lite/setup.test.ts b/js/net/src/lite/setup.test.ts index e0f763920..ed022f5b0 100644 --- a/js/net/src/lite/setup.test.ts +++ b/js/net/src/lite/setup.test.ts @@ -1,4 +1,5 @@ import { expect, test } from "bun:test"; +import { Compression } from "../compression.ts"; import { Reader, Writer } from "../stream.ts"; import * as Varint from "../varint.ts"; import { ProbeLevel, Setup } from "./setup.ts"; @@ -53,6 +54,29 @@ test("SETUP with path round-trips on draft-05", async () => { expect(got.path).toBe("/room/123"); }); +test("SETUP compression parameter round-trips on draft-05", async () => { + const got = await roundTrip(new Setup(ProbeLevel.None, undefined, [Compression.Deflate])); + expect(got.compression).toEqual([Compression.Deflate]); +}); + +test("SETUP compression decode skips none and unknown", async () => { + // Hand-frame a Compression parameter (id 0x3) listing none(0), deflate(1), unknown(99); + // only deflate survives the decode. + const value = concat([Varint.encode(0), Varint.encode(1), Varint.encode(99)]); + const body = await bytes(async (w) => { + await w.u53(1); // parameter count + await w.u62(0x3n); // PARAM_COMPRESSION + await w.u53(value.byteLength); + await w.write(value); + }); + const framed = await bytes(async (w) => { + await w.u53(body.byteLength); + await w.write(body); + }); + const got = await Setup.decode(new Reader(undefined, framed), Version.DRAFT_05_WIP); + expect(got.compression).toEqual([Compression.Deflate]); +}); + test("unknown probe level saturates to Increase", async () => { // Hand-frame a SETUP body carrying an unknown probe level (99): a 1-parameter bag // (PROBE id 0x1) whose value is the varint 99, prefixed with the Message size. diff --git a/js/net/src/lite/setup.ts b/js/net/src/lite/setup.ts index 92dc8d4b0..179870bc7 100644 --- a/js/net/src/lite/setup.ts +++ b/js/net/src/lite/setup.ts @@ -5,6 +5,7 @@ * @module */ +import { type Compression, compressionFromCode } from "../compression.ts"; import type { Reader, Writer } from "../stream.ts"; import * as Varint from "../varint.ts"; import * as Message from "./message.ts"; @@ -14,6 +15,8 @@ import { hasSetupStream, type Version } from "./version.ts"; const PARAM_PROBE = 0x1n; /** Setup Parameter id for the request Path (client-only, URI-less transports). */ const PARAM_PATH = 0x2n; +/** Setup Parameter id for the compression algorithms this endpoint can decompress. */ +const PARAM_COMPRESSION = 0x3n; /** Cap on the number of parameters in a bag, matching the Rust decoder. */ const MAX_PARAMS = 64; @@ -135,9 +138,19 @@ export class Setup { */ path?: string; - constructor(probe: ProbeLevel = ProbeLevel.None, path?: string) { + /** + * Compression algorithms this endpoint can *decompress*, in preference order + * (most-preferred first). Governs only what a peer may compress when sending + * *to* us; the sender names the algorithm actually used per frame. Verbatim + * transfer needs no negotiation, so it's never listed; empty (the default) means + * "send me everything verbatim". + */ + compression: Compression[]; + + constructor(probe: ProbeLevel = ProbeLevel.None, path?: string, compression: Compression[] = []) { this.probe = probe; this.path = path; + this.compression = compression; } static #guard(version: Version) { @@ -155,6 +168,22 @@ export class Setup { if (this.path !== undefined) { params.setBytes(PARAM_PATH, new TextEncoder().encode(this.path)); } + // Pack the advertised algorithms back-to-back as varints. The type can't hold + // a verbatim "no codec" entry, so there's nothing to omit. + const algos: Uint8Array[] = []; + for (const algo of this.compression) { + algos.push(Varint.encode(algo)); + } + if (algos.length > 0) { + const total = algos.reduce((n, a) => n + a.byteLength, 0); + const packed = new Uint8Array(total); + let offset = 0; + for (const a of algos) { + packed.set(a, offset); + offset += a.byteLength; + } + params.setBytes(PARAM_COMPRESSION, packed); + } await params.encode(w); } @@ -173,7 +202,25 @@ export class Setup { } } - return new Setup(probe, path); + // A back-to-back sequence of algorithm varints. Skip `none` (0, which decodes + // to `undefined`) and any identifier we don't understand (it throws): we can + // neither produce nor consume it. + const compression: Compression[] = []; + let algoBytes = params.getBytes(PARAM_COMPRESSION); + while (algoBytes !== undefined && algoBytes.byteLength > 0) { + const [code, remain] = Varint.decode(algoBytes); + algoBytes = remain; + try { + const algo = compressionFromCode(code); + if (algo !== undefined && !compression.includes(algo)) { + compression.push(algo); + } + } catch { + // Unknown algorithm; ignore it. + } + } + + return new Setup(probe, path, compression); } /** Encode the SETUP message with its size prefix. Throws on pre-lite-05 versions. */ diff --git a/js/net/src/lite/subscriber.ts b/js/net/src/lite/subscriber.ts index 008bf2f77..965266bac 100644 --- a/js/net/src/lite/subscriber.ts +++ b/js/net/src/lite/subscriber.ts @@ -2,7 +2,7 @@ import { Signal } from "@moq/signals"; import { Announced } from "../announced.ts"; import type { Bandwidth } from "../bandwidth.ts"; import { Broadcast, type TrackRequest } from "../broadcast.ts"; -import { Compression, decompress } from "../compression.ts"; +import { compressionFromCode, decompress } from "../compression.ts"; import { Group } from "../group.ts"; import * as Path from "../path.ts"; import { type Reader, Stream } from "../stream.ts"; @@ -57,9 +57,10 @@ interface SubscribeEntry { // The write side: incoming GROUP streams are routed here. The application reads // the matching TrackSubscriber it got from Broadcast.subscribe. track: TrackProducer; - // undefined until the negotiated codec is known (from TRACK_INFO on lite-05+, - // or SUBSCRIBE_OK on older drafts). - compression: Signal; + // The track's compress hint, undefined until known (from TRACK_INFO on lite-05+, + // or SUBSCRIBE_OK on older drafts). When true, each frame on the group stream + // carries a per-frame Compression field that runGroup reads and decodes against. + compress: Signal; // Per-frame timestamp scale (0 = none). undefined until it's known. A non-zero // value means each frame on the group stream is prefixed with a zigzag-delta // timestamp varint that runGroup must consume to stay in sync. @@ -228,7 +229,7 @@ export class Subscriber { } const info = await this.#trackInfo(path, name); return { - compress: info.compression !== Compression.None, + compress: info.compress, // The wire no longer carries a cache hint (retention is best-effort), // so the local retention window falls back to the model default. cache: DEFAULT_CACHE_MS, @@ -251,9 +252,9 @@ export class Subscriber { async #runSubscribe(broadcast: Path.Valid, request: TrackRequest) { const id = this.#subscribeNext++; - // `compression` stays undefined until TRACK_INFO (or, on older drafts, + // The `compress` hint stays undefined until TRACK_INFO (or, on older drafts, // implicit defaults) resolves it; runGroup blocks on it before decoding. - const compression = new Signal(undefined); + const compress = new Signal(undefined); const timescale = new Signal(undefined); console.debug(`subscribe start: id=${id} broadcast=${broadcast} track=${request.name}`); @@ -263,7 +264,7 @@ export class Subscriber { // Open the stream under a timeout. The stream handle flows back via `state` // so the timeout path can abort it if it finishes opening after the deadline. const state: { stream?: Stream } = {}; - const setup = this.#openSubscribe(state, msg, request, id, compression, timescale); + const setup = this.#openSubscribe(state, msg, request, id, compress, timescale); let opened: { stream: Stream; producer: TrackProducer }; try { @@ -337,7 +338,7 @@ export class Subscriber { msg: Subscribe, request: TrackRequest, id: bigint, - compression: Signal, + compress: Signal, timescale: Signal, ): Promise<{ stream: Stream; producer: TrackProducer }> { let producer: TrackProducer; @@ -347,25 +348,25 @@ export class Subscriber { // Fetch the immutable properties once via the TRACK stream. const info = await this.#trackInfo(msg.broadcast, msg.track); producer = request.accept({ - compress: info.compression !== Compression.None, + compress: info.compress, // The wire no longer carries a cache hint (retention is best-effort), // so the local retention window falls back to the model default. cache: DEFAULT_CACHE_MS, priority: info.priority, ordered: info.ordered, }); - compression.set(info.compression); + compress.set(info.compress); timescale.set(info.timescale); } else { // Older drafts negotiate nothing per-track: verbatim frames, no timescale. producer = request.accept(); - compression.set(Compression.None); + compress.set(false); timescale.set(0); drainOk = true; } // Register before opening SUBSCRIBE so a racing GROUP stream finds the entry. - this.#subscribes.set(id, { track: producer, compression, timescale }); + this.#subscribes.set(id, { track: producer, compress, timescale }); state.stream = await Stream.open(this.#quic); await state.stream.writer.u53(StreamId.Subscribe); @@ -475,30 +476,29 @@ export class Subscriber { return; } - const { track, compression, timescale } = entry; + const { track, compress, timescale } = entry; const producer = new Group(group.sequence); track.writeGroup(producer); try { - // Block until the codec is known; the group's stream can arrive before - // TRACK_INFO (or SUBSCRIBE_OK) resolves it on the subscribe stream. - let codec = compression.peek(); - while (codec === undefined) { + // Block until the compress hint is known; the group's stream can arrive + // before TRACK_INFO (or SUBSCRIBE_OK) resolves it on the subscribe stream. + let hint = compress.peek(); + while (hint === undefined) { if (track.state.closed.peek()) { - // Subscription ended before the codec resolved; nothing to decode. + // Subscription ended before the hint resolved; nothing to decode. producer.close(); stream.stop(new Error("cancel")); return; } - await Signal.race(compression, track.state.closed); - codec = compression.peek(); + await Signal.race(compress, track.state.closed); + hint = compress.peek(); } - // timescale resolves together with compression (from TRACK_INFO). A - // non-zero scale means every frame is prefixed with a zigzag-delta - // timestamp (see the lite-05 FRAME format). We don't surface it to the - // application yet, but we must still read the varint to keep the frame - // framing in sync with the publisher. + // timescale resolves together with the hint (from TRACK_INFO). A non-zero + // scale means every frame is prefixed with a zigzag-delta timestamp (see the + // lite-05 FRAME format). We don't surface it to the application yet, but we + // must still read the varint to keep the frame framing in sync. const scale = timescale.peek() ?? 0; for (;;) { @@ -510,13 +510,16 @@ export class Subscriber { await stream.u62(); } + // Per-frame Compression field, present iff the track is compress-hinted; + // it names the codec this frame's payload uses (`undefined` = verbatim). + const codec = hint ? compressionFromCode(await stream.u53()) : undefined; + const size = await stream.u53(); const payload = await stream.read(size); if (!payload) break; - // On a compressed track the wire size is the compressed length; - // inflate it back to the original frame the consumer sees. - producer.writeFrame(codec === Compression.None ? payload : await decompress(codec, payload)); + // Inflate per frame; an opted-out (verbatim) frame is already the logical payload. + producer.writeFrame(codec === undefined ? payload : await decompress(codec, payload)); } producer.close(); diff --git a/js/net/src/lite/track.test.ts b/js/net/src/lite/track.test.ts index 14e4deeb1..8aae7f512 100644 --- a/js/net/src/lite/track.test.ts +++ b/js/net/src/lite/track.test.ts @@ -1,5 +1,4 @@ import { expect, test } from "bun:test"; -import { Compression } from "../compression.ts"; import * as Path from "../path.ts"; import { Reader, Writer } from "../stream.ts"; import { Track, TrackInfo } from "./track.ts"; @@ -32,14 +31,31 @@ test("TrackInfo round-trips on draft-05", async () => { priority: 7, ordered: false, timescale: 90000, - compression: Compression.Deflate, + compress: true, }); const reader = new Reader(undefined, await bytes((w) => info.encode(w, Version.DRAFT_05_WIP))); const got = await TrackInfo.decode(reader, Version.DRAFT_05_WIP); expect(got.priority).toBe(7); expect(got.ordered).toBe(false); expect(got.timescale).toBe(90000); - expect(got.compression).toBe(Compression.Deflate); + expect(got.compress).toBe(true); +}); + +test("TrackInfo compress hint is additive", async () => { + // Reserved wire values (>1) decode as the boolean hint `true`. Hand-frame the + // body and prefix it with the Message size, the way the encoder does. + const body = await bytes(async (w) => { + await w.u8(7); + await w.bool(false); + await w.u53(90000); + await w.u53(9); // reserved compress value + }); + const framed = await bytes(async (w) => { + await w.u53(body.byteLength); + await w.write(body); + }); + const got = await TrackInfo.decode(new Reader(undefined, framed), Version.DRAFT_05_WIP); + expect(got.compress).toBe(true); }); test("Track request round-trips on draft-05", async () => { @@ -51,6 +67,6 @@ test("Track request round-trips on draft-05", async () => { }); test("TRACK_INFO is rejected before draft-05", async () => { - const info = new TrackInfo({ compression: Compression.None }); + const info = new TrackInfo({ compress: false }); await expect(bytes((w) => info.encode(w, Version.DRAFT_04))).rejects.toThrow(); }); diff --git a/js/net/src/lite/track.ts b/js/net/src/lite/track.ts index 69be5c7d1..0a2730457 100644 --- a/js/net/src/lite/track.ts +++ b/js/net/src/lite/track.ts @@ -1,4 +1,3 @@ -import { Compression, compressionFromCode } from "../compression.ts"; import * as Path from "../path.ts"; import type { Reader, Writer } from "../stream.ts"; import * as Message from "./message.ts"; @@ -65,39 +64,45 @@ export class TrackInfo { * per-frame timestamps on the wire. */ timescale: number; - /** Codec applied to every frame payload on this track. */ - compression: Compression; + /** + * Boolean hint that this track's payloads are worth compressing. It names no + * algorithm: that's negotiated per hop (SETUP) and named per frame. When set, + * every FRAME on the track carries a per-frame `Compression` field. Wire values + * `>1` are reserved and decode as `true`, so the hint stays additive. + */ + compress: boolean; constructor({ priority = 0, ordered = true, timescale = 0, - compression = Compression.None, + compress = false, }: { priority?: number; ordered?: boolean; timescale?: number; - compression?: Compression; + compress?: boolean; }) { this.priority = priority; this.ordered = ordered; this.timescale = timescale; - this.compression = compression; + this.compress = compress; } async #encode(w: Writer) { await w.u8(this.priority); await w.bool(this.ordered); await w.u53(this.timescale); - await w.u53(this.compression); + await w.u53(this.compress ? 1 : 0); } static async #decode(r: Reader): Promise { const priority = await r.u8(); const ordered = await r.bool(); const timescale = await r.u53(); - const compression = compressionFromCode(await r.u53()); - return new TrackInfo({ priority, ordered, timescale, compression }); + // Any non-zero value (including reserved `>1`) is the "worth compressing" hint. + const compress = (await r.u53()) !== 0; + return new TrackInfo({ priority, ordered, timescale, compress }); } async encode(w: Writer, version: Version): Promise { diff --git a/rs/hang/src/container/frame.rs b/rs/hang/src/container/frame.rs index dd246f945..bd9a642e8 100644 --- a/rs/hang/src/container/frame.rs +++ b/rs/hang/src/container/frame.rs @@ -62,6 +62,7 @@ impl Frame { let net_frame = moq_net::Frame { size, timestamp: net_timestamp, + compression: None, }; let mut chunked = group.create_frame(net_frame)?; chunked.write(header.freeze())?; diff --git a/rs/moq-mux/src/container/fmp4/import.rs b/rs/moq-mux/src/container/fmp4/import.rs index 790f9bf11..5650d2678 100644 --- a/rs/moq-mux/src/container/fmp4/import.rs +++ b/rs/moq-mux/src/container/fmp4/import.rs @@ -613,6 +613,7 @@ impl Import { let mut frame = g.create_frame(moq_net::Frame { size: fragment_bytes.len() as u64, timestamp: Some(timestamp), + compression: None, })?; frame.write(fragment_bytes)?; frame.finish()?; diff --git a/rs/moq-native/tests/broadcast.rs b/rs/moq-native/tests/broadcast.rs index 35406518a..4bb57d6d5 100644 --- a/rs/moq-native/tests/broadcast.rs +++ b/rs/moq-native/tests/broadcast.rs @@ -140,6 +140,7 @@ async fn lite05_timestamp_roundtrip(scheme: &str) { let frame = moq_native::moq_net::Frame { size: payload.len() as u64, timestamp: Some(Timestamp::new(us, Timescale::MICRO).unwrap()), + compression: None, }; let mut writer = group.create_frame(frame).expect("failed to create frame"); writer @@ -257,6 +258,7 @@ async fn lite05_fetch_roundtrip(scheme: &str) { let frame = moq_native::moq_net::Frame { size: payload.len() as u64, timestamp: Some(Timestamp::new(us, Timescale::MICRO).unwrap()), + compression: None, }; let mut writer = group.create_frame(frame).expect("failed to create frame"); writer @@ -353,6 +355,147 @@ async fn broadcast_moq_lite_05_fetch_webtransport() { lite05_fetch_roundtrip("https").await; } +/// A modern (lite-05) subscriber caches a `compress` track's frames in the codec +/// each frame names on the wire: a compressible frame is DEFLATE (so the cached +/// byte count matches the wire/compressed size — the relay-billing invariant), a +/// tiny frame DEFLATE would only enlarge opts out and is cached verbatim, and both +/// inflate back to the original only when read (the decode-at-delivery contract). +async fn lite05_compress_caches_compressed(scheme: &str) { + use moq_net::{Compression, Timescale, Timestamp}; + + let pub_origin = Origin::random().produce(); + let mut broadcast = pub_origin.create_broadcast("test").expect("failed to create broadcast"); + let mut track = broadcast + .create_track( + "meta", + moq_net::TrackInfo::default() + .with_timescale(Timescale::MICRO) + .with_compress(true), + ) + .expect("failed to create track"); + + // Highly compressible payload so the cached (compressed) length is + // unmistakably smaller than the decoded payload. Written verbatim, the way an + // origin produces frames (the publisher compresses on egress). + let payload = bytes::Bytes::from(vec![b'a'; 4096]); + // A second, tiny payload DEFLATE would only enlarge: the publisher must send it + // verbatim (per-frame opt-out) even though the track is compress-hinted. + let tiny = bytes::Bytes::from_static(b"hi"); + let mut group = track.append_group().expect("failed to append group"); + let mut writer = group + .create_frame(moq_net::Frame { + size: payload.len() as u64, + timestamp: Some(Timestamp::new(1_000, Timescale::MICRO).unwrap()), + compression: None, + }) + .expect("failed to create frame"); + writer.write(payload.clone()).expect("failed to write frame"); + writer.finish().expect("failed to finish frame"); + let mut writer = group + .create_frame(moq_net::Frame { + size: tiny.len() as u64, + timestamp: Some(Timestamp::new(2_000, Timescale::MICRO).unwrap()), + compression: None, + }) + .expect("failed to create tiny frame"); + writer.write(tiny.clone()).expect("failed to write tiny frame"); + writer.finish().expect("failed to finish tiny frame"); + group.finish().expect("failed to finish group"); + + let mut server_config = moq_native::ServerConfig::default(); + server_config.bind = Some("[::]:0".to_string()); + server_config.tls.generate = vec!["localhost".into()]; + server_config.version = vec!["moq-lite-05-wip".parse().unwrap()]; + let mut server = server_config.init().expect("failed to init server"); + let addr = server.local_addr().expect("failed to get local addr"); + + let sub_origin = Origin::random().produce(); + let mut announcements = sub_origin.consume().announced(); + + let mut client_config = moq_native::ClientConfig::default(); + client_config.tls.disable_verify = Some(true); + client_config.version = vec!["moq-lite-05-wip".parse().unwrap()]; + let client = client_config.init().expect("failed to init client"); + let url: url::Url = format!("{scheme}://localhost:{}", addr.port()).parse().unwrap(); + + let server_handle = tokio::spawn(async move { + let request = server.accept().await.expect("no incoming connection"); + let session = request.with_publisher(&pub_origin).ok().await?; + let _broadcast = broadcast; + let _track = track; + let _ = session.closed().await; + Ok::<_, anyhow::Error>(()) + }); + + let client = client.with_subscriber(sub_origin); + let session = tokio::time::timeout(TIMEOUT, client.connect(url)) + .await + .expect("client connect timed out") + .expect("client connect failed"); + + let (path, bc) = tokio::time::timeout(TIMEOUT, announcements.next()) + .await + .expect("announce timed out") + .expect("origin closed"); + assert_eq!(path.as_str(), "test"); + let bc = bc.broadcast().expect("expected announce, got unannounce"); + + let mut track_sub = tokio::time::timeout(TIMEOUT, async { + bc.track("meta").unwrap().subscribe(None).unwrap().await + }) + .await + .expect("subscribe timed out") + .expect("subscribe failed"); + assert!(track_sub.info().compress, "track should be compress-hinted"); + + let mut group_sub = tokio::time::timeout(TIMEOUT, track_sub.next_group()) + .await + .expect("next_group timed out") + .expect("next_group failed") + .expect("expected a group"); + + let mut frame_sub = tokio::time::timeout(TIMEOUT, group_sub.next_frame()) + .await + .expect("next_frame timed out") + .expect("next_frame failed") + .expect("expected a frame"); + + // The cache holds the Deflate-compressed bytes, not the plaintext: the codec + // is recorded and the stored size is the compressed length. + assert_eq!(frame_sub.compression, Some(Compression::Deflate)); + assert!( + (frame_sub.size as usize) < payload.len(), + "cached size {} should be the compressed length, smaller than {}", + frame_sub.size, + payload.len() + ); + + // Reading inflates the cached bytes back to the original payload. + let decoded = frame_sub.read_all().await.expect("failed to read frame"); + assert_eq!(decoded, payload); + + // The tiny frame opted out: cached verbatim (no per-frame DEFLATE), reads back unchanged. + let mut tiny_sub = tokio::time::timeout(TIMEOUT, group_sub.next_frame()) + .await + .expect("next_frame timed out") + .expect("next_frame failed") + .expect("expected a second frame"); + assert_eq!(tiny_sub.compression, None); + assert_eq!(tiny_sub.read_all().await.expect("failed to read tiny frame"), tiny); + + drop(session); + server_handle + .await + .expect("server task panicked") + .expect("server task failed"); +} + +#[tracing_test::traced_test] +#[tokio::test] +async fn broadcast_moq_lite_05_compress_caches_compressed_webtransport() { + lite05_compress_caches_compressed("https").await; +} + /// A fetch must be served while a live subscription is active on the same track. /// The relay subscribes starting at the latest group, so an older group isn't /// cached and the fetch has to issue a wire FETCH concurrently with the @@ -365,6 +508,7 @@ async fn lite05_fetch_during_subscribe(scheme: &str) { moq_net::Frame { size: payload.len() as u64, timestamp: Some(Timestamp::new(us, Timescale::MICRO).unwrap()), + compression: None, } } diff --git a/rs/moq-net/src/client.rs b/rs/moq-net/src/client.rs index 00e6e2396..9adf30829 100644 --- a/rs/moq-net/src/client.rs +++ b/rs/moq-net/src/client.rs @@ -161,6 +161,8 @@ impl Client { let our_setup = lite::Setup { probe: lite::ProbeLevel::Report, path: self.setup_path.clone(), + // We can inflate DEFLATE, so a peer may compress what it sends us. + compression: vec![crate::Compression::Deflate], }; let (recv_bw, connecting) = lite::start( diff --git a/rs/moq-net/src/ietf/publisher.rs b/rs/moq-net/src/ietf/publisher.rs index 3daf58829..ac2484483 100644 --- a/rs/moq-net/src/ietf/publisher.rs +++ b/rs/moq-net/src/ietf/publisher.rs @@ -331,6 +331,27 @@ impl Publisher { stream.encode(&0u64).await?; } + // IETF carries no per-frame compression, so a cached compressed frame + // (e.g. relayed from a lite-05 origin) must be inflated for this peer. + if frame.compression.is_some() { + let mut payload = tokio::select! { + biased; + _ = stream.closed() => return Err(Error::Cancel), + payload = frame.read_all() => payload?, + }; + let n = payload.len() as u64; + stream.encode(&n).await?; + track_stats.frame(); + if n == 0 { + // Have to write the object status too. + stream.encode(&0u8).await?; + } else { + stream.write_all(&mut payload).await?; + track_stats.bytes(n); + } + continue; + } + // Write the size of the frame. stream.encode(&frame.size).await?; track_stats.frame(); diff --git a/rs/moq-net/src/ietf/subscriber.rs b/rs/moq-net/src/ietf/subscriber.rs index c97aa4399..f4449ee24 100644 --- a/rs/moq-net/src/ietf/subscriber.rs +++ b/rs/moq-net/src/ietf/subscriber.rs @@ -817,6 +817,7 @@ impl Subscriber { let mut frame = producer.create_frame(Frame { size: 0, timestamp: None, + compression: None, })?; track_stats.frame(); frame.finish()?; @@ -829,7 +830,11 @@ impl Subscriber { if size > MAX_FRAME_SIZE { return Err(Error::FrameTooLarge); } - let mut frame = producer.create_frame(Frame { size, timestamp: None })?; + let mut frame = producer.create_frame(Frame { + size, + timestamp: None, + compression: None, + })?; track_stats.frame(); if let Err(err) = self.run_frame(stream, frame.clone(), &track_stats).await { diff --git a/rs/moq-net/src/lite/publisher.rs b/rs/moq-net/src/lite/publisher.rs index b2660045a..84f570057 100644 --- a/rs/moq-net/src/lite/publisher.rs +++ b/rs/moq-net/src/lite/publisher.rs @@ -15,7 +15,7 @@ use crate::{ util::{MaybeBoxedExt, MaybeSendBox}, }; -use super::Version; +use super::{PeerSetup, Version}; pub(super) struct PublisherConfig { pub session: S, @@ -25,6 +25,9 @@ pub(super) struct PublisherConfig { /// to opt out. pub stats: MoqStats, pub version: Version, + /// The peer's SETUP, recorded by the subscriber loop. Read to learn which + /// compression algorithms the peer can decompress before compressing egress. + pub peer_setup: PeerSetup, } pub(super) struct Publisher { @@ -38,6 +41,7 @@ pub(super) struct Publisher { self_origin: Origin, priority: PriorityQueue, version: Version, + peer_setup: PeerSetup, } impl Publisher { @@ -55,6 +59,7 @@ impl Publisher { self_origin, priority: Default::default(), version: config.version, + peer_setup: config.peer_setup, } } @@ -451,14 +456,9 @@ impl Publisher { let broadcast = self.origin.request_broadcast(&request.broadcast)?.await?; let info = broadcast.track(&request.track)?.info().await?; - // Same negotiation as a subscription, just answered once: codec only when - // both the producer asks for it and the draft can carry it; timescale only - // when the draft carries per-frame timestamps. - let compression = if info.compress { - Compression::Deflate - } else { - Compression::None - }; + // The compress hint travels end to end; the algorithm is negotiated per hop + // (SETUP) and named per frame. Timescale only when the draft carries per-frame + // timestamps. let timescale = if self.version.has_timestamps() { info.timescale } else { @@ -471,7 +471,7 @@ impl Publisher { priority: info.priority, ordered: info.ordered, timescale, - compression, + compress: info.compress, }) .await?; @@ -508,6 +508,7 @@ impl Publisher { self.priority.clone(), (track_stats, self.broadcasts.clone(), absolute.clone()), self.version, + self.peer_setup.clone(), ) .await { @@ -528,6 +529,7 @@ impl Publisher { Ok(()) } + #[allow(clippy::too_many_arguments)] async fn run_subscribe( session: S, stream: &mut Stream, @@ -539,6 +541,7 @@ impl Publisher { // below, after the subscription is validated, and held for its lifetime. stats: (crate::PublisherTrack, crate::SessionBroadcasts, crate::PathOwned), version: Version, + peer_setup: PeerSetup, ) -> Result<(), Error> { let (track_stats, broadcasts, absolute) = stats; let subscription = crate::Subscription { @@ -554,16 +557,14 @@ impl Publisher { let broadcast = broadcast?.await?; let track = broadcast.track(&subscribe.track)?.subscribe(subscription)?.await?; - // Compress only when the producer marked the track worth it and the - // negotiated draft can carry a codec. Older drafts (lite-04 and below) get - // None and the frames stream verbatim. On Lite05+ this matches the codec the - // subscriber already learned from TRACK_INFO. - let supports_compression = version.has_timestamps(); - let compression = if track.info().compress && supports_compression { - Compression::Deflate - } else { - Compression::None - }; + // The publisher's `compress` hint (carried in TRACK_INFO) says whether this + // track is worth compressing; it gates the per-frame `Compression` field. + // Older drafts (lite-04 and below) have no per-frame field, so they never + // compress. Whether we may actually use DEFLATE is the per-hop SETUP + // negotiation: only if the peer advertised it can decompress. We only wait on + // the peer's SETUP for a compress-hinted track (media tracks skip it). + let compress = version.has_timestamps() && track.info().compress; + let peer_deflate = compress && peer_setup.compression().await.contains(&Compression::Deflate); // Per-frame timestamps require both a publisher-advertised timescale and // a wire format that carries it. Older drafts ignore `track.timescale` @@ -608,7 +609,8 @@ impl Publisher { priority, track_priority: track_priority_rx, version, - compression, + compress, + peer_deflate, timescale, }; @@ -651,7 +653,16 @@ impl Publisher { let broadcast = self.origin.request_broadcast(&fetch.broadcast); let track_stats = self.stats.broadcast(&absolute).publisher_track(&track); - if let Err(err) = Self::run_fetch(&mut stream, &fetch, broadcast, track_stats, self.version).await { + if let Err(err) = Self::run_fetch( + &mut stream, + &fetch, + broadcast, + track_stats, + self.version, + self.peer_setup.clone(), + ) + .await + { match &err { Error::Cancel | Error::Transport(_) => { tracing::info!(broadcast = %absolute, %track, %group, "fetch cancelled") @@ -672,6 +683,7 @@ impl Publisher { broadcast: Result, Error>, track_stats: crate::PublisherTrack, version: Version, + peer_setup: PeerSetup, ) -> Result<(), Error> { let broadcast = broadcast?.await?; let track = broadcast.track(&fetch.track)?; @@ -693,14 +705,11 @@ impl Publisher { None }; - // Compression is an immutable per-track property (reported in TRACK_INFO), so - // fetched frames use the same codec as live ones. The group resolved above, so - // the track's info is set and this resolves immediately. - let compression = if track.info().await?.compress && version.has_timestamps() { - Compression::Deflate - } else { - Compression::None - }; + // The compress hint is an immutable per-track property (TRACK_INFO), so fetched + // frames carry the per-frame Compression field exactly as live ones do. The + // group resolved above, so the track info is set and this resolves immediately. + let compress = track.info().await?.compress && version.has_timestamps(); + let peer_deflate = compress && peer_setup.compression().await.contains(&Compression::Deflate); // Lite05+ FETCH responds with bare FRAME messages; the subscriber already has // the codec/timescale from TRACK_INFO and the group sequence from its request. @@ -715,7 +724,8 @@ impl Publisher { write_fetch_frame( &mut stream.writer, &mut frame, - compression, + compress, + peer_deflate, timescale, &mut prev_ts, &track_stats, @@ -773,42 +783,71 @@ async fn encode_zigzag_delta( Ok(()) } +/// Pick the wire codec for one frame's logical payload: DEFLATE when the peer +/// advertised it and it actually shrinks the bytes, else verbatim. An empty payload +/// always stays verbatim (it must use `none`). Shared by the live and fetch paths. +fn choose_compression(peer_deflate: bool, payload: bytes::Bytes) -> (Option, bytes::Bytes) { + if peer_deflate && !payload.is_empty() { + let deflated = Compression::Deflate.compress(&payload); + if deflated.len() < payload.len() { + return (Some(Compression::Deflate), bytes::Bytes::from(deflated)); + } + } + (None, payload) +} + /// Write one frame to a fetch stream in the lite wire format: the optional timing -/// prefix (see [`encode_frame_timing`]), the size, then the payload. Mirrors the -/// per-frame encoding in [`Subscription::serve_frame`] without the priority -/// machinery, since a one-shot fetch carries a single static priority set on the -/// stream up front. +/// prefix (see [`encode_frame_timing`]), the optional per-frame `Compression` field +/// (present iff the track is `compress`-hinted), the size, then the payload. Mirrors +/// the per-frame encoding in [`Subscription::serve_frame`] without the priority +/// machinery, since a one-shot fetch carries a single static priority set up front. async fn write_fetch_frame( writer: &mut Writer, frame: &mut FrameConsumer, - compression: Compression, + compress: bool, + peer_deflate: bool, timescale: Option, prev_ts: &mut u64, track_stats: &crate::PublisherTrack, ) -> Result<(), Error> { encode_frame_timing(writer, frame, timescale, prev_ts).await?; - match compression { - Compression::None => { - writer.encode(&frame.size).await?; - track_stats.frame(); - while let Some(mut chunk) = frame.read_chunk().await? { - let n = chunk.len() as u64; - writer.write_all(&mut chunk).await?; - track_stats.bytes(n); - } + // No per-frame Compression field on a non-hinted track. + if !compress { + writer.encode(&frame.size).await?; + track_stats.frame(); + while let Some(mut chunk) = frame.read_chunk().await? { + let n = chunk.len() as u64; + writer.write_all(&mut chunk).await?; + track_stats.bytes(n); } - compression => { - let payload = frame.read_all().await?; - let mut chunk = bytes::Bytes::from(compression.compress(&payload)); + return Ok(()); + } + + // Passthrough: cached DEFLATE forwarded verbatim to a peer that can inflate it. + if peer_deflate && frame.compression == Some(Compression::Deflate) { + writer.encode(&Compression::Deflate.to_code()).await?; + writer.encode(&frame.size).await?; + track_stats.frame(); + while let Some(mut chunk) = frame.read_chunk().await? { let n = chunk.len() as u64; - writer.encode(&n).await?; - track_stats.frame(); writer.write_all(&mut chunk).await?; track_stats.bytes(n); } + return Ok(()); } + // Otherwise decode (a cached DEFLATE frame for a peer that can't inflate) and + // pick per frame whether DEFLATE shrinks it. + let payload = frame.read_all().await?; + let (codec, mut chunk) = choose_compression(peer_deflate, payload); + let n = chunk.len() as u64; + writer.encode(&codec.map_or(0, Compression::to_code)).await?; + writer.encode(&n).await?; + track_stats.frame(); + writer.write_all(&mut chunk).await?; + track_stats.bytes(n); + Ok(()) } @@ -825,9 +864,12 @@ struct Subscription { priority: PriorityQueue, track_priority: tokio::sync::watch::Receiver, version: Version, - /// Codec for this track (reported in TRACK_INFO on lite-05+); every frame on - /// this subscription is compressed with it before hitting the wire. - compression: Compression, + /// The track's `compress` hint (lite-05+, from TRACK_INFO). When set, every + /// frame carries a per-frame `Compression` field naming the codec it used. + compress: bool, + /// Whether the peer advertised DEFLATE in its SETUP, so we may compress frames + /// for it. Always `false` unless [`Self::compress`] is set. + peer_deflate: bool, /// Negotiated timestamp scale for this track. `Some(_)` iff /// [`Version::has_timestamps`] is true for `version` (gated in /// `run_subscribe`); used to validate per-frame timestamps before encoding. @@ -960,10 +1002,11 @@ impl Subscription { Ok(()) } - /// Send one frame. Uncompressed frames stream chunk-by-chunk so we never - /// buffer the whole payload; a compressed frame must buffer to feed the - /// codec, and its wire size becomes the compressed length (the subscriber - /// inflates it from the track's codec, known from TRACK_INFO on lite-05+). + /// Send one frame. A compress-hinted track carries a per-frame `Compression` + /// field naming the codec used; the relay forwards an already-DEFLATE frame + /// untouched when the peer can inflate it (no needless inflate/deflate), and + /// otherwise picks per frame whether DEFLATE actually helps. A non-compress + /// track streams verbatim with no field. async fn serve_frame( &mut self, stream: &mut Writer, @@ -973,24 +1016,37 @@ impl Subscription { ) -> Result<(), Error> { encode_frame_timing(stream, &frame, self.timescale, prev_ts).await?; - match self.compression { - Compression::None => { - stream.encode(&frame.size).await?; - self.track_stats.frame(); - - while let Some(chunk) = self.read_chunk(stream, priority, &mut frame).await? { - self.write_chunk(stream, priority, chunk).await?; - } + // No per-frame Compression field on a track the publisher didn't hint. + if !self.compress { + stream.encode(&frame.size).await?; + self.track_stats.frame(); + while let Some(chunk) = self.read_chunk(stream, priority, &mut frame).await? { + self.write_chunk(stream, priority, chunk).await?; } - compression => { - let payload = self.read_all(stream, priority, &mut frame).await?; - let chunk = bytes::Bytes::from(compression.compress(&payload)); - stream.encode(&(chunk.len() as u64)).await?; - self.track_stats.frame(); + return Ok(()); + } + + // Passthrough: the cache already holds DEFLATE bytes and the peer can inflate + // them, so forward verbatim (the relay hot path) — field, stored size, bytes. + if self.peer_deflate && frame.compression == Some(Compression::Deflate) { + stream.encode(&Compression::Deflate.to_code()).await?; + stream.encode(&frame.size).await?; + self.track_stats.frame(); + while let Some(chunk) = self.read_chunk(stream, priority, &mut frame).await? { self.write_chunk(stream, priority, chunk).await?; } + return Ok(()); } + // Otherwise read the logical payload (decoding a cached DEFLATE frame for a + // peer that can't inflate) and pick per frame whether DEFLATE shrinks it. + let payload = self.read_all(stream, priority, &mut frame).await?; + let (codec, chunk) = choose_compression(self.peer_deflate, payload); + stream.encode(&codec.map_or(0, Compression::to_code)).await?; + stream.encode(&(chunk.len() as u64)).await?; + self.track_stats.frame(); + self.write_chunk(stream, priority, chunk).await?; + Ok(()) } diff --git a/rs/moq-net/src/lite/session.rs b/rs/moq-net/src/lite/session.rs index 5d807b5f1..217e21585 100644 --- a/rs/moq-net/src/lite/session.rs +++ b/rs/moq-net/src/lite/session.rs @@ -87,6 +87,9 @@ pub fn start( origin: publish, stats: stats.clone(), version, + // The subscriber loop records the peer's SETUP here; the publisher reads it + // to learn which algorithms the peer can decompress before compressing. + peer_setup: peer_setup.clone(), }); let subscriber = Subscriber::new(SubscriberConfig { session: session.clone(), diff --git a/rs/moq-net/src/lite/setup.rs b/rs/moq-net/src/lite/setup.rs index 31a6530d0..3022fb956 100644 --- a/rs/moq-net/src/lite/setup.rs +++ b/rs/moq-net/src/lite/setup.rs @@ -1,6 +1,7 @@ //! The lite-05 SETUP message: each endpoint advertises its capabilities once, as //! the sole message on a unidirectional Setup Stream, then closes it. +use crate::Compression; use crate::coding::*; use super::{Message, Parameters, Version}; @@ -9,6 +10,8 @@ use super::{Message, Parameters, Version}; const PARAM_PROBE: u64 = 0x1; /// Setup Parameter id for the request Path (client-only, URI-less transports). const PARAM_PATH: u64 = 0x2; +/// Setup Parameter id for the compression algorithms this endpoint can decompress. +const PARAM_COMPRESSION: u64 = 0x3; /// The probe capability an endpoint advertises in SETUP. /// @@ -59,6 +62,12 @@ pub struct Setup { /// qmux over TCP/TLS). Sent only by the client; a server never sends one and a /// relay never forwards it. `None` on URI-carrying bindings. pub path: Option, + /// Compression algorithms this endpoint can *decompress*, in preference order + /// (most-preferred first). Governs only what a peer may compress when sending + /// *to* us; the sender names the algorithm actually used per frame. Verbatim + /// transfer needs no negotiation, so it's never listed; empty (the default) means + /// "send me everything verbatim". + pub compression: Vec, } impl Message for Setup { @@ -83,7 +92,27 @@ impl Message for Setup { None => None, }; - Ok(Self { probe, path }) + // A back-to-back sequence of algorithm varints. Skip `none` (0, which decodes + // to `None`) and any identifier we don't understand (an error): we can neither + // produce nor consume it. + let mut compression = Vec::new(); + if let Some(bytes) = params.get_bytes(PARAM_COMPRESSION) { + let mut slice = bytes; + while !slice.is_empty() { + let code = u64::decode(&mut slice, version)?; + if let Ok(Some(algo)) = Compression::from_code(code) + && !compression.contains(&algo) + { + compression.push(algo); + } + } + } + + Ok(Self { + probe, + path, + compression, + }) } fn encode_msg(&self, w: &mut W, version: Version) -> Result<(), EncodeError> { @@ -99,6 +128,15 @@ impl Message for Setup { if let Some(path) = &self.path { params.set_bytes(PARAM_PATH, path.as_bytes().to_vec()); } + // Pack the advertised algorithms back-to-back as varints. The type can't hold + // a verbatim "no codec" entry, so there's nothing to omit. + let mut algos = Vec::new(); + for algo in &self.compression { + algo.to_code().encode(&mut algos, version)?; + } + if !algos.is_empty() { + params.set_bytes(PARAM_COMPRESSION, algos); + } params.encode(w, version) } @@ -141,6 +179,24 @@ impl PeerSetup { } } } + + /// Await the algorithms the peer can decompress, blocking until its SETUP arrives. + /// + /// A publisher consults this before compressing: it MUST NOT use an algorithm the + /// peer did not advertise. An empty list (no parameter, or the sender dropped + /// without a SETUP) means everything must be sent verbatim. + pub async fn compression(&self) -> Vec { + let mut rx = self.0.subscribe(); + loop { + // Clone out of the borrow before awaiting so no guard crosses the await point. + if let Some(setup) = rx.borrow_and_update().clone() { + return setup.compression; + } + if rx.changed().await.is_err() { + return Vec::new(); + } + } + } } #[cfg(test)] @@ -165,7 +221,11 @@ mod tests { #[test] fn probe_levels_round_trip() { for probe in [ProbeLevel::None, ProbeLevel::Report, ProbeLevel::Increase] { - let msg = Setup { probe, path: None }; + let msg = Setup { + probe, + path: None, + compression: Vec::new(), + }; assert_eq!(round_trip(&msg), msg); } } @@ -175,10 +235,43 @@ mod tests { let msg = Setup { probe: ProbeLevel::Report, path: Some("/room/123".to_string()), + compression: Vec::new(), }; assert_eq!(round_trip(&msg), msg); } + #[test] + fn compression_round_trip() { + let msg = Setup { + probe: ProbeLevel::None, + path: None, + compression: vec![Compression::Deflate], + }; + assert_eq!(round_trip(&msg), msg); + } + + #[test] + fn compression_decode_skips_none_and_unknown() { + // Hand-frame a SETUP whose Compression parameter lists none (0), deflate (1), + // and an unknown algorithm (99); only deflate survives the decode. + let mut algos = Vec::new(); + for code in [0u64, 1, 99] { + code.encode(&mut algos, Version::Lite05Wip).unwrap(); + } + let mut params = Parameters::default(); + params.set_bytes(PARAM_COMPRESSION, algos); + let mut body = Vec::new(); + params.encode(&mut body, Version::Lite05Wip).unwrap(); + + let mut buf = bytes::BytesMut::new(); + body.len().encode(&mut buf, Version::Lite05Wip).unwrap(); + buf.extend_from_slice(&body); + + let mut slice = &buf[..]; + let got = Setup::decode(&mut slice, Version::Lite05Wip).unwrap(); + assert_eq!(got.compression, vec![Compression::Deflate]); + } + #[test] fn unknown_probe_level_saturates_to_increase() { // Frame a SETUP message carrying an unknown probe level (99) by hand: the diff --git a/rs/moq-net/src/lite/subscriber.rs b/rs/moq-net/src/lite/subscriber.rs index 5b0fff8d0..21ce737b3 100644 --- a/rs/moq-net/src/lite/subscriber.rs +++ b/rs/moq-net/src/lite/subscriber.rs @@ -77,9 +77,10 @@ pub(super) struct Subscriber { struct TrackEntry { producer: TrackProducer, stats: Arc, - /// Codec + timestamp scale from this track's TRACK_INFO, known before the - /// SUBSCRIBE is even opened, so group streams decode frames without blocking. - compression: Compression, + /// The `compress` hint + timestamp scale from this track's TRACK_INFO, known + /// before the SUBSCRIBE is even opened, so group streams decode frames without + /// blocking. The hint gates the per-frame `Compression` field on each frame. + compress: bool, timescale: Option, } @@ -565,7 +566,7 @@ impl Subscriber { pub async fn recv_group(&mut self, stream: &mut Reader) -> Result<(), Error> { let hdr: lite::Group = stream.decode().await?; - let (mut group, track, track_stats, compression, timescale) = { + let (mut group, track, track_stats, compress, timescale) = { let mut subs = self.subscribes.lock(); let entry = subs.get_mut(&hdr.subscribe).ok_or(Error::Cancel)?; @@ -575,7 +576,7 @@ impl Subscriber { group, entry.producer.clone(), entry.stats.clone(), - entry.compression, + entry.compress, entry.timescale, ) }; @@ -589,7 +590,7 @@ impl Subscriber { let res = tokio::select! { err = track.closed() => Err(err), err = group.closed() => Err(err), - res = self.run_group(stream, group.clone(), track_stats.clone(), compression, timescale) => res, + res = self.run_group(stream, group.clone(), track_stats.clone(), compress, timescale) => res, }; match res { @@ -613,7 +614,7 @@ impl Subscriber { stream: &mut Reader, mut group: GroupProducer, track_stats: Arc, - compression: Compression, + compress: bool, timescale: Option, ) -> Result<(), Error> { // Previous frame's raw timestamp value (in `timescale` units), for the @@ -622,11 +623,10 @@ impl Subscriber { let mut prev_ts: u64 = 0; loop { + // The first per-frame field present doubles as the group's end-of-frames + // sentinel: a clean stream end at a frame boundary means no more frames. + // Wire order: [Timestamp Delta? (timescale)] [Compression? (hint)] [Message Length]. let timestamp = if let Some(scale) = timescale { - // Publisher advertised a timescale, so every frame on this stream is - // prefixed with a zigzag-delta timestamp. The timestamp delta doubles - // as the per-frame sentinel: stream end here means the group has no - // more frames. let Some(zz) = stream.decode_maybe::().await? else { break; }; @@ -639,41 +639,54 @@ impl Subscriber { None }; - let Some(size) = stream.decode_maybe::().await? else { - break; + // Per-frame codec, present iff the track is compress-hinted. It's the + // sentinel when no timestamp delta precedes it. + let frame_compression = if compress { + let code = if timescale.is_some() { + stream.decode::().await? + } else { + let Some(code) = stream.decode_maybe::().await? else { + break; + }; + code + }; + Compression::from_code(code)? + } else { + None + }; + + // Message length, the sentinel only when neither field above was present. + let size = if timescale.is_some() || compress { + stream.decode::().await? + } else { + let Some(size) = stream.decode_maybe::().await? else { + break; + }; + size }; + if size > MAX_FRAME_SIZE { return Err(Error::FrameTooLarge); } - match compression { - Compression::None => { - let mut frame = group.create_frame(Frame { size, timestamp })?; - track_stats.frame(); - - if let Err(err) = self.run_frame(stream, &mut frame, &track_stats).await { - let _ = frame.abort(err.clone()); - return Err(err); - } - - frame.finish()?; - } - compression => { - // `size` is the compressed length; pull it off the wire, then - // inflate. The frame the consumer sees carries the original size. - let packed = stream.read_exact(size as usize).await?; - track_stats.frame(); - track_stats.bytes(size); - - let payload = compression.decompress(&packed)?; - let mut frame = group.create_frame(Frame { - size: payload.len() as u64, - timestamp, - })?; - frame.write(bytes::Bytes::from(payload))?; - frame.finish()?; - } + // Cache the bytes exactly as they arrive, tagged with the per-frame codec + // named on the wire. For a compressed frame the cache holds the packed + // bytes (no inflate on ingress): a relay forwards them verbatim to a peer + // that can inflate, and the model decodes only for one that can't. `size` + // is the on-wire (stored) length in either case. + let mut frame = group.create_frame(Frame { + size, + timestamp, + compression: frame_compression, + })?; + track_stats.frame(); + + if let Err(err) = self.run_frame(stream, &mut frame, &track_stats).await { + let _ = frame.abort(err.clone()); + return Err(err); } + + frame.finish()?; } Ok(()) @@ -824,11 +837,11 @@ impl TrackServe { // The codec/timescale then flow into every SUBSCRIBE and FETCH without a per- // response header. Older drafts have no TRACK stream: the request stays pending // until the first SUBSCRIBE_OK supplies the properties (see `establish`). - let (mut track, compression, timescale) = if self.subscriber.version.has_timestamps() { + let (mut track, compress, timescale) = if self.subscriber.version.has_timestamps() { match self.track_info().await { - Ok((info, compression)) => { + Ok((info, compress)) => { let timescale = info.timescale; - (Some(Track::Active(request.accept(info))), compression, timescale) + (Some(Track::Active(request.accept(info))), compress, timescale) } Err(err) => { tracing::warn!(broadcast = %self.subscriber.log_path(&self.path), track = %self.name, %err, "track info failed"); @@ -837,7 +850,7 @@ impl TrackServe { } } } else { - (Some(Track::Pending(request)), Compression::None, None) + (Some(Track::Pending(request)), false, None) }; let mut sub = Sub::None; @@ -914,7 +927,7 @@ impl TrackServe { match event { Event::Fetch(req) => { linger = None; - fetches.push(self.clone().serve_fetch(req, compression, timescale).maybe_boxed()); + fetches.push(self.clone().serve_fetch(req, compress, timescale).maybe_boxed()); } Event::Subscription(pref) => { linger = None; @@ -925,7 +938,7 @@ impl TrackServe { pref, supports_linger, completed, - compression, + compress, timescale, ) .await @@ -976,9 +989,10 @@ impl TrackServe { } /// Open a TRACK stream, read the single TRACK_INFO, and map it to the model's - /// [`crate::TrackInfo`] plus the wire [`Compression`] (needed verbatim to decode - /// frames). Lite05+ only. Bails if the broadcast dies meanwhile. - async fn track_info(&self) -> Result<(crate::TrackInfo, Compression), Error> { + /// [`crate::TrackInfo`] plus the `compress` hint (which gates the per-frame + /// `Compression` field on every group stream). Lite05+ only. Bails if the + /// broadcast dies meanwhile. + async fn track_info(&self) -> Result<(crate::TrackInfo, bool), Error> { let mut stream = Stream::open(&self.subscriber.session, self.subscriber.version).await?; stream.writer.encode(&lite::ControlType::Track).await?; stream @@ -1000,13 +1014,13 @@ impl TrackServe { // best-effort, not a guarantee), so the local retention window falls back to // the model default. let model = crate::TrackInfo { - compress: info.compression != Compression::None, + compress: info.compress, timescale: info.timescale, cache: crate::DEFAULT_CACHE, priority: info.priority, ordered: info.ordered, }; - Ok((model, info.compression)) + Ok((model, info.compress)) } /// Apply a subscription-demand change: open the upstream SUBSCRIBE on the first @@ -1019,7 +1033,7 @@ impl TrackServe { pref: Option, supports_linger: bool, completed: bool, - compression: Compression, + compress: bool, timescale: Option, ) -> Result<(), Error> { match pref { @@ -1035,7 +1049,7 @@ impl TrackServe { None => false, }; if establish { - self.establish(track, sub, subscription, compression, timescale).await?; + self.establish(track, sub, subscription, compress, timescale).await?; } } Sub::Active(active) if active.paused => { @@ -1097,7 +1111,7 @@ impl TrackServe { track: &mut Option, sub: &mut Sub, subscription: Subscription, - compression: Compression, + compress: bool, timescale: Option, ) -> Result<(), Error> { let id = self.subscriber.next_id.fetch_add(1, atomic::Ordering::Relaxed); @@ -1160,7 +1174,7 @@ impl TrackServe { TrackEntry { producer, stats: self.track_stats.clone(), - compression, + compress, timescale, }, ); @@ -1196,7 +1210,7 @@ impl TrackServe { /// come from this track's TRACK_INFO (already known), and the group sequence is /// implicit from the request. Runs to completion as an independent future in the /// serve loop's `FuturesUnordered`. - async fn serve_fetch(self, request: GroupRequest, compression: Compression, timescale: Option) { + async fn serve_fetch(self, request: GroupRequest, compress: bool, timescale: Option) { let TrackServe { mut subscriber, path, @@ -1234,9 +1248,12 @@ impl TrackServe { // Make the group available (resolving the downstream fetch) and fill it. The // TrackInfo only takes effect if the track isn't accepted yet (a fetch with no - // live subscription); otherwise the group inherits the accepted timescale. + // live subscription); otherwise the group inherits the accepted info. Carry the + // compress hint so a downstream re-serving this track still emits per-frame + // Compression fields that match the cached frames' codecs. let group_info = TrackInfo { timescale, + compress, ..Default::default() }; let mut producer = match request.accept(group_info) { @@ -1250,13 +1267,7 @@ impl TrackServe { }; let res = subscriber - .run_group( - &mut stream.reader, - producer.clone(), - track_stats, - compression, - timescale, - ) + .run_group(&mut stream.reader, producer.clone(), track_stats, compress, timescale) .await; match res { Ok(()) => { diff --git a/rs/moq-net/src/lite/track.rs b/rs/moq-net/src/lite/track.rs index 2470635e2..59bf23295 100644 --- a/rs/moq-net/src/lite/track.rs +++ b/rs/moq-net/src/lite/track.rs @@ -1,7 +1,7 @@ use std::borrow::Cow; use crate::{ - Compression, Path, Timescale, + Path, Timescale, coding::{Decode, DecodeError, Encode, EncodeError}, }; @@ -54,8 +54,11 @@ pub struct TrackInfo { /// Per-frame timestamp scale, or `None` if frames carry no timestamps. On the /// wire `None` is `0` and `Some(n)` is `n`. pub timescale: Option, - /// Codec applied to every frame payload on this track. - pub compression: Compression, + /// Boolean hint that this track's payloads are worth compressing. It names no + /// algorithm: that's negotiated per hop (SETUP) and named per frame. When set, + /// every FRAME on the track carries a per-frame `Compression` field. Wire values + /// `>1` are reserved and decode as `true`, so the hint stays additive. + pub compress: bool, } impl Message for TrackInfo { @@ -67,13 +70,14 @@ impl Message for TrackInfo { let priority = u8::decode(r, version)?; let ordered = u8::decode(r, version)? != 0; let timescale = Timescale::new(u64::decode(r, version)?).ok(); - let compression = Compression::from_code(u64::decode(r, version)?).map_err(|_| DecodeError::InvalidValue)?; + // Any non-zero value (including reserved `>1`) is the "worth compressing" hint. + let compress = u64::decode(r, version)? != 0; Ok(Self { priority, ordered, timescale, - compression, + compress, }) } @@ -85,7 +89,7 @@ impl Message for TrackInfo { self.priority.encode(w, version)?; (self.ordered as u8).encode(w, version)?; self.timescale.map(u64::from).unwrap_or(0).encode(w, version)?; - self.compression.to_code().encode(w, version)?; + (self.compress as u64).encode(w, version)?; Ok(()) } } @@ -99,7 +103,7 @@ mod test { priority: 7, ordered: false, timescale: Some(Timescale::MICRO), - compression: Compression::Deflate, + compress: true, } } @@ -116,7 +120,21 @@ mod test { assert_eq!(got.priority, 7); assert!(!got.ordered); assert_eq!(got.timescale, Some(Timescale::MICRO)); - assert_eq!(got.compression, Compression::Deflate); + assert!(got.compress); + } + + #[test] + fn track_info_compress_hint_is_additive() { + // Reserved wire values (>1) decode as the boolean hint `true`. + let mut buf = Vec::new(); + 7u8.encode(&mut buf, Version::Lite05Wip).unwrap(); + 0u8.encode(&mut buf, Version::Lite05Wip).unwrap(); + u64::from(Timescale::MICRO) + .encode(&mut buf, Version::Lite05Wip) + .unwrap(); + 9u64.encode(&mut buf, Version::Lite05Wip).unwrap(); // reserved compress value + let mut slice = buf.as_slice(); + assert!(TrackInfo::decode_msg(&mut slice, Version::Lite05Wip).unwrap().compress); } #[test] diff --git a/rs/moq-net/src/model/compression.rs b/rs/moq-net/src/model/compression.rs index 7f2b74e58..a0767efaa 100644 --- a/rs/moq-net/src/model/compression.rs +++ b/rs/moq-net/src/model/compression.rs @@ -1,8 +1,9 @@ //! Per-frame payload compression. //! -//! A publisher marks a [`crate::Track`] with `compress = true` when its frames are -//! worth compressing (e.g. a JSON catalog). The wire protocol then negotiates a -//! concrete [`Compression`] codec in SUBSCRIBE_OK, and every frame on that track is +//! A publisher marks a [`crate::Track`] with `compress = true` to hint its frames +//! are worth compressing (e.g. a JSON catalog). The wire then negotiates an +//! algorithm per hop (the SETUP `Compression` parameter) and names it per frame, so +//! a frame can opt out (`None`) when compression wouldn't shrink it. Each frame is //! compressed independently so the codec doesn't carry state across the lossy, //! out-of-order group boundary. @@ -10,26 +11,26 @@ use std::io::{Read, Write}; use crate::{Error, MAX_FRAME_SIZE, Result}; -/// The codec used to (de)compress frame payloads, negotiated per subscription. -#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +/// A frame-payload compression codec. "No compression" (verbatim) is the absence +/// of a codec, modeled as `Option::None` rather than a variant here, so the type +/// can't represent a meaningless "compress with nothing" and a negotiated algorithm +/// list can't list it. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub enum Compression { - /// Frames are sent verbatim. - #[default] - None, /// Raw DEFLATE (RFC 1951), no zlib/gzip header. QUIC already guarantees /// integrity, so the extra checksum bytes of zlib/gzip would be wasted. Deflate, } impl Compression { - /// Compress a whole frame payload. + /// Compress a whole frame payload with this codec. /// - /// [`Compression::None`] returns the input unchanged. The caller decides - /// whether the result is actually smaller; this just applies the codec. - pub fn compress(&self, data: &[u8]) -> Vec { + /// The caller decides whether the result is actually smaller; this just applies + /// the codec. Verbatim transfer is the absence of a codec, so it's handled by the + /// caller (an `Option` of `None`), not here. + pub fn compress(self, data: &[u8]) -> Vec { match self { - Self::None => data.to_vec(), Self::Deflate => { let mut encoder = flate2::write::DeflateEncoder::new(Vec::new(), flate2::Compression::default()); // Writing into a Vec is infallible. @@ -41,9 +42,8 @@ impl Compression { /// Decompress a whole frame payload, rejecting anything that inflates past /// `MAX_FRAME_SIZE` so a malicious peer can't zip-bomb the receiver. - pub fn decompress(&self, data: &[u8]) -> Result> { + pub fn decompress(self, data: &[u8]) -> Result> { match self { - Self::None => Ok(data.to_vec()), Self::Deflate => { // Read one byte past the limit so we can tell "exactly at the cap" // apart from "overflowed". @@ -58,19 +58,20 @@ impl Compression { } } - /// The varint code used on the wire. + /// This codec's wire varint code (always non-zero; verbatim is code `0`, which + /// has no codec — see [`Self::from_code`]). pub fn to_code(self) -> u64 { match self { - Self::None => 0, Self::Deflate => 1, } } - /// Parse a wire varint code, erroring on unknown codecs. - pub fn from_code(code: u64) -> Result { + /// Parse a wire varint code into an optional codec: `0` is verbatim (`None`); + /// other known codes are `Some`. Errors on an unknown codec. + pub fn from_code(code: u64) -> Result> { match code { - 0 => Ok(Self::None), - 1 => Ok(Self::Deflate), + 0 => Ok(None), + 1 => Ok(Some(Self::Deflate)), _ => Err(Error::Unsupported), } } @@ -80,15 +81,6 @@ impl Compression { mod test { use super::*; - #[test] - fn none_roundtrip() { - let data = b"the quick brown fox"; - let c = Compression::None; - let packed = c.compress(data); - assert_eq!(&packed, data); - assert_eq!(c.decompress(&packed).unwrap(), data); - } - #[test] fn deflate_roundtrip() { // Highly compressible input so we can assert the codec actually shrinks it. @@ -114,9 +106,12 @@ mod test { #[test] fn code_roundtrip() { - for c in [Compression::None, Compression::Deflate] { - assert_eq!(Compression::from_code(c.to_code()).unwrap(), c); - } + // A codec round-trips through its non-zero code; `0` is verbatim (`None`). + assert_eq!( + Compression::from_code(Compression::Deflate.to_code()).unwrap(), + Some(Compression::Deflate) + ); + assert_eq!(Compression::from_code(0).unwrap(), None); assert!(Compression::from_code(99).is_err()); } } diff --git a/rs/moq-net/src/model/frame.rs b/rs/moq-net/src/model/frame.rs index 3d83f5f0d..10c8863bd 100644 --- a/rs/moq-net/src/model/frame.rs +++ b/rs/moq-net/src/model/frame.rs @@ -5,7 +5,7 @@ use std::task::{Poll, ready}; use bytes::buf::UninitSlice; use bytes::{BufMut, Bytes}; -use crate::{Error, Result, Timestamp}; +use crate::{Compression, Error, Result, Timestamp}; /// Maximum payload size accepted for a single frame on the wire. /// @@ -29,7 +29,10 @@ pub(crate) const MAX_FRAME_SIZE: u64 = 32 * 1024 * 1024; #[derive(Clone, Debug, Default)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct Frame { - /// Total payload size in bytes. Declared up front so consumers can preallocate. + /// Number of *stored* payload bytes, declared up front so consumers can + /// preallocate. When [`Self::compression`] is `Some`, this is the compressed + /// length, which is smaller than the decoded payload that + /// [`FrameConsumer::read_all`] returns. pub size: u64, /// Presentation timestamp in the parent track's timescale. /// @@ -39,6 +42,18 @@ pub struct Frame { /// scale matches the track's [`crate::TrackInfo::timescale`]; the publisher /// surfaces a `ProtocolViolation` otherwise. pub timestamp: Option, + /// Codec the stored payload bytes are in, or `None` for verbatim bytes. + /// + /// A relay that ingests an already-compressed frame keeps the bytes packed in the + /// cache and records the codec here, so the cache holds (and bills for) the + /// compressed size. It is *not* the same as [`crate::TrackInfo::compress`]: that + /// flag asks the publisher to compress on egress, whereas this records what the + /// cached bytes already are (an origin marks a track `compress` yet writes its + /// frames verbatim, so they stay `None` here). [`FrameConsumer::read_all`] decodes + /// against this; the wire publishers read it to decide whether to pass the bytes + /// through or recode them for a peer. + #[cfg_attr(feature = "serde", serde(default, skip_serializing_if = "Option::is_none"))] + pub compression: Option, } impl Frame { @@ -57,13 +72,18 @@ impl From for Frame { Self { size: size as u64, timestamp: None, + compression: None, } } } impl From for Frame { fn from(size: u64) -> Self { - Self { size, timestamp: None } + Self { + size, + timestamp: None, + compression: None, + } } } @@ -72,6 +92,7 @@ impl From for Frame { Self { size: size as u64, timestamp: None, + compression: None, } } } @@ -81,6 +102,7 @@ impl From for Frame { Self { size: size as u64, timestamp: None, + compression: None, } } } @@ -369,10 +391,13 @@ impl FrameConsumer { } } - /// Poll for all remaining data without blocking. + /// Poll for the frame's full *decoded* payload without blocking. /// - /// Waits until the frame is finished (written == size); then returns the - /// remaining bytes from `read_idx` to the end as a single zero-copy slice. + /// Waits until the frame is finished, then returns the logical bytes: a + /// zero-copy slice for an uncompressed frame, or the inflated payload when + /// [`Frame::compression`] is set (so callers that don't speak the codec, like + /// an application or an IETF/old-lite peer, get usable bytes). Use + /// [`Self::read_chunk`] instead to stream the stored bytes verbatim. pub fn poll_read_all(&mut self, waiter: &kio::Waiter) -> Poll> { let read_idx = self.read_idx; let res = ready!(self.poll(waiter, |state| { @@ -387,17 +412,34 @@ impl FrameConsumer { match res { Ok(()) => { // Frame is finished: written == capacity. - let bytes = self - .snapshot(read_idx) - .unwrap_or_else(|| Bytes::from_owner(self.buf.clone()).slice(read_idx..read_idx)); self.read_idx = self.buf.capacity(); - Poll::Ready(Ok(bytes)) + let Some(codec) = self.info.compression else { + // Verbatim: hand back a zero-copy view of the remaining bytes. + let bytes = self + .snapshot(read_idx) + .unwrap_or_else(|| Bytes::from_owner(self.buf.clone()).slice(read_idx..read_idx)); + return Poll::Ready(Ok(bytes)); + }; + // Already drained by a prior read: nothing remains. + if read_idx >= self.buf.capacity() { + return Poll::Ready(Ok(Bytes::new())); + } + // Compressed: inflate the whole stored payload. DEFLATE needs the + // full stream from the start, so this decodes the entire buffer and + // ignores any partial `read_chunk` cursor (compressed frames are + // meant to be read whole). + match codec.decompress(self.buf.as_ref()) { + Ok(decoded) => Poll::Ready(Ok(Bytes::from(decoded))), + Err(e) => Poll::Ready(Err(e)), + } } Err(e) => Poll::Ready(Err(e)), } } - /// Return all of the remaining bytes, blocking until the frame is finished. + /// Return the frame's full decoded payload, blocking until it is finished. + /// + /// Inflates the payload when [`Frame::compression`] is set; see [`Self::poll_read_all`]. pub async fn read_all(&mut self) -> Result { kio::wait(|waiter| self.poll_read_all(waiter)).await } @@ -409,11 +451,13 @@ impl FrameConsumer { Poll::Ready(Ok(if bytes.is_empty() { Vec::new() } else { vec![bytes] })) } - /// Poll for the next chunk of bytes since the last read. + /// Poll for the next chunk of *stored* bytes since the last read. /// /// Returns whatever bytes have been written since the consumer's `read_idx` — /// could span multiple producer writes. Returns `None` once the frame is - /// finished and all bytes have been consumed. + /// finished and all bytes have been consumed. Unlike [`Self::read_all`], this + /// streams the bytes verbatim and does **not** decode [`Frame::compression`]; + /// it's the path a relay uses to forward a frame without inflating it. pub fn poll_read_chunk(&mut self, waiter: &kio::Waiter) -> Poll>> { let read_idx = self.read_idx; let res = ready!(self.poll(waiter, |state| { @@ -470,6 +514,7 @@ mod test { let mut producer = Frame { size: 5, timestamp: None, + compression: None, } .produce(); producer.write(Bytes::from_static(b"hello")).unwrap(); @@ -485,6 +530,7 @@ mod test { let mut producer = Frame { size: 10, timestamp: None, + compression: None, } .produce(); producer.write(Bytes::from_static(b"hello")).unwrap(); @@ -501,6 +547,7 @@ mod test { let mut producer = Frame { size: 10, timestamp: None, + compression: None, } .produce(); producer.write(Bytes::from_static(b"hello")).unwrap(); @@ -524,6 +571,7 @@ mod test { let mut producer = Frame { size: 10, timestamp: None, + compression: None, } .produce(); producer.write(Bytes::from_static(b"hello")).unwrap(); @@ -541,6 +589,7 @@ mod test { let mut producer = Frame { size: 5, timestamp: None, + compression: None, } .produce(); producer.write(Bytes::from_static(b"hi")).unwrap(); @@ -553,6 +602,7 @@ mod test { let mut producer = Frame { size: 3, timestamp: None, + compression: None, } .produce(); let err = producer.write(Bytes::from_static(b"toolong")).unwrap_err(); @@ -564,6 +614,7 @@ mod test { let mut producer = Frame { size: 5, timestamp: None, + compression: None, } .produce(); let mut consumer = producer.consume(); @@ -578,6 +629,7 @@ mod test { let mut producer = Frame { size: 0, timestamp: None, + compression: None, } .produce(); producer.finish().unwrap(); @@ -592,6 +644,7 @@ mod test { let mut producer = Frame { size: 5, timestamp: None, + compression: None, } .produce(); let mut consumer = producer.consume(); @@ -612,6 +665,7 @@ mod test { let mut producer = Frame { size: 12, timestamp: None, + compression: None, } .produce(); assert_eq!(producer.remaining_mut(), 12); @@ -632,6 +686,7 @@ mod test { let mut producer = Frame { size: 4, timestamp: None, + compression: None, } .produce(); // Safety violation on purpose: cnt > remaining_mut(). @@ -643,6 +698,7 @@ mod test { let mut producer = Frame { size: 6, timestamp: None, + compression: None, } .produce(); let mut consumer = producer.consume(); @@ -667,6 +723,7 @@ mod test { let mut producer = Frame { size: 10, timestamp: None, + compression: None, } .produce(); let mut c1 = producer.consume(); @@ -686,4 +743,57 @@ mod test { let chunk = c2.read_chunk().now_or_never().unwrap().unwrap(); assert_eq!(chunk, Some(Bytes::from_static(b"world"))); } + + /// A compressed frame keeps the packed bytes in the cache: `read_all` + /// inflates them to the logical payload, while `read_chunk` streams them + /// verbatim (the relay-passthrough path). `size` is the compressed length. + #[test] + fn compressed_frame_decode_vs_verbatim() { + let original = vec![b'x'; 4096]; + let packed = Compression::Deflate.compress(&original); + assert!(packed.len() < original.len(), "deflate should shrink repetitive data"); + + let mut producer = Frame { + size: packed.len() as u64, + timestamp: None, + compression: Some(Compression::Deflate), + } + .produce(); + producer.write(Bytes::from(packed.clone())).unwrap(); + producer.finish().unwrap(); + + // read_all inflates to the original payload. + let mut decoded = producer.consume(); + assert_eq!( + decoded.read_all().now_or_never().unwrap().unwrap(), + Bytes::from(original) + ); + + // read_chunk hands back the stored (compressed) bytes untouched. + let mut verbatim = producer.consume(); + let chunk = verbatim.read_chunk().now_or_never().unwrap().unwrap().unwrap(); + assert_eq!(&chunk[..], &packed[..]); + + // The header reports the stored size and codec. + assert_eq!(decoded.size, packed.len() as u64); + assert_eq!(decoded.compression, Some(Compression::Deflate)); + } + + /// `read_all` on a compressed frame surfaces a decode error for corrupt + /// stored bytes rather than handing back garbage. + #[test] + fn compressed_frame_read_all_rejects_garbage() { + let mut producer = Frame { + size: 4, + timestamp: None, + compression: Some(Compression::Deflate), + } + .produce(); + producer.write(Bytes::from_static(b"\xff\xff\xff\xff")).unwrap(); + producer.finish().unwrap(); + + let mut consumer = producer.consume(); + let err = consumer.read_all().now_or_never().unwrap().unwrap_err(); + assert!(matches!(err, Error::Decompress)); + } } diff --git a/rs/moq-net/src/model/group.rs b/rs/moq-net/src/model/group.rs index 470bca67d..d7525a2ce 100644 --- a/rs/moq-net/src/model/group.rs +++ b/rs/moq-net/src/model/group.rs @@ -698,6 +698,7 @@ mod test { let frame = Frame { size: 3, timestamp: Some(Timestamp::from_micros(42).unwrap()), + compression: None, }; assert!(matches!(producer.create_frame(frame), Err(Error::TimestampMismatch))); } @@ -711,6 +712,7 @@ mod test { let frame = Frame { size: 3, timestamp: None, + compression: None, }; assert!(matches!(producer.create_frame(frame), Err(Error::TimestampMismatch))); } @@ -724,6 +726,7 @@ mod test { let frame = Frame { size: 3, timestamp: Some(Timestamp::from_millis(1).unwrap()), // millis, not micros + compression: None, }; assert!(matches!(producer.create_frame(frame), Err(Error::TimestampMismatch))); } @@ -737,6 +740,7 @@ mod test { let frame = Frame { size: 1, timestamp: Some(Timestamp::from_micros(7).unwrap()), + compression: None, }; let mut writer = producer.create_frame(frame).unwrap(); writer.write(Bytes::from_static(b"x")).unwrap(); diff --git a/rs/moq-net/src/server.rs b/rs/moq-net/src/server.rs index 41f4c10fd..2d0f6bc2e 100644 --- a/rs/moq-net/src/server.rs +++ b/rs/moq-net/src/server.rs @@ -140,6 +140,8 @@ impl Server { let our_setup = lite::Setup { probe: lite::ProbeLevel::Report, path: None, + // We can inflate DEFLATE, so a peer may compress what it sends us. + compression: vec![crate::Compression::Deflate], }; // Server side never blocks on the initial set; discard the synced receiver.