Skip to content

feat(core): implement consensus qbft transport and sniffer#448

Open
iamquang95 wants to merge 19 commits into
mainfrom
iamquang95/consensus-qbft-transport
Open

feat(core): implement consensus qbft transport and sniffer#448
iamquang95 wants to merge 19 commits into
mainfrom
iamquang95/consensus-qbft-transport

Conversation

@iamquang95
Copy link
Copy Markdown
Collaborator

Part of: #157

This PR implements transport state and sniffer, part of the consensus qbft. This's currently on top of #442 (hasn't merged yet)

@iamquang95
Copy link
Copy Markdown
Collaborator Author

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented May 28, 2026

Claude finished @iamquang95's task in 3m 54s —— View job


Code Review — feat(core): implement consensus qbft transport and sniffer

Reviewed against the Charon reference at charon/core/consensus/qbft/{transport,sniffer}.go. Overall the port is clean and the Charon parity is high. Tests cover the main code paths and a few error cases. A few things worth flagging:


1. Channel-type mismatch between Transport.recv_tx and the QBFT core

Transport::new accepts a tokio::sync::mpsc::Sender<qbft::Msg<ConsensusQbftTypes>> (crates/core/src/consensus/qbft/transport.rs:81), but crate::qbft::Transport::receive is a crossbeam::channel::Receiver<Msg<T>> (crates/core/src/qbft/mod.rs:111, alias mpmc). The two halves are not on the same channel and cannot be wired directly — the consensus runner will need a bridging task (or this transport will need to switch to crossbeam).

In Go this is trivially the same chan on both sides. The current TODO (// Remove once the consensus runner wires this transport.) covers it, but the asymmetry deserves a comment so the next person picking this up doesn't expect a one-line wiring.

2. set_values / get_value mutex partition diverges from Charon

Charon uses a single valueMu covering both the valueCh drain and the values map. The Rust port splits these into two mutexes (value_rx and values, transport.rs:83–84), so the channel drain in get_value and the values lookup are no longer atomic w.r.t. set_values.

For the current usage pattern (sequential drain → lookup) this is functionally equivalent, but it does open a subtle race: two concurrent get_value calls can interleave such that one consumes the next item from value_rx, inserts it into values, and the other reads values before the insert lands — both would still see the correct entry eventually, but the second call would have to wait for the first to release the values lock. Acceptable, but I'd either (a) collapse into a single mutex to match Charon, or (b) leave a comment explaining the intentional divergence and the invariants that make it safe.

3. to_consensus_msg() invoked twice per broadcast

In broadcast we build consensus_msg = msg.to_consensus_msg() (transport.rs:183) and later call sniffed_msg.to_consensus_msg() again in the spawned task (transport.rs:197). Each call clones pb_msg, justification_protos, and re-collects values into a fresh Vec. Charon has the same double-call shape, so this is parity-preserving — but a small optimization (consensus_msg.clone() for the sniffer side) would avoid the second rebuild. Not blocking.

4. transport.rs:561OtherMsg as_any is mis-shaped for downcast tests

The trait-object downcast in production code is .downcast_ref::<msg::Msg>(). The test type OtherMsg is used to drive Error::InvalidJustification/Error::InvalidJustificationMessage, and as_any returns &self (i.e. &OtherMsg), so the downcast to msg::Msg correctly fails. Working, but worth a one-line comment in the test fixture explaining the trick (OtherMsg exists specifically to fail the downcast) since OtherMsg's as_any body looks at first glance like a normal pass-through.

5. PoisonError::into_inner repetition

unwrap_or_else(PoisonError::into_inner) appears 6× across the two new files (transport.rs:110, 122, 126; sniffer.rs:42, 58). Not a bug, just noisy — a small inline helper (fn lock_or_recover<T>(m: &Mutex<T>) -> MutexGuard<'_, T>) would tighten the call sites if you want to introduce one. Up to you; keeping the explicit form is also fine.

6. Minor / nits

  • transport.rs:14-23: the super::{msg, sniffer::Sniffer} import sits below the use crate::… block. Pluto seems to mix import-grouping styles, but most files I've read in crates/core put super:: imports together with crate::. Cosmetic.
  • transport.rs:36-46 (BroadcastRequest) has nine fields — fine, but peer_idx/round/prepared_round as i64 follows the protobuf shape rather than the conventional u32/u64 we'd otherwise pick. This matches Charon and is consistent with qbft::SomeMsg::source/round — no change requested, just confirming the choice is intentional.
  • transport.rs:201 — the detached self-send tokio::spawn is fine and mirrors Charon's go func(). The existing comment is good; might be worth also noting that the spawned task will silently drop the message on recv_tx send failure (channel closed) — Charon's variant has the same behavior, so parity holds.
  • sniffer.rs:60: QBFT_V2_PROTOCOL_ID.to_string() allocates on every instance() call. Cheap, but if you ever expect this to be called per-message, consider keeping protocol_id as an owned String in SniffedConsensusInstance only when emitting. (Today this is once per instance teardown — fine.)

7. Test observations

  • broadcast_self_enqueues_message (transport.rs:475) relies on the current-thread tokio runtime running the spawned self-send synchronously before the test's recv_rx.recv() resumes. Under #[tokio::test] (current-thread, buffered channel) this is deterministic. If the test file ever switches to flavor = "multi_thread", the sniffer_instance().msgs.len() == 1 check becomes flaky relative to the recv await. Worth a comment, or simply await on a sniffer condition rather than asserting length immediately.
  • process_receives_caches_values_and_forwards (transport.rs:520) — good; verifies caching, forwarding, and sniffer integration. The graceful ct.cancel(); runner.await.unwrap().unwrap(); pattern is nice.
  • I'd like to see one more test that exercises the ReceiveBufferClosed error in process_receives — drop the recv_rx half and send a message through outer_rx. Currently no test path covers result.map_err(|_| Error::ReceiveBufferClosed)? at transport.rs:228.

Summary

LGTM with minor follow-ups. The transport/sniffer ports are faithful to Charon and the test suite is reasonable. Main thing to confirm before merging: the channel-type mismatch (item 1) is intentional and the consensus runner will bridge tokio mpsc ↔ crossbeam mpmc. The mutex-split divergence from Charon (item 2) is the only sub-correctness-level concern; happy either way as long as it's documented.

  • Read PR diff and changed files
  • Compare against Charon Go reference
  • Analyze transport.rs for correctness, style, and tests
  • Analyze sniffer.rs
  • Compose review feedback
    — branch: iamquang95/consensus-qbft-transport

@iamquang95
Copy link
Copy Markdown
Collaborator Author

@varex83agent /review-pr

Copy link
Copy Markdown
Collaborator

@varex83agent varex83agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solid, well-tested port of transport.go + sniffer.go with good module-level documentation in msg.rs. Sniffer is a clean 1:1 mapping. Transport faithfully bridges the tokio mpsc admission buffer with the crossbeam channel that the QBFT core consumes, and the comments at lines 81-83 and 193-194 capture the non-obvious design choices.

No bugs or hard blockers. Findings are predominantly minor style/idiom or contract-documentation gaps, plus a few nit cleanups:

  • Hash-binding contract is implicit: hash_proto_bytes hashes raw Any.value bytes rather than re-marshalling deterministically like Go's hashProto does (transport.rs:131, msg.rs:281). This works today because prost emits deterministic bytes for the proto types that flow through QBFT (BTreeMap for map fields), but the invariant is undocumented and a future caller could easily reach for hash_proto_bytes on an inbound-validation path where the input bytes are NOT guaranteed deterministic. Worth tightening the docstring.
  • A couple of intentional behavioral divergences from Go (Ok(()) on outer-rx close vs Go's zero-value-loop, Err(ReceiveBufferClosed) on inner-send failure vs Go's panic) — both are improvements, but the docstring on process_receives should call them out.
  • Broadcaster is typed Box<dyn Fn(...) -> BoxFuture<'static, Result<()>>> where Result is transport::Result — real implementations have no error variant to return wire/marshal failures through. Worth either adding Error::Broadcast(...) or relaxing the broadcaster signature before the runner wires this up.
  • Self-send is detached as tokio::spawn (transport.rs:195) and relies on the caller to cancel ct at instance teardown; if the contract isn't held, the task can outlive the transport. The contract is implicit — worth a sentence in the doc.
  • Several smaller style/clarity nits: shadowed hash in get_value, inconsistent Arc::clone style, field-name collision Transport.values/ValueStore.values, and one duplicate test.

Verdict: comment-only review — none of the findings block merge, but the contract-documentation items are worth addressing before the consumers of these helpers land.

pub(crate) fn get_value(&self, hash: [u8; 32]) -> Result<Any> {
let mut store = self.values.lock().unwrap_or_else(PoisonError::into_inner);
if let Ok(local) = store.value_rx.try_recv() {
let hash = msg::hash_proto_bytes(&local.value)?;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hash-determinism contract is implicit and undocumented. Here we compute the consensus hash from the raw bytes of Any.value (whatever the producer happened to put in there). Go's hashProto (charon/core/consensus/qbft/msg.go:138-166) instead unmarshals to a typed proto and re-marshals with proto.MarshalOptions{Deterministic: true} before hashing — so Go is byte-canonical regardless of how the value was originally encoded.

This works today because:

  • The local producer of value_rx uses prost_types::Any::from_msg(&typed_msg), and prost's encode_to_vec is byte-deterministic in field-tag order.
  • The proto messages that flow through consensus use BTreeMap for map fields (see hash_proto_uses_btree_map_for_deterministic_encoding in msg.rs:413) — without this, prost map-field encoding is non-deterministic.

The invariant Any.value bytes ≡ canonical deterministic proto encoding of the inner message is currently held by the build-proto configuration plus the producer's use of Any::from_msg. It's not enforced by the type system or documented at this call site. A future contributor implementing inbound wire-validation (analogous to Go's valuesByHash) could naturally reach for hash_proto_bytes and get silent consensus desync if any new proto message ships with a HashMap field.

Recommend a one- or two-line comment here that captures the contract, and consider linking it from the hash_proto_bytes docstring.

}

/// Returns the consensus hash for deterministic protobuf bytes.
pub(crate) fn hash_proto_bytes(encoded: &[u8]) -> Result<[u8; 32]> {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hash_proto (line 266) explicitly rejects Any (test hash_proto_rejects_any). hash_proto_bytes has no such guard — it hashes whatever bytes it is handed. Doc currently reads only Returns the consensus hash for deterministic protobuf bytes.

Given the helper is pub(crate) and is the natural shortcut a future inbound-validation layer would reach for, the docstring should spell out the invariant explicitly: bytes MUST be the deterministic prost encoding of a non-Any concrete message, equivalent to prost::Message::encode_to_vec over a typed proto whose map fields are BTreeMap. Anything else risks silent hash mismatch against Go peers, which re-marshal deterministically before hashing.


/// Errors returned by the QBFT consensus transport.
#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Broadcaster is Box<dyn Fn(...) -> BoxFuture<'static, Result<()>>> where Result<()> is the local transport::Result<()>. The Error enum (lines 49-75) only has variants for the transport's own failures (UnknownValue, InvalidJustification*, InvalidDuty, ReceiveBufferClosed, Msg(msg::Error)) — there is no variant for broadcaster-supplied failures (libp2p stream errors, peer unreachable, serialization issues from a real implementation).

Go's broadcaster.Broadcast returns a plain error (transport.go:22), so any implementation can return its own typed error. Here a real broadcaster will need either to invent a synthetic mapping or to stuff its errors into Error::Msg(msg::Error), which is semantically wrong.

Consider adding Error::Broadcast(#[source] Box<dyn std::error::Error + Send + Sync>) and wrapping at the call site (line 206), or changing the broadcaster signature to return Result<(), Box<dyn Error + Send + Sync>> and wrapping into that new variant. Cheaper to fix now than after the runner is wired.

let sniffed_msg = consensus_msg.clone();
// Self-send is intentionally detached: the inner receive buffer can
// block, but network broadcast must still proceed.
tokio::spawn(async move {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The self-send is correctly detached (the inline comment captures why), but the spawn captures task_ct, recv_tx.clone(), Arc<Sniffer>, Arc<msg::Msg>, and a QbftConsensusMsg. It only completes when either task_ct is cancelled or recv_tx.send succeeds.

If the inner consumer stalls and the caller never cancels ct (e.g., Transport is dropped while its ct is still live elsewhere), each broadcast call piles up another task that lives until process exit. The recv_tx clone in particular keeps the channel's sender half alive even after Transport is dropped.

In Go this is mitigated by goroutines being cheap and the per-instance ctx being virtually guaranteed to cancel on instance teardown. In Rust, the contract caller MUST cancel ct at instance teardown is implicit. Either:

  • document it on Transport::new / broadcast, or
  • have Transport own an internal CancellationToken that's cancelled in Drop, so spawns can't outlive the transport even if the caller's token is leaked.

ct: CancellationToken,
mut outer_rx: mpsc::Receiver<msg::Msg>,
) -> Result<()> {
loop {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

process_receives has two intentional behavioral divergences from Go (transport.go:148-164) that aren't called out in the docstring:

  1. Outer channel close → Ok(()) (lines 218-221). Go does case msg := <-outerBuffer without checking ok, so on channel close Go silently loops receiving zero-value Msg{} until ctx cancels. Rust's behavior is strictly better, but it's a divergence worth documenting.
  2. Inner-send failure → Err(ReceiveBufferClosed) (line 231). Go's recvBuffer <- msg panics on closed channel; Rust returns an error variant, which is again better but diverges.

The docstring Processes admitted outer messages until cancellation or channel close. doesn't distinguish these two terminations. Recommend expanding to: returns Ok(()) on cancellation or outer-channel close, and Err(ReceiveBufferClosed) if the inner buffer's receiver has been dropped — so future readers don't assume Go-style behavior.

// this tokio channel into the crossbeam receiver used by core::qbft::run.
recv_tx: mpsc::Sender<qbft::Msg<ConsensusQbftTypes>>,
sniffer: sync::Arc<Sniffer>,
values: Mutex<ValueStore>,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: field-name collision — Transport.values: Mutex<ValueStore> wraps a ValueStore whose own field is also called values: ValueMap. A reader of self.values.lock()...values has to remember that the outer one is a mutex around a struct that itself has a values field. Renaming one would help:

value_store: Mutex<ValueStore>,
// ...
struct ValueStore {
    value_rx: mpsc::Receiver<Any>,
    values: ValueMap,
}

This also matches the existing value_rx field's naming.

})?;
let msg = sync::Arc::new(msg);
let consensus_msg = msg.to_consensus_msg();
let inner_msg: qbft::Msg<ConsensusQbftTypes> = msg.clone();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this clones an Arc via method syntax (msg.clone() where msg: Arc<msg::Msg>), while the surrounding block (lines 191, and elsewhere in the file at 532/635/649) consistently uses the explicit sync::Arc::clone(&...) form. For consistency:

let inner_msg: qbft::Msg<ConsensusQbftTypes> = sync::Arc::clone(&msg);

Makes it obvious at the call site that this is a refcount bump, not a deep clone. Could also enable clippy::clone_on_ref_ptr if the project wants this enforced.

loop {
let msg = tokio::select! {
() = ct.cancelled() => return Ok(()),
msg = outer_rx.recv() => match msg {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the match arm inside the select! is exactly what let-else was designed for, and the codebase already uses that idiom elsewhere:

let msg = tokio::select! {
    () = ct.cancelled() => return Ok(()),
    msg = outer_rx.recv() => msg,
};
let Some(msg) = msg else { return Ok(()); };

Subjective — feel free to ignore.

}

#[test]
fn get_value_returns_cached_value() {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: set_values_caches_message_values (lines 316-324) and get_value_returns_cached_value (lines 327-334) are functionally identical — both build a Msg, call set_values, then assert get_value returns the cached Any. They differ only in name. One should either be removed or re-purposed to exercise a distinct path (e.g., overwriting an existing key, or set_values with multiple hashes that have to round-trip through get_value).

}

#[tokio::test]
async fn process_receives_stops_on_cancel() {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing test coverage for two documented invariants:

  1. Detached self-send doesn't block external broadcast (lines 193-194 promise this). No test fills recv_tx to capacity before calling broadcast and asserts the broadcaster still completes. A regression that accidentally awaited the self-send in-line wouldn't be caught.
  2. Cancel-during-inner-send branch of process_receives (the inner tokio::select! at lines 228-234 has a ct.cancelled() arm). This test cancels BEFORE entering the loop, exercising only the outer select. A test that fills the inner buffer, sends one outer message, waits until set_values has recorded the value, then cancels, would pin this branch.

Base automatically changed from iamquang95/consensus-qbft-msg to main May 28, 2026 09:34
…qbft-transport

# Conflicts:
#	crates/core/src/consensus/qbft/mod.rs
#	crates/core/src/consensus/qbft/msg.rs
#	crates/core/src/qbft/mod.rs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants