Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 128 additions & 12 deletions ant-cli/src/commands/data/file.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::num::NonZeroUsize;
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -68,6 +69,9 @@ pub enum FileAction {
/// written to the current directory).
#[arg(short, long)]
output: Option<PathBuf>,
/// Number of closest peers to try for each chunk fetch.
#[arg(long, alias = "peer-count", value_name = "COUNT")]
peers: Option<NonZeroUsize>,
},
/// Estimate the cost of uploading a file without uploading.
///
Expand Down Expand Up @@ -153,6 +157,7 @@ impl FileAction {
address,
datamap,
output,
peers,
} => {
let resolved_output = resolve_download_output(output, datamap.as_deref())?;
handle_file_download(
Expand All @@ -161,6 +166,7 @@ impl FileAction {
datamap.as_deref(),
resolved_output,
json,
peers,
)
.await
}
Expand Down Expand Up @@ -445,22 +451,34 @@ async fn handle_file_download(
datamap_path: Option<&Path>,
output: PathBuf,
json_output: bool,
peer_count: Option<NonZeroUsize>,
) -> anyhow::Result<()> {
let output_path = output;
let start = Instant::now();

let data_map = if let Some(addr_hex) = address {
info!("Downloading public file from address {addr_hex}");
let address = parse_address(addr_hex)?;
if !json_output {
let spinner = progress::new_spinner("Fetching data map...");
let result = client.data_map_fetch(&parse_address(addr_hex)?).await;
let result = if let Some(peer_count) = peer_count {
client
.data_map_fetch_from_closest_peers(&address, peer_count)
.await
} else {
client.data_map_fetch(&address).await
};
spinner.finish_and_clear();
result.map_err(|e| anyhow::anyhow!("Failed to fetch public DataMap: {e}"))?
} else {
client
.data_map_fetch(&parse_address(addr_hex)?)
.await
.map_err(|e| anyhow::anyhow!("Failed to fetch public DataMap: {e}"))?
if let Some(peer_count) = peer_count {
client
.data_map_fetch_from_closest_peers(&address, peer_count)
.await
} else {
client.data_map_fetch(&address).await
}
.map_err(|e| anyhow::anyhow!("Failed to fetch public DataMap: {e}"))?
}
} else {
let dm_path = datamap_path
Expand All @@ -470,10 +488,15 @@ async fn handle_file_download(
};

if json_output {
client
.file_download(&data_map, &output_path)
.await
.map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
let download_result = if let Some(peer_count) = peer_count {
client
.file_download_from_closest_peers(&data_map, &output_path, peer_count)
.await
} else {
client.file_download(&data_map, &output_path).await
};

download_result.map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
} else {
let (tx, mut rx) = mpsc::channel(64);

Expand Down Expand Up @@ -512,9 +535,20 @@ async fn handle_file_download(
pb.finish_and_clear();
});

let download_result = client
.file_download_with_progress(&data_map, &output_path, Some(tx))
.await;
let download_result = if let Some(peer_count) = peer_count {
client
.file_download_with_progress_from_closest_peers(
&data_map,
&output_path,
Some(tx),
peer_count,
)
.await
} else {
client
.file_download_with_progress(&data_map, &output_path, Some(tx))
.await
};

// Wait for progress bar cleanup (sender dropped → receiver exits)
let _ = progress_handle.await;
Expand Down Expand Up @@ -697,6 +731,21 @@ fn format_cost(storage_cost_atto: &str, gas_cost_wei: u128) -> String {
#[cfg(test)]
mod tests {
use super::*;
use clap::Parser;

#[derive(Debug, Parser)]
struct TestFileCli {
#[command(subcommand)]
action: FileAction,
}

const TEST_ADDRESS_BYTE_LEN: usize = 32;
const PUBLIC_DOWNLOAD_PEERS: usize = 12;
const PRIVATE_DOWNLOAD_PEERS: usize = 9;

fn test_address() -> String {
"00".repeat(TEST_ADDRESS_BYTE_LEN)
}

#[test]
fn resolve_download_output_returns_explicit_output_unchanged() {
Expand Down Expand Up @@ -754,4 +803,71 @@ mod tests {
let err = resolve_download_output(None, Some(datamap.as_path())).unwrap_err();
assert!(err.to_string().contains("Cannot derive"));
}

#[test]
fn download_peers_is_accepted_for_public_download() {
let address = test_address();
let peer_count = PUBLIC_DOWNLOAD_PEERS.to_string();
let cli = TestFileCli::try_parse_from([
"test",
"download",
address.as_str(),
"--peers",
peer_count.as_str(),
"--output",
"out.bin",
])
.expect("--peers must parse for address downloads");

match cli.action {
FileAction::Download { peers, address, .. } => {
assert!(address.is_some());
assert_eq!(peers.map(NonZeroUsize::get), Some(PUBLIC_DOWNLOAD_PEERS));
}
FileAction::Upload { .. } | FileAction::Cost { .. } => {
panic!("expected file download action")
}
}
}

#[test]
fn download_peers_is_accepted_for_private_download() {
let peer_count = PRIVATE_DOWNLOAD_PEERS.to_string();
let cli = TestFileCli::try_parse_from([
"test",
"download",
"--datamap",
"photo.jpg.datamap",
"--peers",
peer_count.as_str(),
])
.expect("--peers must parse for datamap downloads");

match cli.action {
FileAction::Download { peers, datamap, .. } => {
assert!(datamap.is_some());
assert_eq!(peers.map(NonZeroUsize::get), Some(PRIVATE_DOWNLOAD_PEERS));
}
FileAction::Upload { .. } | FileAction::Cost { .. } => {
panic!("expected file download action")
}
}
}

#[test]
fn download_peers_rejects_zero() {
let address = test_address();
let err = TestFileCli::try_parse_from([
"test",
"download",
address.as_str(),
"--peers",
"0",
"--output",
"out.bin",
])
.expect_err("--peers=0 must fail");

assert_eq!(err.kind(), clap::error::ErrorKind::ValueValidation);
}
}
11 changes: 10 additions & 1 deletion ant-core/src/data/client/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,17 @@ impl Client {
/// sustained run of close-group exhaustions correctly drives the
/// cap down rather than silently inflating it.
pub(crate) async fn chunk_get_observed(&self, address: &XorName) -> Result<Option<DataChunk>> {
self.chunk_get_observed_from_closest_peers(address, self.config().close_group_size)
.await
}

pub(crate) async fn chunk_get_observed_from_closest_peers(
&self,
address: &XorName,
peer_count: usize,
) -> Result<Option<DataChunk>> {
let started = Instant::now();
let result = self.chunk_get(address).await;
let result = self.chunk_get_from_closest_peers(address, peer_count).await;
let latency = started.elapsed();
let bytes = result
.as_ref()
Expand Down
33 changes: 31 additions & 2 deletions ant-core/src/data/client/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use ant_protocol::{compute_address, DATA_TYPE_CHUNK};
use bytes::Bytes;
use futures::stream::StreamExt;
use self_encryption::{decrypt, encrypt, DataMap, EncryptedChunk};
use std::num::NonZeroUsize;
use tracing::{debug, info};

/// Result of an in-memory data upload: the `DataMap` needed to retrieve the data.
Expand Down Expand Up @@ -401,8 +402,31 @@ impl Client {
))
})?;

rmp_serde::from_slice(&chunk.content)
.map_err(|e| Error::Serialization(format!("Failed to deserialize DataMap: {e}")))
decode_data_map_chunk(&chunk.content)
}

/// Fetch a `DataMap` from the network by trying the requested number
/// of closest peers for the DataMap chunk.
///
/// # Errors
///
/// Returns an error if the chunk is not found or deserialization fails.
pub async fn data_map_fetch_from_closest_peers(
&self,
address: &[u8; 32],
peer_count: NonZeroUsize,
) -> Result<DataMap> {
let chunk = self
.chunk_get_from_closest_peers(address, peer_count.get())
.await?
.ok_or_else(|| {
Error::InvalidData(format!(
"DataMap chunk not found at {}",
hex::encode(address)
))
})?;

decode_data_map_chunk(&chunk.content)
}

/// Download and decrypt data from the network using its `DataMap`.
Expand Down Expand Up @@ -469,6 +493,11 @@ impl Client {
}
}

fn decode_data_map_chunk(content: &[u8]) -> Result<DataMap> {
rmp_serde::from_slice(content)
.map_err(|e| Error::Serialization(format!("Failed to deserialize DataMap: {e}")))
}

/// Compile-time assertions that Client method futures are Send.
///
/// These methods are called from axum handlers and tokio::spawn contexts
Expand Down
Loading
Loading