Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ chrono.workspace = true
crossbeam.workspace = true
dyn-clone.workspace = true
dyn-eq.workspace = true
futures.workspace = true
hex.workspace = true
k256.workspace = true
libp2p.workspace = true
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/consensus/qbft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@
/// QBFT protobuf message wrapper.
pub mod msg;

pub(crate) mod sniffer;
pub(crate) mod transport;
61 changes: 24 additions & 37 deletions crates/core/src/consensus/qbft/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ type Result<T> = std::result::Result<T, Error>;
/// Errors returned by QBFT message wrapping.
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Nil QBFT protobuf message.
#[error("nil qbft message")]
NilQbftMessage,

/// Value hash did not exist in the values map.
#[error("value hash not found in values")]
ValueHashNotFound,
Expand Down Expand Up @@ -150,12 +146,10 @@ impl Msg {
/// Justifications are raw protobuf messages from the same consensus
/// envelope. They are recursively wrapped with the same shared value map.
pub(crate) fn new(
msg: Option<pbconsensus::QbftMsg>,
msg: pbconsensus::QbftMsg,
justification: Vec<pbconsensus::QbftMsg>,
values: sync::Arc<ValueMap>,
) -> Result<Self> {
let msg = msg.ok_or(Error::NilQbftMessage)?;

let value_hash = match to_hash32(&msg.value_hash) {
Some(hash) if values.contains_key(&hash) => hash,
Some(_) => return Err(Error::ValueHashNotFound),
Expand All @@ -171,7 +165,7 @@ impl Msg {
Vec::with_capacity(justification.len());

for justification_msg in &justification {
let impl_msg = Self::new(Some(justification_msg.clone()), vec![], values.clone())?;
let impl_msg = Self::new(justification_msg.clone(), vec![], values.clone())?;
justification_impls.push(sync::Arc::new(impl_msg));
}

Expand Down Expand Up @@ -274,9 +268,14 @@ where
let mut encoded = Vec::with_capacity(msg.encoded_len());
msg.encode(&mut encoded).map_err(Error::MarshalProto)?;

hash_proto_bytes(&encoded)
}

/// Returns the consensus hash for deterministic protobuf bytes.
pub(crate) fn hash_proto_bytes(encoded: &[u8]) -> Result<[u8; 32]> {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hash_proto (line 266) explicitly rejects Any (test hash_proto_rejects_any). hash_proto_bytes has no such guard — it hashes whatever bytes it is handed. Doc currently reads only Returns the consensus hash for deterministic protobuf bytes.

Given the helper is pub(crate) and is the natural shortcut a future inbound-validation layer would reach for, the docstring should spell out the invariant explicitly: bytes MUST be the deterministic prost encoding of a non-Any concrete message, equivalent to prost::Message::encode_to_vec over a typed proto whose map fields are BTreeMap. Anything else risks silent hash mismatch against Go peers, which re-marshal deterministically before hashing.

let mut hasher = Hasher::default();
let index = hasher.index();
hasher.put_bytes(&encoded).map_err(Error::HashProto)?;
hasher.put_bytes(encoded).map_err(Error::HashProto)?;
hasher.merkleize(index).map_err(Error::HashProto)?;
hasher.hash_root().map_err(Error::HashProto)
}
Expand Down Expand Up @@ -436,20 +435,13 @@ mod tests {
);
}

#[test]
fn new_rejects_nil_message() {
let err = Msg::new(None, vec![], sync::Arc::default()).unwrap_err();

assert_eq!(err.to_string(), "nil qbft message");
}

#[test]
fn debug_unknown_message_type() {
let msg = Msg::new(
Some(pbconsensus::QbftMsg {
pbconsensus::QbftMsg {
r#type: 99,
..Default::default()
}),
},
vec![],
sync::Arc::default(),
)
Expand All @@ -470,7 +462,7 @@ mod tests {
]));

let msg = Msg::new(
Some(pbconsensus::QbftMsg {
pbconsensus::QbftMsg {
r#type: 1,
duty: Some(pbcore::Duty {
slot: 42,
Expand All @@ -482,7 +474,7 @@ mod tests {
value_hash: value_hash.to_vec().into(),
prepared_value_hash: prepared_hash.to_vec().into(),
..Default::default()
}),
},
vec![],
values,
)
Expand All @@ -506,10 +498,10 @@ mod tests {
#[test_case(vec![0; 32] ; "zero_hash")]
fn new_treats_invalid_value_hash_as_nil(hash: Vec<u8>) {
let msg = Msg::new(
Some(pbconsensus::QbftMsg {
pbconsensus::QbftMsg {
value_hash: hash.into(),
..Default::default()
}),
},
vec![],
sync::Arc::default(),
)
Expand All @@ -522,10 +514,10 @@ mod tests {
#[test_case(vec![0; 32] ; "zero_hash")]
fn new_treats_invalid_prepared_value_hash_as_nil(hash: Vec<u8>) {
let msg = Msg::new(
Some(pbconsensus::QbftMsg {
pbconsensus::QbftMsg {
prepared_value_hash: hash.into(),
..Default::default()
}),
},
vec![],
sync::Arc::default(),
)
Expand All @@ -537,10 +529,10 @@ mod tests {
#[test]
fn new_errors_on_missing_value_hash() {
let err = Msg::new(
Some(pbconsensus::QbftMsg {
pbconsensus::QbftMsg {
value_hash: [1u8; 32].to_vec().into(),
..Default::default()
}),
},
vec![],
sync::Arc::default(),
)
Expand All @@ -552,10 +544,10 @@ mod tests {
#[test]
fn new_errors_on_missing_prepared_value_hash() {
let err = Msg::new(
Some(pbconsensus::QbftMsg {
pbconsensus::QbftMsg {
prepared_value_hash: [2u8; 32].to_vec().into(),
..Default::default()
}),
},
vec![],
sync::Arc::default(),
)
Expand All @@ -567,7 +559,7 @@ mod tests {
#[test]
fn new_errors_on_nested_justification_missing_value() {
let err = Msg::new(
Some(pbconsensus::QbftMsg::default()),
pbconsensus::QbftMsg::default(),
vec![pbconsensus::QbftMsg {
value_hash: [3u8; 32].to_vec().into(),
..Default::default()
Expand All @@ -582,7 +574,7 @@ mod tests {
#[test]
fn value_source_errors_when_value_missing() {
let msg = Msg::new(
Some(pbconsensus::QbftMsg::default()),
pbconsensus::QbftMsg::default(),
vec![],
sync::Arc::default(),
)
Expand All @@ -599,7 +591,7 @@ mod tests {
let values = sync::Arc::new(value_map(vec![(value_hash, any_timestamp(1))]));

let msg = Msg::new(
Some(pbconsensus::QbftMsg::default()),
pbconsensus::QbftMsg::default(),
vec![pbconsensus::QbftMsg {
r#type: 2,
value_hash: value_hash.to_vec().into(),
Expand Down Expand Up @@ -637,12 +629,7 @@ mod tests {
..Default::default()
};

let msg = Msg::new(
Some(raw_msg.clone()),
vec![raw_justification.clone()],
values,
)
.unwrap();
let msg = Msg::new(raw_msg.clone(), vec![raw_justification.clone()], values).unwrap();
let consensus_msg = msg.to_consensus_msg();

assert_eq!(msg.msg(), &raw_msg);
Expand Down
105 changes: 105 additions & 0 deletions crates/core/src/consensus/qbft/sniffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
//! QBFT consensus message sniffer.

// TODO: Remove once the consensus component exports sniffer lifecycle hooks.
#![allow(dead_code)]

use std::{
sync::{Mutex, PoisonError},
time::SystemTime,
};

use prost_types::Timestamp;

use crate::{
consensus::protocols::QBFT_V2_PROTOCOL_ID,
corepb::v1::consensus::{QbftConsensusMsg, SniffedConsensusInstance, SniffedConsensusMsg},
};

/// Buffers consensus messages for the debug API.
#[derive(Debug)]
pub(crate) struct Sniffer {
nodes: i64,
peer_idx: i64,
started_at: SystemTime,
msgs: Mutex<Vec<SniffedConsensusMsg>>,
}

impl Sniffer {
/// Returns a new QBFT consensus sniffer.
pub(crate) fn new(nodes: i64, peer_idx: i64) -> Self {
Self {
nodes,
peer_idx,
started_at: SystemTime::now(),
msgs: Mutex::default(),
}
}

/// Adds a message to the sniffer buffer.
pub(crate) fn add(&self, msg: QbftConsensusMsg) {
self.msgs
.lock()
.unwrap_or_else(PoisonError::into_inner)
.push(SniffedConsensusMsg {
timestamp: Some(Timestamp::from(SystemTime::now())),
msg: Some(msg),
});
}

/// Returns the buffered messages as a sniffed consensus instance.
pub(crate) fn instance(&self) -> SniffedConsensusInstance {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sniffer::instance locks the mutex and clones the entire Vec<SniffedConsensusMsg> (each SniffedConsensusMsg carries a full QbftConsensusMsg with its values: Vec<Any>). Go's Instance (sniffer.go:46-57) aliases the slice (Msgs: c.msgs), which is racier but O(1).

If this is only called at instance end for the debug API, the deep clone is fine. The test helper wait_for_sniffer_len (transport.rs:657-665) busy-polls it, which is acceptable for tests but would be a perf cliff if anything in production polls similarly. Worth pinning the calling convention in the docstring (or, if frequent polling is expected, returning an Arc<[...]> or splitting instance() from len()).

SniffedConsensusInstance {
nodes: self.nodes,
peer_idx: self.peer_idx,
started_at: Some(Timestamp::from(self.started_at)),
msgs: self
.msgs
.lock()
.unwrap_or_else(PoisonError::into_inner)
.clone(),
protocol_id: QBFT_V2_PROTOCOL_ID.to_string(),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::corepb::v1::consensus::QbftMsg;

#[test]
fn sniffer_add_records_messages() {
let sniffer = Sniffer::new(4, 2);
let msg = consensus_msg(7);

sniffer.add(msg.clone());

let instance = sniffer.instance();
assert_eq!(instance.msgs.len(), 1);
assert_eq!(instance.msgs[0].msg, Some(msg));
assert!(instance.msgs[0].timestamp.is_some());
}

#[test]
fn sniffer_instance_maps_fields() {
let sniffer = Sniffer::new(4, 3);

let instance = sniffer.instance();

assert_eq!(instance.nodes, 4);
assert_eq!(instance.peer_idx, 3);
assert!(instance.started_at.is_some());
assert!(instance.msgs.is_empty());
assert_eq!(instance.protocol_id, QBFT_V2_PROTOCOL_ID);
}

fn consensus_msg(round: i64) -> QbftConsensusMsg {
QbftConsensusMsg {
msg: Some(QbftMsg {
round,
..Default::default()
}),
..Default::default()
}
}
}
Loading
Loading