Skip to content

moq-gst: replace moqsink with a GstAggregator-based core#1870

Open
kixelated wants to merge 23 commits into
mainfrom
claude/quirky-ramanujan-71c191
Open

moq-gst: replace moqsink with a GstAggregator-based core#1870
kixelated wants to merge 23 commits into
mainfrom
claude/quirky-ramanujan-71c191

Conversation

@kixelated

Copy link
Copy Markdown
Collaborator

Summary

Rebuilds the moqsink element on GstAggregator instead of a bare gst::Element with a hand-rolled queue. This is an alternative core to the bounded-channel design in #1771: it keeps that PR's media foundation and replaces the transport seam.

It builds directly on @arielmol's commits from #1771 (the rewritten timeline, codec/caps handling, B-frame support, frame-size cap, EOS aggregation, and tests), so this PR is diffed against main rather than stacked on that branch. The final commit swaps the session core for Aggregator; the 14 commits beneath it are ArielM's and carry his authorship.

Why

The #1771 core put a bounded mpsc channel between the GStreamer streaming threads and the async session, for backpressure. A blocking send on that channel cannot be interrupted by FLUSH_START (which arrives out of band on another thread), so it would wedge a seek/flush/state-change. The fix there was a flush-cancellation apparatus: per-pad watch<bool> gates, an out-of-band FlushSignal, biased cancellable sends (send_or_flush), and per-pad generations to discard stale in-flight messages.

GstAggregator already owns per-pad input queues, FLUSH/EOS/SEGMENT handling, and a single aggregate thread, so none of that is needed. Funneling serialized events and buffers onto one thread also removes the generation bookkeeping and the cross-thread failure map.

A second simplification falls out: GroupProducer::write_frame is synchronous (it appends and evicts in memory, never blocking on the network). So the aggregate thread writes frames straight into the moq producers under a RUNTIME.enter() guard. There is no data channel at all — the async task only connects, holds the session, and reports status. Because the producers are created up front, frames buffered before the connection completes are sent once it does (a "future work" item in #1771, free here).

What changed

  • sink/imp.rs — now a GstAggregator subclass. aggregate() drains each pad with pop_buffer() and writes; sink_event rejects unsupported caps; flush() re-anchors timelines; start/stop own the session and producers.
  • sink/session.rs — slimmed from ~2035 to ~240 lines: just Status and a connect/lifecycle task.
  • sink/pad.rs (new) — per-pad media state (caps -> producer, SEGMENT policy, frame import) extracted from the old Pad/PadSet, plus 19 hermetic unit tests.
  • sink/timeline.rs — unchanged.
  • sink/mod.rs@extends gst_base::Aggregator.
  • Cargo.toml — adds gstreamer-base (no version-feature bump; stays at the existing 1.14 baseline).

Deleted relative to the #1771 core: the bounded channel + DATA_CHANNEL_BOUND, send_or_flush/SendOutcome, FlushSignal + flush watches + toggle_pad_flush/reset_pad_flushes, per-pad generations, the Status failure map, PadSet, the manual chain/event pad functions, and the request-pad rollback dance.

Public surface / branch targeting

No public library API or wire change. Same element name (moqsink), sink_%u request pads, the seven codec caps, the three writable properties, and the three read-only diagnostics (connected, moq-version, estimated-send-bitrate). Internal to the rs/moq-gst plugin, so this targets main (same as #1771). One cosmetic change: the element now carries an unused src pad (it is SINK-flagged and never pushes).

Testing

  • cargo test -p moq-gst: 31 unit + 3 element tests pass, including frame_through_h264_emits_a_frame (a real IDR access unit flows caps -> producer -> published track, hermetically).
  • cargo clippy -p moq-gst --all-targets clean; cargo fmt --check clean.
  • gst-inspect-1.0 moqsink: registers Sink/Network/MoQ, hierarchy +----GstAggregator, both pad templates GstAggregatorPad, all properties.
  • Live relay: videotestsrc ! vtenc_h264 ! h264parse ! moqsink to a local moq-relay logged session connected -> finalized on EOS: ["sink_0", "catalog"] -> posting EOS -> clean session terminated.

Not yet validated (same boundary #1771 drew)

  • A subscribe-back byte round-trip against a relay.
  • A genuinely stalled sparse pad. Aggregator fires aggregate() when every pad has a buffer; for continuous A/V that is fine, but a stalled non-EOS pad could hold up others. If sparse data tracks matter, set_force_live(true) is the knob (raises the floor to GStreamer 1.22).
  • FLUSH/seek on a live pipeline.

(Written by Claude)

arielmol and others added 16 commits June 15, 2026 14:10
A parallel sink element (moqsinkspike, Rank::NONE) reimplementing the moqsink core with isolated, testable seams: pure timeline policy, per-pad state, a bounded data channel with cancellation independent of the data path, single finalization (catalog last), and observable connection status. The production moqsink is left untouched.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
moq-gst is outside default-members (needs GStreamer), so 'just rs test' skips it. Add 'just rs test-moq-gst' and chain it into 'just rs ci' so the spike's tests gate. The CI workflow only needs gstreamer, which the nix dev shell already provides; that step is applied at PR time.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…tate)

Judge SEGMENT continuity in running time, not media origin: classify_segment keys on the running-time base, so a moved start stays continuous while base does not rewind (the old start-equality rule wrongly rejected this). Map timestamps with to_running_time_full (signed) so a buffer before the segment is dropped, never clamped to zero. Add an explicit per-pad state (NoSegment/Active/Invalid): a discontinuity invalidates the pad and the next valid SEGMENT re-anchors it.

Also applies rustfmt to the spike baseline, which was committed before a fmt pass.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
GStreamer only registers plugins whose license is in its recognised set. The plugin_define! license was 'Apache 2.0', which gst rejects ('invalid license, not loading'), so the moq plugin (moqsink/moqsrc) never loads via the registry. The crate is MIT OR Apache-2.0, so declare the MIT side ('MIT/X11').

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
PadSet inferred membership lazily from CAPS, so EOS aggregation (eos.len() == pads.len()) was wrong when EOS arrived before CAPS. Make membership authoritative: an 'active' set populated by AddPad (sent from request_new_pad, and seeded at session start for pads requested earlier), with EOS counted against it. Every data message carries its pad's generation, captured per incarnation, so a pad recreated with the same name discards the previous one's in-flight messages. Covers EOS-before-CAPS, duplicate EOS, late members, buffers after EOS, and a release that completes the element.

Adds a hermetic element test (request/release) plus gstreamer as a dev-dependency. Session-dependent flows (multipad EOS, per-pad errors) stay in the relay-backed harness.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
A bad caps or bitstream now invalidates only the offending pad instead of tearing down the session: the worker drops that pad's producer and marks it failed in the shared Status, and the chain returns a FlowError on the pad's next buffer (one buffer later, since the worker is async) rather than silently dropping. Session-level failures (a closed catalog) still propagate to element_error! on the bus. Unsupported caps are also rejected synchronously in the event handler (NotNegotiated) before reaching the worker.

Hermetic tests: worker-side pad-vs-session error classification and the caps validator; element-level invalid-settings StateChangeError and connect-failure to the bus. The per-pad FlowError observation and the live error flows need a session, so they stay in the relay-backed harness.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Post-review fixes to the parallel spike core, all hermetically tested:

- Post the element EOS only after the catalog finalizes cleanly; a
  finalize failure now surfaces as element_error instead of a silent log
  (run_loop returns an ExitReason, finalize_all returns a Result).
- Enforce per-pad frame-timestamp monotonicity: a frame that would
  regress below the last emitted one is dropped, not emitted out of order.
- Make a failed pad terminal: its track is finalized and it stops
  blocking EOS aggregation, so the element completes on the good pads.
- Key the per-pad failure set by generation so a pad recreated with the
  same name never inherits a previous incarnation's failure.
- Require byte-stream/au in caps_supported (what FramedFormat::Avc3
  actually consumes), not just the media type.
- Reject a buffer past the MoQ frame limit (16 MiB) before copying, so
  hostile input cannot drive an unbounded allocation.
- Test honesty: rename tests that over-promised, add the buffer/bitstream
  decode-error path, and assert connected=false on a failed connect.

CONTEXT

Rejected alternatives:
- Escalating a pad failure to a session/element error: per-pad isolation
  is the contract, so a bad pad is finalized and counts as terminal,
  never fatal to the session.
- A transport test seam to make session-level flows hermetic: that is
  architecture-for-tests; multipad EOS and the per-pad FlowError
  observation are validated against a real relay instead.
- A loopback connect endpoint (127.0.0.1:1) for the connect-failure test:
  measured to ride the QUIC idle timeout (no fast localhost-UDP reject),
  so the fast RFC-invalid DNS host was kept.
- A spike-side cap for the importer's non-VCL NAL accumulation: the spike
  has no "frame not emitted" signal, so that cap belongs in the moq-mux
  importer.

Key decisions:
- EOS must not claim success the broadcast did not reach: it waits for
  catalog.finish() to succeed, and a failure becomes element_error.
- Segment-base continuity alone does not stop a frame from regressing:
  monotonicity is enforced at the frame level (last_emitted), not only at
  the segment boundary.
- The frame cap is aligned to moq-net's MAX_FRAME_SIZE: a larger frame
  could not be consumed anyway.
…ough)

Close the codec gap in the parallel spike so it accepts the same media as
moqsink, without regressing the rewrite's seams (shared running time,
per-pad membership, per-pad error isolation, EOS-after-finalize):

- Expand the pad template and replace the hardcoded Avc3 path with a
  caps -> Framed factory over seven media types: H.264/H.265
  (byte-stream/au), AV1, VP8, VP9, AAC (mpegversion=4, raw), Opus. Every
  codec converges on one Framed; only the construction from caps differs.
- Keep caps_supported media-type only. The factory enforces per-codec
  specifics (byte-stream/au, AAC mpegversion/raw plus codec_data) and
  derives Opus channels/rate with defaults, failing just that pad on a
  bad or missing detail, never the session.
- finalize only finishes an initialized producer. A lazy importer
  (H.265/AV1/VP8/VP9) given CAPS then EOS with no frame never created its
  track, so finish() would error "not initialized"; it now no-ops while a
  real finish error on a live track still surfaces.
- Tests: strong frame-through for H.264 and Opus that observe a real
  emitted frame on the rendition track (not just the catalog rendition);
  creation tests for H.265/AV1/VP8/VP9/AAC; negatives for length-prefixed
  H.264, byte-stream-less H.265, AAC without codec_data, and AAC with
  wrong mpegversion; a clean CAPS-then-EOS test for the lazy codecs;
  caps_supported retargeted to the seven media types.

CONTEXT

Rejected alternatives:
- Per-codec field checks inside caps_supported: it stays thin and
  media-type only, so the factory is the one enforcement point and a bad
  detail fails the pad, not the session.
- A new MediaPad/MediaProducer abstraction: Pad already holds an
  Option<Framed>, so a factory function is enough.
- Frame-through fixtures for all seven codecs: only H.264 and Opus get
  strong frame-through; the rest get creation tests, with real
  transmission left to the relay harness.
- A moq-mux change to make a lazy finish() tolerant: the no-track no-op
  is kept spike-local in Pad::finalize via the existing Framed::track()
  probe, so moq-mux behavior is unchanged and real finish errors on live
  tracks still propagate.
- Reintroducing the production sink's per-pad reference_pts anchoring: the
  spike keeps its shared running time.

Key decisions:
- caps_supported is media-type only; per-codec specifics live in the
  factory, which fails the pad (not the session) on a bad or missing
  detail.
- A frame-through test must observe a real emitted frame: it subscribes
  to the rendition track and asserts latest().is_some(), because the
  catalog rendition is published from the SPS/config and would pass with
  no frame at all.
- CAPS then EOS before the first frame must be clean: a lazy importer
  never created its track, so finalize no-ops instead of turning "not
  initialized" into a session error.
…ecycle races)

Production-hardening pass over the parallel spike, all hermetically tested
(62 unit + 3 element, clippy -D warnings, fmt). FLUSH plus the fixes from an
external broadcast review:

- FLUSH: handle FLUSH_START/STOP as a discontinuity plus a cancellation. The
  worker re-anchors the pad's timeline (NoSegment) so a post-seek rewinding
  SEGMENT is accepted instead of rejected, and a per-pad watch cuts a chain
  blocked on the bounded channel without tearing the session down. run_loop is
  biased so the out-of-band flush re-anchors before the post-flush SEGMENT that
  rides the data FIFO.
- B-frames: drop the per-frame PTS monotonicity filter. The moq-mux container
  carries frames in decode order and documents that B-frames have non-monotonic
  presentation timestamps, so the filter silently dropped every B-frame (30-60%
  of frames on High-profile content). The wire order is already decode order.
- Pad lifecycle: populate the membership maps before add_pad, roll back only the
  entry this attempt still owns on failure, and announce AddPad only after
  add_pad succeeds, so a duplicate or failed add never orphans the live pad nor
  seeds a phantom that blocks EOS aggregation forever.
- Status races on restart: a session generation token. All observable state
  (connected, version, send_bitrate, failed) lives under one Mutex so "check the
  live generation, then write" is indivisible; a stale session's late writes and
  exit-reset become no-ops. failed is keyed by (session generation, pad
  generation), so a restart reusing the same pad never inherits a stale failure.
- Worker panic: run the session worker on an inner task and surface a JoinError
  (panic or cancel) as element_error, instead of a silent death only observed
  when stop() reaps the JoinHandle.
- Refactor: bundle the worker's inbound channels into one Inbound struct.

CONTEXT

Rejected alternatives:
- Enforcing frame monotonicity on the DTS instead of removing the filter:
  best-practice in the abstract, but it needs DTS threaded through every layer
  and only guards against out-of-decode-order delivery, which GStreamer does not
  produce. Removing the filter to match the container contract is simpler and
  correct.
- Gating only the exit-reset by generation: a stale session could still write
  connected/version/bitrate. Every observable write is gated.
- Generation-gating with separate atomics: a load-then-store is not an atomic
  check-and-write, so a stale write can still interleave between the check and
  the store. One Mutex over the observable state makes it indivisible.
- Serializing request_new_pad under a lock held across add_pad: add_pad emits
  pad-added synchronously, so a reentrant handler would deadlock on that lock, a
  worse failure than the narrow same-name-concurrent window it would close. That
  window is left documented, not closed.
- A moq-mux Framed::discontinuity() for the FLUSH partial-AU reset: kept this
  change moq-gst-only. The partial-AU edge is documented and low-risk with
  alignment=au (one complete access unit per buffer).
- catch_unwind via the futures crate for the worker panic: futures is not a
  direct dependency, so an inner task plus JoinError is used instead.

Key decisions:
- FLUSH cancellation uses a watch<bool> per pad: no new dependency, and per-pad
  so flushing one input never cancels another input's blocked send. send_or_flush
  is biased toward the flush arm and re-checks the watch after winning a channel
  permit, which narrows (does not atomically close) the capacity-vs-FLUSH_START
  tie; the residual one-buffer leak is absorbed by the re-anchor.
- The failure set carries both generations; is_failed matches the live session,
  so neither a recreated pad nor a recreated session inherits a stale failure.
- Honest test coverage: decode-order B-frame emit, generation scoping, and
  blocked-send cancellation have discriminating tests. The panic-to-bus path and
  the load-vs-store / same-name-concurrent races are covered by construction
  (a lock) or documented, not by a deterministic test, since they need a real
  session or an injected race.
The parallel moqsinkspike element, built across the preceding commits, replaces
the original moqsink. Remove the old implementation, move the new core into
src/sink/, and register it as "moqsink" with the GType name kept as "MoqSink"
for registration continuity.

Behavior is a compatible superset of the old element: the same request sink
pads, the same seven codec caps, and the same three writable properties (url,
broadcast, tls-disable-verify), plus three read-only diagnostics (connected,
moq-version, estimated-send-bitrate).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…solation)

Post-review fixes to the rewritten moqsink, all internal to rs/moq-gst:

- The FLUSH watch is keyed to the pad's lifetime, not the session's. A
  FLUSH_START whose FLUSH_STOP never arrived before a PAUSED->READY->PAUSED
  restart left that pad muted across the new session. Clear every watch at
  session start (reset_pad_flushes), before the new session is visible.
- A pad release is reconfiguration, not end-of-stream: DropPad no longer posts
  element EOS. The genuine "all pads ended" completion still fires from the EOS
  handler, so drop_pad now returns ().
- A failed finish() at EOS is isolated to that pad, like a per-pad data error,
  instead of escalating to a session error that kills the whole broadcast. This
  splits fail_pad into mark_pad_failed_terminal (mark failed + terminal, no
  producer touch) so the EOS path never double-finalizes, and Pad::finalize
  takes the producer up front (let-else) to make its attempt-once contract
  explicit.
- handle_data_msg returns Result<bool> instead of Result<Option<ExitReason>>;
  the only variant it could produce was Ended, so run_loop now constructs every
  ExitReason in one place.
- Fix a comment on the worker's biased select: FLUSH_START arrives out of band
  (a different thread), so the bias only wins a readiness tie, not a
  happens-before against the bounded data channel.

CONTEXT
Rejected alternatives:
- FLUSH restart: a "check session first" guard in the FlushStart handler. It
  narrows a window that reset-on-start already closes fully and reinforces a
  wrong race-condition reading; dropped for the single reset.
- EOS finalize error: calling fail_pad directly. It would re-finalize the
  producer; the mark_pad_failed_terminal split avoids the double-finalize.
Key decisions:
- Isolating finalize errors per pad matches how mid-stream caps/buffer errors
  are already handled; genuinely fatal failures still surface via the session,
  catalog, and transport paths.
- The send_or_flush post-permit window (one pre-flush buffer can still enqueue)
  is accepted and documented in place: the worker re-anchor absorbs it.

Verified: 65 unit + 3 element tests, cargo clippy -D warnings, cargo fmt.

Memory-leak soak (moqsink does not leak): a 35 min x264->moqsink run held RSS
flat with 0 leaked gst objects; a ~12 h msdk->moqsink vs msdk->fakesink
bisection showed identical flat RSS, so moqsink adds no leak over the encoder.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The new [dev-dependencies] section landed before [build-dependencies], but cargo sort expects dependencies -> build-dependencies -> dev-dependencies, failing `just ci`. Swap the two so the lint passes.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Replaces the moqsink session core (built in the preceding commits on this
branch) with one built on GstAggregator, removing the hand-rolled queue that
sat between the GStreamer streaming threads and the async session.

The previous core used a bounded mpsc channel for backpressure, which a
blocking send could not interrupt on FLUSH, so it added per-pad flush watches,
an out-of-band FlushSignal, biased cancellable sends, and per-pad generations
to discard stale in-flight messages. GstAggregator already owns per-pad input
queues, FLUSH/EOS/SEGMENT handling, and a single aggregate thread, so all of
that machinery is unnecessary.

GroupProducer::write_frame is synchronous (it appends and evicts in memory),
so the aggregate thread writes frames straight into the moq producers under a
RUNTIME.enter() guard. There is no data channel: the async task only connects,
holds the session, and reports status. The producers are created up front, so
frames buffered before connect are sent once it completes.

Deleted: the bounded channel, send_or_flush/SendOutcome, FlushSignal and the
flush watches, per-pad generations, the Status failure map, PadSet, and the
manual chain/event pad functions. Per-pad media state moves to sink/pad.rs;
timeline.rs is unchanged.

Same element name, sink_%u pads, caps, and properties. Verified: unit + element
tests, clippy, fmt, gst-inspect, and a live publish to a local relay
(connect, frame flow, EOS).

Co-Authored-By: Ariel Molina <ariel@edis.mx>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Resolutions:
- sink/imp.rs: kept the GstAggregator rewrite. main's #1808 (broadcast-aligned
  timestamps) targeted the old sink and is superseded by the shared running-time
  timeline this rewrite already uses.
- rs/justfile: took main's cargo-based CI. Its `cargo nextest run --workspace`
  covers moq-gst, so ArielM's separate `test-moq-gst` recipe is redundant.
- lib.rs: kept main's plugin-license comment wording (code identical).

Adopts main's edition 2024 bump for moq-gst (reformatted accordingly).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 22, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

The moqsink GStreamer element is rewritten from a tokio-channel/async-task architecture to a GstAggregator-based implementation. Three new internal modules are introduced: timeline.rs for pure timeline policy functions (classify_segment, frame_micros) using Result-based decision semantics; session.rs for async MoQ connection lifecycle with observable Status (connected, version, send_bitrate); and pad.rs for per-sink-pad media producer state machines (PadState, codec-specific Framed producer construction, flush/finalize). The aggregator core in imp.rs is rewritten to implement AggregatorImpl, draining buffers synchronously in aggregate(), adding per-buffer size enforcement via write_buffer(), and posting EOS once all sink pads have reached end-of-stream. Three read-only GObject properties (connected, moq-version, estimated-send-bitrate) are added and backed by Session::status(). Frame size enforcement is added to GroupProducer::append_frame in moq-net to reject oversized frames at the producer boundary. Integration tests and updated documentation validate the new behavior.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and specifically describes the main architectural change: replacing moqsink with a GstAggregator-based implementation.
Description check ✅ Passed The description thoroughly explains the rationale, design changes, and implementation details of replacing the bounded-channel design with a GstAggregator-based approach.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch claude/quirky-ramanujan-71c191

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (1)
rs/moq-gst/tests/element.rs (1)

19-68: 🧹 Nitpick | 🔵 Trivial | 🏗️ Heavy lift

Move these tests inline next to the sink implementation per repo test policy.

These are currently integration tests under tests/, but this repo’s Rust guidance requires colocated inline tests (#[cfg(test)] mod tests) in source files. Please move them near the corresponding sink module code.

As per coding guidelines, “Rust tests are integrated within source files” and “Inline Rust tests in source files as #[cfg(test)] mod tests within the same file.”

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-gst/tests/element.rs` around lines 19 - 68, Move the three test
functions request_and_release_sink_pads, missing_url_fails_state_change, and
connect_failure_posts_error_to_bus from the tests/element.rs file to the source
file containing the moqsink implementation. Create a cfg(test) mod tests module
within the source file and place all three test functions there with their
#[test] attributes intact. Remove the now-empty tests/element.rs file after
moving the tests.

Source: Coding guidelines

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@rs/moq-gst/Cargo.toml`:
- Line 23: The gst-base dependency specification on line 23 uses an explicit
version string instead of workspace-managed dependency specs. Replace the
explicit version specification with `{ workspace = true }` for the gst-base
package entry. Apply the same fix to the dependency entry on line 37 that is
also mentioned in the review comment. This ensures all dependencies in the
Cargo.toml file use workspace-pinned versions consistently.

In `@rs/moq-gst/src/sink/mod.rs`:
- Around line 9-10: Add a public doc comment above the glib::wrapper! macro that
defines the pub struct MoqSink. The doc comment should be a doc string (starting
with ///) that provides a clear description of what the MoqSink struct is and
its purpose in the module. This will satisfy the coding guideline requirement
that all exported public symbols must have documentation comments.

In `@rs/moq-gst/src/sink/pad.rs`:
- Around line 149-171: The issue is that when observe_segment rejects a segment
due to a discontinuity in the Active state (line 168 transition to Invalid), the
segment_info is not updated. On the next buffer with the same sticky segment,
the state check at line 156 sets prev to None (since state is Invalid), causing
classify_segment to accept the same rejected segment again. To fix this, update
self.segment_info to the rejected segment info in the SegmentDecision::Reject
branch before transitioning to Invalid state, so that the early return check at
lines 151-153 will catch and reject the same segment on subsequent calls.
Additionally, add a regression test that verifies after an Active to Invalid
transition from a rewinding segment, re-observing that exact same segment keeps
the pad Invalid and continues dropping frames.

In `@rs/moq-gst/src/sink/timeline.rs`:
- Around line 3-23: Add documentation comments to the public types SegmentInfo,
SegmentDecision, and FrameDecision, and to all their fields and enum variants,
since every exported public symbol must have a doc comment per coding
guidelines. Additionally, mark both SegmentDecision and FrameDecision enums with
the #[non_exhaustive] attribute to prevent downstream breakage when new variants
are added in the future, as required by public API guidelines for Rust code.

---

Nitpick comments:
In `@rs/moq-gst/tests/element.rs`:
- Around line 19-68: Move the three test functions
request_and_release_sink_pads, missing_url_fails_state_change, and
connect_failure_posts_error_to_bus from the tests/element.rs file to the source
file containing the moqsink implementation. Create a cfg(test) mod tests module
within the source file and place all three test functions there with their
#[test] attributes intact. Remove the now-empty tests/element.rs file after
moving the tests.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: a5207109-a4f7-4fd6-8c13-b028d850611c

📥 Commits

Reviewing files that changed from the base of the PR and between 4c67357 and 77e71da.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (10)
  • doc/bin/gstreamer.md
  • rs/justfile
  • rs/moq-gst/Cargo.toml
  • rs/moq-gst/src/lib.rs
  • rs/moq-gst/src/sink/imp.rs
  • rs/moq-gst/src/sink/mod.rs
  • rs/moq-gst/src/sink/pad.rs
  • rs/moq-gst/src/sink/session.rs
  • rs/moq-gst/src/sink/timeline.rs
  • rs/moq-gst/tests/element.rs

Comment thread rs/moq-gst/Cargo.toml
Comment thread rs/moq-gst/src/sink/mod.rs
Comment thread rs/moq-gst/src/sink/pad.rs Outdated
Comment thread rs/moq-gst/src/sink/timeline.rs Outdated
Comment thread rs/moq-gst/src/sink/imp.rs Outdated
Comment thread rs/moq-gst/src/sink/imp.rs Outdated
Comment thread rs/moq-gst/src/sink/imp.rs
Comment thread rs/moq-gst/src/sink/imp.rs Outdated
Comment thread rs/moq-gst/src/sink/imp.rs Outdated
Comment thread rs/moq-gst/src/sink/imp.rs Outdated
Comment thread rs/moq-gst/src/sink/pad.rs Outdated
Comment thread rs/moq-gst/src/sink/pad.rs Outdated
Comment thread rs/moq-gst/src/sink/timeline.rs Outdated
Comment thread rs/moq-gst/src/sink/timeline.rs Outdated
kixelated and others added 3 commits June 22, 2026 11:29
Addresses review feedback:
- pad.rs: observe_segment runs per buffer (the segment is sticky), so an
  invalidated pad re-anchored on the next buffer (Invalid -> prev=None ->
  classify accepts) and silently recovered on the same rewound segment. Gate the
  no-op on the last-classified segment regardless of state, and record it even on
  reject, so a re-sent rewound segment keeps dropping until a genuinely new
  SEGMENT arrives. Adds a regression test.
- imp.rs: restore Luke and Steve to the element authors (the rewrite had dropped
  them), keeping Ariel.
- mod.rs: doc comment on the exported MoqSink wrapper.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…ation, simpler timeline)

- moq-net: enforce MAX_FRAME_SIZE in GroupProducer::append_frame, the chokepoint
  every producer write routes through, returning the existing Error::FrameTooLarge.
  Previously only the receive path checked, so a local writer could cache a frame
  that could never be sent. Resolves the long-standing TODO in frame.rs (append_frame
  is already fallible, so no constructor API break). Adds a guard test.
- sink: drop the duplicated (and wrong: 16 MiB vs 32) MAX_FRAME_BYTES constant and
  the per-buffer size check; an oversized frame now surfaces moq-net's error and
  fails just that pad.
- sink: collapse the separate session/broadcast/catalog/pads/eos Mutexes into one
  Mutex<Option<State>> (one lock per buffer) and document why it is a bare Mutex,
  not Arc<Mutex> (glib owns and shares the subclass instance across threads).
- sink: a buffer that cannot be mapped, and a finalize error, now propagate instead
  of only warning.
- sink: drop the runtime byte-stream/au and AAC mpegversion/stream-format checks; the
  pad template pins those fields, so negotiation already enforces them.
- timeline: replace the SegmentDecision/FrameDecision enums with Result, keeping the
  reason string for logs.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The `mod pad` module is private, so it is the real visibility gate; `pub(super)` on
every item just added noise. No reachability change.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
rs/moq-gst/src/sink/pad.rs (2)

106-117: ⚠️ Potential issue | 🟠 Major

Reject under-specified Opus caps instead of silently defaulting to stereo/48 kHz.

Lines 108–109 use unwrap_or(2) and unwrap_or(48_000) to mask missing Opus channels and rate caps fields. This advertises incorrect audio configuration for malformed caps. The pad template at imp.rs:256 does not constrain these fields, so require them here or pin them in the template.

🛠️ Proposed fix
-				let channels: i32 = structure.get("channels").unwrap_or(2);
-				let rate: i32 = structure.get("rate").unwrap_or(48_000);
-				let channel_count = u32::try_from(channels)
-					.with_context(|| format!("Opus caps has negative channel count {channels}"))?;
-				let sample_rate =
-					u32::try_from(rate).with_context(|| format!("Opus caps has negative sample rate {rate}"))?;
+				let channels: i32 = structure.get("channels").context("Opus caps missing channels")?;
+				let rate: i32 = structure.get("rate").context("Opus caps missing rate")?;
+				if channels <= 0 {
+					anyhow::bail!("Opus caps has non-positive channel count {channels}");
+				}
+				if rate <= 0 {
+					anyhow::bail!("Opus caps has non-positive sample rate {rate}");
+				}
+				let channel_count = channels as u32;
+				let sample_rate = rate as u32;

Add a test case (e.g., opus_caps_without_channels_fails) to prevent regression.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-gst/src/sink/pad.rs` around lines 106 - 117, The Opus audio codec
configuration in the "audio/x-opus" match arm is silently defaulting to stereo
(2 channels) and 48 kHz sample rate when the channels and rate fields are
missing from the caps, which can propagate incorrect audio configuration.
Replace the unwrap_or(2) call on the channels extraction and the
unwrap_or(48_000) call on the rate extraction with proper error handling that
rejects under-specified caps instead of accepting them with defaults.
Additionally, add a test case (for example, opus_caps_without_channels_fails) to
verify that Opus caps without required fields are properly rejected and prevent
this regression in the future.

140-155: ⚠️ Potential issue | 🟠 Major

Refresh the stored segment when only start changes.

The equality check self.segment_info == Some(info) only compares time_format, rate, and base_nanos, but omits the start field. When a new TIME segment arrives with the same base and rate but a different media start, the early return leaves self.segment pointing to the old segment object. Subsequent calls to frame_timestamp() then invoke to_running_time_full(pts) with a stale start value, mapping the PTS incorrectly.

The fix allows Active pads to refresh self.segment by skipping the early return only when the state is Active, since only Active pads have a cached segment whose fields matter for timestamp mapping.

Proposed fix and regression test
-		// Re-observing the exact segment we last classified is a no-op. observe_segment runs on every
-		// buffer (the segment is sticky), so without this an Invalidated pad would re-anchor on the next
-		// buffer (Invalid -> prev=None -> classify accepts) and silently recover on the same rewound
-		// segment. Recording `info` even on reject is what makes this hold.
-		if self.segment_info == Some(info) {
+		// Re-observing a rejected/non-started segment is a no-op. Active segments still refresh
+		// `self.segment` because `SegmentInfo` omits fields like `start` that affect timestamp mapping.
+		if self.segment_info == Some(info) && self.state != PadState::Active {
 			return;
 		}
 	#[test]
 	fn moved_start_with_advancing_base_stays_continuous() {
 		gst::init().unwrap();
 		let mut pad = Pad::new();
 		pad.observe_segment(time_segment_at(0, 0));
 		assert_eq!(pad.state, PadState::Active);
 		pad.observe_segment(time_segment_at(30_000, 5_000));
 		assert_eq!(pad.state, PadState::Active);
 	}
+
+	#[test]
+	fn moved_start_with_equal_base_refreshes_timestamp_mapping() {
+		gst::init().unwrap();
+		let mut pad = Pad::new();
+		pad.observe_segment(time_segment_at(0, 5_000));
+		pad.observe_segment(time_segment_at(3_000, 5_000));
+		assert_eq!(
+			pad.frame_timestamp(Some(gst::ClockTime::from_mseconds(6_000))),
+			Ok(8_000_000)
+		);
+	}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-gst/src/sink/pad.rs` around lines 140 - 155, The early return
condition comparing self.segment_info == Some(info) does not account for changes
in the start field of the segment info. When the state is Active and a new
segment arrives with the same base and rate but a different start value, the
early return prevents refreshing self.segment, leaving it pointing to the old
segment with an incorrect start value that breaks timestamp mapping in
frame_timestamp(). Modify the early return condition to allow Active pads to
skip the return and refresh their cached segment, while still returning early
for other pad states or when all fields including start remain unchanged.
🧹 Nitpick comments (2)
rs/moq-gst/src/sink/imp.rs (2)

120-126: 🧹 Nitpick | 🔵 Trivial | ⚡ Quick win

Add the required doc comment for MoqSink.

MoqSink is pub, so it needs a consumer-facing /// comment.

As per coding guidelines, “Every exported public symbol must have a doc comment: each pub Rust item … gets a doc comment.”

📝 Proposed doc comment
+/// Implements the `moqsink` GStreamer element's settings and live aggregator state.
 #[derive(Default)]
 pub struct MoqSink {
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-gst/src/sink/imp.rs` around lines 120 - 126, The `MoqSink` struct is a
public symbol that is missing the required doc comment. Add a consumer-facing
`///` doc comment above the `pub struct MoqSink` definition to document its
purpose and functionality. The doc comment should follow Rust documentation
conventions and provide a clear description of what this struct represents and
its role as a GStreamer sink element.

Source: Coding guidelines


63-82: 🧹 Nitpick | 🔵 Trivial | ⚡ Quick win

Skip failed pads before mapping and copying buffers.

Line 67 copies the buffer before Line 81 checks pad.is_failed(), so a pad that is already being dropped still pays the map/copy cost and can still fail the whole element on an unnecessary map error. Move the pad lookup/failure check before map_readable().

♻️ Proposed ordering fix
 	fn write_buffer(&mut self, agg_pad: &gst_base::AggregatorPad, buffer: gst::Buffer) -> Result<(), gst::FlowError> {
 		let caps = agg_pad.current_caps();
 		let segment = agg_pad.segment();
 		let pts = buffer.pts();
-		let map = buffer.map_readable().map_err(|_| {
-			gst::error!(CAT, "failed to map buffer on pad {}", agg_pad.name());
-			gst::FlowError::Error
-		})?;
-		let data = Bytes::copy_from_slice(map.as_slice());
 
 		// Disjoint field borrows: the pad entry borrows `pads` while observe_caps reads broadcast/catalog.
 		let Self {
 			broadcast,
@@
 			..
 		} = self;
-		let pad = pads.entry(agg_pad.name().to_string()).or_insert_with(Pad::new);
+		let pad_name = agg_pad.name();
+		let pad = pads.entry(pad_name.to_string()).or_insert_with(Pad::new);
 		if pad.is_failed() {
 			return Ok(());
 		}
+		let map = buffer.map_readable().map_err(|_| {
+			gst::error!(CAT, "failed to map buffer on pad {}", pad_name);
+			gst::FlowError::Error
+		})?;
+		let data = Bytes::copy_from_slice(map.as_slice());
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-gst/src/sink/imp.rs` around lines 63 - 82, The write_buffer function
maps and copies buffer data before checking if the pad has failed, which wastes
resources on pads that are already failed. Move the pad lookup using
pads.entry(agg_pad.name().to_string()).or_insert_with(Pad::new) and the
subsequent is_failed() check to occur before the buffer.map_readable() call, so
that failed pads can be skipped without incurring the cost of mapping and
copying the buffer data.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@rs/moq-gst/src/sink/pad.rs`:
- Around line 106-117: The Opus audio codec configuration in the "audio/x-opus"
match arm is silently defaulting to stereo (2 channels) and 48 kHz sample rate
when the channels and rate fields are missing from the caps, which can propagate
incorrect audio configuration. Replace the unwrap_or(2) call on the channels
extraction and the unwrap_or(48_000) call on the rate extraction with proper
error handling that rejects under-specified caps instead of accepting them with
defaults. Additionally, add a test case (for example,
opus_caps_without_channels_fails) to verify that Opus caps without required
fields are properly rejected and prevent this regression in the future.
- Around line 140-155: The early return condition comparing self.segment_info ==
Some(info) does not account for changes in the start field of the segment info.
When the state is Active and a new segment arrives with the same base and rate
but a different start value, the early return prevents refreshing self.segment,
leaving it pointing to the old segment with an incorrect start value that breaks
timestamp mapping in frame_timestamp(). Modify the early return condition to
allow Active pads to skip the return and refresh their cached segment, while
still returning early for other pad states or when all fields including start
remain unchanged.

---

Nitpick comments:
In `@rs/moq-gst/src/sink/imp.rs`:
- Around line 120-126: The `MoqSink` struct is a public symbol that is missing
the required doc comment. Add a consumer-facing `///` doc comment above the `pub
struct MoqSink` definition to document its purpose and functionality. The doc
comment should follow Rust documentation conventions and provide a clear
description of what this struct represents and its role as a GStreamer sink
element.
- Around line 63-82: The write_buffer function maps and copies buffer data
before checking if the pad has failed, which wastes resources on pads that are
already failed. Move the pad lookup using
pads.entry(agg_pad.name().to_string()).or_insert_with(Pad::new) and the
subsequent is_failed() check to occur before the buffer.map_readable() call, so
that failed pads can be skipped without incurring the cost of mapping and
copying the buffer data.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 731d90f2-5e0b-488a-a480-04c9943efbcc

📥 Commits

Reviewing files that changed from the base of the PR and between 3cfbbc9 and 585b92f.

📒 Files selected for processing (5)
  • rs/moq-gst/src/sink/imp.rs
  • rs/moq-gst/src/sink/pad.rs
  • rs/moq-gst/src/sink/timeline.rs
  • rs/moq-net/src/model/frame.rs
  • rs/moq-net/src/model/group.rs
✅ Files skipped from review due to trivial changes (1)
  • rs/moq-net/src/model/frame.rs

kixelated and others added 2 commits June 22, 2026 12:50
- moq-net: move the frame-size check to GroupProducer::create_frame, BEFORE
  info.produce(). FrameProducer::new preallocates `size` bytes, and create_frame
  is append_frame's only caller, so checking only in append_frame let an oversized
  frame trigger the very multi-GB allocation the limit exists to prevent. Keep the
  append_frame check as a backstop for direct pub callers.
- session: drop the tokio::sync::watch<bool> shutdown signal (rs/CLAUDE.md flags a
  single-value watch as a code smell) in favor of aborting the task in stop(). The
  in-flight connect or idle loop is cancelled at its next await; simpler and one
  fewer tokio::sync primitive.
- session: reset estimated-send-bitrate to 0 when the congestion estimate ends,
  matching the property's documented "0 when unavailable" contract (it previously
  kept reporting the last value).
- session: doc comments on the ResolvedSettings pub fields.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
… silently

The rewrite requires a TIME segment to map timestamps; a pad without one drops
every buffer. Previously that was silent (only a per-frame debug log), so a
misconfigured source produced an empty broadcast with no visible reason.

push_buffer now returns whether a buffer was dropped for lack of a TIME segment
(once per pad), and aggregate posts a single element warning on the bus naming
the pad. Keeps the stricter timeline model (no raw-PTS fallback) while making the
failure visible. Adds a unit test for the once-per-pad reporting.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@arielmol

Copy link
Copy Markdown
Contributor

A genuinely stalled sparse pad. Aggregator fires aggregate() when every pad has a buffer... set_force_live(true) is the knob (raises the floor to GStreamer 1.22)

I'd gently unbundle from "same boundary." it isn't an unvalidated edge in #1771 it's structurally absent. the Fanout in Element has no aggregate() barrier, each pad's chain runs independently to its own track, so a stalled non-EOS pad never holds up the others. there's nothing to force_live around (no need to upgrade gst to 1.22+).

subscribe-back byte round-trip against a relay & FLUSH/seek on a live pipeline

Indeed on both, what I do have is live end2end from about 15+ of the fleet (x86/aarch64) so transport is exercised end to end. I just haven't asserted byte equality, but doable, ... will do! 😄

@arielmol

Copy link
Copy Markdown
Contributor

Update on the byte round-trip for #1771 done. I ran a VP8 payload round-trip: videotestsrc(ball) -> vp8enc -> moqsink -> relay -> moqsrc, capturing encoded buffers before moqsink and after moqsrc. The harness writes one file per GStreamer buffer and checks received hashes against a contiguous slice of the published hashes.

150 buffers published, all 150 distinct; 149 received; every received buffer matched the published VP8 payload byte-for-byte and in order. In this run the missing buffer was the tail not draining before teardown, not a payload mismatch.

The create_frame guard rejects oversized frames before produce() allocates, but a
direct FrameProducer::new still allocates ahead of the append_frame backstop. The
comment claimed full coverage; correct it and point at the constructor-level fix
(a fallible produce(), a breaking change) as a separate dev follow-up.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…ailed-pad order)

- pad.rs observe_segment: an Active pad now always re-runs instead of early-returning
  on a matching SegmentInfo. SegmentInfo omits `start`, so a SEGMENT with the same
  base/rate but a moved start previously left self.segment stale and mapped PTS to the
  wrong running time. The early return is kept for non-Active pads (the Invalid-flap
  guard). Adds a regression test.
- pad.rs build (Opus): require channels/rate from caps instead of defaulting to
  stereo/48k, which could misadvertise the stream. Fail the pad on missing/invalid
  values. Adds a regression test.
- imp.rs write_buffer: check pad.is_failed() before map_readable()/copy, so a failed
  pad pays no copy cost and an unmappable buffer on an already-failed pad can't fail
  the whole element.
- imp.rs: doc comment on the MoqSink impl struct.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
kixelated added a commit that referenced this pull request Jun 23, 2026
Review feedback from kixelated and CodeRabbit on #1893:

- Drop the sink-side MAX_FRAME_BYTES pre-check: moq-net's create_frame now
  rejects oversized frames (FrameTooLarge) before allocating the group slot,
  so the pad is invalidated the same way a bad bitstream is. This matches what
  pad.rs already documented.
- forward_buffer: map and copy the buffer before taking the state lock (the
  copy needs no shared state), and look the pad up with contains_key/get_mut so
  the hot path no longer allocates an owned name on every buffer.
- finalize_all: accumulate into a single Result<Vec<String>>, pushing names
  while Ok and switching to the first error.
- release_pad: re-check EOS aggregation via a shared maybe_post_eos helper, so
  releasing the last still-active pad no longer strands the element EOS for
  pads that already ended.
- Add the missing doc comments (Pad::new, SegmentInfo::rate).
- Fix stale "aggregate thread" comments copied from the GstAggregator design
  (#1870): this core is a bare Element with per-pad streaming threads.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants