diff --git a/ant-cli/src/commands/data/file.rs b/ant-cli/src/commands/data/file.rs index d6464f6..a62409c 100644 --- a/ant-cli/src/commands/data/file.rs +++ b/ant-cli/src/commands/data/file.rs @@ -1,3 +1,4 @@ +use std::num::NonZeroUsize; use std::path::{Path, PathBuf}; use std::time::{Duration, Instant}; @@ -68,6 +69,9 @@ pub enum FileAction { /// written to the current directory). #[arg(short, long)] output: Option, + /// Number of closest peers to try for each chunk fetch. + #[arg(long, alias = "peer-count", value_name = "COUNT")] + peers: Option, }, /// Estimate the cost of uploading a file without uploading. /// @@ -153,6 +157,7 @@ impl FileAction { address, datamap, output, + peers, } => { let resolved_output = resolve_download_output(output, datamap.as_deref())?; handle_file_download( @@ -161,6 +166,7 @@ impl FileAction { datamap.as_deref(), resolved_output, json, + peers, ) .await } @@ -445,22 +451,34 @@ async fn handle_file_download( datamap_path: Option<&Path>, output: PathBuf, json_output: bool, + peer_count: Option, ) -> anyhow::Result<()> { let output_path = output; let start = Instant::now(); let data_map = if let Some(addr_hex) = address { info!("Downloading public file from address {addr_hex}"); + let address = parse_address(addr_hex)?; if !json_output { let spinner = progress::new_spinner("Fetching data map..."); - let result = client.data_map_fetch(&parse_address(addr_hex)?).await; + let result = if let Some(peer_count) = peer_count { + client + .data_map_fetch_from_closest_peers(&address, peer_count) + .await + } else { + client.data_map_fetch(&address).await + }; spinner.finish_and_clear(); result.map_err(|e| anyhow::anyhow!("Failed to fetch public DataMap: {e}"))? } else { - client - .data_map_fetch(&parse_address(addr_hex)?) - .await - .map_err(|e| anyhow::anyhow!("Failed to fetch public DataMap: {e}"))? + if let Some(peer_count) = peer_count { + client + .data_map_fetch_from_closest_peers(&address, peer_count) + .await + } else { + client.data_map_fetch(&address).await + } + .map_err(|e| anyhow::anyhow!("Failed to fetch public DataMap: {e}"))? } } else { let dm_path = datamap_path @@ -470,10 +488,15 @@ async fn handle_file_download( }; if json_output { - client - .file_download(&data_map, &output_path) - .await - .map_err(|e| anyhow::anyhow!("Download failed: {e}"))?; + let download_result = if let Some(peer_count) = peer_count { + client + .file_download_from_closest_peers(&data_map, &output_path, peer_count) + .await + } else { + client.file_download(&data_map, &output_path).await + }; + + download_result.map_err(|e| anyhow::anyhow!("Download failed: {e}"))?; } else { let (tx, mut rx) = mpsc::channel(64); @@ -512,9 +535,20 @@ async fn handle_file_download( pb.finish_and_clear(); }); - let download_result = client - .file_download_with_progress(&data_map, &output_path, Some(tx)) - .await; + let download_result = if let Some(peer_count) = peer_count { + client + .file_download_with_progress_from_closest_peers( + &data_map, + &output_path, + Some(tx), + peer_count, + ) + .await + } else { + client + .file_download_with_progress(&data_map, &output_path, Some(tx)) + .await + }; // Wait for progress bar cleanup (sender dropped → receiver exits) let _ = progress_handle.await; @@ -697,6 +731,21 @@ fn format_cost(storage_cost_atto: &str, gas_cost_wei: u128) -> String { #[cfg(test)] mod tests { use super::*; + use clap::Parser; + + #[derive(Debug, Parser)] + struct TestFileCli { + #[command(subcommand)] + action: FileAction, + } + + const TEST_ADDRESS_BYTE_LEN: usize = 32; + const PUBLIC_DOWNLOAD_PEERS: usize = 12; + const PRIVATE_DOWNLOAD_PEERS: usize = 9; + + fn test_address() -> String { + "00".repeat(TEST_ADDRESS_BYTE_LEN) + } #[test] fn resolve_download_output_returns_explicit_output_unchanged() { @@ -754,4 +803,71 @@ mod tests { let err = resolve_download_output(None, Some(datamap.as_path())).unwrap_err(); assert!(err.to_string().contains("Cannot derive")); } + + #[test] + fn download_peers_is_accepted_for_public_download() { + let address = test_address(); + let peer_count = PUBLIC_DOWNLOAD_PEERS.to_string(); + let cli = TestFileCli::try_parse_from([ + "test", + "download", + address.as_str(), + "--peers", + peer_count.as_str(), + "--output", + "out.bin", + ]) + .expect("--peers must parse for address downloads"); + + match cli.action { + FileAction::Download { peers, address, .. } => { + assert!(address.is_some()); + assert_eq!(peers.map(NonZeroUsize::get), Some(PUBLIC_DOWNLOAD_PEERS)); + } + FileAction::Upload { .. } | FileAction::Cost { .. } => { + panic!("expected file download action") + } + } + } + + #[test] + fn download_peers_is_accepted_for_private_download() { + let peer_count = PRIVATE_DOWNLOAD_PEERS.to_string(); + let cli = TestFileCli::try_parse_from([ + "test", + "download", + "--datamap", + "photo.jpg.datamap", + "--peers", + peer_count.as_str(), + ]) + .expect("--peers must parse for datamap downloads"); + + match cli.action { + FileAction::Download { peers, datamap, .. } => { + assert!(datamap.is_some()); + assert_eq!(peers.map(NonZeroUsize::get), Some(PRIVATE_DOWNLOAD_PEERS)); + } + FileAction::Upload { .. } | FileAction::Cost { .. } => { + panic!("expected file download action") + } + } + } + + #[test] + fn download_peers_rejects_zero() { + let address = test_address(); + let err = TestFileCli::try_parse_from([ + "test", + "download", + address.as_str(), + "--peers", + "0", + "--output", + "out.bin", + ]) + .expect_err("--peers=0 must fail"); + + assert_eq!(err.kind(), clap::error::ErrorKind::ValueValidation); + } } diff --git a/ant-core/src/data/client/chunk.rs b/ant-core/src/data/client/chunk.rs index 434c107..085b620 100644 --- a/ant-core/src/data/client/chunk.rs +++ b/ant-core/src/data/client/chunk.rs @@ -178,8 +178,17 @@ impl Client { /// sustained run of close-group exhaustions correctly drives the /// cap down rather than silently inflating it. pub(crate) async fn chunk_get_observed(&self, address: &XorName) -> Result> { + self.chunk_get_observed_from_closest_peers(address, self.config().close_group_size) + .await + } + + pub(crate) async fn chunk_get_observed_from_closest_peers( + &self, + address: &XorName, + peer_count: usize, + ) -> Result> { let started = Instant::now(); - let result = self.chunk_get(address).await; + let result = self.chunk_get_from_closest_peers(address, peer_count).await; let latency = started.elapsed(); let bytes = result .as_ref() diff --git a/ant-core/src/data/client/data.rs b/ant-core/src/data/client/data.rs index fc951fa..2446f6f 100644 --- a/ant-core/src/data/client/data.rs +++ b/ant-core/src/data/client/data.rs @@ -17,6 +17,7 @@ use ant_protocol::{compute_address, DATA_TYPE_CHUNK}; use bytes::Bytes; use futures::stream::StreamExt; use self_encryption::{decrypt, encrypt, DataMap, EncryptedChunk}; +use std::num::NonZeroUsize; use tracing::{debug, info}; /// Result of an in-memory data upload: the `DataMap` needed to retrieve the data. @@ -401,8 +402,31 @@ impl Client { )) })?; - rmp_serde::from_slice(&chunk.content) - .map_err(|e| Error::Serialization(format!("Failed to deserialize DataMap: {e}"))) + decode_data_map_chunk(&chunk.content) + } + + /// Fetch a `DataMap` from the network by trying the requested number + /// of closest peers for the DataMap chunk. + /// + /// # Errors + /// + /// Returns an error if the chunk is not found or deserialization fails. + pub async fn data_map_fetch_from_closest_peers( + &self, + address: &[u8; 32], + peer_count: NonZeroUsize, + ) -> Result { + let chunk = self + .chunk_get_from_closest_peers(address, peer_count.get()) + .await? + .ok_or_else(|| { + Error::InvalidData(format!( + "DataMap chunk not found at {}", + hex::encode(address) + )) + })?; + + decode_data_map_chunk(&chunk.content) } /// Download and decrypt data from the network using its `DataMap`. @@ -469,6 +493,11 @@ impl Client { } } +fn decode_data_map_chunk(content: &[u8]) -> Result { + rmp_serde::from_slice(content) + .map_err(|e| Error::Serialization(format!("Failed to deserialize DataMap: {e}"))) +} + /// Compile-time assertions that Client method futures are Send. /// /// These methods are called from axum handlers and tokio::spawn contexts diff --git a/ant-core/src/data/client/file.rs b/ant-core/src/data/client/file.rs index 4d190fa..be85e8d 100644 --- a/ant-core/src/data/client/file.rs +++ b/ant-core/src/data/client/file.rs @@ -34,6 +34,7 @@ use self_encryption::{ }; use std::collections::{HashMap, HashSet}; use std::io::Write; +use std::num::NonZeroUsize; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use tokio::runtime::Handle; @@ -2195,6 +2196,26 @@ impl Client { .await } + /// Download and decrypt a file, trying the requested number of + /// closest peers for every chunk fetch. + /// + /// Returns the number of bytes written. + /// + /// # Errors + /// + /// Returns an error if any chunk cannot be retrieved, decryption fails, + /// or the file cannot be written. + #[allow(clippy::unused_async)] + pub async fn file_download_from_closest_peers( + &self, + data_map: &DataMap, + output: &Path, + peer_count: NonZeroUsize, + ) -> Result { + self.file_download_with_progress_using_peer_count(data_map, output, None, peer_count.get()) + .await + } + /// Download and decrypt a file with progress events. /// /// Same as [`Client::file_download`] but sends [`DownloadEvent`]s for UI feedback. @@ -2210,6 +2231,50 @@ impl Client { data_map: &DataMap, output: &Path, progress: Option>, + ) -> Result { + self.file_download_with_progress_using_peer_count( + data_map, + output, + progress, + self.config().close_group_size, + ) + .await + } + + /// Download and decrypt a file with progress events, trying the + /// requested number of closest peers for every chunk fetch. + /// + /// Same as [`Client::file_download_from_closest_peers`] but sends + /// [`DownloadEvent`]s for UI feedback. + /// + /// # Errors + /// + /// Returns an error if any chunk cannot be retrieved, decryption fails, + /// or the file cannot be written. + #[allow(clippy::unused_async)] + pub async fn file_download_with_progress_from_closest_peers( + &self, + data_map: &DataMap, + output: &Path, + progress: Option>, + peer_count: NonZeroUsize, + ) -> Result { + self.file_download_with_progress_using_peer_count( + data_map, + output, + progress, + peer_count.get(), + ) + .await + } + + #[allow(clippy::unused_async)] + async fn file_download_with_progress_using_peer_count( + &self, + data_map: &DataMap, + output: &Path, + progress: Option>, + peer_count: usize, ) -> Result { debug!("Downloading file to {}", output.display()); @@ -2260,7 +2325,7 @@ impl Client { // load-shedding signal for // sustained close-group exhaustion). let chunk = self - .chunk_get_observed(&addr) + .chunk_get_observed_from_closest_peers(&addr, peer_count) .await .map_err(|e| { self_encryption::Error::Generic(format!( @@ -2372,7 +2437,10 @@ impl Client { async move { let addr = hash.0; let addr_hex = hex::encode(addr); - match self.chunk_get_observed(&addr).await { + match self + .chunk_get_observed_from_closest_peers(&addr, peer_count) + .await + { Ok(Some(chunk)) => { let fetched = fetched_ref.fetch_add( 1, @@ -2484,7 +2552,12 @@ impl Client { // next round rather than // aborting; only the final // round's leftovers are fatal. - match self.chunk_get_observed(&addr).await { + match self + .chunk_get_observed_from_closest_peers( + &addr, peer_count, + ) + .await + { Ok(Some(chunk)) => { let fetched = fetched_ref.fetch_add( 1,