feat(core): implement consensus qbft transport and sniffer#448
feat(core): implement consensus qbft transport and sniffer#448iamquang95 wants to merge 19 commits into
Conversation
|
@claude review |
|
Claude finished @iamquang95's task in 3m 54s —— View job Code Review —
|
|
@varex83agent /review-pr |
varex83agent
left a comment
There was a problem hiding this comment.
Solid, well-tested port of transport.go + sniffer.go with good module-level documentation in msg.rs. Sniffer is a clean 1:1 mapping. Transport faithfully bridges the tokio mpsc admission buffer with the crossbeam channel that the QBFT core consumes, and the comments at lines 81-83 and 193-194 capture the non-obvious design choices.
No bugs or hard blockers. Findings are predominantly minor style/idiom or contract-documentation gaps, plus a few nit cleanups:
- Hash-binding contract is implicit:
hash_proto_byteshashes rawAny.valuebytes rather than re-marshalling deterministically like Go'shashProtodoes (transport.rs:131, msg.rs:281). This works today because prost emits deterministic bytes for the proto types that flow through QBFT (BTreeMapfor map fields), but the invariant is undocumented and a future caller could easily reach forhash_proto_byteson an inbound-validation path where the input bytes are NOT guaranteed deterministic. Worth tightening the docstring. - A couple of intentional behavioral divergences from Go (
Ok(())on outer-rx close vs Go's zero-value-loop,Err(ReceiveBufferClosed)on inner-send failure vs Go's panic) — both are improvements, but the docstring onprocess_receivesshould call them out. Broadcasteris typedBox<dyn Fn(...) -> BoxFuture<'static, Result<()>>>whereResultistransport::Result— real implementations have no error variant to return wire/marshal failures through. Worth either addingError::Broadcast(...)or relaxing the broadcaster signature before the runner wires this up.- Self-send is detached as
tokio::spawn(transport.rs:195) and relies on the caller to cancelctat instance teardown; if the contract isn't held, the task can outlive the transport. The contract is implicit — worth a sentence in the doc. - Several smaller style/clarity nits: shadowed
hashinget_value, inconsistentArc::clonestyle, field-name collisionTransport.values/ValueStore.values, and one duplicate test.
Verdict: comment-only review — none of the findings block merge, but the contract-documentation items are worth addressing before the consumers of these helpers land.
| pub(crate) fn get_value(&self, hash: [u8; 32]) -> Result<Any> { | ||
| let mut store = self.values.lock().unwrap_or_else(PoisonError::into_inner); | ||
| if let Ok(local) = store.value_rx.try_recv() { | ||
| let hash = msg::hash_proto_bytes(&local.value)?; |
There was a problem hiding this comment.
Hash-determinism contract is implicit and undocumented. Here we compute the consensus hash from the raw bytes of Any.value (whatever the producer happened to put in there). Go's hashProto (charon/core/consensus/qbft/msg.go:138-166) instead unmarshals to a typed proto and re-marshals with proto.MarshalOptions{Deterministic: true} before hashing — so Go is byte-canonical regardless of how the value was originally encoded.
This works today because:
- The local producer of
value_rxusesprost_types::Any::from_msg(&typed_msg), and prost'sencode_to_vecis byte-deterministic in field-tag order. - The proto messages that flow through consensus use
BTreeMapfor map fields (seehash_proto_uses_btree_map_for_deterministic_encodingin msg.rs:413) — without this, prost map-field encoding is non-deterministic.
The invariant Any.value bytes ≡ canonical deterministic proto encoding of the inner message is currently held by the build-proto configuration plus the producer's use of Any::from_msg. It's not enforced by the type system or documented at this call site. A future contributor implementing inbound wire-validation (analogous to Go's valuesByHash) could naturally reach for hash_proto_bytes and get silent consensus desync if any new proto message ships with a HashMap field.
Recommend a one- or two-line comment here that captures the contract, and consider linking it from the hash_proto_bytes docstring.
| } | ||
|
|
||
| /// Returns the consensus hash for deterministic protobuf bytes. | ||
| pub(crate) fn hash_proto_bytes(encoded: &[u8]) -> Result<[u8; 32]> { |
There was a problem hiding this comment.
hash_proto (line 266) explicitly rejects Any (test hash_proto_rejects_any). hash_proto_bytes has no such guard — it hashes whatever bytes it is handed. Doc currently reads only Returns the consensus hash for deterministic protobuf bytes.
Given the helper is pub(crate) and is the natural shortcut a future inbound-validation layer would reach for, the docstring should spell out the invariant explicitly: bytes MUST be the deterministic prost encoding of a non-Any concrete message, equivalent to prost::Message::encode_to_vec over a typed proto whose map fields are BTreeMap. Anything else risks silent hash mismatch against Go peers, which re-marshal deterministically before hashing.
|
|
||
| /// Errors returned by the QBFT consensus transport. | ||
| #[derive(Debug, thiserror::Error)] | ||
| pub(crate) enum Error { |
There was a problem hiding this comment.
Broadcaster is Box<dyn Fn(...) -> BoxFuture<'static, Result<()>>> where Result<()> is the local transport::Result<()>. The Error enum (lines 49-75) only has variants for the transport's own failures (UnknownValue, InvalidJustification*, InvalidDuty, ReceiveBufferClosed, Msg(msg::Error)) — there is no variant for broadcaster-supplied failures (libp2p stream errors, peer unreachable, serialization issues from a real implementation).
Go's broadcaster.Broadcast returns a plain error (transport.go:22), so any implementation can return its own typed error. Here a real broadcaster will need either to invent a synthetic mapping or to stuff its errors into Error::Msg(msg::Error), which is semantically wrong.
Consider adding Error::Broadcast(#[source] Box<dyn std::error::Error + Send + Sync>) and wrapping at the call site (line 206), or changing the broadcaster signature to return Result<(), Box<dyn Error + Send + Sync>> and wrapping into that new variant. Cheaper to fix now than after the runner is wired.
| let sniffed_msg = consensus_msg.clone(); | ||
| // Self-send is intentionally detached: the inner receive buffer can | ||
| // block, but network broadcast must still proceed. | ||
| tokio::spawn(async move { |
There was a problem hiding this comment.
The self-send is correctly detached (the inline comment captures why), but the spawn captures task_ct, recv_tx.clone(), Arc<Sniffer>, Arc<msg::Msg>, and a QbftConsensusMsg. It only completes when either task_ct is cancelled or recv_tx.send succeeds.
If the inner consumer stalls and the caller never cancels ct (e.g., Transport is dropped while its ct is still live elsewhere), each broadcast call piles up another task that lives until process exit. The recv_tx clone in particular keeps the channel's sender half alive even after Transport is dropped.
In Go this is mitigated by goroutines being cheap and the per-instance ctx being virtually guaranteed to cancel on instance teardown. In Rust, the contract caller MUST cancel ct at instance teardown is implicit. Either:
- document it on
Transport::new/broadcast, or - have
Transportown an internalCancellationTokenthat's cancelled inDrop, so spawns can't outlive the transport even if the caller's token is leaked.
| ct: CancellationToken, | ||
| mut outer_rx: mpsc::Receiver<msg::Msg>, | ||
| ) -> Result<()> { | ||
| loop { |
There was a problem hiding this comment.
process_receives has two intentional behavioral divergences from Go (transport.go:148-164) that aren't called out in the docstring:
- Outer channel close →
Ok(())(lines 218-221). Go doescase msg := <-outerBufferwithout checkingok, so on channel close Go silently loops receiving zero-valueMsg{}until ctx cancels. Rust's behavior is strictly better, but it's a divergence worth documenting. - Inner-send failure →
Err(ReceiveBufferClosed)(line 231). Go'srecvBuffer <- msgpanics on closed channel; Rust returns an error variant, which is again better but diverges.
The docstring Processes admitted outer messages until cancellation or channel close. doesn't distinguish these two terminations. Recommend expanding to: returns Ok(()) on cancellation or outer-channel close, and Err(ReceiveBufferClosed) if the inner buffer's receiver has been dropped — so future readers don't assume Go-style behavior.
| // this tokio channel into the crossbeam receiver used by core::qbft::run. | ||
| recv_tx: mpsc::Sender<qbft::Msg<ConsensusQbftTypes>>, | ||
| sniffer: sync::Arc<Sniffer>, | ||
| values: Mutex<ValueStore>, |
There was a problem hiding this comment.
nit: field-name collision — Transport.values: Mutex<ValueStore> wraps a ValueStore whose own field is also called values: ValueMap. A reader of self.values.lock()...values has to remember that the outer one is a mutex around a struct that itself has a values field. Renaming one would help:
value_store: Mutex<ValueStore>,
// ...
struct ValueStore {
value_rx: mpsc::Receiver<Any>,
values: ValueMap,
}This also matches the existing value_rx field's naming.
| })?; | ||
| let msg = sync::Arc::new(msg); | ||
| let consensus_msg = msg.to_consensus_msg(); | ||
| let inner_msg: qbft::Msg<ConsensusQbftTypes> = msg.clone(); |
There was a problem hiding this comment.
nit: this clones an Arc via method syntax (msg.clone() where msg: Arc<msg::Msg>), while the surrounding block (lines 191, and elsewhere in the file at 532/635/649) consistently uses the explicit sync::Arc::clone(&...) form. For consistency:
let inner_msg: qbft::Msg<ConsensusQbftTypes> = sync::Arc::clone(&msg);Makes it obvious at the call site that this is a refcount bump, not a deep clone. Could also enable clippy::clone_on_ref_ptr if the project wants this enforced.
| loop { | ||
| let msg = tokio::select! { | ||
| () = ct.cancelled() => return Ok(()), | ||
| msg = outer_rx.recv() => match msg { |
There was a problem hiding this comment.
nit: the match arm inside the select! is exactly what let-else was designed for, and the codebase already uses that idiom elsewhere:
let msg = tokio::select! {
() = ct.cancelled() => return Ok(()),
msg = outer_rx.recv() => msg,
};
let Some(msg) = msg else { return Ok(()); };Subjective — feel free to ignore.
| } | ||
|
|
||
| #[test] | ||
| fn get_value_returns_cached_value() { |
There was a problem hiding this comment.
nit: set_values_caches_message_values (lines 316-324) and get_value_returns_cached_value (lines 327-334) are functionally identical — both build a Msg, call set_values, then assert get_value returns the cached Any. They differ only in name. One should either be removed or re-purposed to exercise a distinct path (e.g., overwriting an existing key, or set_values with multiple hashes that have to round-trip through get_value).
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn process_receives_stops_on_cancel() { |
There was a problem hiding this comment.
nit: missing test coverage for two documented invariants:
- Detached self-send doesn't block external broadcast (lines 193-194 promise this). No test fills
recv_txto capacity before callingbroadcastand asserts the broadcaster still completes. A regression that accidentally awaited the self-send in-line wouldn't be caught. - Cancel-during-inner-send branch of
process_receives(the innertokio::select!at lines 228-234 has act.cancelled()arm). This test cancels BEFORE entering the loop, exercising only the outer select. A test that fills the inner buffer, sends one outer message, waits untilset_valueshas recorded the value, then cancels, would pin this branch.
…qbft-transport # Conflicts: # crates/core/src/consensus/qbft/mod.rs # crates/core/src/consensus/qbft/msg.rs # crates/core/src/qbft/mod.rs
Part of: #157
This PR implements transport state and sniffer, part of the consensus qbft. This's currently on top of #442 (hasn't merged yet)