Skip to content

moq-gst: moqsink on a bare Element with direct (no-channel) writes#1893

Open
kixelated wants to merge 21 commits into
mainfrom
claude/moqsink-direct
Open

moq-gst: moqsink on a bare Element with direct (no-channel) writes#1893
kixelated wants to merge 21 commits into
mainfrom
claude/moqsink-direct

Conversation

@kixelated

@kixelated kixelated commented Jun 23, 2026

Copy link
Copy Markdown
Collaborator

Summary

A third moqsink rewrite for comparison, the bare gst::Element with direct, no-channel writes (option B from the discussion). The direct-write core was coded independently, but it builds on the same media foundation @arielmol established in #1771 (timeline/running-time policy, codec/caps factory, B-frames, FLUSH/EOS, per-pad error isolation). Ariel's #1771 commits are merged into this branch so his authorship is credited on merge, and he is listed in the element's author metadata.

Each request pad has its own chain function that writes buffers straight into the moq producers from the streaming thread. There is no intermediate channel and no worker task: moq_net's producer writes are synchronous (an in-memory append bounded by group eviction), so the streaming thread never blocks on the network. A thin async task owns only connect and the session lifetime.

Why bare Element instead of GstAggregator (#1870)

GstAggregator synchronizes pads: it will not call aggregate() until every non-EOS pad has a buffer, so a sparse pad batches its siblings into bursts (the ~2s stalls measured on #1870). moqsink wants the opposite, independent per-pad passthrough (each pad is its own moq track, delivered independently). A bare Element gives that for free: each pad's chain runs on its own thread and never waits on another pad. It also stays on the GStreamer 1.14 baseline (no force_live / 1.22).

Why no channel (vs #1771)

#1771 keeps the bare Element but hands buffers to a worker over an unbounded channel. Since GroupProducer::write_frame is synchronous and non-blocking, that handoff is not load-bearing, so this PR drops it: the chain writes the producer directly. The result is fewer moving parts (no DataMsg, no worker loop, no per-pad generations) while keeping pad independence.

Structure

  • sink/imp.rs: bare Element. Request pads with chain + event functions; one Mutex<Option<State>> holds the producers + per-pad media, held only briefly per buffer. EOS aggregated across pads. Oversized frames are left to moq-net, whose create_frame rejects them (FrameTooLarge) before allocating the group slot, invalidating just that pad.
  • sink/pad.rs: per-pad media (caps -> producer, SEGMENT/running-time policy, frame import) + unit tests.
  • sink/session.rs: connect + transport lifetime + observable Status, no media.
  • sink/timeline.rs: pure SEGMENT/running-time policy.
  • Three read-only diagnostics: connected, moq-version, estimated-send-bitrate.

Public surface / branch targeting

No public library API or wire change. Same element name (moqsink), sink_%u pads, codec caps, and writable properties. No gstreamer-base dependency (it is a bare Element). Internal to the rs/moq-gst plugin, so this targets main.

Testing

  • cargo test -p moq-gst: 34 unit + 3 element tests; cargo clippy --all-targets and cargo fmt --check clean.
  • Live relay: videotestsrc ! vtenc_h264 ! moqsink.sink_0 plus audiotestsrc ! opusenc ! moqsink.sink_1 to a local moq-relay logged session connected -> two independent pads -> finalized on EOS: ["sink_0", "sink_1", "catalog"] -> EOS.

Relationship to the other PRs

(Written by Claude)

arielmol and others added 17 commits June 15, 2026 14:10
A parallel sink element (moqsinkspike, Rank::NONE) reimplementing the moqsink core with isolated, testable seams: pure timeline policy, per-pad state, a bounded data channel with cancellation independent of the data path, single finalization (catalog last), and observable connection status. The production moqsink is left untouched.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
moq-gst is outside default-members (needs GStreamer), so 'just rs test' skips it. Add 'just rs test-moq-gst' and chain it into 'just rs ci' so the spike's tests gate. The CI workflow only needs gstreamer, which the nix dev shell already provides; that step is applied at PR time.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…tate)

Judge SEGMENT continuity in running time, not media origin: classify_segment keys on the running-time base, so a moved start stays continuous while base does not rewind (the old start-equality rule wrongly rejected this). Map timestamps with to_running_time_full (signed) so a buffer before the segment is dropped, never clamped to zero. Add an explicit per-pad state (NoSegment/Active/Invalid): a discontinuity invalidates the pad and the next valid SEGMENT re-anchors it.

Also applies rustfmt to the spike baseline, which was committed before a fmt pass.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
GStreamer only registers plugins whose license is in its recognised set. The plugin_define! license was 'Apache 2.0', which gst rejects ('invalid license, not loading'), so the moq plugin (moqsink/moqsrc) never loads via the registry. The crate is MIT OR Apache-2.0, so declare the MIT side ('MIT/X11').

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
PadSet inferred membership lazily from CAPS, so EOS aggregation (eos.len() == pads.len()) was wrong when EOS arrived before CAPS. Make membership authoritative: an 'active' set populated by AddPad (sent from request_new_pad, and seeded at session start for pads requested earlier), with EOS counted against it. Every data message carries its pad's generation, captured per incarnation, so a pad recreated with the same name discards the previous one's in-flight messages. Covers EOS-before-CAPS, duplicate EOS, late members, buffers after EOS, and a release that completes the element.

Adds a hermetic element test (request/release) plus gstreamer as a dev-dependency. Session-dependent flows (multipad EOS, per-pad errors) stay in the relay-backed harness.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
A bad caps or bitstream now invalidates only the offending pad instead of tearing down the session: the worker drops that pad's producer and marks it failed in the shared Status, and the chain returns a FlowError on the pad's next buffer (one buffer later, since the worker is async) rather than silently dropping. Session-level failures (a closed catalog) still propagate to element_error! on the bus. Unsupported caps are also rejected synchronously in the event handler (NotNegotiated) before reaching the worker.

Hermetic tests: worker-side pad-vs-session error classification and the caps validator; element-level invalid-settings StateChangeError and connect-failure to the bus. The per-pad FlowError observation and the live error flows need a session, so they stay in the relay-backed harness.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Post-review fixes to the parallel spike core, all hermetically tested:

- Post the element EOS only after the catalog finalizes cleanly; a
  finalize failure now surfaces as element_error instead of a silent log
  (run_loop returns an ExitReason, finalize_all returns a Result).
- Enforce per-pad frame-timestamp monotonicity: a frame that would
  regress below the last emitted one is dropped, not emitted out of order.
- Make a failed pad terminal: its track is finalized and it stops
  blocking EOS aggregation, so the element completes on the good pads.
- Key the per-pad failure set by generation so a pad recreated with the
  same name never inherits a previous incarnation's failure.
- Require byte-stream/au in caps_supported (what FramedFormat::Avc3
  actually consumes), not just the media type.
- Reject a buffer past the MoQ frame limit (16 MiB) before copying, so
  hostile input cannot drive an unbounded allocation.
- Test honesty: rename tests that over-promised, add the buffer/bitstream
  decode-error path, and assert connected=false on a failed connect.

CONTEXT

Rejected alternatives:
- Escalating a pad failure to a session/element error: per-pad isolation
  is the contract, so a bad pad is finalized and counts as terminal,
  never fatal to the session.
- A transport test seam to make session-level flows hermetic: that is
  architecture-for-tests; multipad EOS and the per-pad FlowError
  observation are validated against a real relay instead.
- A loopback connect endpoint (127.0.0.1:1) for the connect-failure test:
  measured to ride the QUIC idle timeout (no fast localhost-UDP reject),
  so the fast RFC-invalid DNS host was kept.
- A spike-side cap for the importer's non-VCL NAL accumulation: the spike
  has no "frame not emitted" signal, so that cap belongs in the moq-mux
  importer.

Key decisions:
- EOS must not claim success the broadcast did not reach: it waits for
  catalog.finish() to succeed, and a failure becomes element_error.
- Segment-base continuity alone does not stop a frame from regressing:
  monotonicity is enforced at the frame level (last_emitted), not only at
  the segment boundary.
- The frame cap is aligned to moq-net's MAX_FRAME_SIZE: a larger frame
  could not be consumed anyway.
…ough)

Close the codec gap in the parallel spike so it accepts the same media as
moqsink, without regressing the rewrite's seams (shared running time,
per-pad membership, per-pad error isolation, EOS-after-finalize):

- Expand the pad template and replace the hardcoded Avc3 path with a
  caps -> Framed factory over seven media types: H.264/H.265
  (byte-stream/au), AV1, VP8, VP9, AAC (mpegversion=4, raw), Opus. Every
  codec converges on one Framed; only the construction from caps differs.
- Keep caps_supported media-type only. The factory enforces per-codec
  specifics (byte-stream/au, AAC mpegversion/raw plus codec_data) and
  derives Opus channels/rate with defaults, failing just that pad on a
  bad or missing detail, never the session.
- finalize only finishes an initialized producer. A lazy importer
  (H.265/AV1/VP8/VP9) given CAPS then EOS with no frame never created its
  track, so finish() would error "not initialized"; it now no-ops while a
  real finish error on a live track still surfaces.
- Tests: strong frame-through for H.264 and Opus that observe a real
  emitted frame on the rendition track (not just the catalog rendition);
  creation tests for H.265/AV1/VP8/VP9/AAC; negatives for length-prefixed
  H.264, byte-stream-less H.265, AAC without codec_data, and AAC with
  wrong mpegversion; a clean CAPS-then-EOS test for the lazy codecs;
  caps_supported retargeted to the seven media types.

CONTEXT

Rejected alternatives:
- Per-codec field checks inside caps_supported: it stays thin and
  media-type only, so the factory is the one enforcement point and a bad
  detail fails the pad, not the session.
- A new MediaPad/MediaProducer abstraction: Pad already holds an
  Option<Framed>, so a factory function is enough.
- Frame-through fixtures for all seven codecs: only H.264 and Opus get
  strong frame-through; the rest get creation tests, with real
  transmission left to the relay harness.
- A moq-mux change to make a lazy finish() tolerant: the no-track no-op
  is kept spike-local in Pad::finalize via the existing Framed::track()
  probe, so moq-mux behavior is unchanged and real finish errors on live
  tracks still propagate.
- Reintroducing the production sink's per-pad reference_pts anchoring: the
  spike keeps its shared running time.

Key decisions:
- caps_supported is media-type only; per-codec specifics live in the
  factory, which fails the pad (not the session) on a bad or missing
  detail.
- A frame-through test must observe a real emitted frame: it subscribes
  to the rendition track and asserts latest().is_some(), because the
  catalog rendition is published from the SPS/config and would pass with
  no frame at all.
- CAPS then EOS before the first frame must be clean: a lazy importer
  never created its track, so finalize no-ops instead of turning "not
  initialized" into a session error.
…ecycle races)

Production-hardening pass over the parallel spike, all hermetically tested
(62 unit + 3 element, clippy -D warnings, fmt). FLUSH plus the fixes from an
external broadcast review:

- FLUSH: handle FLUSH_START/STOP as a discontinuity plus a cancellation. The
  worker re-anchors the pad's timeline (NoSegment) so a post-seek rewinding
  SEGMENT is accepted instead of rejected, and a per-pad watch cuts a chain
  blocked on the bounded channel without tearing the session down. run_loop is
  biased so the out-of-band flush re-anchors before the post-flush SEGMENT that
  rides the data FIFO.
- B-frames: drop the per-frame PTS monotonicity filter. The moq-mux container
  carries frames in decode order and documents that B-frames have non-monotonic
  presentation timestamps, so the filter silently dropped every B-frame (30-60%
  of frames on High-profile content). The wire order is already decode order.
- Pad lifecycle: populate the membership maps before add_pad, roll back only the
  entry this attempt still owns on failure, and announce AddPad only after
  add_pad succeeds, so a duplicate or failed add never orphans the live pad nor
  seeds a phantom that blocks EOS aggregation forever.
- Status races on restart: a session generation token. All observable state
  (connected, version, send_bitrate, failed) lives under one Mutex so "check the
  live generation, then write" is indivisible; a stale session's late writes and
  exit-reset become no-ops. failed is keyed by (session generation, pad
  generation), so a restart reusing the same pad never inherits a stale failure.
- Worker panic: run the session worker on an inner task and surface a JoinError
  (panic or cancel) as element_error, instead of a silent death only observed
  when stop() reaps the JoinHandle.
- Refactor: bundle the worker's inbound channels into one Inbound struct.

CONTEXT

Rejected alternatives:
- Enforcing frame monotonicity on the DTS instead of removing the filter:
  best-practice in the abstract, but it needs DTS threaded through every layer
  and only guards against out-of-decode-order delivery, which GStreamer does not
  produce. Removing the filter to match the container contract is simpler and
  correct.
- Gating only the exit-reset by generation: a stale session could still write
  connected/version/bitrate. Every observable write is gated.
- Generation-gating with separate atomics: a load-then-store is not an atomic
  check-and-write, so a stale write can still interleave between the check and
  the store. One Mutex over the observable state makes it indivisible.
- Serializing request_new_pad under a lock held across add_pad: add_pad emits
  pad-added synchronously, so a reentrant handler would deadlock on that lock, a
  worse failure than the narrow same-name-concurrent window it would close. That
  window is left documented, not closed.
- A moq-mux Framed::discontinuity() for the FLUSH partial-AU reset: kept this
  change moq-gst-only. The partial-AU edge is documented and low-risk with
  alignment=au (one complete access unit per buffer).
- catch_unwind via the futures crate for the worker panic: futures is not a
  direct dependency, so an inner task plus JoinError is used instead.

Key decisions:
- FLUSH cancellation uses a watch<bool> per pad: no new dependency, and per-pad
  so flushing one input never cancels another input's blocked send. send_or_flush
  is biased toward the flush arm and re-checks the watch after winning a channel
  permit, which narrows (does not atomically close) the capacity-vs-FLUSH_START
  tie; the residual one-buffer leak is absorbed by the re-anchor.
- The failure set carries both generations; is_failed matches the live session,
  so neither a recreated pad nor a recreated session inherits a stale failure.
- Honest test coverage: decode-order B-frame emit, generation scoping, and
  blocked-send cancellation have discriminating tests. The panic-to-bus path and
  the load-vs-store / same-name-concurrent races are covered by construction
  (a lock) or documented, not by a deterministic test, since they need a real
  session or an injected race.
The parallel moqsinkspike element, built across the preceding commits, replaces
the original moqsink. Remove the old implementation, move the new core into
src/sink/, and register it as "moqsink" with the GType name kept as "MoqSink"
for registration continuity.

Behavior is a compatible superset of the old element: the same request sink
pads, the same seven codec caps, and the same three writable properties (url,
broadcast, tls-disable-verify), plus three read-only diagnostics (connected,
moq-version, estimated-send-bitrate).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…solation)

Post-review fixes to the rewritten moqsink, all internal to rs/moq-gst:

- The FLUSH watch is keyed to the pad's lifetime, not the session's. A
  FLUSH_START whose FLUSH_STOP never arrived before a PAUSED->READY->PAUSED
  restart left that pad muted across the new session. Clear every watch at
  session start (reset_pad_flushes), before the new session is visible.
- A pad release is reconfiguration, not end-of-stream: DropPad no longer posts
  element EOS. The genuine "all pads ended" completion still fires from the EOS
  handler, so drop_pad now returns ().
- A failed finish() at EOS is isolated to that pad, like a per-pad data error,
  instead of escalating to a session error that kills the whole broadcast. This
  splits fail_pad into mark_pad_failed_terminal (mark failed + terminal, no
  producer touch) so the EOS path never double-finalizes, and Pad::finalize
  takes the producer up front (let-else) to make its attempt-once contract
  explicit.
- handle_data_msg returns Result<bool> instead of Result<Option<ExitReason>>;
  the only variant it could produce was Ended, so run_loop now constructs every
  ExitReason in one place.
- Fix a comment on the worker's biased select: FLUSH_START arrives out of band
  (a different thread), so the bias only wins a readiness tie, not a
  happens-before against the bounded data channel.

CONTEXT
Rejected alternatives:
- FLUSH restart: a "check session first" guard in the FlushStart handler. It
  narrows a window that reset-on-start already closes fully and reinforces a
  wrong race-condition reading; dropped for the single reset.
- EOS finalize error: calling fail_pad directly. It would re-finalize the
  producer; the mark_pad_failed_terminal split avoids the double-finalize.
Key decisions:
- Isolating finalize errors per pad matches how mid-stream caps/buffer errors
  are already handled; genuinely fatal failures still surface via the session,
  catalog, and transport paths.
- The send_or_flush post-permit window (one pre-flush buffer can still enqueue)
  is accepted and documented in place: the worker re-anchor absorbs it.

Verified: 65 unit + 3 element tests, cargo clippy -D warnings, cargo fmt.

Memory-leak soak (moqsink does not leak): a 35 min x264->moqsink run held RSS
flat with 0 leaked gst objects; a ~12 h msdk->moqsink vs msdk->fakesink
bisection showed identical flat RSS, so moqsink adds no leak over the encoder.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The new [dev-dependencies] section landed before [build-dependencies], but cargo sort expects dependencies -> build-dependencies -> dev-dependencies, failing `just ci`. Swap the two so the lint passes.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The session handoff used a bounded mpsc channel for backpressure, so a send
blocked on a full channel could not be cut by an out-of-band FLUSH_START and
could wedge a seek/teardown. But moq-net's producer is a synchronous RAM cache
(it appends and evicts, never blocking on the network), so the sink never needs
to apply backpressure at the GStreamer boundary.

Make the handoff unbounded: chain accepts each buffer toward the worker and
returns without blocking. With no blocked send the flush-cancellation apparatus
is unnecessary and removed: the bounded channel, send_or_flush/SendOutcome, the
per-pad flush watches that cut a blocked send, and the AddPad/DropPad blocking
sends. FlushSignal and the per-pad generations stay; FlushSignal now carries only
the timeline re-anchor (FLUSH re-anchors a pad's timeline, kept intact). Net -369
lines.

Caveat: the handoff is unbounded, so buffers accumulate transiently in it before
connect completes and under worker lag, an unbounded buffer ahead of the cache.
Building the producers up front and writing directly would remove it; left as a
follow-up.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
# Conflicts:
#	rs/justfile
#	rs/moq-gst/src/lib.rs
#	rs/moq-gst/src/sink/imp.rs
A rewrite of moqsink that keeps the bare gst::Element base (per-pad request pads
with chain functions) but writes each buffer straight into the moq producers from
the streaming thread. There is no intermediate channel and no worker task:
moq_net's producer writes are synchronous (an in-memory append bounded by group
eviction), so the streaming thread never blocks on the network. A thin async task
owns only connect and the session lifetime.

Why a bare Element rather than GstAggregator: Aggregator synchronizes pads (it
will not call aggregate() until every non-EOS pad has a buffer), which batches a
sparse pad's siblings into bursts. moqsink wants the opposite, independent per-pad
passthrough (each pad is its own moq track), which a bare Element gives naturally:
each pad's chain runs on its own thread and never waits on another pad. It also
stays on the GStreamer 1.14 baseline (no force_live / 1.22).

Structure:
- sink/imp.rs: bare Element. request pads with chain + event functions; a single
  Mutex<Option<State>> holds the producers + per-pad media, held only briefly per
  buffer. Direct writes; EOS aggregated across pads; the frame-size cap stays
  sink-side (moq-net keeps MAX_FRAME_SIZE pub(crate)).
- sink/pad.rs: per-pad media (caps -> producer, SEGMENT/running-time policy, frame
  import), with unit tests.
- sink/session.rs: connect + transport lifetime + observable Status, no media.
- sink/timeline.rs: pure SEGMENT/running-time policy.
- Three read-only diagnostics (connected, moq-version, estimated-send-bitrate).

Same element name, sink_%u pads, caps, and writable properties. Verified: 34 unit
+ 3 element tests, clippy, fmt, and a live publish to a local relay (connect ->
two independent pads -> EOS).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 23, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: abcdded5-649f-47f5-a8a2-3940fb2397a4

📥 Commits

Reviewing files that changed from the base of the PR and between 8a61d46 and df0d25c.

📒 Files selected for processing (1)
  • rs/moq-gst/src/sink/imp.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • rs/moq-gst/src/sink/imp.rs

Walkthrough

The MoqSink GStreamer element is refactored from an async Tokio task/mpsc-channel design to a synchronous per-pad producer model. Three new submodules are introduced: timeline.rs defines SegmentInfo, classify_segment, and frame_micros for segment continuity and timestamp conversion; session.rs owns transport connection lifecycle, observable Status (connected/version/bitrate), and async task management; pad.rs implements per-pad caps negotiation, PadState transitions, buffer forwarding, and finalization. imp.rs replaces SessionHandle with a synchronous State struct and adds three read-only GObject properties (connected, moq-version, estimated-send-bitrate). Integration tests and documentation are added.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately captures the main architectural change: rewriting moqsink as a bare Element with direct writes instead of channel-based workers.
Description check ✅ Passed The description comprehensively explains the rationale, design decisions, and implementation structure, clearly relating to all major changes in the changeset.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch claude/moqsink-direct

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@rs/moq-gst/src/sink/imp.rs`:
- Around line 264-276: Extract the EOS aggregation check and posting logic from
the handle_eos method (specifically the all_ended condition check and EOS
message posting) into a new helper method called maybe_post_eos, then call this
helper from both handle_eos and at the end of release_pad (after
state.ended.remove and self.obj().remove_pad calls) to ensure that when a pad is
released, the aggregation condition is re-evaluated and any pending EOS message
is posted if all remaining pads have ended.

In `@rs/moq-gst/src/sink/pad.rs`:
- Around line 44-55: Add a doc comment above the public method `Pad::new()` in
the Pad impl block. Use Rust's triple-slash doc comment syntax to document what
this constructor method does, creating a new Pad instance with default values.
The doc comment should clearly explain the purpose of this initialization method
and appear immediately before the function signature.

In `@rs/moq-gst/src/sink/timeline.rs`:
- Around line 5-11: Add a doc comment for the `rate` field in the `SegmentInfo`
struct. Locate the `rate: f64,` field declaration and add a documentation
comment above it using the same format as the existing doc comments for
`time_format` and `base_nanos` fields. The comment should briefly describe the
purpose or meaning of the rate field to maintain consistency with the public API
documentation standards.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 1e86c1b3-cc2f-4461-b9b9-08ee581718a2

📥 Commits

Reviewing files that changed from the base of the PR and between 8469563 and 21c253a.

📒 Files selected for processing (7)
  • doc/bin/gstreamer.md
  • rs/moq-gst/src/sink/imp.rs
  • rs/moq-gst/src/sink/mod.rs
  • rs/moq-gst/src/sink/pad.rs
  • rs/moq-gst/src/sink/session.rs
  • rs/moq-gst/src/sink/timeline.rs
  • rs/moq-gst/tests/element.rs

Comment thread rs/moq-gst/src/sink/imp.rs
Comment thread rs/moq-gst/src/sink/pad.rs
Comment thread rs/moq-gst/src/sink/timeline.rs
Comment thread rs/moq-gst/src/sink/imp.rs Outdated

/// Reject a frame past the MoQ frame limit (moq-net's wire MAX_FRAME_SIZE, 32 MiB): it could not be
/// sent anyway, and copying it would let hostile input drive an unbounded allocation. moq-net keeps the
/// constant `pub(crate)`; a follow-up to make its frame constructor fallible would let this check go.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be merged now, so you can remove MAX_FRAME_BYTES

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 8c0e920. Removed the constant and the pre-check. moq-net's create_frame now rejects oversized frames with FrameTooLarge before it preallocates the group slot (group.rs:184), so an oversized buffer just invalidates the pad like any other bad bitstream, which is what pad.rs already documented ("an oversized frame, rejected by moq-net"). (Written by Claude)

Comment thread rs/moq-gst/src/sink/imp.rs Outdated
/// finalize error is logged and surfaced as the returned `Err`; a catalog error takes precedence.
fn finalize_all(&mut self) -> Result<Vec<String>> {
let mut order = Vec::new();
let mut failure = None;

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDK if it would be cleaner to have this be Ok(Vec::new) to start with and we only push when ok_mut() (is that even a function...?)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. finalize_all now starts as Ok(Vec::new()) and pushes names while it's still Ok (via result.as_mut()), switching to the first error and leaving the partial order behind. Dropped the separate failure accumulator. One behavioral nuance: it's now first-error-wins rather than catalog-takes-precedence; both are teardown finalize errors so the distinction is cosmetic, and I updated the doc comment to match. (Written by Claude)

Comment thread rs/moq-gst/src/sink/imp.rs Outdated
return Ok(gst::FlowSuccess::Ok); // drop quietly; the pad already reported its failure
}

let map = buffer.map_readable().map_err(|_| {

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this before grabbing the lock?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The map_readable + copy_from_slice now happen before the lock is taken (neither needs shared state), so the per-pad lock is held only for the producer write. Trade-off: a buffer arriving on an already-failed pad now incurs the copy before we discover it's failed, but that's an error path and checking is_failed needs the lock anyway. (Written by Claude)

Comment thread rs/moq-gst/src/sink/imp.rs Outdated
return Err(gst::FlowError::Error);
}

let media = state.pads.entry(pad.name().to_string()).or_insert_with(Pad::new);

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be occupied 99% of the time right? Maybe don't use entry() to avoid allocating a string every iteration.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The hot path now does contains_key + get_mut (no allocation) and only builds the owned String key on the rare first-buffer insert. (Written by Claude)

Review feedback from kixelated and CodeRabbit on #1893:

- Drop the sink-side MAX_FRAME_BYTES pre-check: moq-net's create_frame now
  rejects oversized frames (FrameTooLarge) before allocating the group slot,
  so the pad is invalidated the same way a bad bitstream is. This matches what
  pad.rs already documented.
- forward_buffer: map and copy the buffer before taking the state lock (the
  copy needs no shared state), and look the pad up with contains_key/get_mut so
  the hot path no longer allocates an owned name on every buffer.
- finalize_all: accumulate into a single Result<Vec<String>>, pushing names
  while Ok and switching to the first error.
- release_pad: re-check EOS aggregation via a shared maybe_post_eos helper, so
  releasing the last still-active pad no longer strands the element EOS for
  pads that already ended.
- Add the missing doc comments (Pad::new, SegmentInfo::rate).
- Fix stale "aggregate thread" comments copied from the GstAggregator design
  (#1870): this core is a bare Element with per-pad streaming threads.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@rs/moq-gst/src/sink/imp.rs`:
- Around line 341-349: The buffer copy at line 349 with Bytes::copy_from_slice
happens before moq-net validates the frame size, causing unnecessary allocations
for oversized frames. Expose MAX_FRAME_SIZE from moq-net as public or create a
public preflight validation helper function that moq-gst can import. Add a size
check before the Bytes::copy_from_slice call at line 349 to validate the frame
size against the limit and return an error if it exceeds the maximum allowed
size. Additionally, update the misleading comment at lines 342-343 to accurately
reflect that the allocation happens first and then moq-net rejects it, rather
than claiming rejection happens before allocation.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: c130c41f-9756-47f9-884b-848d39a7e095

📥 Commits

Reviewing files that changed from the base of the PR and between 21c253a and 8c0e920.

📒 Files selected for processing (4)
  • rs/moq-gst/src/sink/imp.rs
  • rs/moq-gst/src/sink/pad.rs
  • rs/moq-gst/src/sink/session.rs
  • rs/moq-gst/src/sink/timeline.rs
🚧 Files skipped from review as they are similar to previous changes (3)
  • rs/moq-gst/src/sink/timeline.rs
  • rs/moq-gst/src/sink/session.rs
  • rs/moq-gst/src/sink/pad.rs

Comment on lines +341 to 349
// Map and copy outside the lock: neither needs shared state, so the per-pad lock is held only for
// the producer write. An oversized frame is left to moq-net, which rejects it (FrameTooLarge) before
// allocating the group slot and so invalidates just this pad.
let pts = buffer.pts();
let map = buffer.map_readable().map_err(|_| {
gst::error!(CAT, "failed to map buffer on pad {}", pad.name());
gst::FlowError::Error
})?;
let data = Bytes::copy_from_slice(map.as_slice());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify where frame-size validation is exposed and confirm the copy-before-validation path.
set -euo pipefail

# Expect to find the moq-net FrameTooLarge path and whether a reusable public limit/preflight exists.
rg -n -C3 'FrameTooLarge|MAX_FRAME|frame.*too.*large|create_frame|fn create_frame' rs -g '*.rs'

# Expect this to show Bytes::copy_from_slice before Pad::push_buffer.
sed -n '337,376p' rs/moq-gst/src/sink/imp.rs

Repository: moq-dev/moq

Length of output: 14821


🏁 Script executed:

# Check if MAX_FRAME_SIZE is publicly exported from moq-net root or if it's crate-private
rg -n 'pub.*MAX_FRAME_SIZE|pub mod|pub use.*MAX_FRAME_SIZE' rs/moq-net/src --type rust | head -20

# Check moq-net's public API surface (lib.rs or mod.rs)
cat rs/moq-net/src/lib.rs 2>/dev/null | head -100

Repository: moq-dev/moq

Length of output: 3693


🏁 Script executed:

# Check what's exported from the model module
rg -n 'pub (const|fn|struct|enum)' rs/moq-net/src/model --type rust | grep -i 'frame\|size'

# Check moq-net's model module structure
ast-grep outline rs/moq-net/src/model/mod.rs --view expanded | head -50

# See what moq-gst imports from moq-net
rg -n 'use.*moq_net|use moq_net' rs/moq-gst/src -A2 --type rust | head -30

Repository: moq-dev/moq

Length of output: 3276


🏁 Script executed:

# Check moq-net's model/mod.rs to see what's re-exported
cat rs/moq-net/src/model/mod.rs

# Also check if MAX_FRAME_SIZE is used anywhere in moq-gst
rg -n 'MAX_FRAME_SIZE' rs/moq-gst -A2 -B2

Repository: moq-dev/moq

Length of output: 369


🏁 Script executed:

# Check if MAX_FRAME_SIZE is exported via pub use in frame.rs
cat rs/moq-net/src/model/frame.rs | head -50

# Check the comments around MAX_FRAME_SIZE to understand the design intent
sed -n '15,35p' rs/moq-net/src/model/frame.rs

# See if moq-gst accesses any GroupProducer methods
rg -n 'GroupProducer\|create_frame\|write_frame' rs/moq-gst/src -B2 -A3

Repository: moq-dev/moq

Length of output: 3197


🏁 Script executed:

# Check how moq-gst accesses the frame/group writing
rg -n 'push_buffer|Pad::push|write|producer' rs/moq-gst/src/sink/pad.rs -B2 -A5 | head -60

# See the full pad.rs implementation to understand the frame path
sed -n '1,100p' rs/moq-gst/src/sink/pad.rs

Repository: moq-dev/moq

Length of output: 6474


🏁 Script executed:

# Check the Framed implementation in moq_mux to see if it validates frame size
rg -n 'push_buffer|write_frame|create_frame' rs/moq-mux/src/import --type rust -B3 -A3 | head -80

# Also check the Framed implementation signature
ast-grep outline rs/moq-mux/src/import.rs --view expanded 2>/dev/null || echo "File not found, checking directory structure"

# Find the import module
find rs/moq-mux/src -name "import*" -o -name "*framed*" | head -5
</parameter>

Repository: moq-dev/moq

Length of output: 2982


Make frame size validation preflight-accessible from moq-net.

The buffer is copied unconditionally to Bytes at line 349 before reaching moq-net's FrameTooLarge validation in GroupProducer::create_frame, so an oversized gst::Buffer allocates on the streaming thread even though moq-net will reject it. MAX_FRAME_SIZE is pub(crate) in moq-net and cannot be imported here for a preflight check. Expose either MAX_FRAME_SIZE as public or a small preflight helper from moq-net so moq-gst can validate before copying.

Additionally, the comment at lines 342–343 is misleading: it claims oversized frames are rejected "before allocating," but the Bytes::copy_from_slice allocation always happens first.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-gst/src/sink/imp.rs` around lines 341 - 349, The buffer copy at line
349 with Bytes::copy_from_slice happens before moq-net validates the frame size,
causing unnecessary allocations for oversized frames. Expose MAX_FRAME_SIZE from
moq-net as public or create a public preflight validation helper function that
moq-gst can import. Add a size check before the Bytes::copy_from_slice call at
line 349 to validate the frame size against the limit and return an error if it
exceeds the maximum allowed size. Additionally, update the misleading comment at
lines 342-343 to accurately reflect that the allocation happens first and then
moq-net rejects it, rather than claiming rejection happens before allocation.

kixelated and others added 3 commits June 23, 2026 15:18
This direct-write core grew out of Ariel's rewritten, unit-tested moqsink
foundation in #1771 (the timeline/running-time policy, codec/caps factory,
B-frame handling, FLUSH/EOS and per-pad error isolation). Restore him to the
element's author metadata.

Co-Authored-By: Ariel Molina <ariel@edis.mx>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
#1893 is the bare-Element, direct-write variant of the moqsink rewrite. It was
coded independently of #1771, but it builds on the same media foundation Ariel
established there (timeline continuity, codec/caps parity, B-frames, FLUSH/EOS,
per-pad error isolation). Merge his #1771 branch so those commits are part of
this PR and his authorship is credited on merge.

The tree is taken entirely from this branch (`-s ours`): the direct-write core
is the shipping implementation; this merge exists for attribution and history.

Co-Authored-By: Ariel Molina <ariel@edis.mx>
The hot path (pad already exists, since caps precede buffers) now does a single
hash lookup instead of contains_key + get_mut. The borrow checker accepts the
get-or-insert match on the current toolchain, so the None arm falls back to
entry() only on the rare first-buffer insert.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants