Skip to content

moq-gst: replace moqsink with a rewritten, unit-tested core#1771

Open
arielmol wants to merge 16 commits into
moq-dev:mainfrom
arielmol:gstsink
Open

moq-gst: replace moqsink with a rewritten, unit-tested core#1771
arielmol wants to merge 16 commits into
moq-dev:mainfrom
arielmol:gstsink

Conversation

@arielmol

@arielmol arielmol commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

Summary

Replaces the moqsink element with a rewritten core that separates the timeline,
per-pad state, and session lifecycle into independently testable pieces. The old sink
interleaved connection, pads, timing, finalization, and errors, with no dedicated unit
tests and ~16% line coverage. The new sink module has a dedicated unit suite (timeline,
segment classification, session lifecycle, flush, per-pad generations); the moq-gst
crate's suite is 59 unit + 3 element tests.

It is a compatible superset of the old element: same element name, same sink_%u
request pads, the same seven codec caps (H.264/H.265 byte-stream/au, AV1, VP8, VP9,
AAC raw, Opus), and the same three writable properties (url, broadcast,
tls-disable-verify). It adds three read-only diagnostics: connected, moq-version,
estimated-send-bitrate.

New behavior: shared running-time continuity (no per-pad anchoring), explicit pad
membership with per-pad generations, per-pad error isolation, B-frame support
(decode-order frames, no PTS monotonicity filter), a frame-size cap, and FLUSH handling
(timeline re-anchor: drop to NoSegment, recover on the next SEGMENT).

History note: the branch builds the core as a parallel moqsinkspike element, then
promotes it to moqsink (rename + remove the old), then applies post-review polish.
Squash-merge is fine.

Session handoff (non-blocking)

The session handoff is a non-blocking unbounded channel: chain accepts each buffer
toward the session worker and returns, never blocking the streaming thread. On the
connected steady path the worker writes into the MoQ producers, whose cache owns
retention and eviction, so loss under congestion is governed by MoQ cache/retention
semantics rather than by stalling the encoder. moqsink is therefore not a GStreamer
backpressure boundary; pacing a non-live/file source belongs upstream (for example
identity sync=true).

This replaces an earlier bounded-channel design, where a send blocked on a full channel
could not be interrupted by an out-of-band FLUSH_START and so could wedge a
seek/teardown. With a non-blocking handoff there is no blocked send, so the
flush-cancellation apparatus is gone: the bounded channel, send_or_flush/SendOutcome,
and the per-pad flush watch gates that cut a blocked send (plus the AddPad/DropPad
blocking sends). FlushSignal and the per-pad generations stay; FlushSignal now carries
only the timeline re-anchor. FLUSH re-anchors a pad's timeline (drop to NoSegment, recover
on the next SEGMENT), not an anti-deadlock cut.

Behavior differences vs the old moqsink (intentional, not regressions)

  • Stricter caps. AAC requires mpegversion=4 + stream-format=raw; H.264/H.265
    require stream-format=byte-stream + alignment=au; a non-unit-rate SEGMENT
    invalidates the timeline. The old sink validated none of this in code (only the pad
    template), so it accepted inputs the rewrite now rejects per-pad. The seven codec
    types are unchanged.
  • Absolute running-time origin. Timestamps use shared running time (for A/V
    alignment) instead of the old per-pad reference_pts rebasing, so a late-joining live
    pipeline emits a large first timestamp rather than ~0.

Testing

  • Unit + lint: 59 unit + 3 element tests (crate-wide), cargo clippy -D warnings,
    cargo fmt.
  • Live e2e: a fleet of senders (x86_64 + aarch64) publishing to a real relay;
    subscribing back confirmed decoded frames with no errors.
  • Transport failure: connect-failure (hermetic element test); mid-stream relay death
    (SIGKILL the local relay -> session error on the bus + non-zero exit ~35 s after the
    kill, consistent with QUIC's ~30 s idle timeout for a silent peer; a graceful close is
    instant; this latency is shared with the old sink).
  • Memory-leak, local soak (dev box, 35 min): x264 -> moqsink in isolation held RSS
    flat (24 consecutive identical samples) with zero leaked GstMiniObjects at teardown.
  • Memory-leak, overnight bisection (target node, about 12-14 h): msdk -> moqsink vs
    msdk -> fakesink (differ only in the sink) showed identical flat RSS slopes
    (about +0.07 KB/min, R^2~=0.1), so moqsink adds no measurable leak over the encoder.

Notes

  • Public API: no change to any pub item in library crates; internal to rs/moq-gst
    (a plugin), plus three property rows in doc/bin/gstreamer.md.
  • Targets main (no wire or public library API change).
  • Known/accepted: moq_net::Session::closed() always returns Err, so a clean remote
    close surfaces as a bus ERROR rather than EOS.
  • Future work (out of scope): building the session producers up front and writing
    directly (removing the data channel and its connect-window residual buffer), non-VCL
    NAL accumulation in moq-mux::import, Framed::discontinuity() for an AU split across
    buffers, stats/observability, a latency benchmark harness.

(Co-Written by Claude)

arielmol and others added 13 commits June 15, 2026 14:10
A parallel sink element (moqsinkspike, Rank::NONE) reimplementing the moqsink core with isolated, testable seams: pure timeline policy, per-pad state, a bounded data channel with cancellation independent of the data path, single finalization (catalog last), and observable connection status. The production moqsink is left untouched.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
moq-gst is outside default-members (needs GStreamer), so 'just rs test' skips it. Add 'just rs test-moq-gst' and chain it into 'just rs ci' so the spike's tests gate. The CI workflow only needs gstreamer, which the nix dev shell already provides; that step is applied at PR time.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…tate)

Judge SEGMENT continuity in running time, not media origin: classify_segment keys on the running-time base, so a moved start stays continuous while base does not rewind (the old start-equality rule wrongly rejected this). Map timestamps with to_running_time_full (signed) so a buffer before the segment is dropped, never clamped to zero. Add an explicit per-pad state (NoSegment/Active/Invalid): a discontinuity invalidates the pad and the next valid SEGMENT re-anchors it.

Also applies rustfmt to the spike baseline, which was committed before a fmt pass.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
GStreamer only registers plugins whose license is in its recognised set. The plugin_define! license was 'Apache 2.0', which gst rejects ('invalid license, not loading'), so the moq plugin (moqsink/moqsrc) never loads via the registry. The crate is MIT OR Apache-2.0, so declare the MIT side ('MIT/X11').

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
PadSet inferred membership lazily from CAPS, so EOS aggregation (eos.len() == pads.len()) was wrong when EOS arrived before CAPS. Make membership authoritative: an 'active' set populated by AddPad (sent from request_new_pad, and seeded at session start for pads requested earlier), with EOS counted against it. Every data message carries its pad's generation, captured per incarnation, so a pad recreated with the same name discards the previous one's in-flight messages. Covers EOS-before-CAPS, duplicate EOS, late members, buffers after EOS, and a release that completes the element.

Adds a hermetic element test (request/release) plus gstreamer as a dev-dependency. Session-dependent flows (multipad EOS, per-pad errors) stay in the relay-backed harness.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
A bad caps or bitstream now invalidates only the offending pad instead of tearing down the session: the worker drops that pad's producer and marks it failed in the shared Status, and the chain returns a FlowError on the pad's next buffer (one buffer later, since the worker is async) rather than silently dropping. Session-level failures (a closed catalog) still propagate to element_error! on the bus. Unsupported caps are also rejected synchronously in the event handler (NotNegotiated) before reaching the worker.

Hermetic tests: worker-side pad-vs-session error classification and the caps validator; element-level invalid-settings StateChangeError and connect-failure to the bus. The per-pad FlowError observation and the live error flows need a session, so they stay in the relay-backed harness.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Post-review fixes to the parallel spike core, all hermetically tested:

- Post the element EOS only after the catalog finalizes cleanly; a
  finalize failure now surfaces as element_error instead of a silent log
  (run_loop returns an ExitReason, finalize_all returns a Result).
- Enforce per-pad frame-timestamp monotonicity: a frame that would
  regress below the last emitted one is dropped, not emitted out of order.
- Make a failed pad terminal: its track is finalized and it stops
  blocking EOS aggregation, so the element completes on the good pads.
- Key the per-pad failure set by generation so a pad recreated with the
  same name never inherits a previous incarnation's failure.
- Require byte-stream/au in caps_supported (what FramedFormat::Avc3
  actually consumes), not just the media type.
- Reject a buffer past the MoQ frame limit (16 MiB) before copying, so
  hostile input cannot drive an unbounded allocation.
- Test honesty: rename tests that over-promised, add the buffer/bitstream
  decode-error path, and assert connected=false on a failed connect.

CONTEXT

Rejected alternatives:
- Escalating a pad failure to a session/element error: per-pad isolation
  is the contract, so a bad pad is finalized and counts as terminal,
  never fatal to the session.
- A transport test seam to make session-level flows hermetic: that is
  architecture-for-tests; multipad EOS and the per-pad FlowError
  observation are validated against a real relay instead.
- A loopback connect endpoint (127.0.0.1:1) for the connect-failure test:
  measured to ride the QUIC idle timeout (no fast localhost-UDP reject),
  so the fast RFC-invalid DNS host was kept.
- A spike-side cap for the importer's non-VCL NAL accumulation: the spike
  has no "frame not emitted" signal, so that cap belongs in the moq-mux
  importer.

Key decisions:
- EOS must not claim success the broadcast did not reach: it waits for
  catalog.finish() to succeed, and a failure becomes element_error.
- Segment-base continuity alone does not stop a frame from regressing:
  monotonicity is enforced at the frame level (last_emitted), not only at
  the segment boundary.
- The frame cap is aligned to moq-net's MAX_FRAME_SIZE: a larger frame
  could not be consumed anyway.
…ough)

Close the codec gap in the parallel spike so it accepts the same media as
moqsink, without regressing the rewrite's seams (shared running time,
per-pad membership, per-pad error isolation, EOS-after-finalize):

- Expand the pad template and replace the hardcoded Avc3 path with a
  caps -> Framed factory over seven media types: H.264/H.265
  (byte-stream/au), AV1, VP8, VP9, AAC (mpegversion=4, raw), Opus. Every
  codec converges on one Framed; only the construction from caps differs.
- Keep caps_supported media-type only. The factory enforces per-codec
  specifics (byte-stream/au, AAC mpegversion/raw plus codec_data) and
  derives Opus channels/rate with defaults, failing just that pad on a
  bad or missing detail, never the session.
- finalize only finishes an initialized producer. A lazy importer
  (H.265/AV1/VP8/VP9) given CAPS then EOS with no frame never created its
  track, so finish() would error "not initialized"; it now no-ops while a
  real finish error on a live track still surfaces.
- Tests: strong frame-through for H.264 and Opus that observe a real
  emitted frame on the rendition track (not just the catalog rendition);
  creation tests for H.265/AV1/VP8/VP9/AAC; negatives for length-prefixed
  H.264, byte-stream-less H.265, AAC without codec_data, and AAC with
  wrong mpegversion; a clean CAPS-then-EOS test for the lazy codecs;
  caps_supported retargeted to the seven media types.

CONTEXT

Rejected alternatives:
- Per-codec field checks inside caps_supported: it stays thin and
  media-type only, so the factory is the one enforcement point and a bad
  detail fails the pad, not the session.
- A new MediaPad/MediaProducer abstraction: Pad already holds an
  Option<Framed>, so a factory function is enough.
- Frame-through fixtures for all seven codecs: only H.264 and Opus get
  strong frame-through; the rest get creation tests, with real
  transmission left to the relay harness.
- A moq-mux change to make a lazy finish() tolerant: the no-track no-op
  is kept spike-local in Pad::finalize via the existing Framed::track()
  probe, so moq-mux behavior is unchanged and real finish errors on live
  tracks still propagate.
- Reintroducing the production sink's per-pad reference_pts anchoring: the
  spike keeps its shared running time.

Key decisions:
- caps_supported is media-type only; per-codec specifics live in the
  factory, which fails the pad (not the session) on a bad or missing
  detail.
- A frame-through test must observe a real emitted frame: it subscribes
  to the rendition track and asserts latest().is_some(), because the
  catalog rendition is published from the SPS/config and would pass with
  no frame at all.
- CAPS then EOS before the first frame must be clean: a lazy importer
  never created its track, so finalize no-ops instead of turning "not
  initialized" into a session error.
…ecycle races)

Production-hardening pass over the parallel spike, all hermetically tested
(62 unit + 3 element, clippy -D warnings, fmt). FLUSH plus the fixes from an
external broadcast review:

- FLUSH: handle FLUSH_START/STOP as a discontinuity plus a cancellation. The
  worker re-anchors the pad's timeline (NoSegment) so a post-seek rewinding
  SEGMENT is accepted instead of rejected, and a per-pad watch cuts a chain
  blocked on the bounded channel without tearing the session down. run_loop is
  biased so the out-of-band flush re-anchors before the post-flush SEGMENT that
  rides the data FIFO.
- B-frames: drop the per-frame PTS monotonicity filter. The moq-mux container
  carries frames in decode order and documents that B-frames have non-monotonic
  presentation timestamps, so the filter silently dropped every B-frame (30-60%
  of frames on High-profile content). The wire order is already decode order.
- Pad lifecycle: populate the membership maps before add_pad, roll back only the
  entry this attempt still owns on failure, and announce AddPad only after
  add_pad succeeds, so a duplicate or failed add never orphans the live pad nor
  seeds a phantom that blocks EOS aggregation forever.
- Status races on restart: a session generation token. All observable state
  (connected, version, send_bitrate, failed) lives under one Mutex so "check the
  live generation, then write" is indivisible; a stale session's late writes and
  exit-reset become no-ops. failed is keyed by (session generation, pad
  generation), so a restart reusing the same pad never inherits a stale failure.
- Worker panic: run the session worker on an inner task and surface a JoinError
  (panic or cancel) as element_error, instead of a silent death only observed
  when stop() reaps the JoinHandle.
- Refactor: bundle the worker's inbound channels into one Inbound struct.

CONTEXT

Rejected alternatives:
- Enforcing frame monotonicity on the DTS instead of removing the filter:
  best-practice in the abstract, but it needs DTS threaded through every layer
  and only guards against out-of-decode-order delivery, which GStreamer does not
  produce. Removing the filter to match the container contract is simpler and
  correct.
- Gating only the exit-reset by generation: a stale session could still write
  connected/version/bitrate. Every observable write is gated.
- Generation-gating with separate atomics: a load-then-store is not an atomic
  check-and-write, so a stale write can still interleave between the check and
  the store. One Mutex over the observable state makes it indivisible.
- Serializing request_new_pad under a lock held across add_pad: add_pad emits
  pad-added synchronously, so a reentrant handler would deadlock on that lock, a
  worse failure than the narrow same-name-concurrent window it would close. That
  window is left documented, not closed.
- A moq-mux Framed::discontinuity() for the FLUSH partial-AU reset: kept this
  change moq-gst-only. The partial-AU edge is documented and low-risk with
  alignment=au (one complete access unit per buffer).
- catch_unwind via the futures crate for the worker panic: futures is not a
  direct dependency, so an inner task plus JoinError is used instead.

Key decisions:
- FLUSH cancellation uses a watch<bool> per pad: no new dependency, and per-pad
  so flushing one input never cancels another input's blocked send. send_or_flush
  is biased toward the flush arm and re-checks the watch after winning a channel
  permit, which narrows (does not atomically close) the capacity-vs-FLUSH_START
  tie; the residual one-buffer leak is absorbed by the re-anchor.
- The failure set carries both generations; is_failed matches the live session,
  so neither a recreated pad nor a recreated session inherits a stale failure.
- Honest test coverage: decode-order B-frame emit, generation scoping, and
  blocked-send cancellation have discriminating tests. The panic-to-bus path and
  the load-vs-store / same-name-concurrent races are covered by construction
  (a lock) or documented, not by a deterministic test, since they need a real
  session or an injected race.
The parallel moqsinkspike element, built across the preceding commits, replaces
the original moqsink. Remove the old implementation, move the new core into
src/sink/, and register it as "moqsink" with the GType name kept as "MoqSink"
for registration continuity.

Behavior is a compatible superset of the old element: the same request sink
pads, the same seven codec caps, and the same three writable properties (url,
broadcast, tls-disable-verify), plus three read-only diagnostics (connected,
moq-version, estimated-send-bitrate).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…solation)

Post-review fixes to the rewritten moqsink, all internal to rs/moq-gst:

- The FLUSH watch is keyed to the pad's lifetime, not the session's. A
  FLUSH_START whose FLUSH_STOP never arrived before a PAUSED->READY->PAUSED
  restart left that pad muted across the new session. Clear every watch at
  session start (reset_pad_flushes), before the new session is visible.
- A pad release is reconfiguration, not end-of-stream: DropPad no longer posts
  element EOS. The genuine "all pads ended" completion still fires from the EOS
  handler, so drop_pad now returns ().
- A failed finish() at EOS is isolated to that pad, like a per-pad data error,
  instead of escalating to a session error that kills the whole broadcast. This
  splits fail_pad into mark_pad_failed_terminal (mark failed + terminal, no
  producer touch) so the EOS path never double-finalizes, and Pad::finalize
  takes the producer up front (let-else) to make its attempt-once contract
  explicit.
- handle_data_msg returns Result<bool> instead of Result<Option<ExitReason>>;
  the only variant it could produce was Ended, so run_loop now constructs every
  ExitReason in one place.
- Fix a comment on the worker's biased select: FLUSH_START arrives out of band
  (a different thread), so the bias only wins a readiness tie, not a
  happens-before against the bounded data channel.

CONTEXT
Rejected alternatives:
- FLUSH restart: a "check session first" guard in the FlushStart handler. It
  narrows a window that reset-on-start already closes fully and reinforces a
  wrong race-condition reading; dropped for the single reset.
- EOS finalize error: calling fail_pad directly. It would re-finalize the
  producer; the mark_pad_failed_terminal split avoids the double-finalize.
Key decisions:
- Isolating finalize errors per pad matches how mid-stream caps/buffer errors
  are already handled; genuinely fatal failures still surface via the session,
  catalog, and transport paths.
- The send_or_flush post-permit window (one pre-flush buffer can still enqueue)
  is accepted and documented in place: the worker re-anchor absorbs it.

Verified: 65 unit + 3 element tests, cargo clippy -D warnings, cargo fmt.

Memory-leak soak (moqsink does not leak): a 35 min x264->moqsink run held RSS
flat with 0 leaked gst objects; a ~12 h msdk->moqsink vs msdk->fakesink
bisection showed identical flat RSS, so moqsink adds no leak over the encoder.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Walkthrough

The moq-gst sink element is refactored from a monolithic in-file async implementation into a GObject shell (imp.rs) that delegates all networking and publishing to a new async session.rs worker module. Pure segment and frame timestamp classification is extracted into a new stateless timeline.rs module. MoqSink gains per-pad generation counters for flush sequencing, a shared Status handle for observable session state, and three new read-only GObject monitoring properties: connected, moq-version, and estimated-send-bitrate. The plugin license metadata is corrected to "MIT/X11", the gst crate is moved to [dev-dependencies], CI is updated to run moq-gst tests explicitly, and hermetic element-boundary integration tests are added.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The PR title accurately and concisely describes the primary change: rewriting the moqsink element with improved unit test coverage and modular architecture.
Docstring Coverage ✅ Passed Docstring coverage is 95.52% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description check ✅ Passed The pull request description clearly relates to the changeset, describing a comprehensive rewrite of the moqsink GStreamer element with detailed context on objectives, behavior changes, testing, and known limitations.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
✨ Simplify code
  • Create PR with simplified code

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (6)
rs/moq-gst/src/sink/timeline.rs (2)

12-23: ⚡ Quick win

Mark public decision enums as #[non_exhaustive] for additive evolution.

These public enums are likely to grow (new reject/drop reasons or decision forms), so callers should not be forced into exhaustive matching now.

Proposed compatibility patch
 #[derive(Debug, PartialEq, Eq)]
+#[non_exhaustive]
 pub enum SegmentDecision {
 	Accept,
 	Reject(&'static str),
 }
 
 #[derive(Debug, PartialEq, Eq)]
+#[non_exhaustive]
 pub enum FrameDecision {
 	/// Emit at this MoQ timestamp (micros).
 	Emit(u64),
 	Drop(&'static str),
 }

As per coding guidelines, "Add #[non_exhaustive] to public enums that may gain variants in the future".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-gst/src/sink/timeline.rs` around lines 12 - 23, Add the
#[non_exhaustive] attribute above the SegmentDecision enum and the FrameDecision
enum definitions. This will allow these public enums to gain new variants in the
future without breaking existing code that relies on exhaustive pattern
matching. Place the attribute on the line immediately before each #[derive]
attribute for both enums.

Source: Coding guidelines


4-23: ⚡ Quick win

Add Rustdoc on exported symbols and all public fields.

SegmentInfo, SegmentDecision, and FrameDecision are public but missing item-level /// docs, and SegmentInfo::rate is undocumented.

Proposed doc-comment patch
 #[derive(Debug, Clone, Copy, PartialEq)]
+/// Normalized SEGMENT metadata used by timeline classification.
 pub struct SegmentInfo {
 	/// Only TIME segments map to a media timeline (not BYTES/DEFAULT).
 	pub time_format: bool,
+	/// Playback rate for the segment; only 1.0 is currently accepted.
 	pub rate: f64,
 	/// Running-time anchor of the segment; continuity is judged on this, not on `start`.
 	pub base_nanos: u64,
 }
 
 #[derive(Debug, PartialEq, Eq)]
+/// Decision for whether an incoming SEGMENT keeps timeline continuity.
 pub enum SegmentDecision {
 	Accept,
 	Reject(&'static str),
 }
 
 #[derive(Debug, PartialEq, Eq)]
+/// Decision for whether a frame should be emitted or dropped.
 pub enum FrameDecision {
 	/// Emit at this MoQ timestamp (micros).
 	Emit(u64),
 	Drop(&'static str),
 }

As per coding guidelines, "Document every exported Rust symbol with doc comments (///), including every pub item and module-level docs (//! block at module root)".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-gst/src/sink/timeline.rs` around lines 4 - 23, Add Rustdoc comments
(using `///`) to all public items and fields that are currently missing
documentation. Specifically, add a doc comment to the SegmentInfo struct itself,
the rate field within SegmentInfo, the SegmentDecision enum, the Accept and
Reject variants of SegmentDecision, the FrameDecision enum, and the Drop variant
of FrameDecision. Each doc comment should clearly describe the purpose and usage
of the item or field it documents, following the existing documentation style
used for other fields like time_format and base_nanos in SegmentInfo.

Source: Coding guidelines

rs/moq-gst/src/sink/session.rs (4)

25-26: 💤 Low value

Add doc comment for the public debug category.

The coding guideline requires doc comments on every exported symbol. CAT is pub static but lacks a /// doc comment.

+/// GStreamer debug category for the MoQ sink element.
 pub static CAT: LazyLock<gst::DebugCategory> =
 	LazyLock::new(|| gst::DebugCategory::new("moq-sink", gst::DebugColorFlags::empty(), Some("MoQ Sink Element")));
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-gst/src/sink/session.rs` around lines 25 - 26, Add a doc comment to
the public static variable CAT to document its purpose. Place a /// comment
directly above the CAT declaration that describes what this debug category is
used for in the MoQ Sink element, such as "The debug category for the MoQ Sink
element" or similar description that explains its role in the module.

Source: Coding guidelines


138-167: ⚡ Quick win

Add #[non_exhaustive] to the public DataMsg enum.

Per coding guidelines, public enums that may gain variants in the future should be marked #[non_exhaustive]. DataMsg could plausibly add new event types (e.g., Gap, StreamStart, CustomEvent).

+/// Messages sent from the GObject shell to the session worker over the bounded data channel.
+#[non_exhaustive]
 pub enum DataMsg {
 	AddPad {
 		pad: String,
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-gst/src/sink/session.rs` around lines 138 - 167, The public DataMsg
enum is missing the #[non_exhaustive] attribute, which should be added per
coding guidelines to indicate that new variants may be introduced in the future.
Add the #[non_exhaustive] macro attribute directly above the pub enum DataMsg
definition to allow the enum to be extended without breaking existing code that
performs exhaustive pattern matching.

Source: Coding guidelines


48-51: 💤 Low value

Add doc comment for the public Status type.

Status is a pub struct but lacks a /// doc comment. The internal comment on lines 31-34 explains the design well but should be converted to a doc comment for API documentation.

+/// Shared observable state, read by the element's getters without touching the worker task.
+///
+/// One `Mutex` covers the session generation plus every gated field, so "check the live
+/// generation, then write" is one indivisible step.
 #[derive(Default)]
 pub struct Status {
 	inner: Mutex<StatusInner>,
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-gst/src/sink/session.rs` around lines 48 - 51, The public struct
`Status` is missing a doc comment which should be added using the `///`
documentation syntax. Add a doc comment above the `Status` struct definition
that explains its purpose and design. The explanation should be derived from the
internal comments found in the StatusInner section (lines 31-34) and should
provide clear API documentation for users of this public type. Convert the
relevant design details from those internal comments into a proper public
documentation comment for the `Status` struct.

Source: Coding guidelines


890-894: Log timestamp overflow instead of silently ignoring it.

Line 892 uses .ok() to silently convert a potential TimeOverflow error to None, allowing frames to be emitted without timestamp information. While overflow is unlikely for typical media durations, this pattern masks errors without logging or documenting why it's safe to ignore.

Consider logging the overflow if it occurs:

 match decision {
 	FrameDecision::Emit(micros) => {
-		let ts = hang::container::Timestamp::from_micros(micros).ok();
+		let ts = match hang::container::Timestamp::from_micros(micros) {
+			Ok(ts) => Some(ts),
+			Err(e) => {
+				gst::warning!(CAT, "timestamp conversion failed for {micros} µs: {e}");
+				None
+			}
+		};
 		framed.decode_frame(&mut data, ts).map_err(|err| anyhow::anyhow!(err))
 	}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-gst/src/sink/session.rs` around lines 890 - 894, In the
FrameDecision::Emit handler, the call to Timestamp::from_micros(micros).ok()
silently converts any TimeOverflow error to None, which masks potential
timestamp conversion failures. Instead of using .ok() to suppress the error,
check the result of Timestamp::from_micros(micros) and log a warning or error
message when overflow occurs before proceeding with the frame decode. This
ensures that timestamp overflow issues are visible in logs for debugging
purposes while still allowing the frame to be processed if needed.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@rs/moq-gst/src/sink/imp.rs`:
- Around line 271-274: The blocking_send calls for DataMsg::AddPad and
DataMsg::DropPad control messages lack cancellation support, which can cause
indefinite blocking if the bounded data channel fills or the session connection
stalls. Replace the uncancellable blocking_send approach used in the
sender.blocking_send call with a cancellable alternative that matches the data
path's pattern of send_or_flush with per-pad flush cancellation. Ensure both the
AddPad message (around line 273) and the DropPad message (around line 292) are
updated to use the same cancellation mechanism to prevent these control-path
operations from blocking indefinitely under channel pressure.

In `@rs/moq-gst/tests/element.rs`:
- Around line 47-55: In the connect_failure_posts_error_to_bus test function,
replace the URL property value that uses the nonexistent.invalid hostname with a
loopback address and closed port (such as a localhost IP with port 1) to ensure
immediate connection refusal instead of relying on DNS timeout. This will make
the test deterministic by forcing a connection refused error that is not
dependent on DNS resolution timing, while still exercising the same bus error
handling path.

---

Nitpick comments:
In `@rs/moq-gst/src/sink/session.rs`:
- Around line 25-26: Add a doc comment to the public static variable CAT to
document its purpose. Place a /// comment directly above the CAT declaration
that describes what this debug category is used for in the MoQ Sink element,
such as "The debug category for the MoQ Sink element" or similar description
that explains its role in the module.
- Around line 138-167: The public DataMsg enum is missing the #[non_exhaustive]
attribute, which should be added per coding guidelines to indicate that new
variants may be introduced in the future. Add the #[non_exhaustive] macro
attribute directly above the pub enum DataMsg definition to allow the enum to be
extended without breaking existing code that performs exhaustive pattern
matching.
- Around line 48-51: The public struct `Status` is missing a doc comment which
should be added using the `///` documentation syntax. Add a doc comment above
the `Status` struct definition that explains its purpose and design. The
explanation should be derived from the internal comments found in the
StatusInner section (lines 31-34) and should provide clear API documentation for
users of this public type. Convert the relevant design details from those
internal comments into a proper public documentation comment for the `Status`
struct.
- Around line 890-894: In the FrameDecision::Emit handler, the call to
Timestamp::from_micros(micros).ok() silently converts any TimeOverflow error to
None, which masks potential timestamp conversion failures. Instead of using
.ok() to suppress the error, check the result of Timestamp::from_micros(micros)
and log a warning or error message when overflow occurs before proceeding with
the frame decode. This ensures that timestamp overflow issues are visible in
logs for debugging purposes while still allowing the frame to be processed if
needed.

In `@rs/moq-gst/src/sink/timeline.rs`:
- Around line 12-23: Add the #[non_exhaustive] attribute above the
SegmentDecision enum and the FrameDecision enum definitions. This will allow
these public enums to gain new variants in the future without breaking existing
code that relies on exhaustive pattern matching. Place the attribute on the line
immediately before each #[derive] attribute for both enums.
- Around line 4-23: Add Rustdoc comments (using `///`) to all public items and
fields that are currently missing documentation. Specifically, add a doc comment
to the SegmentInfo struct itself, the rate field within SegmentInfo, the
SegmentDecision enum, the Accept and Reject variants of SegmentDecision, the
FrameDecision enum, and the Drop variant of FrameDecision. Each doc comment
should clearly describe the purpose and usage of the item or field it documents,
following the existing documentation style used for other fields like
time_format and base_nanos in SegmentInfo.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 16ea640f-7e9d-412f-811d-8bc53d8f0e7c

📥 Commits

Reviewing files that changed from the base of the PR and between 2788d79 and a858308.

📒 Files selected for processing (9)
  • doc/bin/gstreamer.md
  • rs/justfile
  • rs/moq-gst/Cargo.toml
  • rs/moq-gst/src/lib.rs
  • rs/moq-gst/src/sink/imp.rs
  • rs/moq-gst/src/sink/mod.rs
  • rs/moq-gst/src/sink/session.rs
  • rs/moq-gst/src/sink/timeline.rs
  • rs/moq-gst/tests/element.rs

Comment thread rs/moq-gst/src/sink/imp.rs
Comment on lines +47 to +55
// A connect that cannot succeed surfaces as an ERROR on the bus (not a silent log) and leaves the
// element disconnected. The `.invalid` host fails fast at DNS resolution in this test environment.
#[test]
fn connect_failure_posts_error_to_bus() {
init();
let pipeline = gst::Pipeline::new();
let sink = gst::ElementFactory::make("moqsink")
.property("url", "https://nonexistent.invalid:443")
.property("broadcast", "test")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Make the connect-failure test deterministic without DNS.

This test depends on DNS failure timing for .invalid, which can be flaky in CI/network-restricted environments. Prefer a loopback closed port to force immediate connect failure while exercising the same bus-error path.

Suggested change
-		.property("url", "https://nonexistent.invalid:443")
+		.property("url", "https://127.0.0.1:1")

Also applies to: 60-63

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-gst/tests/element.rs` around lines 47 - 55, In the
connect_failure_posts_error_to_bus test function, replace the URL property value
that uses the nonexistent.invalid hostname with a loopback address and closed
port (such as a localhost IP with port 1) to ensure immediate connection refusal
instead of relying on DNS timeout. This will make the test deterministic by
forcing a connection refused error that is not dependent on DNS resolution
timing, while still exercising the same bus error handling path.

@arielmol

arielmol commented Jun 17, 2026

Copy link
Copy Markdown
Contributor Author

To the holy rabbit:

  • The blocking_send on AddPad/DropPad is a known limitation, already called out in the comment and marked as future work.

  • For the connect-failure test I'm keeping .invalid. The 127.0.0.1:1 suggestion assumes TCP refused semantics, but this is QUIC over UDP, so a closed port won't refuse fast, it'd just sit until the handshake timeout and get flakier. .invalid is a reserved TLD that never resolves, so it fails immediately.

  • The .ok() on the timestamp overflow is fine, it only triggers on ABSURD durations and decode_frame already takes an optional timestamp. Can add a warning log if you'd rather see it.

For CI fails, will fix it asap.

The new [dev-dependencies] section landed before [build-dependencies], but cargo sort expects dependencies -> build-dependencies -> dev-dependencies, failing `just ci`. Swap the two so the lint passes.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
arielmol and others added 2 commits June 22, 2026 13:12
The session handoff used a bounded mpsc channel for backpressure, so a send
blocked on a full channel could not be cut by an out-of-band FLUSH_START and
could wedge a seek/teardown. But moq-net's producer is a synchronous RAM cache
(it appends and evicts, never blocking on the network), so the sink never needs
to apply backpressure at the GStreamer boundary.

Make the handoff unbounded: chain accepts each buffer toward the worker and
returns without blocking. With no blocked send the flush-cancellation apparatus
is unnecessary and removed: the bounded channel, send_or_flush/SendOutcome, the
per-pad flush watches that cut a blocked send, and the AddPad/DropPad blocking
sends. FlushSignal and the per-pad generations stay; FlushSignal now carries only
the timeline re-anchor (FLUSH re-anchors a pad's timeline, kept intact). Net -369
lines.

Caveat: the handoff is unbounded, so buffers accumulate transiently in it before
connect completes and under worker lag, an unbounded buffer ahead of the cache.
Building the producers up front and writing directly would remove it; left as a
follow-up.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
# Conflicts:
#	rs/justfile
#	rs/moq-gst/src/lib.rs
#	rs/moq-gst/src/sink/imp.rs
@arielmol

Copy link
Copy Markdown
Contributor Author

Update on the byte round-trip for #1771 done. I ran a VP8 payload round-trip: videotestsrc(ball) -> vp8enc -> moqsink -> relay -> moqsrc, capturing encoded buffers before moqsink and after moqsrc. The harness writes one file per GStreamer buffer and checks received hashes against a contiguous slice of the published hashes.

150 buffers published, all 150 distinct; 149 received; every received buffer matched the published VP8 payload byte-for-byte and in order. In this run the missing buffer was the tail not draining before teardown, not a payload mismatch.

kixelated added a commit that referenced this pull request Jun 23, 2026
This direct-write core grew out of Ariel's rewritten, unit-tested moqsink
foundation in #1771 (the timeline/running-time policy, codec/caps factory,
B-frame handling, FLUSH/EOS and per-pad error isolation). Restore him to the
element's author metadata.

Co-Authored-By: Ariel Molina <ariel@edis.mx>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
kixelated added a commit that referenced this pull request Jun 23, 2026
#1893 is the bare-Element, direct-write variant of the moqsink rewrite. It was
coded independently of #1771, but it builds on the same media foundation Ariel
established there (timeline continuity, codec/caps parity, B-frames, FLUSH/EOS,
per-pad error isolation). Merge his #1771 branch so those commits are part of
this PR and his authorship is credited on merge.

The tree is taken entirely from this branch (`-s ours`): the direct-write core
is the shipping implementation; this merge exists for attribution and history.

Co-Authored-By: Ariel Molina <ariel@edis.mx>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant