Skip to content

feat(data): stream decrypted file download to a channel sink#111

Merged
Nic-dorman merged 4 commits into
rc-2026.6.2from
nic/file-download-to-sender-streaming
Jun 11, 2026
Merged

feat(data): stream decrypted file download to a channel sink#111
Nic-dorman merged 4 commits into
rc-2026.6.2from
nic/file-download-to-sender-streaming

Conversation

@Nic-dorman

@Nic-dorman Nic-dorman commented Jun 8, 2026

Copy link
Copy Markdown
Contributor

@

What

Adds Client::file_download_to_sender, which downloads + decrypts a file and streams the plaintext to a caller-provided mpsc::Sender<Result<Bytes, Error>> instead of writing it to disk.

Today the only constant-memory download path, file_download[_with_progress], sinks decrypted plaintext to a &Path (and only DownloadEvent progress comes back over a channel). data_download -> Bytes also decrypts through streaming_decrypt internally, but then concatenates the whole result into one in-memory Bytes. So the actual gap isnt "streaming decrypt" — its that the only constant-memory path was shaped around a single &Path sink. A consumer that wants to forward a download to an HTTP chunked body or a gRPC response stream therefore had to either buffer the whole file in memory or stage it on disk first. This adds the missing byte-sink variant.

How

The existing batched-fetch + streaming-decrypt loop is extracted out of file_download_with_progress into a private, sink-parameterized core:

async fn download_decrypted_chunks<F, Fut>(&self, data_map, progress, mut on_chunk: F) -> Result<u64>
where F: FnMut(Bytes) -> Fut, Fut: Future<Output = Result<()>>
  • file_download_with_progress becomes a thin wrapper whose on_chunk does the temp-file write; a TempDownload RAII guard owns the staging file and removes it on any error path or panic, then commit() renames atomically — behavior unchanged for callers.
  • file_download_to_sender is a thin wrapper whose on_chunk does sink.send(Ok(bytes)).await.

No duplication of the ~340-line batched-fetch / deferred-retry logic.

Properties

  • Constant memory — one decrypt batch resident at a time, same as file_download.
  • Progressive — the caller receives bytes as each batch decrypts (low time-to-first-byte), not after the whole file lands.
  • Backpressure — the bounded sink naturally throttles the producer.
  • Disconnect-safe — a dropped receiver ends the download early with Error::Cancelled (a deliberate caller-cancellation signal, not a transport Error::Network; classify_error routes it to ApplicationError so it is never retried as a network fault).
  • No new bound on Client — stays &self; the caller tokio::spawns the call and owns the matching Receiver. (Requires a multi-threaded Tokio runtime, like file_download, because the decrypt iterator uses block_in_place.)

Testing

  • cargo check -p ant-core, cargo clippy --all-targets, cargo fmt --check — clean.
  • New e2e test_file_download_to_sender_multibatch_round_trip (ant-core/tests/e2e_file.rs): uploads a ~1 MiB multi-batch file to a live MiniTestnet, downloads it via file_download_to_sender, asserts each streamed chunk is non-empty and that >=2 segments arrive (proving it streams rather than buffering), reassembles the stream, and asserts it equals the source. Passes (~5 min against the in-process testnet).

🤖 Generated with Claude Code
@

@dirvine dirvine left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Nic — good shape overall. The extraction into a private sink-parameterised core is the right call: no duplication of the ~340-line batched-fetch / deferred-retry logic, the disk path keeps its current atomic-temp-file semantics, and the new streaming variant is a thin layer on top. CI is green (format, clippy, unit + e2e on linux + macos, security audit). I've requested changes for the items below, none of which are blockers — the design is sound and I'd be happy to approve after these are addressed.

Substantive

1. Temp file no longer cleaned up on download_decrypted_chunks failure in the disk path.

Before, the closure-based write_result was matched in a single arm and the temp file was removed on both Err(e) and the rename_err branches — every error path that left a partial .ant_download_*.tmp behind cleaned it up. After the refactor, the download_decrypted_chunks(...).await call can return an error (e.g. Error::Network from the sink, or a chunk_result.map_err decryption failure) after the closure has already written some bytes to the temp file. That ? returns out of the let write_result = self.download_decrypted_chunks(...) expression and skips both the match write_result cleanup arms. Net effect: any failure mid-stream leaves a stale .ant_download_<pid>_<rand>.tmp on disk — exactly what the existing warn!("Failed to remove temp download file ...") block was there to prevent.

The minimal fix is to move the temp-file cleanup out of the match write_result block and into a Drop/scope-guard, or just convert the early-return path to feed back through the same cleanup arms. I'd lean toward a small TempDownload { path: PathBuf } that removes the file in Drop unless commit() is called — that also makes the rename-failure path self-evident.

2. Cloning the sink per chunk is unnecessary work.

|bytes| {
    let sink = sink.clone();
    async move { sink.send(Ok(bytes)).await... }
}

The mpsc::Sender is already Clone and cheap, but the capture-by-move pattern is fine; what isn't fine is the implied per-chunk allocation of the async state machine for what is effectively a single await on a non-blocking send. For a multi-batch file this is a measurable amount of allocation traffic. Two options:

  • Push the async-ness down: have the core take a Sink<Result<Bytes>>-style Pin<Box<dyn ...>>, or return the chunks via a Stream that the caller drains, so the hot loop is just poll_ready / start_send. This is a bigger change.
  • Or, if you want to keep the closure shape: at minimum, drop the clone() and capture &sink — but that requires the closure to be reborrowed each call, which the FnMut bound allows.

Honestly the cleanest answer is to have download_decrypted_chunks take a Stream/async iterator of the chunks as an output, and let the caller (disk path, sender path) consume it. But the current shape is correct, so this is a "consider" not a "must".

3. Error::Network for a dropped receiver is a misclassification.

The "receiver dropped" case is a peer/state-machine error, not a network transport error. Two ways to look at it:

  • It's a caller-side bug (they dropped the receiver), so a new Error::OperationCancelled (or whatever the equivalent variant is in this crate — worth checking the Error enum) is more honest.
  • Or, if there's no such variant, the existing pattern in the codebase (search for mpsc::error::SendError handling) probably indicates the established way to signal this.

At minimum, please make the variant deliberate rather than picking the nearest neighbour.

4. Item type on the channel is Result<Bytes, Error>, not Result<Bytes, thiserror::Error>.

Make sure the public doc-comment example shows the full type as the caller would write it — at the moment the doc-comment says "the bounded sink applies backpressure" but doesn't tell the reader the exact channel type. A 1-line snippet:

let (tx, rx) = tokio::sync::mpsc::channel::<Result<Bytes, Error>>(8);

saves a trip to the source.

Minor / nits

5. The PR description says "no way to get the decrypted bytes back as a stream, so a consumer… has to either buffer the whole file in memory (data_download -> Bytes) or stage it on disk first." This is true and a good summary, but data_download does in fact also go through streaming_decrypt internally and then concatenates into Bytes — worth one line in the PR description acknowledging that the constant-memory path was a single-arg-sink shaped one (&Path), which is the actual gap being filled. Helps reviewers see the symmetry.

6. The new e2e test is a good shape. Two small things:

  • The test would pass if the sender forgot to call send and just dropped the tx after writing to disk — a recv().await returning None is asserted, but that signal comes from dropping tx on the producer side regardless of whether chunks were sent. Consider adding a per-chunk assert!(chunk.len() > 0) or a chunk-count lower bound so a buggy "send one then drop" passes through clearly. Easy to miss without that.
  • The test name test_file_download_to_sender_streaming is fine, but consider test_file_download_to_sender_multibatch_round_trip — describes the property under test, not the API.

7. #[allow(clippy::unused_async)] is gone from file_download_with_progress. Good — the new signature is genuinely async. Worth checking the diff for any other #[allow(clippy::unused_async)] that could now be removed on Client::file_download too, since it still forwards. (Looks like it's still there — probably fine, since the public method stays async, but worth a once-over.)

8. The DownloadEvent progress channel is forwarded through the new core unchanged — confirm that the file_download_with_progress caller's existing behaviour (progress events emitted during the DataMap-resolution phase) is preserved. The doc-comment is updated to mention progress; a quick read of download_decrypted_chunks suggests yes it's preserved (Phase 1 is in the core, Phase 2 uses the same stream.fetch_chunks machinery). Looks good.

Non-blocking questions for the author

  • Is there a use case for a Sink<Result<Bytes>>-shaped trait so callers can plug in hyper::Body / axum::body::Body / tonic Streaming<Bytes> directly without going through an mpsc? That would be a follow-up PR.
  • Is the multi-threaded-runtime requirement (block_in_place in the decrypt iterator) something the team wants to document at the Client level, or just on a per-method basis? Currently it's per-method, which is fine but easy to miss.

TL;DR

Approve-concept. Please address #1 (the temp-file leak on early return) before merge; the rest are quality improvements that I'd be happy to see in this PR or as follow-ups. The CI matrix is solid and the e2e coverage is appropriate for a single feature-flag-style addition.

Nic-dorman added a commit that referenced this pull request Jun 9, 2026
- TempDownload RAII guard: removes the staging file on every disk-path
  error AND on a panic unwind out of the block_in_place decrypt loop,
  replacing three duplicated cleanup arms (#1). drop(file) before rename
  for Windows.
- New Error::Cancelled variant for a dropped receiver; was misclassified
  as Error::Network (#3). Routed to ApplicationError in classify_error so
  caller-initiated cancellation is not retried as a transport failure.
- Doc the exact channel item type Result<Bytes, Error> on
  file_download_to_sender (#4).
- Drop now-stale #[allow(clippy::unused_async)] on file_download (#7).
- Harden e2e test: assert each streamed chunk is non-empty and >=2
  segments arrive (multi-batch property), rename to
  test_file_download_to_sender_multibatch_round_trip (#6).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@Nic-dorman

Copy link
Copy Markdown
Contributor Author

@
Thanks for the review — pushed 0c7ba64 addressing the items.

#1 — temp-file cleanup. Added a TempDownload RAII guard that owns the staging file and removes it on Drop; commit() renames and defuses it. One clarification on the diagnosis, though: the version on the branch didnt actually leak via the path described. The disk function didnt use a bare ? on the download call — it threaded the result through .await.and_then(|bw| { file.flush()?; Ok(bw) }) into write_result, and the match write_result { Err(e) => … } arm already ran remove_file on every error return (download error, flush error, and the rename-failure branch). So error returns were covered.

The one case that genuinely wasnt covered is a panic unwinding out of the block_in_place decrypt loop — that would skip the match entirely and orphan the temp file. The guard closes that window (Drop runs during unwind) and, as a bonus, collapses the three duplicated remove_file arms into one place and makes the rename-failure cleanup self-evident. So the guard is going in for panic-safety + clarity rather than to fix the early-return leak. Also added drop(file) before the rename so Windows wont balk at renaming an open handle.

#3 — dropped-receiver error. Agreed it was a misclassification. Added a deliberate Error::Cancelled(String) variant and routed it through classify_error as ApplicationError, so a caller-initiated disconnect is never retried as a transport failure.

#4 — doc. Added the explicit let (tx, rx) = mpsc::channel::<Result<Bytes, Error>>(8); snippet to the file_download_to_sender doc.

#6 — test. Renamed to test_file_download_to_sender_multibatch_round_trip and hardened: it now asserts each delivered chunk is non-empty and that ≥2 segments arrive, so a "send one then drop" producer fails loudly rather than sliding through on the channel-close signal.

#7 — stale allow. Removed #[allow(clippy::unused_async)] from file_download; it genuinely .awaits now.

#8 — progress events. Confirmed preserved — Phase-1 DataMap-resolution events and per-chunk ChunksFetched events both live in the shared download_decrypted_chunks core, and file_download_with_progress forwards the same Option<Sender<DownloadEvent>> through unchanged.

#2 / Q1 — per-chunk clone + Sink/Stream-shaped API. Leaving these as-is intentionally. The sink.clone() is a single atomic refcount bump (not a heap allocation), and the per-call async move {} future is created regardless of the clone — its inherent to the FnMut -> Fut shape — so theres no measurable allocation traffic to reclaim here. The spawn + bounded-mpsc + ReceiverStream bridge is also the idiomatic way to adapt a block_in_place blocking iterator into an async body; a hand-rolled Stream would mean async_stream/manual poll_next driving block_in_place, which is fiddlier, not cleaner. Happy to revisit if a concrete need surfaces when wiring the daemons stream endpoints, but Id rather not redesign on spec.

CI re-running.
@

@dirvine dirvine left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Nic — re-reviewed the updated branch at 38ac1e15b39d0a8799ad277f76eeb747706cb00d and the requested changes are addressed.

Verified:

  • TempDownload guard now owns the staging file and defuses only after successful rename, covering early-return and panic-unwind cleanup paths.
  • Dropped receiver is now a deliberate Error::Cancelled and is classified as ApplicationError, so caller disconnects are not retried as network failures.
  • Public docs include the concrete mpsc::channel::<Result<Bytes, Error>>(8) item type and runtime/backpressure notes.
  • The e2e now asserts non-empty chunks, a multi-segment stream, byte-for-byte round trip, and returned byte count.
  • Stale unused_async allow on file_download was removed and progress reporting remains in the shared core.

Checks:

  • GitHub CI for current head is green across format, clippy, unit/e2e/merkle on linux + macOS, docs, and security audit.
  • I also ran cargo check -p ant-core locally successfully.

LGTM.

Nic-dorman and others added 4 commits June 11, 2026 12:38
Add `Client::file_download_to_sender`, which downloads + decrypts a file and
streams the plaintext to a caller-provided `mpsc::Sender<Result<Bytes>>`
instead of writing to disk. Constant memory (one decrypt batch resident at a
time, same as `file_download`), and the caller receives bytes progressively as
each batch decrypts — suitable for forwarding to an HTTP chunked body or a
gRPC response stream. The bounded sink applies backpressure; a dropped receiver
(client disconnect) ends the download early.

Implemented by extracting the existing batched-fetch + streaming-decrypt loop
out of `file_download_with_progress` into a private sink-parameterized core,
`download_decrypted_chunks(.., on_chunk)`. `file_download_with_progress` is now
a thin wrapper whose sink writes to the temp file + atomic-renames (behavior
unchanged); the new method's sink forwards to the channel. No duplication of
the fetch/retry logic, and `&self` is preserved (the caller spawns + owns the
Receiver), so no `Client: Clone`/`'static` bound is required.

Adds an e2e round-trip test that streams a multi-batch (~1 MiB) file through
the channel and asserts the reassembled bytes equal the source.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Drop redundant .into() on already-Bytes decrypt result (clippy useless_conversion) and apply rustfmt reflows in file.rs + e2e_file.rs. No behavior change.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- TempDownload RAII guard: removes the staging file on every disk-path
  error AND on a panic unwind out of the block_in_place decrypt loop,
  replacing three duplicated cleanup arms (#1). drop(file) before rename
  for Windows.
- New Error::Cancelled variant for a dropped receiver; was misclassified
  as Error::Network (#3). Routed to ApplicationError in classify_error so
  caller-initiated cancellation is not retried as a transport failure.
- Doc the exact channel item type Result<Bytes, Error> on
  file_download_to_sender (#4).
- Drop now-stale #[allow(clippy::unused_async)] on file_download (#7).
- Harden e2e test: assert each streamed chunk is non-empty and >=2
  segments arrive (multi-batch property), rename to
  test_file_download_to_sender_multibatch_round_trip (#6).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
It was an intra-doc link from the public file_download_with_progress to the private TempDownload struct, tripping -D rustdoc::private_intra_doc_links. A plain code span conveys the same thing without the link.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@Nic-dorman Nic-dorman force-pushed the nic/file-download-to-sender-streaming branch from 38ac1e1 to ba1135e Compare June 11, 2026 13:18
@Nic-dorman Nic-dorman changed the base branch from main to rc-2026.6.2 June 11, 2026 13:18
@Nic-dorman Nic-dorman merged commit cd0beb4 into rc-2026.6.2 Jun 11, 2026
12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants