From 372bb918c1fcd6131fc042b02f7d83668b259339 Mon Sep 17 00:00:00 2001 From: Quang Le Date: Tue, 26 May 2026 15:47:03 +0700 Subject: [PATCH 01/19] fix: add helper wire MessageType --- crates/core/src/qbft/mod.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/crates/core/src/qbft/mod.rs b/crates/core/src/qbft/mod.rs index 554fcfb6..be45377f 100644 --- a/crates/core/src/qbft/mod.rs +++ b/crates/core/src/qbft/mod.rs @@ -75,6 +75,10 @@ pub enum QbftError { #[error("Zero input value not supported")] ZeroInputValue, + /// Message value source was missing. + #[error("value not found")] + ValueNotFound, + /// Node count must be positive. #[error("invalid node count: must be greater than zero, got {nodes}")] InvalidNodes { @@ -200,6 +204,18 @@ pub const MSG_DECIDED: MessageType = MessageType(5); const MSG_SENTINEL: MessageType = MessageType(6); // intentionally not public impl MessageType { + /// Converts a stable wire integer into a message type. + pub fn from_wire(value: i64) -> Self { + match value { + 1 => MSG_PRE_PREPARE, + 2 => MSG_PREPARE, + 3 => MSG_COMMIT, + 4 => MSG_ROUND_CHANGE, + 5 => MSG_DECIDED, + _ => MSG_UNKNOWN, + } + } + /// Returns true when the message type is one of the known QBFT wire types. pub fn valid(&self) -> bool { self.0 > MSG_UNKNOWN.0 && self.0 < MSG_SENTINEL.0 From d17b023dc54846540c4e8cc31d1242f166192a89 Mon Sep 17 00:00:00 2001 From: Quang Le Date: Tue, 26 May 2026 16:51:07 +0700 Subject: [PATCH 02/19] feat(core): implement consensus/qbft/msg --- Cargo.lock | 1 + crates/core/Cargo.toml | 3 +- crates/core/src/consensus/mod.rs | 3 + crates/core/src/consensus/qbft/mod.rs | 4 + crates/core/src/consensus/qbft/msg.rs | 696 ++++++++++++++++++++++++++ 5 files changed, 706 insertions(+), 1 deletion(-) create mode 100644 crates/core/src/consensus/qbft/mod.rs create mode 100644 crates/core/src/consensus/qbft/msg.rs diff --git a/Cargo.lock b/Cargo.lock index 229760be..2b5ab642 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5608,6 +5608,7 @@ dependencies = [ "pluto-eth2api", "pluto-eth2util", "pluto-featureset", + "pluto-k1util", "pluto-p2p", "pluto-ssz", "pluto-testutil", diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index e4133302..9833b8d7 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -14,10 +14,12 @@ crossbeam.workspace = true dyn-clone.workspace = true dyn-eq.workspace = true hex.workspace = true +k256.workspace = true libp2p.workspace = true vise.workspace = true pluto-crypto.workspace = true pluto-eth2api.workspace = true +pluto-k1util.workspace = true prost.workspace = true prost-types.workspace = true regex.workspace = true @@ -40,7 +42,6 @@ alloy.workspace = true clap.workspace = true rand.workspace = true libp2p.workspace = true -k256.workspace = true prost.workspace = true prost-types.workspace = true hex.workspace = true diff --git a/crates/core/src/consensus/mod.rs b/crates/core/src/consensus/mod.rs index c2712567..d52bc3de 100644 --- a/crates/core/src/consensus/mod.rs +++ b/crates/core/src/consensus/mod.rs @@ -7,5 +7,8 @@ /// Consensus protocols. pub mod protocols; +/// QBFT consensus wrapper. +pub mod qbft; + /// Consensus round timers. pub mod timer; diff --git a/crates/core/src/consensus/qbft/mod.rs b/crates/core/src/consensus/qbft/mod.rs new file mode 100644 index 00000000..a18db6be --- /dev/null +++ b/crates/core/src/consensus/qbft/mod.rs @@ -0,0 +1,4 @@ +//! QBFT consensus wrapper. + +/// QBFT protobuf message wrapper. +pub mod msg; diff --git a/crates/core/src/consensus/qbft/msg.rs b/crates/core/src/consensus/qbft/msg.rs new file mode 100644 index 00000000..4685c7a9 --- /dev/null +++ b/crates/core/src/consensus/qbft/msg.rs @@ -0,0 +1,696 @@ +//! QBFT protobuf message adapter. +//! +//! This module bridges the domain-specific consensus protobuf messages with +//! the generic [`crate::qbft`] state machine. +//! +//! [`QbftMsg`](pbconsensus::QbftMsg) carries only consensus metadata and value +//! hashes. The concrete proposal values are transported beside it in +//! [`QbftConsensusMsg`](pbconsensus::QbftConsensusMsg) as protobuf `Any` +//! payloads. [`Msg`] ties those two pieces back together by: +//! +//! - converting `value_hash` and `prepared_value_hash` into fixed `[u8; 32]` +//! values for the generic QBFT core; +//! - checking that every non-zero hash referenced by the message exists in the +//! supplied [`ValueMap`]; +//! - recursively wrapping raw justification messages so the core can validate +//! PRE-PREPARE and ROUND-CHANGE justifications; +//! - preserving the raw protobufs so the transport layer can rebuild the +//! original consensus message with [`Msg::to_consensus_msg`]. +//! +//! Do not hash `Any` directly. The consensus hash is over the deterministic +//! protobuf bytes of the inner message. + +use std::{any, collections::HashMap, fmt, sync}; + +use k256::{PublicKey, SecretKey}; +use pluto_ssz::{HashWalker, Hasher, HasherError}; +use prost::Name; +use prost_types::Any; + +use crate::{ + corepb::v1::{consensus as pbconsensus, core as pbcore}, + qbft::{self, MessageType, SomeMsg}, + types::{Duty, DutyType, SlotNumber}, +}; + +/// Type mapping used by the consensus adapter when invoking generic QBFT. +/// +/// - Instance: [`Duty`] +/// - Value: `[u8; 32]` hash of the concrete proposal value +/// - Compare: original `Any` payload passed to the application compare callback +pub struct ConsensusQbftTypes; + +impl qbft::QbftTypes for ConsensusQbftTypes { + type Compare = Any; + type Instance = Duty; + type Value = [u8; 32]; +} + +/// Concrete values carried beside QBFT hash messages. +/// +/// The key is the [`hash_proto`] result of the decoded inner protobuf message. +/// The value remains the original `Any` envelope so later layers can forward or +/// compare the same payload without losing type-url information. +pub type ValueMap = HashMap<[u8; 32], Any>; + +type Result = std::result::Result; + +/// Errors returned by QBFT message wrapping. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Nil QBFT protobuf message. + #[error("nil qbft message")] + NilQbftMessage, + + /// Value hash did not exist in the values map. + #[error("value hash not found in values")] + ValueHashNotFound, + + /// Prepared value hash did not exist in the values map. + #[error("prepared value hash not found in values")] + PreparedValueHashNotFound, + + /// Value did not exist in the values map. + #[error("value not found")] + ValueNotFound, + + /// Callers must hash the concrete inner message, not `Any`. + #[error("cannot hash any proto, must hash inner value")] + CannotHashAnyProto, + + /// Protobuf marshal failed. + #[error("marshal proto: {0}")] + MarshalProto(prost::EncodeError), + + /// SSZ hash failed. + #[error("hash proto: {0}")] + HashProto(HasherError), + + /// QBFT message signature was empty. + #[error("empty signature")] + EmptySignature, + + /// Public key recovery failed. + #[error("recover pubkey: {0}")] + RecoverPubkey(pluto_k1util::K1UtilError), + + /// Signing failed. + #[error("sign: {0}")] + Sign(pluto_k1util::K1UtilError), +} + +/// Wrapped consensus message consumed by the generic QBFT core. +/// +/// The raw protobuf remains available for re-broadcasting. The hash fields are +/// cached as `[u8; 32]` because the core treats consensus values as comparable +/// hashes, not full protobuf payloads. +#[derive(Clone)] +pub struct Msg { + msg: pbconsensus::QbftMsg, + value_hash: [u8; 32], + prepared_value_hash: [u8; 32], + values: sync::Arc, + justification_protos: Vec, + justification: Vec>, +} + +impl fmt::Debug for Msg { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Msg") + .field("type", &self.msg.r#type) + .field( + "duty", + &self.msg.duty.as_ref().map(|duty| (duty.slot, duty.r#type)), + ) + .field("peer_idx", &self.msg.peer_idx) + .field("round", &self.msg.round) + .field("prepared_round", &self.msg.prepared_round) + .field("value_hash", &self.value_hash) + .field("prepared_value_hash", &self.prepared_value_hash) + .field("values_len", &self.values.len()) + .field("justification_len", &self.justification.len()) + .finish() + } +} + +impl Msg { + /// Wraps a raw QBFT protobuf message for the generic core. + /// + /// Non-zero `value_hash` and `prepared_value_hash` fields must both exist + /// in `values`. Invalid hash encodings, including zero hashes, are + /// treated as the nil value and do not require a map entry. + /// + /// Justifications are raw protobuf messages from the same consensus + /// envelope. They are recursively wrapped with the same shared value map. + pub fn new( + msg: Option, + justification: Vec, + values: sync::Arc, + ) -> Result { + let msg = msg.ok_or(Error::NilQbftMessage)?; + + let value_hash = match to_hash32(&msg.value_hash) { + Some(hash) if values.contains_key(&hash) => hash, + Some(_) => return Err(Error::ValueHashNotFound), + None => [0u8; 32], + }; + let prepared_value_hash = match to_hash32(&msg.prepared_value_hash) { + Some(hash) if values.contains_key(&hash) => hash, + Some(_) => return Err(Error::PreparedValueHashNotFound), + None => [0u8; 32], + }; + + let mut justification_impls: Vec> = + Vec::with_capacity(justification.len()); + + for justification_msg in &justification { + let impl_msg = Self::new(Some(justification_msg.clone()), vec![], values.clone())?; + justification_impls.push(sync::Arc::new(impl_msg)); + } + + Ok(Self { + msg, + value_hash, + prepared_value_hash, + values, + justification_protos: justification, + justification: justification_impls, + }) + } + + /// Returns the raw protobuf message. + pub fn msg(&self) -> &pbconsensus::QbftMsg { + &self.msg + } + + /// Returns the values map shared by this message and nested justifications. + pub fn values(&self) -> &ValueMap { + &self.values + } + + /// Returns the `Any` payload for this message's `value_hash`. + pub fn value_source(&self) -> Result { + self.values + .get(&self.value_hash) + .cloned() + .ok_or(Error::ValueNotFound) + } + + /// Rebuilds the protobuf consensus envelope for transport. + pub fn to_consensus_msg(&self) -> pbconsensus::QbftConsensusMsg { + pbconsensus::QbftConsensusMsg { + msg: Some(self.msg.clone()), + justification: self.justification_protos.clone(), + values: self.values.values().cloned().collect(), + } + } +} + +impl SomeMsg for Msg { + fn type_(&self) -> MessageType { + MessageType::from_wire(self.msg.r#type) + } + + fn instance(&self) -> Duty { + duty_from_proto_unchecked(self.msg.duty.as_ref()) + } + + fn source(&self) -> i64 { + self.msg.peer_idx + } + + fn round(&self) -> i64 { + self.msg.round + } + + fn value(&self) -> [u8; 32] { + self.value_hash + } + + fn value_source(&self) -> std::result::Result { + Msg::value_source(self).map_err(|_| qbft::QbftError::ValueNotFound) + } + + fn prepared_round(&self) -> i64 { + self.msg.prepared_round + } + + fn prepared_value(&self) -> [u8; 32] { + self.prepared_value_hash + } + + fn justification(&self) -> Vec> { + self.justification.clone() + } + + fn as_any(&self) -> &dyn any::Any { + self + } +} + +/// Returns a deterministic SSZ hash root of a concrete protobuf message. +/// +/// The hash input is deterministic protobuf encoding, then SSZ `PutBytes` +/// merkleization. `Any` is rejected because the consensus value hash must bind +/// to the inner message bytes, not the transport envelope. +pub fn hash_proto(msg: &M) -> Result<[u8; 32]> +where + M: prost::Message + prost::Name, +{ + if M::full_name() == Any::full_name() { + return Err(Error::CannotHashAnyProto); + } + + let mut encoded = Vec::with_capacity(msg.encoded_len()); + msg.encode(&mut encoded).map_err(Error::MarshalProto)?; + + let mut hasher = Hasher::default(); + let index = hasher.index(); + hasher.put_bytes(&encoded).map_err(Error::HashProto)?; + hasher.merkleize(index).map_err(Error::HashProto)?; + hasher.hash_root().map_err(Error::HashProto) +} + +/// Returns a signed copy of a QBFT protobuf message. +/// +/// The signature field is cleared before hashing, so callers may pass either an +/// unsigned message or an already-signed message to re-sign. +pub fn sign_msg(msg: &pbconsensus::QbftMsg, privkey: &SecretKey) -> Result { + let mut clone = msg.clone(); + clone.signature.clear(); + + let hash = hash_proto(&clone)?; + let signature = pluto_k1util::sign(privkey, &hash).map_err(Error::Sign)?; + clone.signature = signature.to_vec().into(); + + Ok(clone) +} + +/// Verifies that a QBFT protobuf message was signed by `pubkey`. +/// +/// The signature is recoverable secp256k1 over [`hash_proto`] of the message +/// with its signature field cleared. +pub fn verify_msg_sig(msg: &pbconsensus::QbftMsg, pubkey: &PublicKey) -> Result { + // Protobuf `bytes` fields decode both absent and explicit-empty values as + // empty bytes in prost. + if msg.signature.is_empty() { + return Err(Error::EmptySignature); + } + + let mut clone = msg.clone(); + clone.signature.clear(); + + let hash = hash_proto(&clone)?; + let recovered = pluto_k1util::recover(&hash, &msg.signature).map_err(Error::RecoverPubkey)?; + + Ok(recovered == *pubkey) +} + +fn to_hash32(value: &[u8]) -> Option<[u8; 32]> { + let value: [u8; 32] = value.try_into().ok()?; + if value == [0u8; 32] { + return None; + } + + Some(value) +} + +fn duty_from_proto_unchecked(duty: Option<&pbcore::Duty>) -> Duty { + let Some(duty) = duty else { + return Duty::new(SlotNumber::new(0), DutyType::Unknown); + }; + + // Message receive validation rejects invalid duty types before this adapter + // is used by the consensus runner. This mirrors the unchecked protobuf + // projection used by the source implementation. + let duty_type = DutyType::try_from(duty.r#type).unwrap_or(DutyType::Unknown); + Duty::new(SlotNumber::new(duty.slot), duty_type) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::qbft::{MSG_PRE_PREPARE, MSG_PREPARE}; + use prost::bytes::Bytes; + use prost_types::Timestamp; + use test_case::test_case; + + const UNSIGNED_DATASET_HASH: &str = + "d8f9bc3de8b0cb0e3eb1f773c14a96d58f7acaf0f09192ce6562d84ea315e67b"; + const UNSIGNED_DATASET_KEY: &str = "0xe5301bb68d031b01ef7f35613a77f05f6134983fedd8b0107ec2e45c9bb480eb52accb3174a9a936f255f96410d2eb03"; + const UNSIGNED_DATASET_VALUE_HEX: &str = "0800000088000000394651850fd4010078892ee285ec0100511455780875d64ee2d3d0d0de6bf8f9b44ce85ff044c6b1f83b8e883bbf857ac354f3ede2d61e0067cfe242cf3ccc4ea3ae5e88526a9f4a578bcb9ef2d4a65314768d6d299761ea045c3f000f8a1900ddcdd01d756bce6c512c3801aacaeedfad5b506664e8c0e4a771ece0b8b7c196a5512e043e9b9aa687907adf5dba61350991daef80dd5c470c90650aaf7b5fd90022215ae7966bb600191b1825f88d4273c86e4ff95f160062a5eee82abd14004a2d0b75fb180d0000010000000000000001000000000000e000000000000000"; + const TIMESTAMP_HASH: &str = "0880e2cfaa0610959aef3a000000000000000000000000000000000000000000"; + const QBFT_MSG_HASH: &str = "9423898db5f4fc224e07cd775a03d7dc89dafe6aedfda9f75cccb1f17c3ba803"; + const SIGNING_PRIVKEY: &str = + "41d3ff12045b73c870529fe44f70dca2745bafbe1698ffc3c8759eef3cfbaee1"; + const WRONG_PRIVKEY: &str = "42d3ff12045b73c870529fe44f70dca2745bafbe1698ffc3c8759eef3cfbaee1"; + const QBFT_MSG_SIGNATURE: &str = "8a3d48258325037ce680c0bfd40ebc95ff53865b9a7ea391308f27dd1be324791647d3814dc40e9c1edbf6b50e62b99dbc7401724c975ffc0673d034fb9bb0df01"; + + #[test_case(vec![] ; "empty")] + #[test_case(vec![1; 31] ; "short")] + #[test_case(vec![1; 33] ; "long")] + #[test_case(vec![0; 32] ; "zero_hash")] + fn to_hash32_rejects_invalid_hashes(value: Vec) { + assert_eq!(to_hash32(&value), None); + } + + #[test] + fn to_hash32_accepts_nonzero_32_bytes() { + assert_eq!(to_hash32(&[1u8; 32]), Some([1u8; 32])); + } + + #[test] + fn hash_proto_matches_seeded_unsigned_dataset() { + let mut set = std::collections::BTreeMap::new(); + set.insert( + UNSIGNED_DATASET_KEY.to_string(), + Bytes::from(hex::decode(UNSIGNED_DATASET_VALUE_HEX).unwrap()), + ); + + let hash = hash_proto(&pbcore::UnsignedDataSet { set }).unwrap(); + + assert_eq!(hex::encode(hash), UNSIGNED_DATASET_HASH); + } + + #[test] + fn hash_proto_matches_timestamp() { + let hash = hash_proto(&Timestamp { + seconds: 1_700_000_000, + nanos: 123_456_789, + }) + .unwrap(); + + assert_eq!(hex::encode(hash), TIMESTAMP_HASH); + } + + #[test] + fn hash_proto_matches_qbft_msg() { + let hash = hash_proto(&fixed_qbft_msg()).unwrap(); + + assert_eq!(hex::encode(hash), QBFT_MSG_HASH); + } + + #[test] + fn hash_proto_map_encoding_is_order_independent() { + let mut forward = std::collections::BTreeMap::new(); + forward.insert("a".to_string(), Bytes::from_static(b"first")); + forward.insert("b".to_string(), Bytes::from_static(b"second")); + + let mut reverse = std::collections::BTreeMap::new(); + reverse.insert("b".to_string(), Bytes::from_static(b"second")); + reverse.insert("a".to_string(), Bytes::from_static(b"first")); + + assert_eq!( + hash_proto(&pbcore::UnsignedDataSet { set: forward }).unwrap(), + hash_proto(&pbcore::UnsignedDataSet { set: reverse }).unwrap() + ); + } + + #[test] + fn hash_proto_rejects_any() { + let any = Any::from_msg(&Timestamp { + seconds: 1, + nanos: 2, + }) + .unwrap(); + + let err = hash_proto(&any).unwrap_err(); + + assert_eq!( + err.to_string(), + "cannot hash any proto, must hash inner value" + ); + } + + #[test] + fn new_rejects_nil_message() { + let err = Msg::new(None, vec![], sync::Arc::default()).unwrap_err(); + + assert_eq!(err.to_string(), "nil qbft message"); + } + + #[test] + fn new_maps_valid_value_and_prepared_hashes() { + let value_hash = hash_proto(×tamp(1)).unwrap(); + let prepared_hash = hash_proto(×tamp(2)).unwrap(); + let values = sync::Arc::new(value_map(vec![ + (value_hash, any_timestamp(1)), + (prepared_hash, any_timestamp(2)), + ])); + + let msg = Msg::new( + Some(pbconsensus::QbftMsg { + r#type: 1, + duty: Some(pbcore::Duty { + slot: 42, + r#type: 2, + }), + peer_idx: 7, + round: 3, + prepared_round: 2, + value_hash: value_hash.to_vec().into(), + prepared_value_hash: prepared_hash.to_vec().into(), + ..Default::default() + }), + vec![], + values, + ) + .unwrap(); + + assert_eq!(msg.type_(), MSG_PRE_PREPARE); + assert_eq!( + msg.instance(), + Duty::new(SlotNumber::new(42), DutyType::Attester) + ); + assert_eq!(msg.source(), 7); + assert_eq!(msg.round(), 3); + assert_eq!(msg.value(), value_hash); + assert_eq!(msg.prepared_round(), 2); + assert_eq!(msg.prepared_value(), prepared_hash); + assert_eq!(msg.value_source().unwrap(), any_timestamp(1)); + assert_eq!(msg.values().len(), 2); + } + + #[test_case(vec![1; 31] ; "invalid_length")] + #[test_case(vec![0; 32] ; "zero_hash")] + fn new_treats_invalid_value_hash_as_nil(hash: Vec) { + let msg = Msg::new( + Some(pbconsensus::QbftMsg { + value_hash: hash.into(), + ..Default::default() + }), + vec![], + sync::Arc::default(), + ) + .unwrap(); + + assert_eq!(msg.value(), [0u8; 32]); + } + + #[test_case(vec![1; 31] ; "invalid_length")] + #[test_case(vec![0; 32] ; "zero_hash")] + fn new_treats_invalid_prepared_value_hash_as_nil(hash: Vec) { + let msg = Msg::new( + Some(pbconsensus::QbftMsg { + prepared_value_hash: hash.into(), + ..Default::default() + }), + vec![], + sync::Arc::default(), + ) + .unwrap(); + + assert_eq!(msg.prepared_value(), [0u8; 32]); + } + + #[test] + fn new_errors_on_missing_value_hash() { + let err = Msg::new( + Some(pbconsensus::QbftMsg { + value_hash: [1u8; 32].to_vec().into(), + ..Default::default() + }), + vec![], + sync::Arc::default(), + ) + .unwrap_err(); + + assert_eq!(err.to_string(), "value hash not found in values"); + } + + #[test] + fn new_errors_on_missing_prepared_value_hash() { + let err = Msg::new( + Some(pbconsensus::QbftMsg { + prepared_value_hash: [2u8; 32].to_vec().into(), + ..Default::default() + }), + vec![], + sync::Arc::default(), + ) + .unwrap_err(); + + assert_eq!(err.to_string(), "prepared value hash not found in values"); + } + + #[test] + fn new_errors_on_nested_justification_missing_value() { + let err = Msg::new( + Some(pbconsensus::QbftMsg::default()), + vec![pbconsensus::QbftMsg { + value_hash: [3u8; 32].to_vec().into(), + ..Default::default() + }], + sync::Arc::default(), + ) + .unwrap_err(); + + assert_eq!(err.to_string(), "value hash not found in values"); + } + + #[test] + fn value_source_errors_when_value_missing() { + let msg = Msg::new( + Some(pbconsensus::QbftMsg::default()), + vec![], + sync::Arc::default(), + ) + .unwrap(); + + let err = msg.value_source().unwrap_err(); + + assert_eq!(err.to_string(), "value not found"); + } + + #[test] + fn new_maps_justification() { + let value_hash = hash_proto(×tamp(1)).unwrap(); + let values = sync::Arc::new(value_map(vec![(value_hash, any_timestamp(1))])); + + let msg = Msg::new( + Some(pbconsensus::QbftMsg::default()), + vec![pbconsensus::QbftMsg { + r#type: 2, + value_hash: value_hash.to_vec().into(), + ..Default::default() + }], + values, + ) + .unwrap(); + + let justification = msg.justification(); + + assert_eq!(justification.len(), 1); + assert_eq!(justification[0].type_(), MSG_PREPARE); + assert_eq!(justification[0].value(), value_hash); + } + + #[test] + fn to_consensus_msg_preserves_raw_message_justification_and_values() { + let value_hash = hash_proto(×tamp(1)).unwrap(); + let prepared_hash = hash_proto(×tamp(2)).unwrap(); + let value_1 = any_timestamp(1); + let value_2 = any_timestamp(2); + let values = sync::Arc::new(value_map(vec![ + (value_hash, value_1.clone()), + (prepared_hash, value_2.clone()), + ])); + let raw_msg = pbconsensus::QbftMsg { + r#type: 1, + value_hash: value_hash.to_vec().into(), + ..Default::default() + }; + let raw_justification = pbconsensus::QbftMsg { + r#type: 2, + prepared_value_hash: prepared_hash.to_vec().into(), + ..Default::default() + }; + + let msg = Msg::new( + Some(raw_msg.clone()), + vec![raw_justification.clone()], + values, + ) + .unwrap(); + let consensus_msg = msg.to_consensus_msg(); + + assert_eq!(msg.msg(), &raw_msg); + assert_eq!(consensus_msg.msg, Some(raw_msg)); + assert_eq!(consensus_msg.justification, vec![raw_justification]); + assert_eq!(consensus_msg.values.len(), 2); + assert_eq!( + sorted_any(consensus_msg.values), + sorted_any(vec![value_1, value_2]) + ); + } + + #[test] + fn sign_msg_matches_expected_signature_and_verifies() { + let key = secret_key(SIGNING_PRIVKEY); + + let signed = sign_msg(&fixed_qbft_msg(), &key).unwrap(); + + assert_eq!(hex::encode(&signed.signature), QBFT_MSG_SIGNATURE); + assert!(verify_msg_sig(&signed, &key.public_key()).unwrap()); + } + + #[test] + fn verify_msg_sig_wrong_key_returns_false() { + let key = secret_key(SIGNING_PRIVKEY); + let wrong_key = secret_key(WRONG_PRIVKEY); + let signed = sign_msg(&fixed_qbft_msg(), &key).unwrap(); + + let ok = verify_msg_sig(&signed, &wrong_key.public_key()).unwrap(); + + assert!(!ok); + } + + #[test] + fn verify_msg_sig_errors_on_empty_signature() { + let err = verify_msg_sig(&fixed_qbft_msg(), &secret_key(SIGNING_PRIVKEY).public_key()) + .unwrap_err(); + + assert_eq!(err.to_string(), "empty signature"); + } + + fn timestamp(seconds: i64) -> Timestamp { + Timestamp { seconds, nanos: 0 } + } + + fn any_timestamp(seconds: i64) -> Any { + Any::from_msg(×tamp(seconds)).unwrap() + } + + fn value_map(values: Vec<([u8; 32], Any)>) -> ValueMap { + values.into_iter().collect() + } + + fn sorted_any(values: Vec) -> Vec<(String, Vec)> { + let mut values = values + .into_iter() + .map(|value| (value.type_url, value.value.to_vec())) + .collect::>(); + values.sort(); + values + } + + fn secret_key(hex_key: &str) -> SecretKey { + SecretKey::from_slice(&hex::decode(hex_key).unwrap()).unwrap() + } + + fn fixed_qbft_msg() -> pbconsensus::QbftMsg { + pbconsensus::QbftMsg { + r#type: 1, + duty: Some(pbcore::Duty { + slot: 42, + r#type: 2, + }), + peer_idx: 7, + round: 3, + prepared_round: 2, + value_hash: [0x11u8; 32].to_vec().into(), + prepared_value_hash: [0x22u8; 32].to_vec().into(), + ..Default::default() + } + } +} From 1b1b03b62c80d24902472d1f62047afdc25c0073 Mon Sep 17 00:00:00 2001 From: Quang Le Date: Tue, 26 May 2026 17:42:12 +0700 Subject: [PATCH 03/19] fix: wire function --- crates/core/src/qbft/mod.rs | 34 +++++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/crates/core/src/qbft/mod.rs b/crates/core/src/qbft/mod.rs index be45377f..55fdef8c 100644 --- a/crates/core/src/qbft/mod.rs +++ b/crates/core/src/qbft/mod.rs @@ -204,16 +204,9 @@ pub const MSG_DECIDED: MessageType = MessageType(5); const MSG_SENTINEL: MessageType = MessageType(6); // intentionally not public impl MessageType { - /// Converts a stable wire integer into a message type. + /// Converts a stable wire integer into a message type without clamping. pub fn from_wire(value: i64) -> Self { - match value { - 1 => MSG_PRE_PREPARE, - 2 => MSG_PREPARE, - 3 => MSG_COMMIT, - 4 => MSG_ROUND_CHANGE, - 5 => MSG_DECIDED, - _ => MSG_UNKNOWN, - } + Self(value) } /// Returns true when the message type is one of the known QBFT wire types. @@ -1393,6 +1386,29 @@ fn uniq_source() -> impl FnMut(&Msg) -> bool { move |msg: &Msg| sources.insert(msg.source()) } +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn message_type_from_wire_preserves_known_types() { + assert_eq!(MessageType::from_wire(0), MSG_UNKNOWN); + assert_eq!(MessageType::from_wire(1), MSG_PRE_PREPARE); + assert_eq!(MessageType::from_wire(2), MSG_PREPARE); + assert_eq!(MessageType::from_wire(3), MSG_COMMIT); + assert_eq!(MessageType::from_wire(4), MSG_ROUND_CHANGE); + assert_eq!(MessageType::from_wire(5), MSG_DECIDED); + } + + #[test] + fn message_type_from_wire_preserves_unknown_wire_value() { + let message_type = MessageType::from_wire(99); + + assert_eq!(message_type, MessageType(99)); + assert!(!message_type.valid()); + } +} + #[cfg(test)] mod fake_clock; #[cfg(test)] From 2d7b362d9978c8f5147f64340c9e28a8a3e9525b Mon Sep 17 00:00:00 2001 From: Quang Le Date: Tue, 26 May 2026 17:49:58 +0700 Subject: [PATCH 04/19] fix: rename function --- crates/core/src/consensus/qbft/msg.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/crates/core/src/consensus/qbft/msg.rs b/crates/core/src/consensus/qbft/msg.rs index 4685c7a9..66aa8272 100644 --- a/crates/core/src/consensus/qbft/msg.rs +++ b/crates/core/src/consensus/qbft/msg.rs @@ -212,7 +212,7 @@ impl SomeMsg for Msg { } fn instance(&self) -> Duty { - duty_from_proto_unchecked(self.msg.duty.as_ref()) + duty_from_proto(self.msg.duty.as_ref()) } fn source(&self) -> i64 { @@ -315,15 +315,16 @@ fn to_hash32(value: &[u8]) -> Option<[u8; 32]> { Some(value) } -fn duty_from_proto_unchecked(duty: Option<&pbcore::Duty>) -> Duty { +fn duty_from_proto(duty: Option<&pbcore::Duty>) -> Duty { let Some(duty) = duty else { return Duty::new(SlotNumber::new(0), DutyType::Unknown); }; // Message receive validation rejects invalid duty types before this adapter - // is used by the consensus runner. This mirrors the unchecked protobuf - // projection used by the source implementation. - let duty_type = DutyType::try_from(duty.r#type).unwrap_or(DutyType::Unknown); + // is used by the consensus runner. If an invalid value reaches this local + // projection, Rust's closed enum maps it to Unknown instead of preserving + // the raw wire value. + let duty_type: DutyType = DutyType::try_from(duty.r#type).unwrap_or(DutyType::Unknown); Duty::new(SlotNumber::new(duty.slot), duty_type) } From eeacd5e482b269a5ff843828391d00f180ae349f Mon Sep 17 00:00:00 2001 From: Quang Le Date: Tue, 26 May 2026 17:58:33 +0700 Subject: [PATCH 05/19] fix: address comments --- crates/core/src/consensus/qbft/msg.rs | 28 +++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/crates/core/src/consensus/qbft/msg.rs b/crates/core/src/consensus/qbft/msg.rs index 66aa8272..a454596c 100644 --- a/crates/core/src/consensus/qbft/msg.rs +++ b/crates/core/src/consensus/qbft/msg.rs @@ -24,7 +24,6 @@ use std::{any, collections::HashMap, fmt, sync}; use k256::{PublicKey, SecretKey}; use pluto_ssz::{HashWalker, Hasher, HasherError}; -use prost::Name; use prost_types::Any; use crate::{ @@ -117,7 +116,7 @@ pub struct Msg { impl fmt::Debug for Msg { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Msg") - .field("type", &self.msg.r#type) + .field("type", &MessageType::from_wire(self.msg.r#type).to_string()) .field( "duty", &self.msg.duty.as_ref().map(|duty| (duty.slot, duty.r#type)), @@ -257,7 +256,7 @@ pub fn hash_proto(msg: &M) -> Result<[u8; 32]> where M: prost::Message + prost::Name, { - if M::full_name() == Any::full_name() { + if M::PACKAGE == "google.protobuf" && M::NAME == "Any" { return Err(Error::CannotHashAnyProto); } @@ -392,7 +391,7 @@ mod tests { } #[test] - fn hash_proto_map_encoding_is_order_independent() { + fn hash_proto_uses_btree_map_for_deterministic_encoding() { let mut forward = std::collections::BTreeMap::new(); forward.insert("a".to_string(), Bytes::from_static(b"first")); forward.insert("b".to_string(), Bytes::from_static(b"second")); @@ -635,6 +634,16 @@ mod tests { assert!(verify_msg_sig(&signed, &key.public_key()).unwrap()); } + #[test] + fn sign_msg_resigns_already_signed_message() { + let key = secret_key(SIGNING_PRIVKEY); + let signed = sign_msg(&fixed_qbft_msg(), &key).unwrap(); + + let resigned = sign_msg(&signed, &key).unwrap(); + + assert_eq!(resigned, signed); + } + #[test] fn verify_msg_sig_wrong_key_returns_false() { let key = secret_key(SIGNING_PRIVKEY); @@ -646,6 +655,17 @@ mod tests { assert!(!ok); } + #[test] + fn verify_msg_sig_tampered_message_returns_false() { + let key = secret_key(SIGNING_PRIVKEY); + let mut signed = sign_msg(&fixed_qbft_msg(), &key).unwrap(); + signed.round += 1; + + let ok = verify_msg_sig(&signed, &key.public_key()).unwrap(); + + assert!(!ok); + } + #[test] fn verify_msg_sig_errors_on_empty_signature() { let err = verify_msg_sig(&fixed_qbft_msg(), &secret_key(SIGNING_PRIVKEY).public_key()) From 4c5be6a085a20bdf97622a70568401a52e87e7e8 Mon Sep 17 00:00:00 2001 From: Quang Le Date: Wed, 27 May 2026 09:47:28 +0700 Subject: [PATCH 06/19] fix: display for unknown tpe --- crates/core/src/consensus/qbft/msg.rs | 17 +++++++++++++++++ crates/core/src/qbft/mod.rs | 14 ++++++++++---- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/crates/core/src/consensus/qbft/msg.rs b/crates/core/src/consensus/qbft/msg.rs index a454596c..03d7db21 100644 --- a/crates/core/src/consensus/qbft/msg.rs +++ b/crates/core/src/consensus/qbft/msg.rs @@ -429,6 +429,23 @@ mod tests { assert_eq!(err.to_string(), "nil qbft message"); } + #[test] + fn debug_unknown_message_type() { + let msg = Msg::new( + Some(pbconsensus::QbftMsg { + r#type: 99, + ..Default::default() + }), + vec![], + sync::Arc::default(), + ) + .unwrap(); + + let debug = format!("{msg:?}"); + + assert!(debug.contains("type: \"\"")); + } + #[test] fn new_maps_valid_value_and_prepared_hashes() { let value_hash = hash_proto(×tamp(1)).unwrap(); diff --git a/crates/core/src/qbft/mod.rs b/crates/core/src/qbft/mod.rs index 55fdef8c..00b876ad 100644 --- a/crates/core/src/qbft/mod.rs +++ b/crates/core/src/qbft/mod.rs @@ -225,9 +225,9 @@ impl Display for MessageType { 3 => "commit", 4 => "round_change", 5 => "decided", - _ => panic!("bug: invalid message type"), + _ => "", }; - write!(f, "{}", s) + write!(f, "{s}") } } @@ -295,9 +295,9 @@ impl Display for UponRule { 6 => "quorum_round_changes", 7 => "justified_decided", 8 => "round_timeout", - _ => panic!("bug: invalid upon rule"), + _ => "", }; - write!(f, "{}", s) + write!(f, "{s}") } } @@ -1406,6 +1406,12 @@ mod tests { assert_eq!(message_type, MessageType(99)); assert!(!message_type.valid()); + assert_eq!(message_type.to_string(), ""); + } + + #[test] + fn upon_rule_display_unknown_value_does_not_panic() { + assert_eq!(UponRule(99).to_string(), ""); } } From 5bf0e111175b23b46737b1297b948fdc343b4859 Mon Sep 17 00:00:00 2001 From: Quang Le Date: Wed, 27 May 2026 10:25:14 +0700 Subject: [PATCH 07/19] fix: address comments --- crates/core/src/consensus/qbft/msg.rs | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/crates/core/src/consensus/qbft/msg.rs b/crates/core/src/consensus/qbft/msg.rs index 03d7db21..42181158 100644 --- a/crates/core/src/consensus/qbft/msg.rs +++ b/crates/core/src/consensus/qbft/msg.rs @@ -19,6 +19,10 @@ //! //! Do not hash `Any` directly. The consensus hash is over the deterministic //! protobuf bytes of the inner message. +//! +//! Inbound callers validate message type, duty type, peer membership, rounds, +//! and signatures before constructing [`Msg`]. This adapter preserves raw +//! message types, while invalid duty wire values project to [`DutyType::Unknown`]. use std::{any, collections::HashMap, fmt, sync}; @@ -79,11 +83,11 @@ pub enum Error { /// Protobuf marshal failed. #[error("marshal proto: {0}")] - MarshalProto(prost::EncodeError), + MarshalProto(#[source] prost::EncodeError), /// SSZ hash failed. #[error("hash proto: {0}")] - HashProto(HasherError), + HashProto(#[source] HasherError), /// QBFT message signature was empty. #[error("empty signature")] @@ -91,11 +95,11 @@ pub enum Error { /// Public key recovery failed. #[error("recover pubkey: {0}")] - RecoverPubkey(pluto_k1util::K1UtilError), + RecoverPubkey(#[source] pluto_k1util::K1UtilError), /// Signing failed. #[error("sign: {0}")] - Sign(pluto_k1util::K1UtilError), + Sign(#[source] pluto_k1util::K1UtilError), } /// Wrapped consensus message consumed by the generic QBFT core. @@ -691,6 +695,18 @@ mod tests { assert_eq!(err.to_string(), "empty signature"); } + #[test] + fn verify_msg_sig_errors_on_malformed_signature() { + let key = secret_key(SIGNING_PRIVKEY); + let mut msg = fixed_qbft_msg(); + msg.signature = vec![0x42u8; 64].into(); + + let err = verify_msg_sig(&msg, &key.public_key()).unwrap_err(); + + assert!(matches!(err, Error::RecoverPubkey(_))); + assert!(std::error::Error::source(&err).is_some()); + } + fn timestamp(seconds: i64) -> Timestamp { Timestamp { seconds, nanos: 0 } } From cf52760b2b741983a19c7a9c6a443f7fb8d01de8 Mon Sep 17 00:00:00 2001 From: Quang Le Date: Wed, 27 May 2026 13:48:57 +0700 Subject: [PATCH 08/19] fix: machete --- Cargo.lock | 1 - crates/core/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 79ca1255..fe0c9cfd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5589,7 +5589,6 @@ version = "1.7.1" dependencies = [ "alloy", "anyhow", - "async-trait", "base64", "built", "cancellation", diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 9833b8d7..2aa595a4 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -7,7 +7,6 @@ license.workspace = true publish.workspace = true [dependencies] -async-trait.workspace = true cancellation.workspace = true chrono.workspace = true crossbeam.workspace = true From d6b7fe6f7a12527fa8d5bbf98fc824f548e79362 Mon Sep 17 00:00:00 2001 From: Quang Le Date: Wed, 27 May 2026 13:53:02 +0700 Subject: [PATCH 09/19] fix: lint --- crates/core/src/consensus/mod.rs | 4 ++-- crates/core/src/consensus/qbft/msg.rs | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/crates/core/src/consensus/mod.rs b/crates/core/src/consensus/mod.rs index da501fb1..75b5a06e 100644 --- a/crates/core/src/consensus/mod.rs +++ b/crates/core/src/consensus/mod.rs @@ -7,10 +7,10 @@ /// Consensus protocols. pub mod protocols; -/// QBFT consensus wrapper. -pub mod qbft; /// Consensus instance I/O channels. pub mod instance; +/// QBFT consensus wrapper. +pub mod qbft; /// Consensus round timers. pub mod timer; diff --git a/crates/core/src/consensus/qbft/msg.rs b/crates/core/src/consensus/qbft/msg.rs index 42181158..7b168276 100644 --- a/crates/core/src/consensus/qbft/msg.rs +++ b/crates/core/src/consensus/qbft/msg.rs @@ -22,7 +22,8 @@ //! //! Inbound callers validate message type, duty type, peer membership, rounds, //! and signatures before constructing [`Msg`]. This adapter preserves raw -//! message types, while invalid duty wire values project to [`DutyType::Unknown`]. +//! message types, while invalid duty wire values project to +//! [`DutyType::Unknown`]. use std::{any, collections::HashMap, fmt, sync}; From 099bb28bb4458179b7858cf3a49a7ef9c4d4a7e5 Mon Sep 17 00:00:00 2001 From: Quang Le Date: Wed, 27 May 2026 14:23:53 +0700 Subject: [PATCH 10/19] fix: comments --- crates/core/src/consensus/qbft/msg.rs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/crates/core/src/consensus/qbft/msg.rs b/crates/core/src/consensus/qbft/msg.rs index 7b168276..f2339a9a 100644 --- a/crates/core/src/consensus/qbft/msg.rs +++ b/crates/core/src/consensus/qbft/msg.rs @@ -25,6 +25,9 @@ //! message types, while invalid duty wire values project to //! [`DutyType::Unknown`]. +// TODO: Remove once component/transport wiring uses the crate-visible helpers. +#![allow(dead_code)] + use std::{any, collections::HashMap, fmt, sync}; use k256::{PublicKey, SecretKey}; @@ -146,7 +149,7 @@ impl Msg { /// /// Justifications are raw protobuf messages from the same consensus /// envelope. They are recursively wrapped with the same shared value map. - pub fn new( + pub(crate) fn new( msg: Option, justification: Vec, values: sync::Arc, @@ -232,7 +235,10 @@ impl SomeMsg for Msg { } fn value_source(&self) -> std::result::Result { - Msg::value_source(self).map_err(|_| qbft::QbftError::ValueNotFound) + self.values + .get(&self.value_hash) + .cloned() + .ok_or(qbft::QbftError::ValueNotFound) } fn prepared_round(&self) -> i64 { @@ -257,7 +263,7 @@ impl SomeMsg for Msg { /// The hash input is deterministic protobuf encoding, then SSZ `PutBytes` /// merkleization. `Any` is rejected because the consensus value hash must bind /// to the inner message bytes, not the transport envelope. -pub fn hash_proto(msg: &M) -> Result<[u8; 32]> +pub(crate) fn hash_proto(msg: &M) -> Result<[u8; 32]> where M: prost::Message + prost::Name, { @@ -279,7 +285,10 @@ where /// /// The signature field is cleared before hashing, so callers may pass either an /// unsigned message or an already-signed message to re-sign. -pub fn sign_msg(msg: &pbconsensus::QbftMsg, privkey: &SecretKey) -> Result { +pub(crate) fn sign_msg( + msg: &pbconsensus::QbftMsg, + privkey: &SecretKey, +) -> Result { let mut clone = msg.clone(); clone.signature.clear(); @@ -294,7 +303,7 @@ pub fn sign_msg(msg: &pbconsensus::QbftMsg, privkey: &SecretKey) -> Result Result { +pub(crate) fn verify_msg_sig(msg: &pbconsensus::QbftMsg, pubkey: &PublicKey) -> Result { // Protobuf `bytes` fields decode both absent and explicit-empty values as // empty bytes in prost. if msg.signature.is_empty() { From 7da6acd18cc84e53b4f4ce7b93c80ecd9f0e79c9 Mon Sep 17 00:00:00 2001 From: Quang Le Date: Thu, 28 May 2026 12:16:46 +0700 Subject: [PATCH 11/19] feat(core): implement consensus/qbft/sniffer --- crates/core/src/consensus/qbft/sniffer.rs | 105 ++++++++++++++++++++++ 1 file changed, 105 insertions(+) create mode 100644 crates/core/src/consensus/qbft/sniffer.rs diff --git a/crates/core/src/consensus/qbft/sniffer.rs b/crates/core/src/consensus/qbft/sniffer.rs new file mode 100644 index 00000000..d3b6f13e --- /dev/null +++ b/crates/core/src/consensus/qbft/sniffer.rs @@ -0,0 +1,105 @@ +//! QBFT consensus message sniffer. + +// TODO: Remove once the consensus component exports sniffer lifecycle hooks. +#![allow(dead_code)] + +use std::{ + sync::{Mutex, PoisonError}, + time::SystemTime, +}; + +use prost_types::Timestamp; + +use crate::{ + consensus::protocols::QBFT_V2_PROTOCOL_ID, + corepb::v1::consensus::{QbftConsensusMsg, SniffedConsensusInstance, SniffedConsensusMsg}, +}; + +/// Buffers consensus messages for the debug API. +#[derive(Debug)] +pub(crate) struct Sniffer { + nodes: i64, + peer_idx: i64, + started_at: SystemTime, + msgs: Mutex>, +} + +impl Sniffer { + /// Returns a new QBFT consensus sniffer. + pub(crate) fn new(nodes: i64, peer_idx: i64) -> Self { + Self { + nodes, + peer_idx, + started_at: SystemTime::now(), + msgs: Mutex::default(), + } + } + + /// Adds a message to the sniffer buffer. + pub(crate) fn add(&self, msg: QbftConsensusMsg) { + self.msgs + .lock() + .unwrap_or_else(PoisonError::into_inner) + .push(SniffedConsensusMsg { + timestamp: Some(Timestamp::from(SystemTime::now())), + msg: Some(msg), + }); + } + + /// Returns the buffered messages as a sniffed consensus instance. + pub(crate) fn instance(&self) -> SniffedConsensusInstance { + SniffedConsensusInstance { + nodes: self.nodes, + peer_idx: self.peer_idx, + started_at: Some(Timestamp::from(self.started_at)), + msgs: self + .msgs + .lock() + .unwrap_or_else(PoisonError::into_inner) + .clone(), + protocol_id: QBFT_V2_PROTOCOL_ID.to_string(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::corepb::v1::consensus::QbftMsg; + + #[test] + fn sniffer_add_records_messages() { + let sniffer = Sniffer::new(4, 2); + let msg = consensus_msg(7); + + sniffer.add(msg.clone()); + + let instance = sniffer.instance(); + assert_eq!(instance.msgs.len(), 1); + assert_eq!(instance.msgs[0].msg, Some(msg)); + assert!(instance.msgs[0].timestamp.is_some()); + } + + #[test] + fn sniffer_instance_maps_fields() { + let sniffer = Sniffer::new(4, 3); + + let instance = sniffer.instance(); + + assert_eq!(instance.nodes, 4); + assert_eq!(instance.peer_idx, 3); + assert!(instance.started_at.is_some()); + assert!(instance.msgs.is_empty()); + assert_eq!(instance.protocol_id, QBFT_V2_PROTOCOL_ID); + } + + fn consensus_msg(round: i64) -> QbftConsensusMsg { + QbftConsensusMsg { + msg: Some(QbftMsg { + round, + ..Default::default() + }), + ..Default::default() + } + } +} From 237814336135e620aa518d79edb2c6c8cac2d0d1 Mon Sep 17 00:00:00 2001 From: Quang Le Date: Thu, 28 May 2026 14:06:23 +0700 Subject: [PATCH 12/19] feat(core): implement consensus/qbft/transport --- Cargo.lock | 1 + crates/core/Cargo.toml | 1 + crates/core/src/consensus/qbft/mod.rs | 3 + crates/core/src/consensus/qbft/msg.rs | 7 +- crates/core/src/consensus/qbft/transport.rs | 731 ++++++++++++++++++++ crates/core/src/qbft/mod.rs | 5 + 6 files changed, 747 insertions(+), 1 deletion(-) create mode 100644 crates/core/src/consensus/qbft/transport.rs diff --git a/Cargo.lock b/Cargo.lock index fe0c9cfd..b79189f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5598,6 +5598,7 @@ dependencies = [ "dyn-clone", "dyn-eq", "ethereum_ssz", + "futures", "hex", "k256", "libp2p", diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 2aa595a4..5c1fbf7d 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -12,6 +12,7 @@ chrono.workspace = true crossbeam.workspace = true dyn-clone.workspace = true dyn-eq.workspace = true +futures.workspace = true hex.workspace = true k256.workspace = true libp2p.workspace = true diff --git a/crates/core/src/consensus/qbft/mod.rs b/crates/core/src/consensus/qbft/mod.rs index a18db6be..723197f8 100644 --- a/crates/core/src/consensus/qbft/mod.rs +++ b/crates/core/src/consensus/qbft/mod.rs @@ -2,3 +2,6 @@ /// QBFT protobuf message wrapper. pub mod msg; + +pub(crate) mod sniffer; +pub(crate) mod transport; diff --git a/crates/core/src/consensus/qbft/msg.rs b/crates/core/src/consensus/qbft/msg.rs index f2339a9a..7a510726 100644 --- a/crates/core/src/consensus/qbft/msg.rs +++ b/crates/core/src/consensus/qbft/msg.rs @@ -274,9 +274,14 @@ where let mut encoded = Vec::with_capacity(msg.encoded_len()); msg.encode(&mut encoded).map_err(Error::MarshalProto)?; + hash_proto_bytes(&encoded) +} + +/// Returns the consensus hash for deterministic protobuf bytes. +pub(crate) fn hash_proto_bytes(encoded: &[u8]) -> Result<[u8; 32]> { let mut hasher = Hasher::default(); let index = hasher.index(); - hasher.put_bytes(&encoded).map_err(Error::HashProto)?; + hasher.put_bytes(encoded).map_err(Error::HashProto)?; hasher.merkleize(index).map_err(Error::HashProto)?; hasher.hash_root().map_err(Error::HashProto) } diff --git a/crates/core/src/consensus/qbft/transport.rs b/crates/core/src/consensus/qbft/transport.rs new file mode 100644 index 00000000..5ee4fdb8 --- /dev/null +++ b/crates/core/src/consensus/qbft/transport.rs @@ -0,0 +1,731 @@ +//! QBFT consensus transport adapter. + +// TODO: Remove once the consensus runner wires this transport. +#![allow(dead_code)] + +use std::sync::{self, Mutex, PoisonError}; + +use futures::future::BoxFuture; +use k256::SecretKey; +use prost_types::Any; +use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; + +use crate::{ + corepb::v1::{consensus as pbconsensus, core as pbcore}, + qbft::{self, SomeMsg}, + types::{Duty, DutyTypeError}, +}; + +use super::{ + msg::{self, ConsensusQbftTypes, ValueMap}, + sniffer::Sniffer, +}; + +/// Transport result. +pub(crate) type Result = std::result::Result; + +/// External consensus-message broadcaster seam. +pub(crate) type Broadcaster = Box< + dyn Fn(CancellationToken, pbconsensus::QbftConsensusMsg) -> BoxFuture<'static, Result<()>> + + Send + + Sync, +>; + +/// Parameters for broadcasting a QBFT message. +pub(crate) struct BroadcastRequest { + pub(crate) ct: CancellationToken, + pub(crate) type_: qbft::MessageType, + pub(crate) duty: Duty, + pub(crate) peer_idx: i64, + pub(crate) round: i64, + pub(crate) value_hash: [u8; 32], + pub(crate) prepared_round: i64, + pub(crate) prepared_value_hash: [u8; 32], + pub(crate) justification: Vec>, +} + +/// Errors returned by the QBFT consensus transport. +#[derive(Debug, thiserror::Error)] +pub(crate) enum Error { + /// Hash was not available in the value cache. + #[error("unknown value")] + UnknownValue, + + /// Broadcast justification was not the consensus QBFT message wrapper. + #[error("invalid justification message")] + InvalidJustificationMessage, + + /// Message creation justification was not the consensus QBFT message + /// wrapper. + #[error("invalid justification")] + InvalidJustification, + + /// Duty conversion failed. + #[error("invalid duty")] + InvalidDuty(#[source] DutyTypeError), + + /// Inner receive buffer was closed. + #[error("receive buffer closed")] + ReceiveBufferClosed, + + /// Consensus message wrapping/signing failed. + #[error("{0}")] + Msg(#[from] msg::Error), +} + +/// Transport adapter for one QBFT consensus instance. +pub(crate) struct Transport { + broadcaster: Broadcaster, + privkey: SecretKey, + recv_tx: mpsc::Sender>, + sniffer: sync::Arc, + value_rx: Mutex>, + values: Mutex, +} + +impl Transport { + /// Creates a new QBFT consensus transport. + pub(crate) fn new( + broadcaster: Broadcaster, + privkey: SecretKey, + value_rx: mpsc::Receiver, + recv_tx: mpsc::Sender>, + sniffer: Sniffer, + ) -> Self { + Self { + broadcaster, + privkey, + recv_tx, + sniffer: sync::Arc::new(sniffer), + value_rx: Mutex::new(value_rx), + values: Mutex::default(), + } + } + + /// Caches values carried by a consensus message. + pub(crate) fn set_values(&self, msg: &msg::Msg) { + self.values + .lock() + .unwrap_or_else(PoisonError::into_inner) + .extend( + msg.values() + .iter() + .map(|(hash, value)| (*hash, value.clone())), + ); + } + + /// Returns a cached value by hash, after draining at most one local value. + pub(crate) fn get_value(&self, hash: [u8; 32]) -> Result { + let local_value = self + .value_rx + .lock() + .unwrap_or_else(PoisonError::into_inner) + .try_recv(); + + let mut values = self.values.lock().unwrap_or_else(PoisonError::into_inner); + if let Ok(local) = local_value { + let hash = msg::hash_proto_bytes(&local.value)?; + values.insert(hash, local); + } + + values.get(&hash).cloned().ok_or(Error::UnknownValue) + } + + /// Creates, self-enqueues, sniffs, and externally broadcasts a QBFT + /// message. + pub(crate) async fn broadcast(&self, request: BroadcastRequest) -> Result<()> { + let BroadcastRequest { + ct, + type_, + duty, + peer_idx, + round, + value_hash, + prepared_round, + prepared_value_hash, + justification, + } = request; + + let mut hashes = vec![value_hash, prepared_value_hash]; + + for justification_msg in &justification { + let msg = justification_msg + .as_any() + .downcast_ref::() + .ok_or(Error::InvalidJustificationMessage)?; + hashes.push(msg.value()); + hashes.push(msg.prepared_value()); + } + + let mut values = ValueMap::new(); + for hash in hashes { + if hash == [0u8; 32] || values.contains_key(&hash) { + continue; + } + + values.insert(hash, self.get_value(hash)?); + } + + let msg = create_msg(CreateMsgRequest { + type_, + duty: &duty, + peer_idx, + round, + value_hash, + prepared_round, + prepared_value_hash, + values, + justification: &justification, + privkey: &self.privkey, + })?; + let msg = sync::Arc::new(msg); + let consensus_msg = msg.to_consensus_msg(); + let inner_msg: qbft::Msg = msg.clone(); + + let task_ct = ct.clone(); + let recv_tx = self.recv_tx.clone(); + let sniffer = sync::Arc::clone(&self.sniffer); + let sniffed_msg = sync::Arc::clone(&msg); + // Self-send is intentionally detached: the inner receive buffer can + // block, but network broadcast must still proceed. + tokio::spawn(async move { + tokio::select! { + () = task_ct.cancelled() => {} + result = recv_tx.send(inner_msg) => { + if result.is_ok() { + sniffer.add(sniffed_msg.to_consensus_msg()); + } + } + } + }); + + (self.broadcaster)(ct, consensus_msg).await + } + + /// Processes admitted outer messages until cancellation or channel close. + pub(crate) async fn process_receives( + &self, + ct: CancellationToken, + mut outer_rx: mpsc::Receiver, + ) -> Result<()> { + loop { + let msg = tokio::select! { + () = ct.cancelled() => return Ok(()), + msg = outer_rx.recv() => match msg { + Some(msg) => msg, + None => return Ok(()), + }, + }; + + self.set_values(&msg); + let consensus_msg = msg.to_consensus_msg(); + let inner_msg: qbft::Msg = sync::Arc::new(msg); + + tokio::select! { + () = ct.cancelled() => return Ok(()), + result = self.recv_tx.send(inner_msg) => { + result.map_err(|_| Error::ReceiveBufferClosed)?; + self.sniffer.add(consensus_msg); + } + } + } + } + + /// Returns the current sniffed consensus instance. + pub(crate) fn sniffer_instance(&self) -> pbconsensus::SniffedConsensusInstance { + self.sniffer.instance() + } +} + +struct CreateMsgRequest<'a> { + type_: qbft::MessageType, + duty: &'a Duty, + peer_idx: i64, + round: i64, + value_hash: [u8; 32], + prepared_round: i64, + prepared_value_hash: [u8; 32], + values: ValueMap, + justification: &'a [qbft::Msg], + privkey: &'a SecretKey, +} + +/// Creates a signed consensus QBFT message wrapper. +fn create_msg(request: CreateMsgRequest<'_>) -> Result { + let CreateMsgRequest { + type_, + duty, + peer_idx, + round, + value_hash, + prepared_round, + prepared_value_hash, + values, + justification, + privkey, + } = request; + + let pb_msg = pbconsensus::QbftMsg { + r#type: type_.as_i64(), + duty: Some(pbcore::Duty::try_from(duty).map_err(Error::InvalidDuty)?), + peer_idx, + round, + value_hash: value_hash.to_vec().into(), + prepared_round, + prepared_value_hash: prepared_value_hash.to_vec().into(), + ..Default::default() + }; + let pb_msg = msg::sign_msg(&pb_msg, privkey)?; + + let mut justifications = Vec::with_capacity(justification.len()); + for msg in justification { + let msg = msg + .as_any() + .downcast_ref::() + .ok_or(Error::InvalidJustification)?; + justifications.push(msg.msg().clone()); + } + + Ok(msg::Msg::new( + Some(pb_msg), + justifications, + sync::Arc::new(values), + )?) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + consensus::qbft::{msg::hash_proto, sniffer::Sniffer}, + corepb::v1::consensus::QbftMsg, + qbft::SomeMsg, + types::{DutyType, SlotNumber}, + }; + use prost_types::Timestamp; + use tokio::time::{Duration, timeout}; + + const SIGNING_PRIVKEY: &str = + "41d3ff12045b73c870529fe44f70dca2745bafbe1698ffc3c8759eef3cfbaee1"; + + #[test] + fn set_values_caches_message_values() { + let transport = test_transport(empty_value_rx()).0; + let value_hash = value_hash(1); + let msg = wrapped_msg(qbft::MSG_PRE_PREPARE, value_hash, [0u8; 32], vec![]); + + transport.set_values(&msg); + + assert_eq!(transport.get_value(value_hash).unwrap(), any_timestamp(1)); + } + + #[test] + fn get_value_returns_cached_value() { + let transport = test_transport(empty_value_rx()).0; + let value_hash = value_hash(1); + let msg = wrapped_msg(qbft::MSG_PRE_PREPARE, value_hash, [0u8; 32], vec![]); + transport.set_values(&msg); + + assert_eq!(transport.get_value(value_hash).unwrap(), any_timestamp(1)); + } + + #[test] + fn get_value_drains_local_value() { + let (value_tx, value_rx) = mpsc::channel(1); + let value_hash = value_hash(1); + value_tx.try_send(any_timestamp(1)).unwrap(); + let transport = test_transport(value_rx).0; + + assert_eq!(transport.get_value(value_hash).unwrap(), any_timestamp(1)); + } + + #[test] + fn get_value_unknown_value_errors() { + let transport = test_transport(empty_value_rx()).0; + + let err = transport.get_value([9u8; 32]).unwrap_err(); + + assert_eq!(err.to_string(), "unknown value"); + } + + #[test] + fn create_msg_signs_and_wraps() { + let key = secret_key(); + let duty = duty(); + let value_hash = value_hash(1); + let mut request = create_msg_request(&duty, &key); + request.peer_idx = 2; + request.round = 3; + request.value_hash = value_hash; + request.values = value_map(vec![(value_hash, any_timestamp(1))]); + + let msg = create_msg(request).unwrap(); + + assert_eq!(msg.msg().r#type, 1); + assert_eq!(msg.msg().peer_idx, 2); + assert_eq!(msg.msg().round, 3); + assert_eq!(msg.value(), value_hash); + assert!(!msg.msg().signature.is_empty()); + assert!(msg::verify_msg_sig(msg.msg(), &key.public_key()).unwrap()); + } + + #[test] + fn create_msg_preserves_unknown_message_type() { + let key = secret_key(); + let duty = duty(); + let mut request = create_msg_request(&duty, &key); + request.type_ = qbft::MessageType::from_wire(99); + request.peer_idx = 2; + request.round = 3; + + let msg = create_msg(request).unwrap(); + + assert_eq!(msg.msg().r#type, 99); + } + + #[test] + fn create_msg_uses_raw_justifications_only() { + let key = secret_key(); + let duty = duty(); + let nested = QbftMsg { + r#type: 3, + round: 9, + ..Default::default() + }; + let raw_justification = QbftMsg { + r#type: 2, + round: 4, + ..Default::default() + }; + let justification = msg::Msg::new( + Some(raw_justification.clone()), + vec![nested], + sync::Arc::default(), + ) + .unwrap(); + let justification: qbft::Msg = sync::Arc::new(justification); + let justifications = [justification]; + let mut request = create_msg_request(&duty, &key); + request.type_ = qbft::MSG_ROUND_CHANGE; + request.peer_idx = 2; + request.round = 5; + request.justification = &justifications; + + let msg = create_msg(request).unwrap(); + + assert_eq!( + msg.to_consensus_msg().justification, + vec![raw_justification] + ); + } + + #[test] + fn create_msg_rejects_invalid_justification_type() { + let key = secret_key(); + let duty = duty(); + let justification: qbft::Msg = sync::Arc::new(OtherMsg); + let justifications = [justification]; + let mut request = create_msg_request(&duty, &key); + request.justification = &justifications; + + let err = create_msg(request).unwrap_err(); + + assert_eq!(err.to_string(), "invalid justification"); + } + + #[tokio::test] + async fn broadcast_resolves_hashes_and_calls_broadcaster() { + let value_hash = value_hash(1); + let (transport, _recv_rx, sent) = + test_transport(local_value_rx(value_hash, any_timestamp(1))); + + transport + .broadcast(broadcast_request(value_hash)) + .await + .unwrap(); + + let sent = sent.lock().unwrap(); + assert_eq!(sent.len(), 1); + assert_eq!( + sent[0].msg.as_ref().unwrap().value_hash, + value_hash.to_vec() + ); + assert_eq!(sent[0].values, vec![any_timestamp(1)]); + } + + #[tokio::test] + async fn broadcast_skips_zero_and_duplicate_hashes() { + let value_hash = value_hash(1); + let justification = wrapped_msg(qbft::MSG_PREPARE, value_hash, [0u8; 32], vec![]); + let justification: qbft::Msg = sync::Arc::new(justification); + let (transport, _recv_rx, sent) = + test_transport(local_value_rx(value_hash, any_timestamp(1))); + let mut request = broadcast_request(value_hash); + request.type_ = qbft::MSG_ROUND_CHANGE; + request.justification = vec![justification]; + + transport.broadcast(request).await.unwrap(); + + let sent = sent.lock().unwrap(); + assert_eq!(sent[0].values.len(), 1); + assert_eq!(sent[0].values[0], any_timestamp(1)); + } + + #[tokio::test] + async fn broadcast_self_enqueues_message() { + let value_hash = value_hash(1); + let (transport, mut recv_rx, _sent) = + test_transport(local_value_rx(value_hash, any_timestamp(1))); + + transport + .broadcast(broadcast_request(value_hash)) + .await + .unwrap(); + + let received = timeout(Duration::from_secs(1), recv_rx.recv()) + .await + .unwrap() + .unwrap(); + assert_eq!(received.value(), value_hash); + assert_eq!(transport.sniffer_instance().msgs.len(), 1); + } + + #[tokio::test] + async fn broadcast_unknown_value_errors() { + let (transport, _recv_rx, sent) = test_transport(empty_value_rx()); + + let err = transport + .broadcast(broadcast_request([8u8; 32])) + .await + .unwrap_err(); + + assert_eq!(err.to_string(), "unknown value"); + assert!(sent.lock().unwrap().is_empty()); + } + + #[tokio::test] + async fn broadcast_rejects_invalid_justification_message() { + let justification: qbft::Msg = sync::Arc::new(OtherMsg); + let (transport, _recv_rx, _sent) = test_transport(empty_value_rx()); + let mut request = broadcast_request([0u8; 32]); + request.type_ = qbft::MSG_ROUND_CHANGE; + request.justification = vec![justification]; + + let err = transport.broadcast(request).await.unwrap_err(); + + assert_eq!(err.to_string(), "invalid justification message"); + } + + #[tokio::test] + async fn process_receives_caches_values_and_forwards() { + let value_hash = value_hash(1); + let msg = wrapped_msg(qbft::MSG_PRE_PREPARE, value_hash, [0u8; 32], vec![]); + let (transport, mut recv_rx, _sent) = test_transport(empty_value_rx()); + let transport = sync::Arc::new(transport); + let (outer_tx, outer_rx) = mpsc::channel(1); + let ct = CancellationToken::new(); + let runner = { + let transport = sync::Arc::clone(&transport); + let ct = ct.clone(); + tokio::spawn(async move { transport.process_receives(ct, outer_rx).await }) + }; + + outer_tx.send(msg).await.unwrap(); + + let received = timeout(Duration::from_secs(1), recv_rx.recv()) + .await + .unwrap() + .unwrap(); + assert_eq!(received.value(), value_hash); + assert_eq!(transport.get_value(value_hash).unwrap(), any_timestamp(1)); + assert_eq!(transport.sniffer_instance().msgs.len(), 1); + + ct.cancel(); + runner.await.unwrap().unwrap(); + } + + #[tokio::test] + async fn process_receives_stops_on_cancel() { + let (transport, _recv_rx, _sent) = test_transport(empty_value_rx()); + let (_outer_tx, outer_rx) = mpsc::channel(1); + let ct = CancellationToken::new(); + ct.cancel(); + + transport.process_receives(ct, outer_rx).await.unwrap(); + } + + #[derive(Debug)] + struct OtherMsg; + + impl SomeMsg for OtherMsg { + fn type_(&self) -> qbft::MessageType { + qbft::MSG_PRE_PREPARE + } + + fn instance(&self) -> Duty { + duty() + } + + fn source(&self) -> i64 { + 1 + } + + fn round(&self) -> i64 { + 1 + } + + fn value(&self) -> [u8; 32] { + [0u8; 32] + } + + fn value_source(&self) -> std::result::Result { + Ok(Any::default()) + } + + fn prepared_round(&self) -> i64 { + 0 + } + + fn prepared_value(&self) -> [u8; 32] { + [0u8; 32] + } + + fn justification(&self) -> Vec> { + vec![] + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + } + + type SentMessages = sync::Arc>>; + + fn test_transport( + value_rx: mpsc::Receiver, + ) -> ( + Transport, + mpsc::Receiver>, + SentMessages, + ) { + let (recv_tx, recv_rx) = mpsc::channel(8); + let sent = SentMessages::default(); + let broadcaster = recording_broadcaster(sync::Arc::clone(&sent)); + let transport = Transport::new( + broadcaster, + secret_key(), + value_rx, + recv_tx, + Sniffer::new(4, 1), + ); + + (transport, recv_rx, sent) + } + + fn recording_broadcaster(sent: SentMessages) -> Broadcaster { + Box::new(move |_ct, msg| { + let sent = sync::Arc::clone(&sent); + Box::pin(async move { + sent.lock().unwrap().push(msg); + Ok(()) + }) + }) + } + + fn create_msg_request<'a>(duty: &'a Duty, privkey: &'a SecretKey) -> CreateMsgRequest<'a> { + CreateMsgRequest { + type_: qbft::MSG_PRE_PREPARE, + duty, + peer_idx: 1, + round: 1, + value_hash: [0u8; 32], + prepared_round: 0, + prepared_value_hash: [0u8; 32], + values: ValueMap::new(), + justification: &[], + privkey, + } + } + + fn broadcast_request(value_hash: [u8; 32]) -> BroadcastRequest { + BroadcastRequest { + ct: CancellationToken::new(), + type_: qbft::MSG_PRE_PREPARE, + duty: duty(), + peer_idx: 1, + round: 2, + value_hash, + prepared_round: 0, + prepared_value_hash: [0u8; 32], + justification: vec![], + } + } + + fn empty_value_rx() -> mpsc::Receiver { + let (_tx, rx) = mpsc::channel(1); + rx + } + + fn local_value_rx(hash: [u8; 32], value: Any) -> mpsc::Receiver { + let (tx, rx) = mpsc::channel(1); + assert_eq!(msg::hash_proto_bytes(&value.value).unwrap(), hash); + tx.try_send(value).unwrap(); + rx + } + + fn wrapped_msg( + type_: qbft::MessageType, + value_hash: [u8; 32], + prepared_value_hash: [u8; 32], + justification: Vec, + ) -> msg::Msg { + let mut values = ValueMap::new(); + if value_hash != [0u8; 32] { + values.insert(value_hash, any_timestamp(1)); + } + if prepared_value_hash != [0u8; 32] { + values.insert(prepared_value_hash, any_timestamp(2)); + } + + msg::Msg::new( + Some(QbftMsg { + r#type: type_.as_i64(), + duty: Some(pbcore::Duty::try_from(&duty()).unwrap()), + peer_idx: 1, + round: 2, + value_hash: value_hash.to_vec().into(), + prepared_round: 0, + prepared_value_hash: prepared_value_hash.to_vec().into(), + ..Default::default() + }), + justification, + sync::Arc::new(values), + ) + .unwrap() + } + + fn value_map(values: Vec<([u8; 32], Any)>) -> ValueMap { + values.into_iter().collect() + } + + fn value_hash(seconds: i64) -> [u8; 32] { + hash_proto(×tamp(seconds)).unwrap() + } + + fn timestamp(seconds: i64) -> Timestamp { + Timestamp { seconds, nanos: 0 } + } + + fn any_timestamp(seconds: i64) -> Any { + Any::from_msg(×tamp(seconds)).unwrap() + } + + fn duty() -> Duty { + Duty::new(SlotNumber::new(42), DutyType::Attester) + } + + fn secret_key() -> SecretKey { + SecretKey::from_slice(&hex::decode(SIGNING_PRIVKEY).unwrap()).unwrap() + } +} diff --git a/crates/core/src/qbft/mod.rs b/crates/core/src/qbft/mod.rs index 00b876ad..a3bfa517 100644 --- a/crates/core/src/qbft/mod.rs +++ b/crates/core/src/qbft/mod.rs @@ -209,6 +209,11 @@ impl MessageType { Self(value) } + /// Returns the stable wire integer. + pub(crate) fn as_i64(self) -> i64 { + self.0 + } + /// Returns true when the message type is one of the known QBFT wire types. pub fn valid(&self) -> bool { self.0 > MSG_UNKNOWN.0 && self.0 < MSG_SENTINEL.0 From 68ea84631b37fc169378e550646a69ea835af365 Mon Sep 17 00:00:00 2001 From: Quang Le Date: Thu, 28 May 2026 15:28:59 +0700 Subject: [PATCH 13/19] chore: add comment on bridging mpsc and crossbeam --- crates/core/src/consensus/qbft/transport.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/core/src/consensus/qbft/transport.rs b/crates/core/src/consensus/qbft/transport.rs index 5ee4fdb8..98ca134e 100644 --- a/crates/core/src/consensus/qbft/transport.rs +++ b/crates/core/src/consensus/qbft/transport.rs @@ -78,6 +78,8 @@ pub(crate) enum Error { pub(crate) struct Transport { broadcaster: Broadcaster, privkey: SecretKey, + // Async admission buffer for wrapped QBFT messages. Runner wiring bridges + // this tokio channel into the crossbeam receiver used by core::qbft::run. recv_tx: mpsc::Sender>, sniffer: sync::Arc, value_rx: Mutex>, From 8515fd997459dac4996518a1040b176fdadbc671 Mon Sep 17 00:00:00 2001 From: Quang Le Date: Thu, 28 May 2026 15:32:48 +0700 Subject: [PATCH 14/19] fix: single mutex for transport state --- crates/core/src/consensus/qbft/transport.rs | 29 +++++++++++---------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/crates/core/src/consensus/qbft/transport.rs b/crates/core/src/consensus/qbft/transport.rs index 98ca134e..9ed8fea3 100644 --- a/crates/core/src/consensus/qbft/transport.rs +++ b/crates/core/src/consensus/qbft/transport.rs @@ -82,8 +82,12 @@ pub(crate) struct Transport { // this tokio channel into the crossbeam receiver used by core::qbft::run. recv_tx: mpsc::Sender>, sniffer: sync::Arc, - value_rx: Mutex>, - values: Mutex, + values: Mutex, +} + +struct ValueStore { + value_rx: mpsc::Receiver, + values: ValueMap, } impl Transport { @@ -100,8 +104,10 @@ impl Transport { privkey, recv_tx, sniffer: sync::Arc::new(sniffer), - value_rx: Mutex::new(value_rx), - values: Mutex::default(), + values: Mutex::new(ValueStore { + value_rx, + values: ValueMap::new(), + }), } } @@ -110,6 +116,7 @@ impl Transport { self.values .lock() .unwrap_or_else(PoisonError::into_inner) + .values .extend( msg.values() .iter() @@ -119,19 +126,13 @@ impl Transport { /// Returns a cached value by hash, after draining at most one local value. pub(crate) fn get_value(&self, hash: [u8; 32]) -> Result { - let local_value = self - .value_rx - .lock() - .unwrap_or_else(PoisonError::into_inner) - .try_recv(); - - let mut values = self.values.lock().unwrap_or_else(PoisonError::into_inner); - if let Ok(local) = local_value { + let mut store = self.values.lock().unwrap_or_else(PoisonError::into_inner); + if let Ok(local) = store.value_rx.try_recv() { let hash = msg::hash_proto_bytes(&local.value)?; - values.insert(hash, local); + store.values.insert(hash, local); } - values.get(&hash).cloned().ok_or(Error::UnknownValue) + store.values.get(&hash).cloned().ok_or(Error::UnknownValue) } /// Creates, self-enqueues, sniffs, and externally broadcasts a QBFT From 08b0edf2718744522e6094bf858bcb1c024181a9 Mon Sep 17 00:00:00 2001 From: Quang Le Date: Thu, 28 May 2026 15:34:46 +0700 Subject: [PATCH 15/19] fix: double clone consensus msg --- crates/core/src/consensus/qbft/transport.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/core/src/consensus/qbft/transport.rs b/crates/core/src/consensus/qbft/transport.rs index 9ed8fea3..8f326c37 100644 --- a/crates/core/src/consensus/qbft/transport.rs +++ b/crates/core/src/consensus/qbft/transport.rs @@ -189,7 +189,7 @@ impl Transport { let task_ct = ct.clone(); let recv_tx = self.recv_tx.clone(); let sniffer = sync::Arc::clone(&self.sniffer); - let sniffed_msg = sync::Arc::clone(&msg); + let sniffed_msg = consensus_msg.clone(); // Self-send is intentionally detached: the inner receive buffer can // block, but network broadcast must still proceed. tokio::spawn(async move { @@ -197,7 +197,7 @@ impl Transport { () = task_ct.cancelled() => {} result = recv_tx.send(inner_msg) => { if result.is_ok() { - sniffer.add(sniffed_msg.to_consensus_msg()); + sniffer.add(sniffed_msg); } } } From ff3f933594fba2aabe288349069254a98528129c Mon Sep 17 00:00:00 2001 From: Quang Le Date: Thu, 28 May 2026 15:39:36 +0700 Subject: [PATCH 16/19] test: improve tests wait --- crates/core/src/consensus/qbft/transport.rs | 30 ++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/crates/core/src/consensus/qbft/transport.rs b/crates/core/src/consensus/qbft/transport.rs index 8f326c37..c1fbddc8 100644 --- a/crates/core/src/consensus/qbft/transport.rs +++ b/crates/core/src/consensus/qbft/transport.rs @@ -491,7 +491,7 @@ mod tests { .unwrap() .unwrap(); assert_eq!(received.value(), value_hash); - assert_eq!(transport.sniffer_instance().msgs.len(), 1); + wait_for_sniffer_len(&transport, 1).await; } #[tokio::test] @@ -558,6 +558,24 @@ mod tests { transport.process_receives(ct, outer_rx).await.unwrap(); } + #[tokio::test] + async fn process_receives_errors_when_receive_buffer_closed() { + let value_hash = value_hash(1); + let msg = wrapped_msg(qbft::MSG_PRE_PREPARE, value_hash, [0u8; 32], vec![]); + let (transport, recv_rx, _sent) = test_transport(empty_value_rx()); + drop(recv_rx); + + let (outer_tx, outer_rx) = mpsc::channel(1); + outer_tx.send(msg).await.unwrap(); + + let err = transport + .process_receives(CancellationToken::new(), outer_rx) + .await + .unwrap_err(); + + assert_eq!(err.to_string(), "receive buffer closed"); + } + #[derive(Debug)] struct OtherMsg; @@ -636,6 +654,16 @@ mod tests { }) } + async fn wait_for_sniffer_len(transport: &Transport, expected: usize) { + timeout(Duration::from_secs(1), async { + while transport.sniffer_instance().msgs.len() != expected { + tokio::task::yield_now().await; + } + }) + .await + .unwrap(); + } + fn create_msg_request<'a>(duty: &'a Duty, privkey: &'a SecretKey) -> CreateMsgRequest<'a> { CreateMsgRequest { type_: qbft::MSG_PRE_PREPARE, From c76522eb0904dd02d491f3517ad76af762221163 Mon Sep 17 00:00:00 2001 From: Quang Le Date: Thu, 28 May 2026 16:46:37 +0700 Subject: [PATCH 17/19] fix: new msg take Msg not Option --- crates/core/src/consensus/qbft/msg.rs | 54 +++++++-------------- crates/core/src/consensus/qbft/transport.rs | 8 +-- 2 files changed, 22 insertions(+), 40 deletions(-) diff --git a/crates/core/src/consensus/qbft/msg.rs b/crates/core/src/consensus/qbft/msg.rs index 7a510726..b2fd23e3 100644 --- a/crates/core/src/consensus/qbft/msg.rs +++ b/crates/core/src/consensus/qbft/msg.rs @@ -65,10 +65,6 @@ type Result = std::result::Result; /// Errors returned by QBFT message wrapping. #[derive(Debug, thiserror::Error)] pub enum Error { - /// Nil QBFT protobuf message. - #[error("nil qbft message")] - NilQbftMessage, - /// Value hash did not exist in the values map. #[error("value hash not found in values")] ValueHashNotFound, @@ -150,12 +146,10 @@ impl Msg { /// Justifications are raw protobuf messages from the same consensus /// envelope. They are recursively wrapped with the same shared value map. pub(crate) fn new( - msg: Option, + msg: pbconsensus::QbftMsg, justification: Vec, values: sync::Arc, ) -> Result { - let msg = msg.ok_or(Error::NilQbftMessage)?; - let value_hash = match to_hash32(&msg.value_hash) { Some(hash) if values.contains_key(&hash) => hash, Some(_) => return Err(Error::ValueHashNotFound), @@ -171,7 +165,7 @@ impl Msg { Vec::with_capacity(justification.len()); for justification_msg in &justification { - let impl_msg = Self::new(Some(justification_msg.clone()), vec![], values.clone())?; + let impl_msg = Self::new(justification_msg.clone(), vec![], values.clone())?; justification_impls.push(sync::Arc::new(impl_msg)); } @@ -441,20 +435,13 @@ mod tests { ); } - #[test] - fn new_rejects_nil_message() { - let err = Msg::new(None, vec![], sync::Arc::default()).unwrap_err(); - - assert_eq!(err.to_string(), "nil qbft message"); - } - #[test] fn debug_unknown_message_type() { let msg = Msg::new( - Some(pbconsensus::QbftMsg { + pbconsensus::QbftMsg { r#type: 99, ..Default::default() - }), + }, vec![], sync::Arc::default(), ) @@ -475,7 +462,7 @@ mod tests { ])); let msg = Msg::new( - Some(pbconsensus::QbftMsg { + pbconsensus::QbftMsg { r#type: 1, duty: Some(pbcore::Duty { slot: 42, @@ -487,7 +474,7 @@ mod tests { value_hash: value_hash.to_vec().into(), prepared_value_hash: prepared_hash.to_vec().into(), ..Default::default() - }), + }, vec![], values, ) @@ -511,10 +498,10 @@ mod tests { #[test_case(vec![0; 32] ; "zero_hash")] fn new_treats_invalid_value_hash_as_nil(hash: Vec) { let msg = Msg::new( - Some(pbconsensus::QbftMsg { + pbconsensus::QbftMsg { value_hash: hash.into(), ..Default::default() - }), + }, vec![], sync::Arc::default(), ) @@ -527,10 +514,10 @@ mod tests { #[test_case(vec![0; 32] ; "zero_hash")] fn new_treats_invalid_prepared_value_hash_as_nil(hash: Vec) { let msg = Msg::new( - Some(pbconsensus::QbftMsg { + pbconsensus::QbftMsg { prepared_value_hash: hash.into(), ..Default::default() - }), + }, vec![], sync::Arc::default(), ) @@ -542,10 +529,10 @@ mod tests { #[test] fn new_errors_on_missing_value_hash() { let err = Msg::new( - Some(pbconsensus::QbftMsg { + pbconsensus::QbftMsg { value_hash: [1u8; 32].to_vec().into(), ..Default::default() - }), + }, vec![], sync::Arc::default(), ) @@ -557,10 +544,10 @@ mod tests { #[test] fn new_errors_on_missing_prepared_value_hash() { let err = Msg::new( - Some(pbconsensus::QbftMsg { + pbconsensus::QbftMsg { prepared_value_hash: [2u8; 32].to_vec().into(), ..Default::default() - }), + }, vec![], sync::Arc::default(), ) @@ -572,7 +559,7 @@ mod tests { #[test] fn new_errors_on_nested_justification_missing_value() { let err = Msg::new( - Some(pbconsensus::QbftMsg::default()), + pbconsensus::QbftMsg::default(), vec![pbconsensus::QbftMsg { value_hash: [3u8; 32].to_vec().into(), ..Default::default() @@ -587,7 +574,7 @@ mod tests { #[test] fn value_source_errors_when_value_missing() { let msg = Msg::new( - Some(pbconsensus::QbftMsg::default()), + pbconsensus::QbftMsg::default(), vec![], sync::Arc::default(), ) @@ -604,7 +591,7 @@ mod tests { let values = sync::Arc::new(value_map(vec![(value_hash, any_timestamp(1))])); let msg = Msg::new( - Some(pbconsensus::QbftMsg::default()), + pbconsensus::QbftMsg::default(), vec![pbconsensus::QbftMsg { r#type: 2, value_hash: value_hash.to_vec().into(), @@ -642,12 +629,7 @@ mod tests { ..Default::default() }; - let msg = Msg::new( - Some(raw_msg.clone()), - vec![raw_justification.clone()], - values, - ) - .unwrap(); + let msg = Msg::new(raw_msg.clone(), vec![raw_justification.clone()], values).unwrap(); let consensus_msg = msg.to_consensus_msg(); assert_eq!(msg.msg(), &raw_msg); diff --git a/crates/core/src/consensus/qbft/transport.rs b/crates/core/src/consensus/qbft/transport.rs index c1fbddc8..72a0c9c1 100644 --- a/crates/core/src/consensus/qbft/transport.rs +++ b/crates/core/src/consensus/qbft/transport.rs @@ -291,7 +291,7 @@ fn create_msg(request: CreateMsgRequest<'_>) -> Result { } Ok(msg::Msg::new( - Some(pb_msg), + pb_msg, justifications, sync::Arc::new(values), )?) @@ -402,7 +402,7 @@ mod tests { ..Default::default() }; let justification = msg::Msg::new( - Some(raw_justification.clone()), + raw_justification.clone(), vec![nested], sync::Arc::default(), ) @@ -720,7 +720,7 @@ mod tests { } msg::Msg::new( - Some(QbftMsg { + QbftMsg { r#type: type_.as_i64(), duty: Some(pbcore::Duty::try_from(&duty()).unwrap()), peer_idx: 1, @@ -729,7 +729,7 @@ mod tests { prepared_round: 0, prepared_value_hash: prepared_value_hash.to_vec().into(), ..Default::default() - }), + }, justification, sync::Arc::new(values), ) From 6ed418e5a9cabb594415862c33882995c6113751 Mon Sep 17 00:00:00 2001 From: Quang Le Date: Fri, 29 May 2026 09:52:16 +0700 Subject: [PATCH 18/19] fix: implement From for MessageType --- crates/core/src/consensus/qbft/transport.rs | 4 ++-- crates/core/src/qbft/mod.rs | 12 +++++++----- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/crates/core/src/consensus/qbft/transport.rs b/crates/core/src/consensus/qbft/transport.rs index 72a0c9c1..2826fb29 100644 --- a/crates/core/src/consensus/qbft/transport.rs +++ b/crates/core/src/consensus/qbft/transport.rs @@ -270,7 +270,7 @@ fn create_msg(request: CreateMsgRequest<'_>) -> Result { } = request; let pb_msg = pbconsensus::QbftMsg { - r#type: type_.as_i64(), + r#type: i64::from(type_), duty: Some(pbcore::Duty::try_from(duty).map_err(Error::InvalidDuty)?), peer_idx, round, @@ -721,7 +721,7 @@ mod tests { msg::Msg::new( QbftMsg { - r#type: type_.as_i64(), + r#type: i64::from(type_), duty: Some(pbcore::Duty::try_from(&duty()).unwrap()), peer_idx: 1, round: 2, diff --git a/crates/core/src/qbft/mod.rs b/crates/core/src/qbft/mod.rs index a3bfa517..d1d244d4 100644 --- a/crates/core/src/qbft/mod.rs +++ b/crates/core/src/qbft/mod.rs @@ -209,17 +209,18 @@ impl MessageType { Self(value) } - /// Returns the stable wire integer. - pub(crate) fn as_i64(self) -> i64 { - self.0 - } - /// Returns true when the message type is one of the known QBFT wire types. pub fn valid(&self) -> bool { self.0 > MSG_UNKNOWN.0 && self.0 < MSG_SENTINEL.0 } } +impl From for i64 { + fn from(value: MessageType) -> Self { + value.0 + } +} + impl Display for MessageType { /// Formats the message type using the stable wire/debug label. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -1410,6 +1411,7 @@ mod tests { let message_type = MessageType::from_wire(99); assert_eq!(message_type, MessageType(99)); + assert_eq!(i64::from(message_type), 99); assert!(!message_type.valid()); assert_eq!(message_type.to_string(), ""); } From 20f272430e84a7a439cb461fc0b13e948ad176d5 Mon Sep 17 00:00:00 2001 From: Quang Le Date: Fri, 29 May 2026 09:58:40 +0700 Subject: [PATCH 19/19] fix: update docs --- crates/core/src/consensus/qbft/msg.rs | 6 +++++- crates/core/src/consensus/qbft/transport.rs | 12 ++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/crates/core/src/consensus/qbft/msg.rs b/crates/core/src/consensus/qbft/msg.rs index b2fd23e3..8085e234 100644 --- a/crates/core/src/consensus/qbft/msg.rs +++ b/crates/core/src/consensus/qbft/msg.rs @@ -271,7 +271,11 @@ where hash_proto_bytes(&encoded) } -/// Returns the consensus hash for deterministic protobuf bytes. +/// Returns the consensus hash for deterministic inner-protobuf bytes. +/// +/// This helper hashes the bytes exactly as provided; it does not decode or +/// canonicalize a protobuf envelope. Callers must pass bytes produced from the +/// concrete inner message with deterministic field/map ordering. pub(crate) fn hash_proto_bytes(encoded: &[u8]) -> Result<[u8; 32]> { let mut hasher = Hasher::default(); let index = hasher.index(); diff --git a/crates/core/src/consensus/qbft/transport.rs b/crates/core/src/consensus/qbft/transport.rs index 2826fb29..4f40a1ed 100644 --- a/crates/core/src/consensus/qbft/transport.rs +++ b/crates/core/src/consensus/qbft/transport.rs @@ -92,6 +92,10 @@ struct ValueStore { impl Transport { /// Creates a new QBFT consensus transport. + /// + /// Callers must cancel the tokens passed to [`Transport::broadcast`] when + /// the consensus instance ends. Detached self-send tasks use those tokens + /// to stop if the inner receive buffer stays full. pub(crate) fn new( broadcaster: Broadcaster, privkey: SecretKey, @@ -128,6 +132,9 @@ impl Transport { pub(crate) fn get_value(&self, hash: [u8; 32]) -> Result { let mut store = self.values.lock().unwrap_or_else(PoisonError::into_inner); if let Ok(local) = store.value_rx.try_recv() { + // Any::value is hashable here because the local producer must pack + // canonical deterministic bytes for the concrete inner protobuf. + // Inbound values must be decoded and canonicalized before caching. let hash = msg::hash_proto_bytes(&local.value)?; store.values.insert(hash, local); } @@ -137,6 +144,11 @@ impl Transport { /// Creates, self-enqueues, sniffs, and externally broadcasts a QBFT /// message. + /// + /// The self-send task exits when the message is accepted by the inner + /// receive buffer or when `request.ct` is cancelled. Instance teardown must + /// cancel that token so blocked self-send tasks cannot outlive the + /// transport. pub(crate) async fn broadcast(&self, request: BroadcastRequest) -> Result<()> { let BroadcastRequest { ct,