From 576b191a6b8f68bad42603185cc3ab68f611f383 Mon Sep 17 00:00:00 2001 From: dicethedev Date: Thu, 7 May 2026 01:17:34 +0100 Subject: [PATCH 01/14] feat(p2p): add inbound BlocksByRange req/resp support --- crates/net/p2p/src/lib.rs | 9 +- crates/net/p2p/src/req_resp/codec.rs | 36 ++++- crates/net/p2p/src/req_resp/handlers.rs | 178 +++++++++++++++++++++++- crates/net/p2p/src/req_resp/messages.rs | 13 +- crates/net/p2p/src/req_resp/mod.rs | 3 +- 5 files changed, 226 insertions(+), 13 deletions(-) diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index 58b44472..515bf71e 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -40,8 +40,9 @@ use crate::{ publish_attestation, publish_block, }, req_resp::{ - BLOCKS_BY_ROOT_PROTOCOL_V1, Codec, MAX_COMPRESSED_PAYLOAD_SIZE, Request, - STATUS_PROTOCOL_V1, build_status, fetch_block_from_peer, + BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, Codec, + MAX_COMPRESSED_PAYLOAD_SIZE, Request, STATUS_PROTOCOL_V1, build_status, + fetch_block_from_peer, }, swarm_adapter::SwarmHandle, }; @@ -154,6 +155,10 @@ pub fn build_swarm( StreamProtocol::new(BLOCKS_BY_ROOT_PROTOCOL_V1), request_response::ProtocolSupport::Full, ), + ( + StreamProtocol::new(BLOCKS_BY_RANGE_PROTOCOL_V1), + request_response::ProtocolSupport::Full, + ), ], Default::default(), ); diff --git a/crates/net/p2p/src/req_resp/codec.rs b/crates/net/p2p/src/req_resp/codec.rs index e85f440a..b9aea0ec 100644 --- a/crates/net/p2p/src/req_resp/codec.rs +++ b/crates/net/p2p/src/req_resp/codec.rs @@ -7,8 +7,8 @@ use tracing::{debug, trace, warn}; use super::{ encoding::{MAX_PAYLOAD_SIZE, decode_payload, write_payload}, messages::{ - BLOCKS_BY_ROOT_PROTOCOL_V1, ErrorMessage, Request, Response, ResponseCode, ResponsePayload, - STATUS_PROTOCOL_V1, Status, + BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, ErrorMessage, Request, Response, + ResponseCode, ResponsePayload, STATUS_PROTOCOL_V1, Status, }, }; @@ -45,6 +45,12 @@ impl libp2p::request_response::Codec for Codec { })?; Ok(Request::BlocksByRoot(request)) } + BLOCKS_BY_RANGE_PROTOCOL_V1 => { + let request = SszDecode::from_ssz_bytes(&payload).map_err(|err| { + io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")) + })?; + Ok(Request::BlocksByRange(request)) + } _ => Err(io::Error::new( io::ErrorKind::InvalidData, format!("unknown protocol: {}", protocol.as_ref()), @@ -63,6 +69,7 @@ impl libp2p::request_response::Codec for Codec { match protocol.as_ref() { STATUS_PROTOCOL_V1 => decode_status_response(io).await, BLOCKS_BY_ROOT_PROTOCOL_V1 => decode_blocks_by_root_response(io).await, + BLOCKS_BY_RANGE_PROTOCOL_V1 => decode_blocks_by_range_response(io).await, _ => Err(io::Error::new( io::ErrorKind::InvalidData, format!("unknown protocol: {}", protocol.as_ref()), @@ -84,6 +91,7 @@ impl libp2p::request_response::Codec for Codec { let encoded = match req { Request::Status(status) => status.to_ssz(), Request::BlocksByRoot(request) => request.to_ssz(), + Request::BlocksByRange(request) => request.to_ssz(), }; write_payload(io, &encoded).await @@ -107,7 +115,8 @@ impl libp2p::request_response::Codec for Codec { let encoded = status.to_ssz(); write_payload(io, &encoded).await } - ResponsePayload::BlocksByRoot(blocks) => { + ResponsePayload::BlocksByRoot(blocks) + | ResponsePayload::BlocksByRange(blocks) => { // Write each block as a separate chunk. // Encode first, then check size before writing the SUCCESS // code byte. This avoids corrupting the stream if a block @@ -118,7 +127,7 @@ impl libp2p::request_response::Codec for Codec { if encoded.len() > MAX_PAYLOAD_SIZE - 1024 { warn!( size = encoded.len(), - "Skipping oversized block in BlocksByRoot response" + "Skipping oversized block in block response" ); continue; } @@ -216,6 +225,23 @@ where /// Note: Error chunks from the peer (non-SUCCESS response codes) do not cause this /// function to return `Err` - they are logged and skipped. async fn decode_blocks_by_root_response(io: &mut T) -> io::Result +where + T: AsyncRead + Unpin + Send, +{ + decode_blocks_response(io, ResponsePayload::BlocksByRoot).await +} + +async fn decode_blocks_by_range_response(io: &mut T) -> io::Result +where + T: AsyncRead + Unpin + Send, +{ + decode_blocks_response(io, ResponsePayload::BlocksByRange).await +} + +async fn decode_blocks_response( + io: &mut T, + payload: fn(Vec) -> ResponsePayload, +) -> io::Result where T: AsyncRead + Unpin + Send, { @@ -247,5 +273,5 @@ where blocks.push(block); } - Ok(Response::success(ResponsePayload::BlocksByRoot(blocks))) + Ok(Response::success(payload(blocks))) } diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index 21bbbd10..ae1390dd 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -1,4 +1,4 @@ -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use ethlambda_storage::Store; use libp2p::{PeerId, request_response}; @@ -12,11 +12,13 @@ use ethlambda_types::primitives::HashTreeRoot as _; use ethlambda_types::{block::SignedBlock, primitives::H256}; use super::{ - BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, Request, Response, ResponsePayload, Status, + BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRangeRequest, BlocksByRootRequest, MAX_REQUEST_BLOCKS, + Request, Response, ResponsePayload, Status, messages::error_message, }; use crate::{ BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest, - p2p_protocol, req_resp::RequestedBlockRoots, + p2p_protocol, + req_resp::{RequestedBlockRoots, messages::ResponseCode}, }; pub async fn handle_req_resp_message( @@ -42,6 +44,13 @@ pub async fn handle_req_resp_message( ); handle_blocks_by_root_request(server, request, channel, peer).await; } + Request::BlocksByRange(request) => { + info!( + kind = "blocks_by_range_request", + peer_count, "P2P message received" + ); + handle_blocks_by_range_request(server, request, channel, peer).await; + } } } request_response::Message::Response { @@ -63,6 +72,14 @@ pub async fn handle_req_resp_message( handle_blocks_by_root_response(server, blocks, peer, request_id, ctx) .await; } + ResponsePayload::BlocksByRange(blocks) => { + info!( + kind = "blocks_by_range_response", + peer_count, + count = blocks.len(), + "P2P message received" + ); + } }, Response::Error { code, message } => { let error_str = String::from_utf8_lossy(&message); @@ -140,6 +157,97 @@ async fn handle_blocks_by_root_request( server.swarm_handle.send_response(channel, response); } +async fn handle_blocks_by_range_request( + server: &mut P2PServer, + request: BlocksByRangeRequest, + channel: request_response::ResponseChannel, + peer: PeerId, +) { + info!( + %peer, + start_slot = request.start_slot, + count = request.count, + step = request.step, + "Received BlocksByRange request" + ); + + if request.step == 0 || request.count > MAX_REQUEST_BLOCKS { + let response = Response::error( + ResponseCode::INVALID_REQUEST, + error_message("invalid BlocksByRange request"), + ); + server.swarm_handle.send_response(channel, response); + return; + } + + let blocks = canonical_blocks_by_range( + &server.store, + request.start_slot, + request.count, + request.step, + ); + + info!( + %peer, + start_slot = request.start_slot, + count = request.count, + step = request.step, + found = blocks.len(), + "Responding to BlocksByRange request" + ); + + let response = Response::success(ResponsePayload::BlocksByRange(blocks)); + server.swarm_handle.send_response(channel, response); +} + +fn canonical_blocks_by_range( + store: &Store, + start_slot: u64, + count: u64, + step: u64, +) -> Vec { + if count == 0 { + return Vec::new(); + } + + let Some(last_offset) = count + .checked_sub(1) + .and_then(|value| value.checked_mul(step)) + else { + return Vec::new(); + }; + let Some(end_slot) = start_slot.checked_add(last_offset) else { + return Vec::new(); + }; + + let mut roots_by_slot = HashMap::new(); + let mut current_root = store.head(); + + while !current_root.is_zero() { + let Some(header) = store.get_block_header(¤t_root) else { + break; + }; + + if header.slot < start_slot { + break; + } + + if header.slot <= end_slot && (header.slot - start_slot) % step == 0 { + roots_by_slot.insert(header.slot, current_root); + } + + current_root = header.parent_root; + } + + (0..count) + .filter_map(|index| { + let slot = start_slot.checked_add(index.checked_mul(step)?)?; + let root = roots_by_slot.get(&slot)?; + store.get_signed_block(root) + }) + .collect() +} + async fn handle_blocks_by_root_response( server: &mut P2PServer, blocks: Vec, @@ -313,3 +421,67 @@ async fn handle_fetch_failure( send_after(backoff, ctx.clone(), p2p_protocol::RetryBlockFetch { root }); } + +#[cfg(test)] +mod tests { + use super::*; + use ethlambda_storage::{ForkCheckpoints, backend::InMemoryBackend}; + use ethlambda_types::{ + attestation::XmssSignature, + block::{Block, BlockBody, BlockSignatures}, + signature::SIGNATURE_SIZE, + state::State, + }; + use libssz_types::SszList; + use std::sync::Arc; + + fn signed_block(slot: u64, parent_root: H256) -> SignedBlock { + SignedBlock { + message: Block { + slot, + proposer_index: 0, + parent_root, + state_root: H256::ZERO, + body: BlockBody::default(), + }, + signature: BlockSignatures { + attestation_signatures: SszList::new(), + proposer_signature: XmssSignature::try_from(vec![0u8; SIGNATURE_SIZE]).unwrap(), + }, + } + } + + #[test] + fn blocks_by_range_returns_canonical_blocks_in_requested_order() { + let backend = Arc::new(InMemoryBackend::new()); + let mut store = Store::from_anchor_state(backend, State::from_genesis(0, vec![])); + + let block_1 = signed_block(1, store.head()); + let root_1 = block_1.message.hash_tree_root(); + store.insert_signed_block(root_1, block_1); + + let block_2 = signed_block(2, root_1); + let root_2 = block_2.message.hash_tree_root(); + store.insert_signed_block(root_2, block_2); + + let side_block_3 = signed_block(3, root_1); + let side_root_3 = side_block_3.message.hash_tree_root(); + store.insert_signed_block(side_root_3, side_block_3); + + let block_4 = signed_block(4, root_2); + let root_4 = block_4.message.hash_tree_root(); + store.insert_signed_block(root_4, block_4); + store.update_checkpoints(ForkCheckpoints::head_only(root_4)); + + let blocks = canonical_blocks_by_range(&store, 1, 4, 1); + let slots: Vec<_> = blocks.iter().map(|block| block.message.slot).collect(); + let roots: Vec<_> = blocks + .iter() + .map(|block| block.message.hash_tree_root()) + .collect(); + + assert_eq!(slots, vec![1, 2, 4]); + assert_eq!(roots, vec![root_1, root_2, root_4]); + assert!(!roots.contains(&side_root_3)); + } +} diff --git a/crates/net/p2p/src/req_resp/messages.rs b/crates/net/p2p/src/req_resp/messages.rs index d90b6c91..a1737e4b 100644 --- a/crates/net/p2p/src/req_resp/messages.rs +++ b/crates/net/p2p/src/req_resp/messages.rs @@ -4,11 +4,14 @@ use libssz_types::SszList; pub const STATUS_PROTOCOL_V1: &str = "/leanconsensus/req/status/1/ssz_snappy"; pub const BLOCKS_BY_ROOT_PROTOCOL_V1: &str = "/leanconsensus/req/blocks_by_root/1/ssz_snappy"; +pub const BLOCKS_BY_RANGE_PROTOCOL_V1: &str = "/leanconsensus/req/blocks_by_range/1/ssz_snappy"; +pub const MAX_REQUEST_BLOCKS: u64 = 1024; // Maximum number of blocks in a single request (1024). #[derive(Debug, Clone)] pub enum Request { Status(Status), BlocksByRoot(BlocksByRootRequest), + BlocksByRange(BlocksByRangeRequest), } #[derive(Debug, Clone)] @@ -89,6 +92,7 @@ impl std::fmt::Debug for ResponseCode { pub enum ResponsePayload { Status(Status), BlocksByRoot(Vec), + BlocksByRange(Vec), } #[derive(Debug, Clone, SszEncode, SszDecode)] @@ -106,8 +110,6 @@ pub type ErrorMessage = SszList; /// Helper to create an ErrorMessage from a string. /// Debug builds panic if message exceeds 256 bytes (programming error). /// Release builds truncate to 256 bytes. -#[expect(dead_code)] -// TODO: map errors to req/resp error messages pub fn error_message(msg: impl AsRef) -> ErrorMessage { let bytes = msg.as_ref().as_bytes(); debug_assert!( @@ -130,3 +132,10 @@ pub fn error_message(msg: impl AsRef) -> ErrorMessage { pub struct BlocksByRootRequest { pub roots: RequestedBlockRoots, } + +#[derive(Debug, Clone, SszEncode, SszDecode)] +pub struct BlocksByRangeRequest { + pub start_slot: u64, + pub count: u64, + pub step: u64, +} diff --git a/crates/net/p2p/src/req_resp/mod.rs b/crates/net/p2p/src/req_resp/mod.rs index c550ba37..11acb79f 100644 --- a/crates/net/p2p/src/req_resp/mod.rs +++ b/crates/net/p2p/src/req_resp/mod.rs @@ -7,6 +7,7 @@ pub use codec::Codec; pub use encoding::{MAX_COMPRESSED_PAYLOAD_SIZE, MAX_PAYLOAD_SIZE}; pub use handlers::{build_status, fetch_block_from_peer, handle_req_resp_message}; pub use messages::{ - BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, Request, RequestedBlockRoots, Response, + BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRangeRequest, + BlocksByRootRequest, MAX_REQUEST_BLOCKS, Request, RequestedBlockRoots, Response, ResponsePayload, STATUS_PROTOCOL_V1, Status, }; From cba8385a23b9e99c108f8ce18ddb5358e88a846e Mon Sep 17 00:00:00 2001 From: Blessing Samuel Date: Thu, 7 May 2026 01:36:59 +0100 Subject: [PATCH 02/14] Update crates/net/p2p/src/req_resp/handlers.rs Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> --- crates/net/p2p/src/req_resp/handlers.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index ae1390dd..bc4ac17c 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -171,7 +171,7 @@ async fn handle_blocks_by_range_request( "Received BlocksByRange request" ); - if request.step == 0 || request.count > MAX_REQUEST_BLOCKS { + if request.step == 0 || request.count == 0 || request.count > MAX_REQUEST_BLOCKS { let response = Response::error( ResponseCode::INVALID_REQUEST, error_message("invalid BlocksByRange request"), From 815814e2beb2f6ef11fadbacb442c574396368f6 Mon Sep 17 00:00:00 2001 From: dicethedev Date: Fri, 8 May 2026 05:55:55 +0100 Subject: [PATCH 03/14] feat(p2p): use BlocksByRange for long-range sync --- crates/net/p2p/src/lib.rs | 1 + crates/net/p2p/src/req_resp/codec.rs | 15 ++-- crates/net/p2p/src/req_resp/handlers.rs | 113 ++++++++++++++++++++++-- 3 files changed, 117 insertions(+), 12 deletions(-) diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index 515bf71e..17bab4da 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -59,6 +59,7 @@ const MAX_FETCH_RETRIES: u32 = 10; const INITIAL_BACKOFF_MS: u64 = 5; const BACKOFF_MULTIPLIER: u64 = 2; const PEER_REDIAL_INTERVAL_SECS: u64 = 12; +pub const LONG_RANGE_SYNC_THRESHOLD: u64 = 2; pub(crate) struct PendingRequest { pub(crate) attempts: u32, diff --git a/crates/net/p2p/src/req_resp/codec.rs b/crates/net/p2p/src/req_resp/codec.rs index 1672c80b..0e9d3f45 100644 --- a/crates/net/p2p/src/req_resp/codec.rs +++ b/crates/net/p2p/src/req_resp/codec.rs @@ -21,6 +21,7 @@ fn protocol_label(protocol: &str) -> &'static str { match protocol { STATUS_PROTOCOL_V1 => "status", BLOCKS_BY_ROOT_PROTOCOL_V1 => "blocks_by_root", + BLOCKS_BY_RANGE_PROTOCOL_V1 => "blocks_by_range", _ => "unknown", } } @@ -82,11 +83,9 @@ impl libp2p::request_response::Codec for Codec { { let label = protocol_label(protocol.as_ref()); match protocol.as_ref() { - STATUS_PROTOCOL_V1 => decode_status_response(io).await, - BLOCKS_BY_ROOT_PROTOCOL_V1 => decode_blocks_by_root_response(io).await, - BLOCKS_BY_RANGE_PROTOCOL_V1 => decode_blocks_by_range_response(io).await, STATUS_PROTOCOL_V1 => decode_status_response(io, label).await, BLOCKS_BY_ROOT_PROTOCOL_V1 => decode_blocks_by_root_response(io, label).await, + BLOCKS_BY_RANGE_PROTOCOL_V1 => decode_blocks_by_range_response(io, label).await, _ => Err(io::Error::new( io::ErrorKind::InvalidData, format!("unknown protocol: {}", protocol.as_ref()), @@ -268,18 +267,22 @@ async fn decode_blocks_by_root_response(io: &mut T, protocol_label: &str) -> where T: AsyncRead + Unpin + Send, { - decode_blocks_response(io, ResponsePayload::BlocksByRoot).await + decode_blocks_response(io, protocol_label, ResponsePayload::BlocksByRoot).await } -async fn decode_blocks_by_range_response(io: &mut T) -> io::Result +async fn decode_blocks_by_range_response( + io: &mut T, + protocol_label: &str, +) -> io::Result where T: AsyncRead + Unpin + Send, { - decode_blocks_response(io, ResponsePayload::BlocksByRange).await + decode_blocks_response(io, protocol_label, ResponsePayload::BlocksByRange).await } async fn decode_blocks_response( io: &mut T, + protocol_label: &str, payload: fn(Vec) -> ResponsePayload, ) -> io::Result where diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index bc4ac17c..93be6959 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -11,14 +11,15 @@ use ethlambda_types::checkpoint::Checkpoint; use ethlambda_types::primitives::HashTreeRoot as _; use ethlambda_types::{block::SignedBlock, primitives::H256}; +use super::messages::{ResponseCode, error_message}; use super::{ - BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRangeRequest, BlocksByRootRequest, MAX_REQUEST_BLOCKS, - Request, Response, ResponsePayload, Status, messages::error_message, + BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRangeRequest, + BlocksByRootRequest, MAX_REQUEST_BLOCKS, Request, Response, ResponsePayload, Status, }; +use crate::LONG_RANGE_SYNC_THRESHOLD; use crate::{ BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest, - p2p_protocol, - req_resp::{RequestedBlockRoots, messages::ResponseCode}, + p2p_protocol, req_resp::RequestedBlockRoots, }; pub async fn handle_req_resp_message( @@ -62,7 +63,7 @@ pub async fn handle_req_resp_message( Response::Success { payload } => match payload { ResponsePayload::Status(status) => { info!(kind = "status_response", peer_count, "P2P message received"); - handle_status_response(status, peer).await; + handle_status_response(server, status, peer).await; } ResponsePayload::BlocksByRoot(blocks) => { info!( @@ -79,6 +80,7 @@ pub async fn handle_req_resp_message( count = blocks.len(), "P2P message received" ); + handle_blocks_by_range_response(server, blocks, peer).await; } }, Response::Error { code, message } => { @@ -129,8 +131,24 @@ async fn handle_status_request( server.swarm_handle.send_response(channel, response); } -async fn handle_status_response(status: Status, peer: PeerId) { +async fn handle_status_response(server: &mut P2PServer, status: Status, peer: PeerId) { info!(finalized_slot=%status.finalized.slot, head_slot=%status.head.slot, "Received status response from peer {peer}"); + + let our_head_slot = server.store.head_slot(); + if status.head.slot <= our_head_slot { + return; + } + let gap = status.head.slot - our_head_slot; + + if gap > LONG_RANGE_SYNC_THRESHOLD { + // Long-range sync: request blocks by range to efficiently fill large gap. + let start_slot = our_head_slot.saturating_add(1); + info!(%peer, start_slot, gap, "Long-range sync: using BlocksByRange"); + request_blocks_by_range_from_peer(server, peer, start_slot, gap).await; + } else { + // Short-range sync: fetch individual blocks by root, relying on gossip to fill any small gaps. + info!(%peer, gap, "Short gap, relying on gossip / FetchBlock for missing slots"); + } } async fn handle_blocks_by_root_request( @@ -295,6 +313,35 @@ async fn handle_blocks_by_root_response( } } +async fn handle_blocks_by_range_response( + server: &mut P2PServer, + blocks: Vec, + peer: PeerId, +) { + info!(%peer, count = blocks.len(), "Received BlocksByRange response"); + + if blocks.is_empty() { + warn!(%peer, "Received empty BlocksByRange response"); + return; + } + + if let Some(ref blockchain) = server.blockchain { + for block in blocks { + let block_root = block.message.hash_tree_root(); + let slot = block.message.slot; + let _ = blockchain.new_block(block).inspect_err(|err| { + error!( + %peer, + %slot, + block_root = %ethlambda_types::ShortRoot(&block_root.0), + %err, + "Failed to forward range-fetched block to blockchain" + ) + }); + } + } +} + /// Build a Status message from the current Store state. pub fn build_status(store: &Store) -> Status { let finalized = store.latest_finalized(); @@ -393,6 +440,60 @@ pub async fn fetch_block_from_peer(server: &mut P2PServer, root: H256) -> bool { true } +async fn request_blocks_by_range_from_peer( + server: &mut P2PServer, + peer: PeerId, + start_slot: u64, + count: u64, +) -> bool { + if count == 0 { + return true; + } + + let mut remaining = count; + let mut next_slot = start_slot; + + while remaining > 0 { + let batch_count = remaining.min(MAX_REQUEST_BLOCKS); + let request = BlocksByRangeRequest { + start_slot: next_slot, + count: batch_count, + step: 1, + }; + + info!( + %peer, + start_slot = next_slot, + count = batch_count, + "Sending BlocksByRange request" + ); + + if server + .swarm_handle + .send_request( + peer, + Request::BlocksByRange(request), + libp2p::StreamProtocol::new(BLOCKS_BY_RANGE_PROTOCOL_V1), + ) + .await + .is_none() + { + warn!( + %peer, + start_slot = next_slot, + count = batch_count, + "Failed to send BlocksByRange request (swarm adapter closed)" + ); + return false; + } + + remaining -= batch_count; + next_slot = next_slot.saturating_add(batch_count); + } + + true +} + async fn handle_fetch_failure( server: &mut P2PServer, root: H256, From 9fc8d73a2364e9b6f03d0993b01e8afc3188b0cb Mon Sep 17 00:00:00 2001 From: dicethedev Date: Fri, 8 May 2026 06:05:20 +0100 Subject: [PATCH 04/14] fix(clippy): use is_multiple_of for slot step check --- crates/net/p2p/src/req_resp/handlers.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index 93be6959..b0e933da 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -250,7 +250,7 @@ fn canonical_blocks_by_range( break; } - if header.slot <= end_slot && (header.slot - start_slot) % step == 0 { + if header.slot <= end_slot && (header.slot - start_slot).is_multiple_of(step) { roots_by_slot.insert(header.slot, current_root); } From 9028a2987f3115b812a3818fbfb31325512dbc29 Mon Sep 17 00:00:00 2001 From: Blessing Samuel Date: Fri, 8 May 2026 06:10:34 +0100 Subject: [PATCH 05/14] Update crates/net/p2p/src/req_resp/handlers.rs Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> --- crates/net/p2p/src/req_resp/handlers.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index b0e933da..78e39a8e 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -329,6 +329,7 @@ async fn handle_blocks_by_range_response( for block in blocks { let block_root = block.message.hash_tree_root(); let slot = block.message.slot; + // TODO: validate block.message.slot is within the originally requested range. let _ = blockchain.new_block(block).inspect_err(|err| { error!( %peer, From 71bc4727a79d44af9e02e69c96c413f728fb1d7b Mon Sep 17 00:00:00 2001 From: dicethedev Date: Fri, 8 May 2026 06:19:34 +0100 Subject: [PATCH 06/14] fix(p2p): cap BlocksByRange sync count to prevent unbounded request loop --- crates/net/p2p/src/lib.rs | 3 ++- crates/net/p2p/src/req_resp/handlers.rs | 9 +++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index 17bab4da..2c096e5e 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -41,7 +41,7 @@ use crate::{ }, req_resp::{ BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, Codec, - MAX_COMPRESSED_PAYLOAD_SIZE, Request, STATUS_PROTOCOL_V1, build_status, + MAX_COMPRESSED_PAYLOAD_SIZE, MAX_REQUEST_BLOCKS, Request, STATUS_PROTOCOL_V1, build_status, fetch_block_from_peer, }, swarm_adapter::SwarmHandle, @@ -60,6 +60,7 @@ const INITIAL_BACKOFF_MS: u64 = 5; const BACKOFF_MULTIPLIER: u64 = 2; const PEER_REDIAL_INTERVAL_SECS: u64 = 12; pub const LONG_RANGE_SYNC_THRESHOLD: u64 = 2; +const MAX_SYNC_RANGE: u64 = MAX_REQUEST_BLOCKS * 64; // 65,536 slots (~3 days) pub(crate) struct PendingRequest { pub(crate) attempts: u32, diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index 78e39a8e..194c7009 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -16,10 +16,9 @@ use super::{ BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRangeRequest, BlocksByRootRequest, MAX_REQUEST_BLOCKS, Request, Response, ResponsePayload, Status, }; -use crate::LONG_RANGE_SYNC_THRESHOLD; use crate::{ - BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest, - p2p_protocol, req_resp::RequestedBlockRoots, + BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, LONG_RANGE_SYNC_THRESHOLD, MAX_FETCH_RETRIES, + MAX_SYNC_RANGE, P2PServer, PendingRequest, p2p_protocol, req_resp::RequestedBlockRoots, }; pub async fn handle_req_resp_message( @@ -143,8 +142,10 @@ async fn handle_status_response(server: &mut P2PServer, status: Status, peer: Pe if gap > LONG_RANGE_SYNC_THRESHOLD { // Long-range sync: request blocks by range to efficiently fill large gap. let start_slot = our_head_slot.saturating_add(1); + // Cap the range to avoid requesting an excessive number of blocks if the peer is very far ahead. + let count = gap.min(MAX_SYNC_RANGE); info!(%peer, start_slot, gap, "Long-range sync: using BlocksByRange"); - request_blocks_by_range_from_peer(server, peer, start_slot, gap).await; + request_blocks_by_range_from_peer(server, peer, start_slot, count).await; } else { // Short-range sync: fetch individual blocks by root, relying on gossip to fill any small gaps. info!(%peer, gap, "Short gap, relying on gossip / FetchBlock for missing slots"); From 300a1afa709433d63d9e9be539f13b0980b99531 Mon Sep 17 00:00:00 2001 From: dicethedev Date: Fri, 8 May 2026 06:24:48 +0100 Subject: [PATCH 07/14] fix(p2p): bound canonical_blocks_by_range traversal depth to prevent DoS --- crates/net/p2p/src/lib.rs | 3 ++- crates/net/p2p/src/req_resp/handlers.rs | 9 ++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index 2c096e5e..89431a85 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -59,8 +59,9 @@ const MAX_FETCH_RETRIES: u32 = 10; const INITIAL_BACKOFF_MS: u64 = 5; const BACKOFF_MULTIPLIER: u64 = 2; const PEER_REDIAL_INTERVAL_SECS: u64 = 12; -pub const LONG_RANGE_SYNC_THRESHOLD: u64 = 2; +const LONG_RANGE_SYNC_THRESHOLD: u64 = 2; const MAX_SYNC_RANGE: u64 = MAX_REQUEST_BLOCKS * 64; // 65,536 slots (~3 days) +const MAX_SLOT_LOOKBACK: u64 = MAX_REQUEST_BLOCKS * 4; // 4 096 slots pub(crate) struct PendingRequest { pub(crate) attempts: u32, diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index 194c7009..51ee431f 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -18,7 +18,8 @@ use super::{ }; use crate::{ BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, LONG_RANGE_SYNC_THRESHOLD, MAX_FETCH_RETRIES, - MAX_SYNC_RANGE, P2PServer, PendingRequest, p2p_protocol, req_resp::RequestedBlockRoots, + MAX_SLOT_LOOKBACK, MAX_SYNC_RANGE, P2PServer, PendingRequest, p2p_protocol, + req_resp::RequestedBlockRoots, }; pub async fn handle_req_resp_message( @@ -239,6 +240,12 @@ fn canonical_blocks_by_range( return Vec::new(); }; + // Avoid expensive lookups if the requested range is too far in the past (beyond recent gossip history). + let head_slot = store.head_slot(); + if head_slot.saturating_sub(end_slot) > MAX_SLOT_LOOKBACK { + return Vec::new(); + } + let mut roots_by_slot = HashMap::new(); let mut current_root = store.head(); From 1e9702147b090f5d2585dd9f806a59fdbe19796f Mon Sep 17 00:00:00 2001 From: dicethedev Date: Tue, 12 May 2026 22:22:31 +0100 Subject: [PATCH 08/14] Fix BlocksByRange response routing --- crates/net/p2p/src/lib.rs | 4 ++- crates/net/p2p/src/req_resp/codec.rs | 21 --------------- crates/net/p2p/src/req_resp/handlers.rs | 35 ++++++++++++------------- 3 files changed, 20 insertions(+), 40 deletions(-) diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index 2bfe8e4c..bcc74525 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -41,7 +41,7 @@ use crate::{ }, req_resp::{ BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, Codec, - MAX_COMPRESSED_PAYLOAD_SIZE, Request, STATUS_PROTOCOL_V1, build_status, + MAX_COMPRESSED_PAYLOAD_SIZE, MAX_REQUEST_BLOCKS, Request, STATUS_PROTOCOL_V1, build_status, fetch_block_from_peer, }, swarm_adapter::SwarmHandle, @@ -305,6 +305,7 @@ impl P2P { connected_peers: HashSet::new(), pending_requests: HashMap::new(), request_id_map: HashMap::new(), + range_request_ids: HashSet::new(), bootnode_addrs: built.bootnode_addrs, node_names, }; @@ -341,6 +342,7 @@ pub struct P2PServer { pub(crate) connected_peers: HashSet, pub(crate) pending_requests: HashMap, pub(crate) request_id_map: HashMap, + pub(crate) range_request_ids: HashSet, bootnode_addrs: HashMap, node_names: HashMap, } diff --git a/crates/net/p2p/src/req_resp/codec.rs b/crates/net/p2p/src/req_resp/codec.rs index 3b62e5b3..7805cf6d 100644 --- a/crates/net/p2p/src/req_resp/codec.rs +++ b/crates/net/p2p/src/req_resp/codec.rs @@ -264,27 +264,6 @@ where /// Note: Error chunks from the peer (non-SUCCESS response codes) do not cause this /// function to return `Err` - they are logged and skipped. async fn decode_blocks_response(io: &mut T, protocol_label: &str) -> io::Result -where - T: AsyncRead + Unpin + Send, -{ - decode_blocks_response(io, protocol_label, ResponsePayload::BlocksByRoot).await -} - -async fn decode_blocks_by_range_response( - io: &mut T, - protocol_label: &str, -) -> io::Result -where - T: AsyncRead + Unpin + Send, -{ - decode_blocks_response(io, protocol_label, ResponsePayload::BlocksByRange).await -} - -async fn decode_blocks_response( - io: &mut T, - protocol_label: &str, - payload: fn(Vec) -> ResponsePayload, -) -> io::Result where T: AsyncRead + Unpin + Send, { diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index 614929c0..6368fd03 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -11,10 +11,9 @@ use ethlambda_types::checkpoint::Checkpoint; use ethlambda_types::primitives::HashTreeRoot as _; use ethlambda_types::{block::SignedBlock, primitives::H256}; -use super::messages::{ResponseCode, error_message}; use super::{ - BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRangeRequest, BlocksByRootRequest, MAX_REQUEST_BLOCKS, - Request, Response, ResponsePayload, Status, + BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRangeRequest, + BlocksByRootRequest, MAX_REQUEST_BLOCKS, Request, Response, ResponsePayload, Status, messages::{ResponseCode, error_message}, }; use crate::{ @@ -68,17 +67,14 @@ pub async fn handle_req_resp_message( } ResponsePayload::Blocks(blocks) => { info!(kind = "blocks_response", peer_count, "P2P message received"); - handle_blocks_by_root_response(server, blocks, peer, request_id, ctx) + if server.range_request_ids.remove(&request_id) { + handle_blocks_by_range_response(server, blocks, peer).await; + } else { + handle_blocks_by_root_response( + server, blocks, peer, request_id, ctx, + ) .await; - } - ResponsePayload::BlocksByRange(blocks) => { - info!( - kind = "blocks_by_range_response", - peer_count, - count = blocks.len(), - "P2P message received" - ); - handle_blocks_by_range_response(server, blocks, peer).await; + } } }, Response::Error { code, message } => { @@ -99,6 +95,8 @@ pub async fn handle_req_resp_message( // Check if this was a block fetch request if let Some(root) = server.request_id_map.remove(&request_id) { handle_fetch_failure(server, root, peer, ctx).await; + } else if server.range_request_ids.remove(&request_id) { + warn!(%peer, ?request_id, "BlocksByRange request failed"); } } request_response::Event::InboundFailure { @@ -235,7 +233,7 @@ fn canonical_blocks_by_range( else { return Vec::new(); }; - + // Avoid expensive lookups if the requested range is too far in the past (beyond recent gossip history). let head_slot = store.head_slot(); if head_slot.saturating_sub(end_slot) > MAX_SLOT_LOOKBACK { @@ -473,7 +471,7 @@ async fn request_blocks_by_range_from_peer( "Sending BlocksByRange request" ); - if server + let Some(request_id) = server .swarm_handle .send_request( peer, @@ -481,8 +479,7 @@ async fn request_blocks_by_range_from_peer( libp2p::StreamProtocol::new(BLOCKS_BY_RANGE_PROTOCOL_V1), ) .await - .is_none() - { + else { warn!( %peer, start_slot = next_slot, @@ -490,7 +487,9 @@ async fn request_blocks_by_range_from_peer( "Failed to send BlocksByRange request (swarm adapter closed)" ); return false; - } + }; + + server.range_request_ids.insert(request_id); remaining -= batch_count; next_slot = next_slot.saturating_add(batch_count); From 41bcebf7f4620a300326e2d6a9fdc9cfe87fb142 Mon Sep 17 00:00:00 2001 From: dicethedev Date: Fri, 15 May 2026 04:19:19 +0100 Subject: [PATCH 09/14] fix(p2p): address BlocksByRange review feedback - Remove redundant MAX_SLOT_LOOKBACK guard - Always trigger long-range sync on status response - Deduplicate range requests across peers --- crates/net/p2p/src/lib.rs | 4 +-- crates/net/p2p/src/req_resp/handlers.rs | 35 ++++++++++--------------- 2 files changed, 16 insertions(+), 23 deletions(-) diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index bcc74525..d20d19a6 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -59,9 +59,7 @@ const MAX_FETCH_RETRIES: u32 = 10; const INITIAL_BACKOFF_MS: u64 = 5; const BACKOFF_MULTIPLIER: u64 = 2; const PEER_REDIAL_INTERVAL_SECS: u64 = 12; -const LONG_RANGE_SYNC_THRESHOLD: u64 = 2; const MAX_SYNC_RANGE: u64 = MAX_REQUEST_BLOCKS * 64; // 65,536 slots (~3 days) -const MAX_SLOT_LOOKBACK: u64 = MAX_REQUEST_BLOCKS * 4; // 4 096 slots pub(crate) struct PendingRequest { pub(crate) attempts: u32, @@ -306,6 +304,7 @@ impl P2P { pending_requests: HashMap::new(), request_id_map: HashMap::new(), range_request_ids: HashSet::new(), + pending_sync_ranges: HashSet::new(), bootnode_addrs: built.bootnode_addrs, node_names, }; @@ -343,6 +342,7 @@ pub struct P2PServer { pub(crate) pending_requests: HashMap, pub(crate) request_id_map: HashMap, pub(crate) range_request_ids: HashSet, + pub(crate) pending_sync_ranges: HashSet<(u64, u64)>, bootnode_addrs: HashMap, node_names: HashMap, } diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index 6368fd03..1c6d1911 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -17,9 +17,8 @@ use super::{ messages::{ResponseCode, error_message}, }; use crate::{ - BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, LONG_RANGE_SYNC_THRESHOLD, MAX_FETCH_RETRIES, - MAX_SLOT_LOOKBACK, MAX_SYNC_RANGE, P2PServer, PendingRequest, p2p_protocol, - req_resp::RequestedBlockRoots, + BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, MAX_SYNC_RANGE, P2PServer, + PendingRequest, p2p_protocol, req_resp::RequestedBlockRoots, }; pub async fn handle_req_resp_message( @@ -135,18 +134,10 @@ async fn handle_status_response(server: &mut P2PServer, status: Status, peer: Pe return; } let gap = status.head.slot - our_head_slot; - - if gap > LONG_RANGE_SYNC_THRESHOLD { - // Long-range sync: request blocks by range to efficiently fill large gap. - let start_slot = our_head_slot.saturating_add(1); - // Cap the range to avoid requesting an excessive number of blocks if the peer is very far ahead. - let count = gap.min(MAX_SYNC_RANGE); - info!(%peer, start_slot, gap, "Long-range sync: using BlocksByRange"); - request_blocks_by_range_from_peer(server, peer, start_slot, count).await; - } else { - // Short-range sync: fetch individual blocks by root, relying on gossip to fill any small gaps. - info!(%peer, gap, "Short gap, relying on gossip / FetchBlock for missing slots"); - } + let count = gap.min(MAX_SYNC_RANGE); + let start_slot = our_head_slot.saturating_add(1); + request_blocks_by_range_from_peer(server, peer, start_slot, count).await; + info!(%peer, start_slot, gap, "Long-range sync: using BlocksByRange"); } async fn handle_blocks_by_root_request( @@ -234,12 +225,6 @@ fn canonical_blocks_by_range( return Vec::new(); }; - // Avoid expensive lookups if the requested range is too far in the past (beyond recent gossip history). - let head_slot = store.head_slot(); - if head_slot.saturating_sub(end_slot) > MAX_SLOT_LOOKBACK { - return Vec::new(); - } - let mut roots_by_slot = HashMap::new(); let mut current_root = store.head(); @@ -452,6 +437,14 @@ async fn request_blocks_by_range_from_peer( if count == 0 { return true; } + let end_slot = start_slot.saturating_add(count).saturating_sub(1); + + // Deduplicate: skip if we already have this range in-flight + if server.pending_sync_ranges.contains(&(start_slot, end_slot)) { + info!(%peer, start_slot, end_slot, "BlocksByRange already in-flight, skipping duplicate"); + return true; + } + server.pending_sync_ranges.insert((start_slot, end_slot)); let mut remaining = count; let mut next_slot = start_slot; From 50e85acf7017fa25deb703f9bab903faa8eed76d Mon Sep 17 00:00:00 2001 From: dicethedev Date: Wed, 20 May 2026 00:50:09 +0100 Subject: [PATCH 10/14] fix(p2p): remove legacy step field from BlocksByRangeRequest and update request builder --- crates/net/p2p/src/req_resp/handlers.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index 00b56e4f..3331ca82 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -440,7 +440,6 @@ async fn request_blocks_by_range_from_peer( let request = BlocksByRangeRequest { start_slot: next_slot, count: batch_count, - step: 1, }; info!( From c430e79645156219347572425d0fafbc566b237b Mon Sep 17 00:00:00 2001 From: dicethedev Date: Thu, 28 May 2026 16:55:36 +0100 Subject: [PATCH 11/14] refactor(p2p): merge request_id_map and range_request_ids into outbound_requests enum --- crates/net/p2p/src/lib.rs | 63 ++++-- crates/net/p2p/src/req_resp/handlers.rs | 264 ++++++++++++++++-------- crates/net/p2p/src/req_resp/mod.rs | 4 +- 3 files changed, 231 insertions(+), 100 deletions(-) diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index d20d19a6..79da78ff 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -42,7 +42,7 @@ use crate::{ req_resp::{ BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, Codec, MAX_COMPRESSED_PAYLOAD_SIZE, MAX_REQUEST_BLOCKS, Request, STATUS_PROTOCOL_V1, build_status, - fetch_block_from_peer, + fetch_block_from_peer, request_blocks_by_range_from_peer, }, swarm_adapter::SwarmHandle, }; @@ -66,6 +66,15 @@ pub(crate) struct PendingRequest { pub(crate) failed_peers: HashSet, } +pub(crate) enum PendingRequestKind { + Root(H256), + Range { + start_slot: u64, + end_slot: u64, + total_end_slot: u64, + }, +} + // --- Swarm construction --- /// [libp2p Behaviour](libp2p::swarm::NetworkBehaviour) combining identify, Gossipsub @@ -301,10 +310,9 @@ impl P2P { block_topic: built.block_topic, aggregation_topic: built.aggregation_topic, connected_peers: HashSet::new(), - pending_requests: HashMap::new(), - request_id_map: HashMap::new(), - range_request_ids: HashSet::new(), - pending_sync_ranges: HashSet::new(), + pending_root_requests: HashMap::new(), + outbound_requests: HashMap::new(), + pending_range_requests: HashSet::new(), bootnode_addrs: built.bootnode_addrs, node_names, }; @@ -339,10 +347,9 @@ pub struct P2PServer { pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic, pub(crate) connected_peers: HashSet, - pub(crate) pending_requests: HashMap, - pub(crate) request_id_map: HashMap, - pub(crate) range_request_ids: HashSet, - pub(crate) pending_sync_ranges: HashSet<(u64, u64)>, + pub(crate) pending_root_requests: HashMap, + pub(crate) outbound_requests: HashMap, + pub(crate) pending_range_requests: HashSet<(u64, u64)>, bootnode_addrs: HashMap, node_names: HashMap, } @@ -364,6 +371,13 @@ pub(crate) trait P2PProtocol: Send + Sync { fn retry_block_fetch(&self, root: H256) -> Result<(), ActorError>; #[allow(dead_code)] // invoked via send_after, not called directly fn retry_peer_redial(&self, peer_id: PeerId) -> Result<(), ActorError>; + #[allow(dead_code)] + fn retry_range_sync( + &self, + start_slot: u64, + end_slot: u64, + peer_id: PeerId, + ) -> Result<(), ActorError>; } #[actor(protocol = P2PProtocol)] @@ -376,7 +390,7 @@ impl P2PServer { ) { let root = msg.root; // Check if still pending (might have succeeded during backoff) - if !self.pending_requests.contains_key(&root) { + if !self.pending_root_requests.contains_key(&root) { trace!(%root, "Block fetch completed during backoff, skipping retry"); return; } @@ -385,7 +399,7 @@ impl P2PServer { if !fetch_block_from_peer(self, root).await { tracing::error!(%root, "Failed to retry block fetch, giving up"); - self.pending_requests.remove(&root); + self.pending_root_requests.remove(&root); } } @@ -408,6 +422,31 @@ impl P2PServer { self.swarm_handle.dial(addr.clone()); } } + + #[send_handler] + async fn handle_retry_range_sync( + &mut self, + msg: p2p_protocol::RetryRangeSync, + _ctx: &Context, + ) { + let start_slot = msg.start_slot; + let end_slot = msg.end_slot; + let peer = msg.peer_id; + + // safety check: if already synced, skip retry + let still_needed = !self + .pending_range_requests + .contains(&(start_slot, end_slot)); + + if still_needed { + tracing::trace!(%peer, start_slot, end_slot, "Skipping retry, range already resolved"); + return; + } + + info!(%peer, start_slot, end_slot, "Retrying BlocksByRange sync"); + + request_blocks_by_range_from_peer(self, peer, start_slot, end_slot).await; + } } // --- Manual Handler impls for network-api messages --- @@ -441,7 +480,7 @@ impl Handler for P2PServer { async fn handle(&mut self, msg: FetchBlock, _ctx: &Context) { let root = msg.root; // Deduplicate - if already pending, ignore - if self.pending_requests.contains_key(&root) { + if self.pending_root_requests.contains_key(&root) { trace!(%root, "Block fetch already in progress, ignoring duplicate"); return; } diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index 3331ca82..b424fb66 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -18,7 +18,7 @@ use super::{ }; use crate::{ BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, MAX_SYNC_RANGE, P2PServer, - PendingRequest, p2p_protocol, req_resp::RequestedBlockRoots, + PendingRequest, PendingRequestKind, p2p_protocol, req_resp::RequestedBlockRoots, }; pub async fn handle_req_resp_message( @@ -66,13 +66,35 @@ pub async fn handle_req_resp_message( } ResponsePayload::Blocks(blocks) => { info!(kind = "blocks_response", peer_count, "P2P message received"); - if server.range_request_ids.remove(&request_id) { - handle_blocks_by_range_response(server, blocks, peer).await; - } else { - handle_blocks_by_root_response( - server, blocks, peer, request_id, ctx, - ) - .await; + + match server.outbound_requests.remove(&request_id) { + Some(PendingRequestKind::Range { + start_slot, + end_slot, + total_end_slot, + }) => { + server + .pending_range_requests + .remove(&(start_slot, end_slot)); + handle_blocks_by_range_response( + server, + blocks, + peer, + start_slot, + end_slot, + total_end_slot, + ) + .await; + } + Some(PendingRequestKind::Root(root)) => { + handle_blocks_by_root_response( + server, blocks, peer, request_id, root, ctx, + ) + .await; + } + None => { + warn!(%peer, ?request_id, "Received blocks response for unknown request_id"); + } } } }, @@ -92,10 +114,29 @@ pub async fn handle_req_resp_message( warn!(%peer, ?request_id, %error, "Outbound request failed"); // Check if this was a block fetch request - if let Some(root) = server.request_id_map.remove(&request_id) { - handle_fetch_failure(server, root, peer, ctx).await; - } else if server.range_request_ids.remove(&request_id) { - warn!(%peer, ?request_id, "BlocksByRange request failed"); + match server.outbound_requests.remove(&request_id) { + Some(PendingRequestKind::Root(root)) => { + handle_fetch_failure(server, root, peer, ctx).await; + } + Some(PendingRequestKind::Range { + start_slot, + end_slot, + total_end_slot, + }) => { + server + .pending_range_requests + .remove(&(start_slot, end_slot)); + send_after( + Duration::from_millis(500), + ctx.clone(), + p2p_protocol::RetryRangeSync { + peer_id: peer, + start_slot, + end_slot: total_end_slot, // retry the full remaining range + }, + ); + } + None => {} } } request_response::Event::InboundFailure { @@ -134,9 +175,11 @@ async fn handle_status_response(server: &mut P2PServer, status: Status, peer: Pe return; } let gap = status.head.slot - our_head_slot; - let count = gap.min(MAX_SYNC_RANGE); let start_slot = our_head_slot.saturating_add(1); - request_blocks_by_range_from_peer(server, peer, start_slot, count).await; + let total_end_slot = start_slot + .saturating_add(gap.min(MAX_SYNC_RANGE)) + .saturating_sub(1); + request_blocks_by_range_from_peer(server, peer, start_slot, total_end_slot).await; info!(%peer, start_slot, gap, "Long-range sync: using BlocksByRange"); } @@ -244,18 +287,16 @@ async fn handle_blocks_by_root_response( blocks: Vec, peer: PeerId, request_id: request_response::OutboundRequestId, + requested_root: H256, ctx: &Context, ) { info!(%peer, count = blocks.len(), "Received BlocksByRoot response"); - // Look up which root was requested for this specific request - let Some(requested_root) = server.request_id_map.remove(&request_id) else { - warn!(%peer, ?request_id, "Received response for unknown request_id"); - return; - }; - if blocks.is_empty() { - server.request_id_map.insert(request_id, requested_root); + // Re-insert so failure handling can find it + server + .outbound_requests + .insert(request_id, PendingRequestKind::Root(requested_root)); warn!(%peer, "Received empty BlocksByRoot response"); handle_fetch_failure(server, requested_root, peer, ctx).await; return; @@ -276,7 +317,7 @@ async fn handle_blocks_by_root_response( } // Clean up tracking for this root - server.pending_requests.remove(&root); + server.pending_root_requests.remove(&root); if let Some(ref blockchain) = server.blockchain { let _ = blockchain @@ -290,30 +331,48 @@ async fn handle_blocks_by_range_response( server: &mut P2PServer, blocks: Vec, peer: PeerId, + start_slot: u64, + end_slot: u64, + total_end_slot: u64, ) { + server + .pending_range_requests + .remove(&(start_slot, end_slot)); + info!(%peer, count = blocks.len(), "Received BlocksByRange response"); if blocks.is_empty() { - warn!(%peer, "Received empty BlocksByRange response"); + warn!(%peer, start_slot, end_slot, "Received empty BlocksByRange response"); return; } - if let Some(ref blockchain) = server.blockchain { - for block in blocks { - let block_root = block.message.hash_tree_root(); - let slot = block.message.slot; - // TODO: validate block.message.slot is within the originally requested range. - let _ = blockchain.new_block(block).inspect_err(|err| { - error!( - %peer, - %slot, - block_root = %ethlambda_types::ShortRoot(&block_root.0), - %err, - "Failed to forward range-fetched block to blockchain" - ) - }); + let Some(ref blockchain) = server.blockchain else { + warn!(%peer, "No blockchain handler available"); + return; + }; + + for block in blocks { + let slot = block.message.slot; + + if slot < start_slot || slot > end_slot { + warn!(%peer, %slot, start_slot, end_slot, "Received block outside requested range"); + continue; + } + + let block_root = block.message.hash_tree_root(); + if let Err(err) = blockchain.new_block(block) { + error!( + %err, %slot, %peer, + block_root = %ethlambda_types::ShortRoot(&block_root.0), + "Failed to forward range-fetched block to blockchain" + ); } } + + // Chain the next batch if there are more slots to fetch + if end_slot < total_end_slot { + request_blocks_by_range_from_peer(server, peer, end_slot + 1, total_end_slot).await; + } } /// Build a Status message from the current Store state. @@ -342,7 +401,10 @@ pub async fn fetch_block_from_peer(server: &mut P2PServer, root: H256) -> bool { } // Exclude peers that already returned empty responses for this root - let failed = server.pending_requests.get(&root).map(|p| &p.failed_peers); + let failed = server + .pending_root_requests + .get(&root) + .map(|p| &p.failed_peers); let pool: Vec<_> = if failed.is_none_or(|f| f.is_empty()) { server.connected_peers.iter().copied().collect() } else { @@ -360,7 +422,7 @@ pub async fn fetch_block_from_peer(server: &mut P2PServer, root: H256) -> bool { // retries start a fresh round of elimination. let pool = if pool.is_empty() { warn!(%root, "All peers failed for this block, retrying with full peer set"); - if let Some(pending) = server.pending_requests.get_mut(&root) { + if let Some(pending) = server.pending_root_requests.get_mut(&root) { pending.failed_peers.clear(); } server.connected_peers.iter().copied().collect() @@ -401,7 +463,7 @@ pub async fn fetch_block_from_peer(server: &mut P2PServer, root: H256) -> bool { // Track the request if not already tracked (new request) server - .pending_requests + .pending_root_requests .entry(root) .or_insert(PendingRequest { attempts: 1, @@ -409,69 +471,97 @@ pub async fn fetch_block_from_peer(server: &mut P2PServer, root: H256) -> bool { }); // Map request_id to root for failure handling - server.request_id_map.insert(request_id, root); + server + .outbound_requests + .insert(request_id, PendingRequestKind::Root(root)); true } -async fn request_blocks_by_range_from_peer( +pub async fn request_blocks_by_range_from_peer( server: &mut P2PServer, peer: PeerId, start_slot: u64, - count: u64, + total_end_slot: u64, ) -> bool { - if count == 0 { + if start_slot > total_end_slot { return true; } - let end_slot = start_slot.saturating_add(count).saturating_sub(1); - // Deduplicate: skip if we already have this range in-flight - if server.pending_sync_ranges.contains(&(start_slot, end_slot)) { - info!(%peer, start_slot, end_slot, "BlocksByRange already in-flight, skipping duplicate"); - return true; + // Trim effective_start forward past any in-flight coverage, handling + // non-contiguous gaps by looping until no range covers the current position. + let mut effective_start = start_slot; + loop { + let covered = server + .pending_range_requests + .iter() + .find(|&&(s, e)| s <= effective_start && effective_start <= e) + .copied(); + match covered { + Some((_, covered_end)) => effective_start = covered_end + 1, + None => break, + } + if effective_start > total_end_slot { + info!( + %peer, + start_slot, + total_end_slot, + "BlocksByRange fully covered by in-flight requests, skipping" + ); + return true; + } } - server.pending_sync_ranges.insert((start_slot, end_slot)); - let mut remaining = count; - let mut next_slot = start_slot; + // Send only one batch — response handler chains the next one + let count = (total_end_slot - effective_start + 1).min(MAX_REQUEST_BLOCKS); + let batch_end = effective_start.saturating_add(count).saturating_sub(1); - while remaining > 0 { - let batch_count = remaining.min(MAX_REQUEST_BLOCKS); - let request = BlocksByRangeRequest { - start_slot: next_slot, - count: batch_count, - }; + server + .pending_range_requests + .insert((effective_start, batch_end)); - info!( - %peer, - start_slot = next_slot, - count = batch_count, - "Sending BlocksByRange request" - ); + let request = BlocksByRangeRequest { + start_slot: effective_start, + count, + }; - let Some(request_id) = server - .swarm_handle - .send_request( - peer, - Request::BlocksByRange(request), - libp2p::StreamProtocol::new(BLOCKS_BY_RANGE_PROTOCOL_V1), - ) - .await - else { - warn!( - %peer, - start_slot = next_slot, - count = batch_count, - "Failed to send BlocksByRange request (swarm adapter closed)" - ); - return false; - }; + info!( + %peer, + start_slot = effective_start, + count, + total_end_slot, + "Sending BlocksByRange request (single batch)" + ); - server.range_request_ids.insert(request_id); + let Some(request_id) = server + .swarm_handle + .send_request( + peer, + Request::BlocksByRange(request), + libp2p::StreamProtocol::new(BLOCKS_BY_RANGE_PROTOCOL_V1), + ) + .await + else { + warn!( + %peer, + start_slot = effective_start, + count, + "Failed to send BlocksByRange request" + ); + server + .pending_range_requests + .remove(&(effective_start, batch_end)); + return false; + }; - remaining -= batch_count; - next_slot = next_slot.saturating_add(batch_count); - } + server.outbound_requests.insert( + request_id, + PendingRequestKind::Range { + start_slot: effective_start, + end_slot: batch_end, + total_end_slot, + }, + ); true } @@ -482,7 +572,7 @@ async fn handle_fetch_failure( peer: PeerId, ctx: &Context, ) { - let Some(pending) = server.pending_requests.get_mut(&root) else { + let Some(pending) = server.pending_root_requests.get_mut(&root) else { return; }; @@ -491,7 +581,7 @@ async fn handle_fetch_failure( if pending.attempts >= MAX_FETCH_RETRIES { error!(%root, %peer, attempts=%pending.attempts, "Block fetch failed after max retries, giving up"); - server.pending_requests.remove(&root); + server.pending_root_requests.remove(&root); return; } diff --git a/crates/net/p2p/src/req_resp/mod.rs b/crates/net/p2p/src/req_resp/mod.rs index 11acb79f..26240ec5 100644 --- a/crates/net/p2p/src/req_resp/mod.rs +++ b/crates/net/p2p/src/req_resp/mod.rs @@ -5,7 +5,9 @@ mod messages; pub use codec::Codec; pub use encoding::{MAX_COMPRESSED_PAYLOAD_SIZE, MAX_PAYLOAD_SIZE}; -pub use handlers::{build_status, fetch_block_from_peer, handle_req_resp_message}; +pub use handlers::{ + build_status, fetch_block_from_peer, handle_req_resp_message, request_blocks_by_range_from_peer, +}; pub use messages::{ BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRangeRequest, BlocksByRootRequest, MAX_REQUEST_BLOCKS, Request, RequestedBlockRoots, Response, From ed82fb0f1dfa8bfa2ed2ce3d5f7e6b3dbe42436a Mon Sep 17 00:00:00 2001 From: dicethedev Date: Fri, 5 Jun 2026 10:41:41 +0100 Subject: [PATCH 12/14] Remove broken range sync retry --- crates/net/p2p/src/lib.rs | 34 +------------------------ crates/net/p2p/src/req_resp/handlers.rs | 15 +++++------ 2 files changed, 7 insertions(+), 42 deletions(-) diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index 79da78ff..d66d5c16 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -42,7 +42,7 @@ use crate::{ req_resp::{ BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, Codec, MAX_COMPRESSED_PAYLOAD_SIZE, MAX_REQUEST_BLOCKS, Request, STATUS_PROTOCOL_V1, build_status, - fetch_block_from_peer, request_blocks_by_range_from_peer, + fetch_block_from_peer, }, swarm_adapter::SwarmHandle, }; @@ -371,13 +371,6 @@ pub(crate) trait P2PProtocol: Send + Sync { fn retry_block_fetch(&self, root: H256) -> Result<(), ActorError>; #[allow(dead_code)] // invoked via send_after, not called directly fn retry_peer_redial(&self, peer_id: PeerId) -> Result<(), ActorError>; - #[allow(dead_code)] - fn retry_range_sync( - &self, - start_slot: u64, - end_slot: u64, - peer_id: PeerId, - ) -> Result<(), ActorError>; } #[actor(protocol = P2PProtocol)] @@ -422,31 +415,6 @@ impl P2PServer { self.swarm_handle.dial(addr.clone()); } } - - #[send_handler] - async fn handle_retry_range_sync( - &mut self, - msg: p2p_protocol::RetryRangeSync, - _ctx: &Context, - ) { - let start_slot = msg.start_slot; - let end_slot = msg.end_slot; - let peer = msg.peer_id; - - // safety check: if already synced, skip retry - let still_needed = !self - .pending_range_requests - .contains(&(start_slot, end_slot)); - - if still_needed { - tracing::trace!(%peer, start_slot, end_slot, "Skipping retry, range already resolved"); - return; - } - - info!(%peer, start_slot, end_slot, "Retrying BlocksByRange sync"); - - request_blocks_by_range_from_peer(self, peer, start_slot, end_slot).await; - } } // --- Manual Handler impls for network-api messages --- diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index b424fb66..77281109 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -121,19 +121,16 @@ pub async fn handle_req_resp_message( Some(PendingRequestKind::Range { start_slot, end_slot, - total_end_slot, + .. }) => { server .pending_range_requests .remove(&(start_slot, end_slot)); - send_after( - Duration::from_millis(500), - ctx.clone(), - p2p_protocol::RetryRangeSync { - peer_id: peer, - start_slot, - end_slot: total_end_slot, // retry the full remaining range - }, + warn!( + %peer, + start_slot, + end_slot, + "BlocksByRange request failed; retry is disabled" ); } None => {} From ce17a60ff612b6c9cba97e43e9b95c9d37abcfd3 Mon Sep 17 00:00:00 2001 From: dicethedev Date: Fri, 5 Jun 2026 11:04:03 +0100 Subject: [PATCH 13/14] Refactor range sync request deduplication --- crates/net/p2p/src/lib.rs | 122 +++++++++++++++++-- crates/net/p2p/src/req_resp/handlers.rs | 153 ++++++++++++------------ crates/net/p2p/src/req_resp/mod.rs | 4 +- 3 files changed, 191 insertions(+), 88 deletions(-) diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index d66d5c16..de34e469 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -1,6 +1,7 @@ use std::{ collections::{HashMap, HashSet}, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, + ops::Range, time::Duration, }; @@ -68,11 +69,70 @@ pub(crate) struct PendingRequest { pub(crate) enum PendingRequestKind { Root(H256), - Range { - start_slot: u64, - end_slot: u64, - total_end_slot: u64, - }, + Range { start_slot: u64, end_slot: u64 }, +} + +pub(crate) struct RangeSyncState { + /// Remaining slots to request, with an exclusive end. + pub(crate) current_range: Range, + /// Latest advertised head slot for each peer. + pub(crate) peer_set: HashMap, + pub(crate) in_flight: bool, +} + +impl RangeSyncState { + pub(crate) fn new(current_range: Range, peer: PeerId, peer_head: u64) -> Self { + Self { + current_range, + peer_set: HashMap::from([(peer, peer_head)]), + in_flight: false, + } + } + + pub(crate) fn merge_peer(&mut self, peer: PeerId, peer_head: u64, end_exclusive: u64) { + self.peer_set.insert(peer, peer_head); + self.current_range.end = self.current_range.end.max(end_exclusive); + self.drop_stale_peers(); + } + + pub(crate) fn next_batch(&self) -> Option<(PeerId, Range)> { + if self.in_flight || self.current_range.is_empty() { + return None; + } + + let (&peer, &peer_head) = self + .peer_set + .iter() + .filter(|(_, head)| **head >= self.current_range.start) + .max_by_key(|(_, head)| **head)?; + let peer_end = peer_head.saturating_add(1); + let batch_end = self + .current_range + .start + .saturating_add(MAX_REQUEST_BLOCKS) + .min(self.current_range.end) + .min(peer_end); + + (batch_end > self.current_range.start) + .then_some((peer, self.current_range.start..batch_end)) + } + + pub(crate) fn complete_batch(&mut self, end_slot: u64) { + self.in_flight = false; + self.current_range.start = self.current_range.start.max(end_slot.saturating_add(1)); + self.drop_stale_peers(); + } + + pub(crate) fn fail_peer(&mut self, peer: &PeerId) { + self.in_flight = false; + self.peer_set.remove(peer); + self.drop_stale_peers(); + } + + fn drop_stale_peers(&mut self) { + let start_slot = self.current_range.start; + self.peer_set.retain(|_, head| *head >= start_slot); + } } // --- Swarm construction --- @@ -312,7 +372,7 @@ impl P2P { connected_peers: HashSet::new(), pending_root_requests: HashMap::new(), outbound_requests: HashMap::new(), - pending_range_requests: HashSet::new(), + range_sync_state: None, bootnode_addrs: built.bootnode_addrs, node_names, }; @@ -349,7 +409,7 @@ pub struct P2PServer { pub(crate) connected_peers: HashSet, pub(crate) pending_root_requests: HashMap, pub(crate) outbound_requests: HashMap, - pub(crate) pending_range_requests: HashSet<(u64, u64)>, + pub(crate) range_sync_state: Option, bootnode_addrs: HashMap, node_names: HashMap, } @@ -731,6 +791,54 @@ fn compute_message_id(message: &libp2p::gossipsub::Message) -> libp2p::gossipsub mod tests { use super::*; + fn random_peer() -> PeerId { + PeerId::from_public_key(&Keypair::generate_ed25519().public()) + } + + #[test] + fn range_sync_state_merges_new_peer_ranges() { + let first_peer = random_peer(); + let second_peer = random_peer(); + let mut state = RangeSyncState::new(10..101, first_peer, 100); + + state.merge_peer(second_peer, 150, 151); + + assert_eq!(state.current_range, 10..151); + assert_eq!(state.peer_set.get(&first_peer), Some(&100)); + assert_eq!(state.peer_set.get(&second_peer), Some(&150)); + } + + #[test] + fn range_sync_state_allows_only_one_batch_in_flight() { + let first_peer = random_peer(); + let second_peer = random_peer(); + let mut state = RangeSyncState::new(10..3000, first_peer, 500); + state.merge_peer(second_peer, 2000, 3000); + + let (selected_peer, batch) = state.next_batch().expect("batch available"); + assert_eq!(selected_peer, second_peer); + assert_eq!(batch, 10..(10 + MAX_REQUEST_BLOCKS)); + + state.in_flight = true; + assert!(state.next_batch().is_none()); + } + + #[test] + fn range_sync_state_advances_and_drops_stale_peers() { + let stale_peer = random_peer(); + let current_peer = random_peer(); + let mut state = RangeSyncState::new(10..3000, stale_peer, 100); + state.merge_peer(current_peer, 2999, 3000); + state.in_flight = true; + + state.complete_batch(1033); + + assert_eq!(state.current_range, 1034..3000); + assert!(!state.in_flight); + assert!(!state.peer_set.contains_key(&stale_peer)); + assert_eq!(state.peer_set.get(¤t_peer), Some(&2999)); + } + #[test] fn parse_enrs_extracts_ip_port_and_public_key() { // Values taken from a local devnet run with lean-quickstart diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index 77281109..784522d8 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -18,7 +18,8 @@ use super::{ }; use crate::{ BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, MAX_SYNC_RANGE, P2PServer, - PendingRequest, PendingRequestKind, p2p_protocol, req_resp::RequestedBlockRoots, + PendingRequest, PendingRequestKind, RangeSyncState, p2p_protocol, + req_resp::RequestedBlockRoots, }; pub async fn handle_req_resp_message( @@ -71,18 +72,9 @@ pub async fn handle_req_resp_message( Some(PendingRequestKind::Range { start_slot, end_slot, - total_end_slot, }) => { - server - .pending_range_requests - .remove(&(start_slot, end_slot)); handle_blocks_by_range_response( - server, - blocks, - peer, - start_slot, - end_slot, - total_end_slot, + server, blocks, peer, start_slot, end_slot, ) .await; } @@ -101,6 +93,16 @@ pub async fn handle_req_resp_message( Response::Error { code, message } => { let error_str = String::from_utf8_lossy(&message); warn!(%peer, ?code, %error_str, "Received error response"); + + match server.outbound_requests.remove(&request_id) { + Some(PendingRequestKind::Range { .. }) => { + fail_range_request(server, &peer); + } + Some(request @ PendingRequestKind::Root(_)) => { + server.outbound_requests.insert(request_id, request); + } + None => {} + } } } } @@ -121,11 +123,8 @@ pub async fn handle_req_resp_message( Some(PendingRequestKind::Range { start_slot, end_slot, - .. }) => { - server - .pending_range_requests - .remove(&(start_slot, end_slot)); + fail_range_request(server, &peer); warn!( %peer, start_slot, @@ -173,10 +172,20 @@ async fn handle_status_response(server: &mut P2PServer, status: Status, peer: Pe } let gap = status.head.slot - our_head_slot; let start_slot = our_head_slot.saturating_add(1); - let total_end_slot = start_slot - .saturating_add(gap.min(MAX_SYNC_RANGE)) - .saturating_sub(1); - request_blocks_by_range_from_peer(server, peer, start_slot, total_end_slot).await; + let end_exclusive = start_slot.saturating_add(gap.min(MAX_SYNC_RANGE)); + + match &mut server.range_sync_state { + Some(state) => state.merge_peer(peer, status.head.slot, end_exclusive), + None => { + server.range_sync_state = Some(RangeSyncState::new( + start_slot..end_exclusive, + peer, + status.head.slot, + )); + } + } + + request_next_range_batch(server).await; info!(%peer, start_slot, gap, "Long-range sync: using BlocksByRange"); } @@ -330,20 +339,17 @@ async fn handle_blocks_by_range_response( peer: PeerId, start_slot: u64, end_slot: u64, - total_end_slot: u64, ) { - server - .pending_range_requests - .remove(&(start_slot, end_slot)); - info!(%peer, count = blocks.len(), "Received BlocksByRange response"); if blocks.is_empty() { + fail_range_request(server, &peer); warn!(%peer, start_slot, end_slot, "Received empty BlocksByRange response"); return; } let Some(ref blockchain) = server.blockchain else { + server.range_sync_state = None; warn!(%peer, "No blockchain handler available"); return; }; @@ -366,10 +372,15 @@ async fn handle_blocks_by_range_response( } } - // Chain the next batch if there are more slots to fetch - if end_slot < total_end_slot { - request_blocks_by_range_from_peer(server, peer, end_slot + 1, total_end_slot).await; + if let Some(state) = &mut server.range_sync_state { + state.complete_batch(end_slot); + if state.current_range.is_empty() || state.peer_set.is_empty() { + server.range_sync_state = None; + return; + } } + + request_next_range_batch(server).await; } /// Build a Status message from the current Store state. @@ -475,58 +486,30 @@ pub async fn fetch_block_from_peer(server: &mut P2PServer, root: H256) -> bool { true } -pub async fn request_blocks_by_range_from_peer( - server: &mut P2PServer, - peer: PeerId, - start_slot: u64, - total_end_slot: u64, -) -> bool { - if start_slot > total_end_slot { +async fn request_next_range_batch(server: &mut P2PServer) -> bool { + let Some((peer, batch)) = server + .range_sync_state + .as_ref() + .and_then(RangeSyncState::next_batch) + else { return true; - } - - // Trim effective_start forward past any in-flight coverage, handling - // non-contiguous gaps by looping until no range covers the current position. - let mut effective_start = start_slot; - loop { - let covered = server - .pending_range_requests - .iter() - .find(|&&(s, e)| s <= effective_start && effective_start <= e) - .copied(); - match covered { - Some((_, covered_end)) => effective_start = covered_end + 1, - None => break, - } - if effective_start > total_end_slot { - info!( - %peer, - start_slot, - total_end_slot, - "BlocksByRange fully covered by in-flight requests, skipping" - ); - return true; - } - } - - // Send only one batch — response handler chains the next one - let count = (total_end_slot - effective_start + 1).min(MAX_REQUEST_BLOCKS); - let batch_end = effective_start.saturating_add(count).saturating_sub(1); - - server - .pending_range_requests - .insert((effective_start, batch_end)); + }; let request = BlocksByRangeRequest { - start_slot: effective_start, - count, + start_slot: batch.start, + count: batch.end - batch.start, }; + let count = request.count; info!( %peer, - start_slot = effective_start, + start_slot = batch.start, count, - total_end_slot, + total_end_slot = server + .range_sync_state + .as_ref() + .map_or(batch.end, |state| state.current_range.end) + .saturating_sub(1), "Sending BlocksByRange request (single batch)" ); @@ -541,28 +524,42 @@ pub async fn request_blocks_by_range_from_peer( else { warn!( %peer, - start_slot = effective_start, + start_slot = batch.start, count, "Failed to send BlocksByRange request" ); - server - .pending_range_requests - .remove(&(effective_start, batch_end)); + fail_range_request(server, &peer); return false; }; + if let Some(state) = &mut server.range_sync_state { + state.in_flight = true; + } + server.outbound_requests.insert( request_id, PendingRequestKind::Range { - start_slot: effective_start, - end_slot: batch_end, - total_end_slot, + start_slot: batch.start, + end_slot: batch.end - 1, }, ); true } +fn fail_range_request(server: &mut P2PServer, peer: &PeerId) { + let should_clear = if let Some(state) = &mut server.range_sync_state { + state.fail_peer(peer); + state.peer_set.is_empty() + } else { + false + }; + + if should_clear { + server.range_sync_state = None; + } +} + async fn handle_fetch_failure( server: &mut P2PServer, root: H256, diff --git a/crates/net/p2p/src/req_resp/mod.rs b/crates/net/p2p/src/req_resp/mod.rs index 26240ec5..11acb79f 100644 --- a/crates/net/p2p/src/req_resp/mod.rs +++ b/crates/net/p2p/src/req_resp/mod.rs @@ -5,9 +5,7 @@ mod messages; pub use codec::Codec; pub use encoding::{MAX_COMPRESSED_PAYLOAD_SIZE, MAX_PAYLOAD_SIZE}; -pub use handlers::{ - build_status, fetch_block_from_peer, handle_req_resp_message, request_blocks_by_range_from_peer, -}; +pub use handlers::{build_status, fetch_block_from_peer, handle_req_resp_message}; pub use messages::{ BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRangeRequest, BlocksByRootRequest, MAX_REQUEST_BLOCKS, Request, RequestedBlockRoots, Response, From 09159237a96f67fc60fcb935ee21d125961955b5 Mon Sep 17 00:00:00 2001 From: dicethedev Date: Fri, 5 Jun 2026 11:09:37 +0100 Subject: [PATCH 14/14] Fix Option variant resolution in p2p --- crates/net/p2p/src/lib.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index de34e469..06c39be1 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -97,7 +97,7 @@ impl RangeSyncState { pub(crate) fn next_batch(&self) -> Option<(PeerId, Range)> { if self.in_flight || self.current_range.is_empty() { - return None; + return Option::None; } let (&peer, &peer_head) = self @@ -586,8 +586,8 @@ async fn handle_swarm_event( } => { let direction = connection_direction(&endpoint); let reason = match cause { - None => "remote_close", - Some(err) => { + Option::None => "remote_close", + Option::Some(err) => { // Categorize disconnection reasons let err_str = err.to_string().to_lowercase(); if err_str.contains("timeout") @@ -774,8 +774,8 @@ fn compute_message_id(message: &libp2p::gossipsub::Message) -> libp2p::gossipsub let decompressed = gossipsub::decompress_message(&message.data).ok(); let (domain, data) = match decompressed.as_deref() { - Some(data) => (MESSAGE_DOMAIN_VALID_SNAPPY, data), - None => (MESSAGE_DOMAIN_INVALID_SNAPPY, message.data.as_slice()), + Option::Some(data) => (MESSAGE_DOMAIN_VALID_SNAPPY, data), + Option::None => (MESSAGE_DOMAIN_INVALID_SNAPPY, message.data.as_slice()), }; let topic = message.topic.as_str().as_bytes(); let topic_len = (topic.len() as u64).to_le_bytes();