diff --git a/ant-cli/src/commands/data/file.rs b/ant-cli/src/commands/data/file.rs index a62409c..d4e1885 100644 --- a/ant-cli/src/commands/data/file.rs +++ b/ant-cli/src/commands/data/file.rs @@ -233,6 +233,7 @@ async fn handle_file_upload( stored_count, failed_count, total_chunks, + spend, reason, .. }) => { @@ -242,12 +243,18 @@ async fn handle_file_upload( total_chunks, chunks_stored: stored_count, chunks_failed: failed_count, + storage_cost_atto: spend.storage_cost_atto.clone(), + gas_cost_wei: spend.gas_cost_wei.to_string(), reason: &reason, }; println!("{}", serde_json::to_string(&out)?); } + // The partial upload still spent money on-chain for the chunks it + // paid for; report it so the user knows what the failed attempt cost. + let cost_display = format_cost(&spend.storage_cost_atto, spend.gas_cost_wei); anyhow::bail!( - "Upload failed: {stored_count}/{total_chunks} stored, {failed_count} failed: {reason}" + "Upload failed: {stored_count}/{total_chunks} stored, {failed_count} failed \ + (spent {cost_display}): {reason}" ); } Err(e) => anyhow::bail!("File upload failed: {e}"), @@ -280,6 +287,8 @@ async fn handle_file_upload( total_chunks: result.chunks_stored + 1, chunks_stored: result.chunks_stored, chunks_failed: 1, + storage_cost_atto: result.storage_cost_atto.clone(), + gas_cost_wei: result.gas_cost_wei.to_string(), reason: &reason, }; println!("{}", serde_json::to_string(&out)?); @@ -659,6 +668,11 @@ struct UploadFailureJson<'a> { total_chunks: usize, chunks_stored: usize, chunks_failed: usize, + /// Storage cost paid on-chain so far, in atto-tokens. A partial upload + /// still spends money for the chunks it paid for. + storage_cost_atto: String, + /// Gas cost paid on-chain so far, in wei. + gas_cost_wei: String, reason: &'a str, } diff --git a/ant-core/src/data/client/adaptive.rs b/ant-core/src/data/client/adaptive.rs index d7c24a8..f72af0a 100644 --- a/ant-core/src/data/client/adaptive.rs +++ b/ant-core/src/data/client/adaptive.rs @@ -2865,6 +2865,10 @@ mod tests { failed: vec![], failed_count: 0, total_chunks: 0, + spend: Box::new(crate::data::error::PartialUploadSpend { + storage_cost_atto: "0".to_string(), + gas_cost_wei: 0, + }), reason: "r".to_string(), }), ), diff --git a/ant-core/src/data/client/batch.rs b/ant-core/src/data/client/batch.rs index 6221f25..09c3b4b 100644 --- a/ant-core/src/data/client/batch.rs +++ b/ant-core/src/data/client/batch.rs @@ -9,7 +9,7 @@ use crate::data::client::classify_error; use crate::data::client::file::UploadEvent; use crate::data::client::payment::peer_id_to_encoded; use crate::data::client::Client; -use crate::data::error::{Error, Result}; +use crate::data::error::{Error, PartialUploadSpend, Result}; use ant_protocol::evm::{ Amount, EncodedPeerId, PayForQuotesError, PaymentQuote, ProofOfPayment, QuoteHash, RewardsAddress, TxHash, @@ -29,6 +29,13 @@ use tracing::{debug, info, warn}; /// Number of chunks per payment wave. const PAYMENT_WAVE_SIZE: usize = 64; +/// Soft ceiling on the combined body size of chunks stored concurrently in a +/// single wave. Caps store concurrency for large chunks so the send path's +/// per-peer body buffers can't pin multiple GB at once (see V2-461). At ~4 MB +/// chunks this permits ~16 concurrent stores; small chunks hit the chunk-count +/// / adaptive limits instead and are unaffected. +const STORE_INFLIGHT_BYTE_BUDGET: usize = 64 * 1024 * 1024; + /// Chunk quoted but not yet paid. Produced by [`Client::prepare_chunk_payment`]. #[derive(Debug)] pub struct PreparedChunk { @@ -413,35 +420,28 @@ impl Client { // is decision-pure: we never hand a doomed proof to a storer, // and the cache is updated under our own lock with no remote // text involved. - // `cached_cost` carries the cumulative cost from waves paid in - // a previous run so the returned tally reflects total spend on - // this file, not just freshly-paid chunks. Without this the - // user's "this upload cost X" message under-reports by the - // resumed waves' cost. - let (cached_proofs, cached_storage, cached_gas): (HashMap>, Amount, u128) = - match resume_key { - Some(key) => match crate::data::client::cached_single::try_load_for_file(key) { - Some((_, receipt)) => { - let prior_storage = receipt - .storage_cost_atto - .parse::() - .unwrap_or(Amount::ZERO); - let prior_gas = receipt.gas_cost_wei; - let kept = prune_locally_expired_proofs(key, receipt.proofs); - (kept, prior_storage, prior_gas) - } - None => (HashMap::new(), Amount::ZERO, 0u128), - }, - None => (HashMap::new(), Amount::ZERO, 0u128), - }; + // Load only the cached PROOFS (for reuse). The cost this function + // returns is a per-call DELTA — what was freshly paid in THIS call — + // not the cache's cumulative. The single-node wave driver + // (`upload_spill_addresses_single`) calls this once per wave and SUMS + // the per-call costs, so seeding the return with the cumulative cache + // (which grows as each wave appends to it) double-counts: + // A + (A+B) + (A+B+C) instead of A+B+C. + let cached_proofs: HashMap> = match resume_key { + Some(key) => match crate::data::client::cached_single::try_load_for_file(key) { + Some((_, receipt)) => prune_locally_expired_proofs(key, receipt.proofs), + None => HashMap::new(), + }, + None => HashMap::new(), + }; let mut all_addresses = Vec::with_capacity(total_chunks); let mut seen_addresses: HashSet = HashSet::new(); - // Accumulate costs across waves, seeded with cumulative from - // any cached receipt loaded above. - let mut total_storage = cached_storage; - let mut total_gas: u128 = cached_gas; + // Accumulate only THIS call's freshly-paid cost (per-call delta; see + // the proof-load comment above for why this must not include the cache). + let mut total_storage = Amount::ZERO; + let mut total_gas: u128 = 0; let mut agg_stats = WaveAggregateStats::default(); // Deduplicate chunks by content address. @@ -520,6 +520,10 @@ impl Client { failed: wave_result.failed, failed_count, total_chunks: file_total, + spend: Box::new(PartialUploadSpend { + storage_cost_atto: total_storage.to_string(), + gas_cost_wei: total_gas, + }), reason: "wave store failed after retries".into(), }); } @@ -618,6 +622,10 @@ impl Client { failed: wave_result.failed, failed_count, total_chunks: file_total, + spend: Box::new(PartialUploadSpend { + storage_cost_atto: total_storage.to_string(), + gas_cost_wei: total_gas, + }), reason: "final wave store failed after retries".into(), }); } @@ -735,6 +743,22 @@ impl Client { first_seen.entry(chunk.address).or_insert_with(Instant::now); } + // Bound concurrency by IN-FLIGHT BYTES, not just chunk count. Each + // concurrently-stored chunk is held in memory while it is sent to its + // close group, and the send path re-serializes the body once per peer, + // so a wave of large (~4 MB) chunks at full store concurrency can pin + // multiple GB and OOM a small host. Cap how many chunks store at once + // so their combined body size stays under the budget; small chunks are + // unaffected (the byte bound exceeds the chunk-count bound). The budget + // is deliberately conservative for the current per-peer send + // amplification and can be raised once that is reduced upstream. + let max_chunk_bytes = to_retry.iter().map(|c| c.content.len()).max().unwrap_or(0); + // `checked_div` yields `None` only when `max_chunk_bytes == 0` (an + // empty/zero-length wave), in which case there is no byte limit. + let byte_bound = STORE_INFLIGHT_BYTE_BUDGET + .checked_div(max_chunk_bytes) + .map_or(usize::MAX, |n| n.max(1)); + let mut chunk_attempts_total: usize = 0; let mut store_durations_ms: Vec = Vec::new(); let mut retries_per_chunk: Vec = Vec::new(); @@ -753,7 +777,10 @@ impl Client { chunk_attempts_total = chunk_attempts_total.saturating_add(to_retry.len()); let store_limiter = self.controller().store.clone(); - let store_concurrency = store_limiter.current().min(to_retry.len().max(1)); + let store_concurrency = store_limiter + .current() + .min(to_retry.len().max(1)) + .min(byte_bound); let mut upload_stream = stream::iter(to_retry) .map(|chunk| { let chunk_clone = chunk.clone(); diff --git a/ant-core/src/data/client/file.rs b/ant-core/src/data/client/file.rs index 4c6fdcb..f5d205c 100644 --- a/ant-core/src/data/client/file.rs +++ b/ant-core/src/data/client/file.rs @@ -21,7 +21,7 @@ use crate::data::client::merkle::{ PreparedMerkleBatch, DEFERRED_ROUND_DELAYS_SECS, }; use crate::data::client::Client; -use crate::data::error::{Error, Result}; +use crate::data::error::{Error, PartialUploadSpend, Result}; use ant_protocol::evm::{Amount, PaymentQuote, QuoteHash, TxHash, MAX_LEAVES}; use ant_protocol::transport::{MultiAddr, PeerId}; use ant_protocol::{compute_address, DATA_TYPE_CHUNK}; @@ -464,6 +464,7 @@ fn partial_upload_after_fatal( stored_count: usize, total_chunks: usize, known_failed: Vec<([u8; 32], String)>, + spend: PartialUploadSpend, reason: String, ) -> Error { let stored_set: HashSet<[u8; 32]> = stored_addresses.iter().copied().collect(); @@ -486,10 +487,65 @@ fn partial_upload_after_fatal( failed, failed_count, total_chunks, + spend: Box::new(spend), reason, } } +/// One wave's contribution to a single-node upload, distilled from its +/// `batch_upload_chunks_with_events` result. +#[derive(Debug)] +struct SingleWaveOutcome { + /// Addresses confirmed stored in this wave. + stored: Vec<[u8; 32]>, + /// Chunks that failed after retries in this wave. + failed: Vec<([u8; 32], String)>, + /// Storage cost paid on-chain for this wave, in atto-tokens. + storage_atto: Amount, + /// Gas paid on-chain for this wave, in wei. + gas_wei: u128, + /// Per-wave store/retry statistics. Empty for a quorum-short wave, whose + /// `PartialUpload` carries no stats. + stats: WaveAggregateStats, +} + +/// Fold one wave's batch-upload result for the single-node path. +/// +/// A `PartialUpload` (chunks short of quorum after retries) is **recoverable**: +/// its stored/failed chunks and on-chain spend are returned so the caller +/// records them and continues to the next wave, making the file make maximum +/// progress exactly like `upload_waves_merkle`. Every other error is **fatal** +/// (wallet/payment-infrastructure failures, missing proofs, spill reads) and is +/// returned via `Err` to abort the file. Because `UPLOAD_WAVE_SIZE == +/// PAYMENT_WAVE_SIZE`, each batch call is exactly one payment wave, so folding a +/// `PartialUpload` leaves nothing un-attempted within the wave. +fn fold_single_wave( + result: Result<(Vec<[u8; 32]>, String, u128, WaveAggregateStats)>, +) -> Result { + match result { + Ok((stored, storage, gas, stats)) => Ok(SingleWaveOutcome { + stored, + failed: Vec::new(), + storage_atto: storage.parse().unwrap_or(Amount::ZERO), + gas_wei: gas, + stats, + }), + Err(Error::PartialUpload { + stored, + failed, + spend, + .. + }) => Ok(SingleWaveOutcome { + stored, + failed, + storage_atto: spend.storage_cost_atto.parse().unwrap_or(Amount::ZERO), + gas_wei: spend.gas_cost_wei, + stats: WaveAggregateStats::default(), + }), + Err(e) => Err(e), + } +} + /// Check that the spill directory has enough free space for the spilled chunks. /// /// `file_size` is the source file's byte count. We require @@ -1380,7 +1436,7 @@ impl Client { match prepared.payment_info { ExternalPaymentInfo::WaveBatch { prepared_chunks, - payment_intent: _, + payment_intent, } => { let paid_chunks = finalize_batch_payment(prepared_chunks, tx_hash_map)?; let wave_result = self @@ -1402,6 +1458,13 @@ impl Client { failed: wave_result.failed, failed_count, total_chunks, + // Report the storage spend known from the payment intent + // the external signer was handed. Gas is paid by the + // signer out-of-band, so it stays unknown (0). + spend: Box::new(PartialUploadSpend { + storage_cost_atto: payment_intent.total_amount.to_string(), + gas_cost_wei: 0, + }), reason: "finalize_upload: chunk storage failed after retries".into(), }); } @@ -1418,7 +1481,9 @@ impl Client { chunks_failed: 0, total_chunks, payment_mode_used: PaymentMode::Single, - storage_cost_atto: "0".into(), + // Storage spend is known from the payment intent; gas is + // paid by the external signer out-of-band (unknown here). + storage_cost_atto: payment_intent.total_amount.to_string(), gas_cost_wei: 0, data_map_address, chunk_attempts_total: stats.chunk_attempts_total, @@ -1721,7 +1786,7 @@ impl Client { &spill, &merkle_plan.to_upload, progress.as_ref(), - merkle_plan.already_stored.len(), + &merkle_plan.already_stored, chunk_count, Some(&file_path_key), ) @@ -1783,7 +1848,7 @@ impl Client { &spill, &merkle_plan.to_upload, progress.as_ref(), - merkle_plan.already_stored.len(), + &merkle_plan.already_stored, chunk_count, Some(&file_path_key), ) @@ -1909,7 +1974,7 @@ impl Client { spill, &spill.addresses, progress, - 0, + &[], spill.len(), resume_key, ) @@ -1921,17 +1986,38 @@ impl Client { spill: &ChunkSpill, addresses: &[[u8; 32]], progress: Option<&mpsc::Sender>, - stored_offset: usize, + already_stored_addresses: &[[u8; 32]], total_chunks: usize, resume_key: Option<&str>, ) -> Result<(usize, String, u128, WaveAggregateStats)> { - let mut total_stored = stored_offset; + let mut total_stored = already_stored_addresses.len(); let mut total_storage = Amount::ZERO; let mut total_gas: u128 = 0; let mut agg_stats = WaveAggregateStats::default(); + // A wave whose chunks fall short of quorum after retries must not abort + // the file: its failures are accumulated here and surfaced as a single + // `PartialUpload` only after every wave has been attempted, mirroring + // `upload_waves_merkle`. Aborting on the first failed wave (the old `?`) + // discarded all later waves' progress — already self-encrypted, spilled, + // and in some cases already paid for — converting high per-chunk success + // into 0% per-file success. + // Seed with the addresses a preflight already confirmed stored (e.g. + // the merkle-fallback path passes `merkle_plan.already_stored`), so a + // returned `PartialUpload.stored` lists every stored chunk and + // `stored_count == stored.len()` holds for programmatic callers. + let mut stored_addresses: Vec<[u8; 32]> = already_stored_addresses.to_vec(); + let mut failed: Vec<([u8; 32], String)> = Vec::new(); let waves: Vec<&[[u8; 32]]> = addresses.chunks(UPLOAD_WAVE_SIZE).collect(); let wave_count = waves.len(); + // Unconditional breadcrumb: lets a clean run confirm the continue-on- + // partial single-node path is in effect (the old path aborted the file + // on the first failed wave instead of continuing across all waves). + info!( + "single-node upload: {} chunk(s) in {wave_count} wave(s) (continue-on-partial)", + addresses.len() + ); + for (wave_idx, wave_addrs) in waves.into_iter().enumerate() { let wave_num = wave_idx + 1; let wave_data: Vec = wave_addrs @@ -1952,35 +2038,50 @@ impl Client { }) .await; } - let (addresses, wave_storage, wave_gas, wave_stats) = self - .batch_upload_chunks_with_events( + // Fold this wave's result. A quorum shortfall (`PartialUpload`) is + // recoverable and its parts are returned to be recorded here; + // genuinely fatal errors propagate via `?` and abort the file, as in + // `upload_waves_merkle`. + let outcome = fold_single_wave( + self.batch_upload_chunks_with_events( wave_data, progress, total_stored, total_chunks, resume_key, ) - .await?; - total_stored += addresses.len(); - if let Ok(cost) = wave_storage.parse::() { - total_storage += cost; + .await, + )?; + + if !outcome.failed.is_empty() { + warn!( + "Wave {wave_num}/{wave_count}: {} chunk(s) failed to store after retries; \ + continuing with remaining waves", + outcome.failed.len() + ); } - total_gas = total_gas.saturating_add(wave_gas); - // Merge per-call stats (each call already aggregates across the - // waves it ran internally, so a simple sum/extend is correct). + + total_stored += outcome.stored.len(); + stored_addresses.extend(outcome.stored); + failed.extend(outcome.failed); + total_storage += outcome.storage_atto; + total_gas = total_gas.saturating_add(outcome.gas_wei); + // Merge per-wave stats (a quorum-short wave contributes none, since + // `PartialUpload` carries no stats). agg_stats.chunk_attempts_total = agg_stats .chunk_attempts_total - .saturating_add(wave_stats.chunk_attempts_total); + .saturating_add(outcome.stats.chunk_attempts_total); agg_stats .store_durations_ms - .extend(wave_stats.store_durations_ms); + .extend(outcome.stats.store_durations_ms); for (slot, count) in agg_stats .retries_histogram .iter_mut() - .zip(wave_stats.retries_histogram.iter()) + .zip(outcome.stats.retries_histogram.iter()) { *slot = slot.saturating_add(*count); } + if let Some(tx) = progress { let _ = tx .send(UploadEvent::WaveComplete { @@ -1993,6 +2094,28 @@ impl Client { } } + // Any chunk still failed after every wave was attempted means the file + // is not fully stored — surface it as `PartialUpload` (never silently + // succeed with missing chunks), carrying the real on-chain spend. + if !failed.is_empty() { + let failed_count = failed.len(); + warn!( + "single-node upload incomplete: {failed_count}/{total_chunks} chunks failed after retries" + ); + return Err(Error::PartialUpload { + stored: stored_addresses, + stored_count: total_stored, + failed, + failed_count, + total_chunks, + spend: Box::new(PartialUploadSpend { + storage_cost_atto: total_storage.to_string(), + gas_cost_wei: total_gas, + }), + reason: format!("{failed_count} chunk(s) failed to store after retries"), + }); + } + Ok(( total_stored, total_storage.to_string(), @@ -2175,6 +2298,10 @@ impl Client { total_stored, total_chunks, known_failed, + PartialUploadSpend { + storage_cost_atto: batch_result.storage_cost_atto.clone(), + gas_cost_wei: batch_result.gas_cost_wei, + }, format!("merkle chunk store aborted: {e}"), )); } @@ -2260,6 +2387,10 @@ impl Client { total_stored, total_chunks, known_failed, + PartialUploadSpend { + storage_cost_atto: batch_result.storage_cost_atto.clone(), + gas_cost_wei: batch_result.gas_cost_wei, + }, format!("merkle chunk store aborted: {reason}"), )); } @@ -2282,6 +2413,10 @@ impl Client { failed, failed_count, total_chunks, + spend: Box::new(PartialUploadSpend { + storage_cost_atto: batch_result.storage_cost_atto.clone(), + gas_cost_wei: batch_result.gas_cost_wei, + }), reason: format!( "{failed_count} chunk(s) short of quorum after {total_attempts} attempts" ), @@ -2911,6 +3046,68 @@ mod tests { assert_eq!(missing, vec![unpaid_b, unpaid_d]); } + /// A wave that returns `Ok` contributes its stored chunks, parsed cost, and + /// stats; nothing is recorded as failed. + #[test] + fn fold_single_wave_keeps_ok_wave() { + let stored = vec![[1u8; 32], [2u8; 32]]; + let stats = WaveAggregateStats { + chunk_attempts_total: 7, + ..Default::default() + }; + + let outcome = fold_single_wave(Ok((stored.clone(), "100".to_string(), 9, stats))).unwrap(); + + assert_eq!(outcome.stored, stored); + assert!(outcome.failed.is_empty()); + assert_eq!(outcome.storage_atto.to_string(), "100"); + assert_eq!(outcome.gas_wei, 9); + assert_eq!(outcome.stats.chunk_attempts_total, 7); + } + + /// The core V2-461 semantic: a wave short of quorum (`PartialUpload`) is + /// recoverable — its stored chunks, failed chunks, and on-chain spend are + /// folded so the caller can continue to the next wave rather than aborting + /// the whole file. + #[test] + fn fold_single_wave_folds_partial_upload() { + let stored = vec![[3u8; 32]]; + let failed = vec![([4u8; 32], "short of quorum".to_string())]; + let err = Error::PartialUpload { + stored: stored.clone(), + stored_count: 1, + failed: failed.clone(), + failed_count: 1, + total_chunks: 2, + spend: Box::new(PartialUploadSpend { + storage_cost_atto: "250".to_string(), + gas_cost_wei: 11, + }), + reason: "wave store failed after retries".to_string(), + }; + + let outcome = fold_single_wave(Err(err)).unwrap(); + + assert_eq!(outcome.stored, stored); + assert_eq!(outcome.failed, failed); + assert_eq!(outcome.storage_atto.to_string(), "250"); + assert_eq!(outcome.gas_wei, 11); + // `PartialUpload` carries no stats, so the failed wave contributes none. + assert_eq!(outcome.stats.chunk_attempts_total, 0); + } + + /// A non-`PartialUpload` error (wallet/payment-infrastructure failure) is + /// fatal and must abort the file, not be folded into the failed set. + #[test] + fn fold_single_wave_propagates_fatal_error() { + let result = fold_single_wave(Err(Error::Payment("wallet unavailable".to_string()))); + + assert!( + matches!(result, Err(Error::Payment(_))), + "fatal payment error must propagate, got: {result:?}" + ); + } + #[test] fn partition_addresses_by_proof_handles_all_or_nothing() { let a = [5u8; 32]; diff --git a/ant-core/src/data/client/mod.rs b/ant-core/src/data/client/mod.rs index f9ab0ed..b0deb7d 100644 --- a/ant-core/src/data/client/mod.rs +++ b/ant-core/src/data/client/mod.rs @@ -631,6 +631,10 @@ mod tests { failed: vec![], failed_count: 0, total_chunks: 0, + spend: Box::new(crate::data::error::PartialUploadSpend { + storage_cost_atto: "0".to_string(), + gas_cost_wei: 0, + }), reason: "r".to_string(), }, Outcome::NetworkError, diff --git a/ant-core/src/data/error.rs b/ant-core/src/data/error.rs index de49b5d..0b2adf1 100644 --- a/ant-core/src/data/error.rs +++ b/ant-core/src/data/error.rs @@ -124,11 +124,29 @@ pub enum Error { failed_count: usize, /// Total number of chunks the upload was attempting to store. total_chunks: usize, + /// On-chain spend incurred so far. Boxed to keep the `Error` enum small + /// (the variant is returned in `Result` across the crate; without the + /// box the two cost fields would trip `clippy::result_large_err`). + spend: Box, /// Root cause description. reason: String, }, } +/// On-chain spend recorded on a [`Error::PartialUpload`]. +/// +/// A partial upload still spends money for the chunks it paid for. In the +/// single-node path payment precedes store, so this includes a failed wave's +/// chunks; surfacing it lets the caller report real spend rather than silently +/// dropping it. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PartialUploadSpend { + /// Storage cost paid on-chain so far, in atto-tokens. + pub storage_cost_atto: String, + /// Gas cost paid on-chain so far, in wei. + pub gas_cost_wei: u128, +} + // ant-node is only linked when the `devnet` feature is on, so the // blanket `From` impl follows that gate. LocalDevnet maps node errors // to `Error::Network` via this conversion; default builds never see it.