diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index f1fa5fe4..06c39be1 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -1,6 +1,7 @@ use std::{ collections::{HashMap, HashSet}, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, + ops::Range, time::Duration, }; @@ -41,7 +42,7 @@ use crate::{ }, req_resp::{ BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, Codec, - MAX_COMPRESSED_PAYLOAD_SIZE, Request, STATUS_PROTOCOL_V1, build_status, + MAX_COMPRESSED_PAYLOAD_SIZE, MAX_REQUEST_BLOCKS, Request, STATUS_PROTOCOL_V1, build_status, fetch_block_from_peer, }, swarm_adapter::SwarmHandle, @@ -59,12 +60,81 @@ const MAX_FETCH_RETRIES: u32 = 10; const INITIAL_BACKOFF_MS: u64 = 5; const BACKOFF_MULTIPLIER: u64 = 2; const PEER_REDIAL_INTERVAL_SECS: u64 = 12; +const MAX_SYNC_RANGE: u64 = MAX_REQUEST_BLOCKS * 64; // 65,536 slots (~3 days) pub(crate) struct PendingRequest { pub(crate) attempts: u32, pub(crate) failed_peers: HashSet, } +pub(crate) enum PendingRequestKind { + Root(H256), + Range { start_slot: u64, end_slot: u64 }, +} + +pub(crate) struct RangeSyncState { + /// Remaining slots to request, with an exclusive end. + pub(crate) current_range: Range, + /// Latest advertised head slot for each peer. + pub(crate) peer_set: HashMap, + pub(crate) in_flight: bool, +} + +impl RangeSyncState { + pub(crate) fn new(current_range: Range, peer: PeerId, peer_head: u64) -> Self { + Self { + current_range, + peer_set: HashMap::from([(peer, peer_head)]), + in_flight: false, + } + } + + pub(crate) fn merge_peer(&mut self, peer: PeerId, peer_head: u64, end_exclusive: u64) { + self.peer_set.insert(peer, peer_head); + self.current_range.end = self.current_range.end.max(end_exclusive); + self.drop_stale_peers(); + } + + pub(crate) fn next_batch(&self) -> Option<(PeerId, Range)> { + if self.in_flight || self.current_range.is_empty() { + return Option::None; + } + + let (&peer, &peer_head) = self + .peer_set + .iter() + .filter(|(_, head)| **head >= self.current_range.start) + .max_by_key(|(_, head)| **head)?; + let peer_end = peer_head.saturating_add(1); + let batch_end = self + .current_range + .start + .saturating_add(MAX_REQUEST_BLOCKS) + .min(self.current_range.end) + .min(peer_end); + + (batch_end > self.current_range.start) + .then_some((peer, self.current_range.start..batch_end)) + } + + pub(crate) fn complete_batch(&mut self, end_slot: u64) { + self.in_flight = false; + self.current_range.start = self.current_range.start.max(end_slot.saturating_add(1)); + self.drop_stale_peers(); + } + + pub(crate) fn fail_peer(&mut self, peer: &PeerId) { + self.in_flight = false; + self.peer_set.remove(peer); + self.drop_stale_peers(); + } + + fn drop_stale_peers(&mut self) { + let start_slot = self.current_range.start; + self.peer_set.retain(|_, head| *head >= start_slot); + } +} + // --- Swarm construction --- /// [libp2p Behaviour](libp2p::swarm::NetworkBehaviour) combining identify, Gossipsub @@ -300,8 +370,9 @@ impl P2P { block_topic: built.block_topic, aggregation_topic: built.aggregation_topic, connected_peers: HashSet::new(), - pending_requests: HashMap::new(), - request_id_map: HashMap::new(), + pending_root_requests: HashMap::new(), + outbound_requests: HashMap::new(), + range_sync_state: None, bootnode_addrs: built.bootnode_addrs, node_names, }; @@ -336,8 +407,9 @@ pub struct P2PServer { pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic, pub(crate) connected_peers: HashSet, - pub(crate) pending_requests: HashMap, - pub(crate) request_id_map: HashMap, + pub(crate) pending_root_requests: HashMap, + pub(crate) outbound_requests: HashMap, + pub(crate) range_sync_state: Option, bootnode_addrs: HashMap, node_names: HashMap, } @@ -371,7 +443,7 @@ impl P2PServer { ) { let root = msg.root; // Check if still pending (might have succeeded during backoff) - if !self.pending_requests.contains_key(&root) { + if !self.pending_root_requests.contains_key(&root) { trace!(%root, "Block fetch completed during backoff, skipping retry"); return; } @@ -380,7 +452,7 @@ impl P2PServer { if !fetch_block_from_peer(self, root).await { tracing::error!(%root, "Failed to retry block fetch, giving up"); - self.pending_requests.remove(&root); + self.pending_root_requests.remove(&root); } } @@ -436,7 +508,7 @@ impl Handler for P2PServer { async fn handle(&mut self, msg: FetchBlock, _ctx: &Context) { let root = msg.root; // Deduplicate - if already pending, ignore - if self.pending_requests.contains_key(&root) { + if self.pending_root_requests.contains_key(&root) { trace!(%root, "Block fetch already in progress, ignoring duplicate"); return; } @@ -514,8 +586,8 @@ async fn handle_swarm_event( } => { let direction = connection_direction(&endpoint); let reason = match cause { - None => "remote_close", - Some(err) => { + Option::None => "remote_close", + Option::Some(err) => { // Categorize disconnection reasons let err_str = err.to_string().to_lowercase(); if err_str.contains("timeout") @@ -702,8 +774,8 @@ fn compute_message_id(message: &libp2p::gossipsub::Message) -> libp2p::gossipsub let decompressed = gossipsub::decompress_message(&message.data).ok(); let (domain, data) = match decompressed.as_deref() { - Some(data) => (MESSAGE_DOMAIN_VALID_SNAPPY, data), - None => (MESSAGE_DOMAIN_INVALID_SNAPPY, message.data.as_slice()), + Option::Some(data) => (MESSAGE_DOMAIN_VALID_SNAPPY, data), + Option::None => (MESSAGE_DOMAIN_INVALID_SNAPPY, message.data.as_slice()), }; let topic = message.topic.as_str().as_bytes(); let topic_len = (topic.len() as u64).to_le_bytes(); @@ -719,6 +791,54 @@ fn compute_message_id(message: &libp2p::gossipsub::Message) -> libp2p::gossipsub mod tests { use super::*; + fn random_peer() -> PeerId { + PeerId::from_public_key(&Keypair::generate_ed25519().public()) + } + + #[test] + fn range_sync_state_merges_new_peer_ranges() { + let first_peer = random_peer(); + let second_peer = random_peer(); + let mut state = RangeSyncState::new(10..101, first_peer, 100); + + state.merge_peer(second_peer, 150, 151); + + assert_eq!(state.current_range, 10..151); + assert_eq!(state.peer_set.get(&first_peer), Some(&100)); + assert_eq!(state.peer_set.get(&second_peer), Some(&150)); + } + + #[test] + fn range_sync_state_allows_only_one_batch_in_flight() { + let first_peer = random_peer(); + let second_peer = random_peer(); + let mut state = RangeSyncState::new(10..3000, first_peer, 500); + state.merge_peer(second_peer, 2000, 3000); + + let (selected_peer, batch) = state.next_batch().expect("batch available"); + assert_eq!(selected_peer, second_peer); + assert_eq!(batch, 10..(10 + MAX_REQUEST_BLOCKS)); + + state.in_flight = true; + assert!(state.next_batch().is_none()); + } + + #[test] + fn range_sync_state_advances_and_drops_stale_peers() { + let stale_peer = random_peer(); + let current_peer = random_peer(); + let mut state = RangeSyncState::new(10..3000, stale_peer, 100); + state.merge_peer(current_peer, 2999, 3000); + state.in_flight = true; + + state.complete_batch(1033); + + assert_eq!(state.current_range, 1034..3000); + assert!(!state.in_flight); + assert!(!state.peer_set.contains_key(&stale_peer)); + assert_eq!(state.peer_set.get(¤t_peer), Some(&2999)); + } + #[test] fn parse_enrs_extracts_ip_port_and_public_key() { // Values taken from a local devnet run with lean-quickstart diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index 31743316..784522d8 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -12,13 +12,14 @@ use ethlambda_types::primitives::HashTreeRoot as _; use ethlambda_types::{block::SignedBlock, primitives::H256}; use super::{ - BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRangeRequest, BlocksByRootRequest, MAX_REQUEST_BLOCKS, - Request, Response, ResponsePayload, Status, + BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRangeRequest, + BlocksByRootRequest, MAX_REQUEST_BLOCKS, Request, Response, ResponsePayload, Status, messages::{ResponseCode, error_message}, }; use crate::{ - BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest, - p2p_protocol, req_resp::RequestedBlockRoots, + BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, MAX_SYNC_RANGE, P2PServer, + PendingRequest, PendingRequestKind, RangeSyncState, p2p_protocol, + req_resp::RequestedBlockRoots, }; pub async fn handle_req_resp_message( @@ -62,17 +63,46 @@ pub async fn handle_req_resp_message( Response::Success { payload } => match payload { ResponsePayload::Status(status) => { info!(kind = "status_response", peer_count, "P2P message received"); - handle_status_response(status, peer).await; + handle_status_response(server, status, peer).await; } ResponsePayload::Blocks(blocks) => { info!(kind = "blocks_response", peer_count, "P2P message received"); - handle_blocks_by_root_response(server, blocks, peer, request_id, ctx) - .await; + + match server.outbound_requests.remove(&request_id) { + Some(PendingRequestKind::Range { + start_slot, + end_slot, + }) => { + handle_blocks_by_range_response( + server, blocks, peer, start_slot, end_slot, + ) + .await; + } + Some(PendingRequestKind::Root(root)) => { + handle_blocks_by_root_response( + server, blocks, peer, request_id, root, ctx, + ) + .await; + } + None => { + warn!(%peer, ?request_id, "Received blocks response for unknown request_id"); + } + } } }, Response::Error { code, message } => { let error_str = String::from_utf8_lossy(&message); warn!(%peer, ?code, %error_str, "Received error response"); + + match server.outbound_requests.remove(&request_id) { + Some(PendingRequestKind::Range { .. }) => { + fail_range_request(server, &peer); + } + Some(request @ PendingRequestKind::Root(_)) => { + server.outbound_requests.insert(request_id, request); + } + None => {} + } } } } @@ -86,8 +116,23 @@ pub async fn handle_req_resp_message( warn!(%peer, ?request_id, %error, "Outbound request failed"); // Check if this was a block fetch request - if let Some(root) = server.request_id_map.remove(&request_id) { - handle_fetch_failure(server, root, peer, ctx).await; + match server.outbound_requests.remove(&request_id) { + Some(PendingRequestKind::Root(root)) => { + handle_fetch_failure(server, root, peer, ctx).await; + } + Some(PendingRequestKind::Range { + start_slot, + end_slot, + }) => { + fail_range_request(server, &peer); + warn!( + %peer, + start_slot, + end_slot, + "BlocksByRange request failed; retry is disabled" + ); + } + None => {} } } request_response::Event::InboundFailure { @@ -118,8 +163,30 @@ async fn handle_status_request( server.swarm_handle.send_response(channel, response); } -async fn handle_status_response(status: Status, peer: PeerId) { +async fn handle_status_response(server: &mut P2PServer, status: Status, peer: PeerId) { info!(finalized_slot=%status.finalized.slot, head_slot=%status.head.slot, "Received status response from peer {peer}"); + + let our_head_slot = server.store.head_slot(); + if status.head.slot <= our_head_slot { + return; + } + let gap = status.head.slot - our_head_slot; + let start_slot = our_head_slot.saturating_add(1); + let end_exclusive = start_slot.saturating_add(gap.min(MAX_SYNC_RANGE)); + + match &mut server.range_sync_state { + Some(state) => state.merge_peer(peer, status.head.slot, end_exclusive), + None => { + server.range_sync_state = Some(RangeSyncState::new( + start_slot..end_exclusive, + peer, + status.head.slot, + )); + } + } + + request_next_range_batch(server).await; + info!(%peer, start_slot, gap, "Long-range sync: using BlocksByRange"); } async fn handle_blocks_by_root_request( @@ -226,18 +293,16 @@ async fn handle_blocks_by_root_response( blocks: Vec, peer: PeerId, request_id: request_response::OutboundRequestId, + requested_root: H256, ctx: &Context, ) { info!(%peer, count = blocks.len(), "Received BlocksByRoot response"); - // Look up which root was requested for this specific request - let Some(requested_root) = server.request_id_map.remove(&request_id) else { - warn!(%peer, ?request_id, "Received response for unknown request_id"); - return; - }; - if blocks.is_empty() { - server.request_id_map.insert(request_id, requested_root); + // Re-insert so failure handling can find it + server + .outbound_requests + .insert(request_id, PendingRequestKind::Root(requested_root)); warn!(%peer, "Received empty BlocksByRoot response"); handle_fetch_failure(server, requested_root, peer, ctx).await; return; @@ -258,7 +323,7 @@ async fn handle_blocks_by_root_response( } // Clean up tracking for this root - server.pending_requests.remove(&root); + server.pending_root_requests.remove(&root); if let Some(ref blockchain) = server.blockchain { let _ = blockchain @@ -268,6 +333,56 @@ async fn handle_blocks_by_root_response( } } +async fn handle_blocks_by_range_response( + server: &mut P2PServer, + blocks: Vec, + peer: PeerId, + start_slot: u64, + end_slot: u64, +) { + info!(%peer, count = blocks.len(), "Received BlocksByRange response"); + + if blocks.is_empty() { + fail_range_request(server, &peer); + warn!(%peer, start_slot, end_slot, "Received empty BlocksByRange response"); + return; + } + + let Some(ref blockchain) = server.blockchain else { + server.range_sync_state = None; + warn!(%peer, "No blockchain handler available"); + return; + }; + + for block in blocks { + let slot = block.message.slot; + + if slot < start_slot || slot > end_slot { + warn!(%peer, %slot, start_slot, end_slot, "Received block outside requested range"); + continue; + } + + let block_root = block.message.hash_tree_root(); + if let Err(err) = blockchain.new_block(block) { + error!( + %err, %slot, %peer, + block_root = %ethlambda_types::ShortRoot(&block_root.0), + "Failed to forward range-fetched block to blockchain" + ); + } + } + + if let Some(state) = &mut server.range_sync_state { + state.complete_batch(end_slot); + if state.current_range.is_empty() || state.peer_set.is_empty() { + server.range_sync_state = None; + return; + } + } + + request_next_range_batch(server).await; +} + /// Build a Status message from the current Store state. pub fn build_status(store: &Store) -> Status { let finalized = store.latest_finalized(); @@ -294,7 +409,10 @@ pub async fn fetch_block_from_peer(server: &mut P2PServer, root: H256) -> bool { } // Exclude peers that already returned empty responses for this root - let failed = server.pending_requests.get(&root).map(|p| &p.failed_peers); + let failed = server + .pending_root_requests + .get(&root) + .map(|p| &p.failed_peers); let pool: Vec<_> = if failed.is_none_or(|f| f.is_empty()) { server.connected_peers.iter().copied().collect() } else { @@ -312,7 +430,7 @@ pub async fn fetch_block_from_peer(server: &mut P2PServer, root: H256) -> bool { // retries start a fresh round of elimination. let pool = if pool.is_empty() { warn!(%root, "All peers failed for this block, retrying with full peer set"); - if let Some(pending) = server.pending_requests.get_mut(&root) { + if let Some(pending) = server.pending_root_requests.get_mut(&root) { pending.failed_peers.clear(); } server.connected_peers.iter().copied().collect() @@ -353,7 +471,7 @@ pub async fn fetch_block_from_peer(server: &mut P2PServer, root: H256) -> bool { // Track the request if not already tracked (new request) server - .pending_requests + .pending_root_requests .entry(root) .or_insert(PendingRequest { attempts: 1, @@ -361,18 +479,94 @@ pub async fn fetch_block_from_peer(server: &mut P2PServer, root: H256) -> bool { }); // Map request_id to root for failure handling - server.request_id_map.insert(request_id, root); + server + .outbound_requests + .insert(request_id, PendingRequestKind::Root(root)); true } +async fn request_next_range_batch(server: &mut P2PServer) -> bool { + let Some((peer, batch)) = server + .range_sync_state + .as_ref() + .and_then(RangeSyncState::next_batch) + else { + return true; + }; + + let request = BlocksByRangeRequest { + start_slot: batch.start, + count: batch.end - batch.start, + }; + let count = request.count; + + info!( + %peer, + start_slot = batch.start, + count, + total_end_slot = server + .range_sync_state + .as_ref() + .map_or(batch.end, |state| state.current_range.end) + .saturating_sub(1), + "Sending BlocksByRange request (single batch)" + ); + + let Some(request_id) = server + .swarm_handle + .send_request( + peer, + Request::BlocksByRange(request), + libp2p::StreamProtocol::new(BLOCKS_BY_RANGE_PROTOCOL_V1), + ) + .await + else { + warn!( + %peer, + start_slot = batch.start, + count, + "Failed to send BlocksByRange request" + ); + fail_range_request(server, &peer); + return false; + }; + + if let Some(state) = &mut server.range_sync_state { + state.in_flight = true; + } + + server.outbound_requests.insert( + request_id, + PendingRequestKind::Range { + start_slot: batch.start, + end_slot: batch.end - 1, + }, + ); + + true +} + +fn fail_range_request(server: &mut P2PServer, peer: &PeerId) { + let should_clear = if let Some(state) = &mut server.range_sync_state { + state.fail_peer(peer); + state.peer_set.is_empty() + } else { + false + }; + + if should_clear { + server.range_sync_state = None; + } +} + async fn handle_fetch_failure( server: &mut P2PServer, root: H256, peer: PeerId, ctx: &Context, ) { - let Some(pending) = server.pending_requests.get_mut(&root) else { + let Some(pending) = server.pending_root_requests.get_mut(&root) else { return; }; @@ -381,7 +575,7 @@ async fn handle_fetch_failure( if pending.attempts >= MAX_FETCH_RETRIES { error!(%root, %peer, attempts=%pending.attempts, "Block fetch failed after max retries, giving up"); - server.pending_requests.remove(&root); + server.pending_root_requests.remove(&root); return; }