Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
576b191
feat(p2p): add inbound BlocksByRange req/resp support
dicethedev May 7, 2026
cba8385
Update crates/net/p2p/src/req_resp/handlers.rs
dicethedev May 7, 2026
0130146
Merge branch 'main' into feat/blocks-by-range-inbound-support
dicethedev May 8, 2026
815814e
feat(p2p): use BlocksByRange for long-range sync
dicethedev May 8, 2026
9fc8d73
fix(clippy): use is_multiple_of for slot step check
dicethedev May 8, 2026
9028a29
Update crates/net/p2p/src/req_resp/handlers.rs
dicethedev May 8, 2026
71bc472
fix(p2p): cap BlocksByRange sync count to prevent unbounded request loop
dicethedev May 8, 2026
300a1af
fix(p2p): bound canonical_blocks_by_range traversal depth to prevent DoS
dicethedev May 8, 2026
659456e
Merge branch 'main' into feat/blocks-by-range-long-range-sync
dicethedev May 12, 2026
1e97021
Fix BlocksByRange response routing
dicethedev May 12, 2026
41bcebf
fix(p2p): address BlocksByRange review feedback
dicethedev May 15, 2026
f201a38
Merge branch 'main' into feat/blocks-by-range-long-range-sync
MegaRedHand May 19, 2026
50e85ac
fix(p2p): remove legacy step field from BlocksByRangeRequest and upda…
dicethedev May 19, 2026
3cac9a6
Merge branch 'main' into feat/blocks-by-range-long-range-sync
pablodeymo May 22, 2026
f7c9d34
Merge branch 'main' into feat/blocks-by-range-long-range-sync
pablodeymo May 27, 2026
c430e79
refactor(p2p): merge request_id_map and range_request_ids into outbou…
dicethedev May 28, 2026
641dc51
Merge branch 'main' into feat/blocks-by-range-long-range-sync
MegaRedHand Jun 3, 2026
ed82fb0
Remove broken range sync retry
dicethedev Jun 5, 2026
ce17a60
Refactor range sync request deduplication
dicethedev Jun 5, 2026
0915923
Fix Option variant resolution in p2p
dicethedev Jun 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 132 additions & 12 deletions crates/net/p2p/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
collections::{HashMap, HashSet},
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
ops::Range,
time::Duration,
};

Expand Down Expand Up @@ -41,7 +42,7 @@ use crate::{
},
req_resp::{
BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, Codec,
MAX_COMPRESSED_PAYLOAD_SIZE, Request, STATUS_PROTOCOL_V1, build_status,
MAX_COMPRESSED_PAYLOAD_SIZE, MAX_REQUEST_BLOCKS, Request, STATUS_PROTOCOL_V1, build_status,
fetch_block_from_peer,
},
swarm_adapter::SwarmHandle,
Expand All @@ -59,12 +60,81 @@ const MAX_FETCH_RETRIES: u32 = 10;
const INITIAL_BACKOFF_MS: u64 = 5;
const BACKOFF_MULTIPLIER: u64 = 2;
const PEER_REDIAL_INTERVAL_SECS: u64 = 12;
const MAX_SYNC_RANGE: u64 = MAX_REQUEST_BLOCKS * 64; // 65,536 slots (~3 days)

pub(crate) struct PendingRequest {
pub(crate) attempts: u32,
pub(crate) failed_peers: HashSet<PeerId>,
}

pub(crate) enum PendingRequestKind {
Root(H256),
Range { start_slot: u64, end_slot: u64 },
}

pub(crate) struct RangeSyncState {
/// Remaining slots to request, with an exclusive end.
pub(crate) current_range: Range<u64>,
/// Latest advertised head slot for each peer.
pub(crate) peer_set: HashMap<PeerId, u64>,
pub(crate) in_flight: bool,
}

impl RangeSyncState {
pub(crate) fn new(current_range: Range<u64>, peer: PeerId, peer_head: u64) -> Self {
Self {
current_range,
peer_set: HashMap::from([(peer, peer_head)]),
in_flight: false,
}
}

pub(crate) fn merge_peer(&mut self, peer: PeerId, peer_head: u64, end_exclusive: u64) {
self.peer_set.insert(peer, peer_head);
self.current_range.end = self.current_range.end.max(end_exclusive);
self.drop_stale_peers();
}

pub(crate) fn next_batch(&self) -> Option<(PeerId, Range<u64>)> {
if self.in_flight || self.current_range.is_empty() {
return Option::None;
}

let (&peer, &peer_head) = self
.peer_set
.iter()
.filter(|(_, head)| **head >= self.current_range.start)
.max_by_key(|(_, head)| **head)?;
let peer_end = peer_head.saturating_add(1);
let batch_end = self
.current_range
.start
.saturating_add(MAX_REQUEST_BLOCKS)
.min(self.current_range.end)
.min(peer_end);

(batch_end > self.current_range.start)
.then_some((peer, self.current_range.start..batch_end))
}

pub(crate) fn complete_batch(&mut self, end_slot: u64) {
self.in_flight = false;
self.current_range.start = self.current_range.start.max(end_slot.saturating_add(1));
self.drop_stale_peers();
}

pub(crate) fn fail_peer(&mut self, peer: &PeerId) {
self.in_flight = false;
self.peer_set.remove(peer);
self.drop_stale_peers();
}

fn drop_stale_peers(&mut self) {
let start_slot = self.current_range.start;
self.peer_set.retain(|_, head| *head >= start_slot);
}
}

// --- Swarm construction ---

/// [libp2p Behaviour](libp2p::swarm::NetworkBehaviour) combining identify, Gossipsub
Expand Down Expand Up @@ -300,8 +370,9 @@ impl P2P {
block_topic: built.block_topic,
aggregation_topic: built.aggregation_topic,
connected_peers: HashSet::new(),
pending_requests: HashMap::new(),
request_id_map: HashMap::new(),
pending_root_requests: HashMap::new(),
outbound_requests: HashMap::new(),
range_sync_state: None,
bootnode_addrs: built.bootnode_addrs,
node_names,
};
Expand Down Expand Up @@ -336,8 +407,9 @@ pub struct P2PServer {
pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic,

pub(crate) connected_peers: HashSet<PeerId>,
pub(crate) pending_requests: HashMap<H256, PendingRequest>,
pub(crate) request_id_map: HashMap<OutboundRequestId, H256>,
pub(crate) pending_root_requests: HashMap<H256, PendingRequest>,
pub(crate) outbound_requests: HashMap<OutboundRequestId, PendingRequestKind>,
pub(crate) range_sync_state: Option<RangeSyncState>,
bootnode_addrs: HashMap<PeerId, Multiaddr>,
node_names: HashMap<PeerId, String>,
}
Expand Down Expand Up @@ -371,7 +443,7 @@ impl P2PServer {
) {
let root = msg.root;
// Check if still pending (might have succeeded during backoff)
if !self.pending_requests.contains_key(&root) {
if !self.pending_root_requests.contains_key(&root) {
trace!(%root, "Block fetch completed during backoff, skipping retry");
return;
}
Expand All @@ -380,7 +452,7 @@ impl P2PServer {

if !fetch_block_from_peer(self, root).await {
tracing::error!(%root, "Failed to retry block fetch, giving up");
self.pending_requests.remove(&root);
self.pending_root_requests.remove(&root);
}
}

Expand Down Expand Up @@ -436,7 +508,7 @@ impl Handler<FetchBlock> for P2PServer {
async fn handle(&mut self, msg: FetchBlock, _ctx: &Context<Self>) {
let root = msg.root;
// Deduplicate - if already pending, ignore
if self.pending_requests.contains_key(&root) {
if self.pending_root_requests.contains_key(&root) {
trace!(%root, "Block fetch already in progress, ignoring duplicate");
return;
}
Expand Down Expand Up @@ -514,8 +586,8 @@ async fn handle_swarm_event(
} => {
let direction = connection_direction(&endpoint);
let reason = match cause {
None => "remote_close",
Some(err) => {
Option::None => "remote_close",
Option::Some(err) => {
// Categorize disconnection reasons
let err_str = err.to_string().to_lowercase();
if err_str.contains("timeout")
Expand Down Expand Up @@ -702,8 +774,8 @@ fn compute_message_id(message: &libp2p::gossipsub::Message) -> libp2p::gossipsub
let decompressed = gossipsub::decompress_message(&message.data).ok();

let (domain, data) = match decompressed.as_deref() {
Some(data) => (MESSAGE_DOMAIN_VALID_SNAPPY, data),
None => (MESSAGE_DOMAIN_INVALID_SNAPPY, message.data.as_slice()),
Option::Some(data) => (MESSAGE_DOMAIN_VALID_SNAPPY, data),
Option::None => (MESSAGE_DOMAIN_INVALID_SNAPPY, message.data.as_slice()),
};
let topic = message.topic.as_str().as_bytes();
let topic_len = (topic.len() as u64).to_le_bytes();
Expand All @@ -719,6 +791,54 @@ fn compute_message_id(message: &libp2p::gossipsub::Message) -> libp2p::gossipsub
mod tests {
use super::*;

fn random_peer() -> PeerId {
PeerId::from_public_key(&Keypair::generate_ed25519().public())
}

#[test]
fn range_sync_state_merges_new_peer_ranges() {
let first_peer = random_peer();
let second_peer = random_peer();
let mut state = RangeSyncState::new(10..101, first_peer, 100);

state.merge_peer(second_peer, 150, 151);

assert_eq!(state.current_range, 10..151);
assert_eq!(state.peer_set.get(&first_peer), Some(&100));
assert_eq!(state.peer_set.get(&second_peer), Some(&150));
}

#[test]
fn range_sync_state_allows_only_one_batch_in_flight() {
let first_peer = random_peer();
let second_peer = random_peer();
let mut state = RangeSyncState::new(10..3000, first_peer, 500);
state.merge_peer(second_peer, 2000, 3000);

let (selected_peer, batch) = state.next_batch().expect("batch available");
assert_eq!(selected_peer, second_peer);
assert_eq!(batch, 10..(10 + MAX_REQUEST_BLOCKS));

state.in_flight = true;
assert!(state.next_batch().is_none());
}

#[test]
fn range_sync_state_advances_and_drops_stale_peers() {
let stale_peer = random_peer();
let current_peer = random_peer();
let mut state = RangeSyncState::new(10..3000, stale_peer, 100);
state.merge_peer(current_peer, 2999, 3000);
state.in_flight = true;

state.complete_batch(1033);

assert_eq!(state.current_range, 1034..3000);
assert!(!state.in_flight);
assert!(!state.peer_set.contains_key(&stale_peer));
assert_eq!(state.peer_set.get(&current_peer), Some(&2999));
}

#[test]
fn parse_enrs_extracts_ip_port_and_public_key() {
// Values taken from a local devnet run with lean-quickstart
Expand Down
Loading