Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion ant-cli/src/commands/data/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ async fn handle_file_upload(
stored_count,
failed_count,
total_chunks,
spend,
reason,
..
}) => {
Expand All @@ -242,12 +243,18 @@ async fn handle_file_upload(
total_chunks,
chunks_stored: stored_count,
chunks_failed: failed_count,
storage_cost_atto: spend.storage_cost_atto.clone(),
gas_cost_wei: spend.gas_cost_wei.to_string(),
reason: &reason,
};
println!("{}", serde_json::to_string(&out)?);
}
// The partial upload still spent money on-chain for the chunks it
// paid for; report it so the user knows what the failed attempt cost.
let cost_display = format_cost(&spend.storage_cost_atto, spend.gas_cost_wei);
anyhow::bail!(
"Upload failed: {stored_count}/{total_chunks} stored, {failed_count} failed: {reason}"
"Upload failed: {stored_count}/{total_chunks} stored, {failed_count} failed \
(spent {cost_display}): {reason}"
);
}
Err(e) => anyhow::bail!("File upload failed: {e}"),
Expand Down Expand Up @@ -280,6 +287,8 @@ async fn handle_file_upload(
total_chunks: result.chunks_stored + 1,
chunks_stored: result.chunks_stored,
chunks_failed: 1,
storage_cost_atto: result.storage_cost_atto.clone(),
gas_cost_wei: result.gas_cost_wei.to_string(),
reason: &reason,
};
println!("{}", serde_json::to_string(&out)?);
Expand Down Expand Up @@ -659,6 +668,11 @@ struct UploadFailureJson<'a> {
total_chunks: usize,
chunks_stored: usize,
chunks_failed: usize,
/// Storage cost paid on-chain so far, in atto-tokens. A partial upload
/// still spends money for the chunks it paid for.
storage_cost_atto: String,
/// Gas cost paid on-chain so far, in wei.
gas_cost_wei: String,
reason: &'a str,
}

Expand Down
4 changes: 4 additions & 0 deletions ant-core/src/data/client/adaptive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2865,6 +2865,10 @@ mod tests {
failed: vec![],
failed_count: 0,
total_chunks: 0,
spend: Box::new(crate::data::error::PartialUploadSpend {
storage_cost_atto: "0".to_string(),
gas_cost_wei: 0,
}),
reason: "r".to_string(),
}),
),
Expand Down
81 changes: 54 additions & 27 deletions ant-core/src/data/client/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::data::client::classify_error;
use crate::data::client::file::UploadEvent;
use crate::data::client::payment::peer_id_to_encoded;
use crate::data::client::Client;
use crate::data::error::{Error, Result};
use crate::data::error::{Error, PartialUploadSpend, Result};
use ant_protocol::evm::{
Amount, EncodedPeerId, PayForQuotesError, PaymentQuote, ProofOfPayment, QuoteHash,
RewardsAddress, TxHash,
Expand All @@ -29,6 +29,13 @@ use tracing::{debug, info, warn};
/// Number of chunks per payment wave.
const PAYMENT_WAVE_SIZE: usize = 64;

/// Soft ceiling on the combined body size of chunks stored concurrently in a
/// single wave. Caps store concurrency for large chunks so the send path's
/// per-peer body buffers can't pin multiple GB at once (see V2-461). At ~4 MB
/// chunks this permits ~16 concurrent stores; small chunks hit the chunk-count
/// / adaptive limits instead and are unaffected.
const STORE_INFLIGHT_BYTE_BUDGET: usize = 64 * 1024 * 1024;

/// Chunk quoted but not yet paid. Produced by [`Client::prepare_chunk_payment`].
#[derive(Debug)]
pub struct PreparedChunk {
Expand Down Expand Up @@ -413,35 +420,28 @@ impl Client {
// is decision-pure: we never hand a doomed proof to a storer,
// and the cache is updated under our own lock with no remote
// text involved.
// `cached_cost` carries the cumulative cost from waves paid in
// a previous run so the returned tally reflects total spend on
// this file, not just freshly-paid chunks. Without this the
// user's "this upload cost X" message under-reports by the
// resumed waves' cost.
let (cached_proofs, cached_storage, cached_gas): (HashMap<XorName, Vec<u8>>, Amount, u128) =
match resume_key {
Some(key) => match crate::data::client::cached_single::try_load_for_file(key) {
Some((_, receipt)) => {
let prior_storage = receipt
.storage_cost_atto
.parse::<Amount>()
.unwrap_or(Amount::ZERO);
let prior_gas = receipt.gas_cost_wei;
let kept = prune_locally_expired_proofs(key, receipt.proofs);
(kept, prior_storage, prior_gas)
}
None => (HashMap::new(), Amount::ZERO, 0u128),
},
None => (HashMap::new(), Amount::ZERO, 0u128),
};
// Load only the cached PROOFS (for reuse). The cost this function
// returns is a per-call DELTA — what was freshly paid in THIS call —
// not the cache's cumulative. The single-node wave driver
// (`upload_spill_addresses_single`) calls this once per wave and SUMS
// the per-call costs, so seeding the return with the cumulative cache
// (which grows as each wave appends to it) double-counts:
// A + (A+B) + (A+B+C) instead of A+B+C.
let cached_proofs: HashMap<XorName, Vec<u8>> = match resume_key {
Some(key) => match crate::data::client::cached_single::try_load_for_file(key) {
Some((_, receipt)) => prune_locally_expired_proofs(key, receipt.proofs),
None => HashMap::new(),
},
None => HashMap::new(),
};

let mut all_addresses = Vec::with_capacity(total_chunks);
let mut seen_addresses: HashSet<XorName> = HashSet::new();

// Accumulate costs across waves, seeded with cumulative from
// any cached receipt loaded above.
let mut total_storage = cached_storage;
let mut total_gas: u128 = cached_gas;
// Accumulate only THIS call's freshly-paid cost (per-call delta; see
// the proof-load comment above for why this must not include the cache).
let mut total_storage = Amount::ZERO;
let mut total_gas: u128 = 0;
let mut agg_stats = WaveAggregateStats::default();

// Deduplicate chunks by content address.
Expand Down Expand Up @@ -520,6 +520,10 @@ impl Client {
failed: wave_result.failed,
failed_count,
total_chunks: file_total,
spend: Box::new(PartialUploadSpend {
storage_cost_atto: total_storage.to_string(),
gas_cost_wei: total_gas,
}),
reason: "wave store failed after retries".into(),
});
}
Expand Down Expand Up @@ -618,6 +622,10 @@ impl Client {
failed: wave_result.failed,
failed_count,
total_chunks: file_total,
spend: Box::new(PartialUploadSpend {
storage_cost_atto: total_storage.to_string(),
gas_cost_wei: total_gas,
}),
reason: "final wave store failed after retries".into(),
});
}
Expand Down Expand Up @@ -735,6 +743,22 @@ impl Client {
first_seen.entry(chunk.address).or_insert_with(Instant::now);
}

// Bound concurrency by IN-FLIGHT BYTES, not just chunk count. Each
// concurrently-stored chunk is held in memory while it is sent to its
// close group, and the send path re-serializes the body once per peer,
// so a wave of large (~4 MB) chunks at full store concurrency can pin
// multiple GB and OOM a small host. Cap how many chunks store at once
// so their combined body size stays under the budget; small chunks are
// unaffected (the byte bound exceeds the chunk-count bound). The budget
// is deliberately conservative for the current per-peer send
// amplification and can be raised once that is reduced upstream.
let max_chunk_bytes = to_retry.iter().map(|c| c.content.len()).max().unwrap_or(0);
// `checked_div` yields `None` only when `max_chunk_bytes == 0` (an
// empty/zero-length wave), in which case there is no byte limit.
let byte_bound = STORE_INFLIGHT_BYTE_BUDGET
.checked_div(max_chunk_bytes)
.map_or(usize::MAX, |n| n.max(1));

let mut chunk_attempts_total: usize = 0;
let mut store_durations_ms: Vec<u64> = Vec::new();
let mut retries_per_chunk: Vec<u32> = Vec::new();
Expand All @@ -753,7 +777,10 @@ impl Client {
chunk_attempts_total = chunk_attempts_total.saturating_add(to_retry.len());

let store_limiter = self.controller().store.clone();
let store_concurrency = store_limiter.current().min(to_retry.len().max(1));
let store_concurrency = store_limiter
.current()
.min(to_retry.len().max(1))
.min(byte_bound);
let mut upload_stream = stream::iter(to_retry)
.map(|chunk| {
let chunk_clone = chunk.clone();
Expand Down
Loading
Loading