feat(data): stream decrypted file download to a channel sink#111
Conversation
dirvine
left a comment
There was a problem hiding this comment.
Thanks Nic — good shape overall. The extraction into a private sink-parameterised core is the right call: no duplication of the ~340-line batched-fetch / deferred-retry logic, the disk path keeps its current atomic-temp-file semantics, and the new streaming variant is a thin layer on top. CI is green (format, clippy, unit + e2e on linux + macos, security audit). I've requested changes for the items below, none of which are blockers — the design is sound and I'd be happy to approve after these are addressed.
Substantive
1. Temp file no longer cleaned up on download_decrypted_chunks failure in the disk path.
Before, the closure-based write_result was matched in a single arm and the temp file was removed on both Err(e) and the rename_err branches — every error path that left a partial .ant_download_*.tmp behind cleaned it up. After the refactor, the download_decrypted_chunks(...).await call can return an error (e.g. Error::Network from the sink, or a chunk_result.map_err decryption failure) after the closure has already written some bytes to the temp file. That ? returns out of the let write_result = self.download_decrypted_chunks(...) expression and skips both the match write_result cleanup arms. Net effect: any failure mid-stream leaves a stale .ant_download_<pid>_<rand>.tmp on disk — exactly what the existing warn!("Failed to remove temp download file ...") block was there to prevent.
The minimal fix is to move the temp-file cleanup out of the match write_result block and into a Drop/scope-guard, or just convert the early-return path to feed back through the same cleanup arms. I'd lean toward a small TempDownload { path: PathBuf } that removes the file in Drop unless commit() is called — that also makes the rename-failure path self-evident.
2. Cloning the sink per chunk is unnecessary work.
|bytes| {
let sink = sink.clone();
async move { sink.send(Ok(bytes)).await... }
}The mpsc::Sender is already Clone and cheap, but the capture-by-move pattern is fine; what isn't fine is the implied per-chunk allocation of the async state machine for what is effectively a single await on a non-blocking send. For a multi-batch file this is a measurable amount of allocation traffic. Two options:
- Push the async-ness down: have the core take a
Sink<Result<Bytes>>-stylePin<Box<dyn ...>>, or return the chunks via aStreamthat the caller drains, so the hot loop is justpoll_ready/start_send. This is a bigger change. - Or, if you want to keep the closure shape: at minimum, drop the
clone()and capture&sink— but that requires the closure to be reborrowed each call, which theFnMutbound allows.
Honestly the cleanest answer is to have download_decrypted_chunks take a Stream/async iterator of the chunks as an output, and let the caller (disk path, sender path) consume it. But the current shape is correct, so this is a "consider" not a "must".
3. Error::Network for a dropped receiver is a misclassification.
The "receiver dropped" case is a peer/state-machine error, not a network transport error. Two ways to look at it:
- It's a caller-side bug (they dropped the receiver), so a new
Error::OperationCancelled(or whatever the equivalent variant is in this crate — worth checking theErrorenum) is more honest. - Or, if there's no such variant, the existing pattern in the codebase (search for
mpsc::error::SendErrorhandling) probably indicates the established way to signal this.
At minimum, please make the variant deliberate rather than picking the nearest neighbour.
4. Item type on the channel is Result<Bytes, Error>, not Result<Bytes, thiserror::Error>.
Make sure the public doc-comment example shows the full type as the caller would write it — at the moment the doc-comment says "the bounded sink applies backpressure" but doesn't tell the reader the exact channel type. A 1-line snippet:
let (tx, rx) = tokio::sync::mpsc::channel::<Result<Bytes, Error>>(8);saves a trip to the source.
Minor / nits
5. The PR description says "no way to get the decrypted bytes back as a stream, so a consumer… has to either buffer the whole file in memory (data_download -> Bytes) or stage it on disk first." This is true and a good summary, but data_download does in fact also go through streaming_decrypt internally and then concatenates into Bytes — worth one line in the PR description acknowledging that the constant-memory path was a single-arg-sink shaped one (&Path), which is the actual gap being filled. Helps reviewers see the symmetry.
6. The new e2e test is a good shape. Two small things:
- The test would pass if the sender forgot to call
sendand just dropped thetxafter writing to disk — arecv().awaitreturningNoneis asserted, but that signal comes from droppingtxon the producer side regardless of whether chunks were sent. Consider adding a per-chunkassert!(chunk.len() > 0)or a chunk-count lower bound so a buggy "send one then drop" passes through clearly. Easy to miss without that. - The test name
test_file_download_to_sender_streamingis fine, but considertest_file_download_to_sender_multibatch_round_trip— describes the property under test, not the API.
7. #[allow(clippy::unused_async)] is gone from file_download_with_progress. Good — the new signature is genuinely async. Worth checking the diff for any other #[allow(clippy::unused_async)] that could now be removed on Client::file_download too, since it still forwards. (Looks like it's still there — probably fine, since the public method stays async, but worth a once-over.)
8. The DownloadEvent progress channel is forwarded through the new core unchanged — confirm that the file_download_with_progress caller's existing behaviour (progress events emitted during the DataMap-resolution phase) is preserved. The doc-comment is updated to mention progress; a quick read of download_decrypted_chunks suggests yes it's preserved (Phase 1 is in the core, Phase 2 uses the same stream.fetch_chunks machinery). Looks good.
Non-blocking questions for the author
- Is there a use case for a
Sink<Result<Bytes>>-shaped trait so callers can plug inhyper::Body/axum::body::Body/ tonicStreaming<Bytes>directly without going through anmpsc? That would be a follow-up PR. - Is the multi-threaded-runtime requirement (
block_in_placein the decrypt iterator) something the team wants to document at theClientlevel, or just on a per-method basis? Currently it's per-method, which is fine but easy to miss.
TL;DR
Approve-concept. Please address #1 (the temp-file leak on early return) before merge; the rest are quality improvements that I'd be happy to see in this PR or as follow-ups. The CI matrix is solid and the e2e coverage is appropriate for a single feature-flag-style addition.
- TempDownload RAII guard: removes the staging file on every disk-path error AND on a panic unwind out of the block_in_place decrypt loop, replacing three duplicated cleanup arms (#1). drop(file) before rename for Windows. - New Error::Cancelled variant for a dropped receiver; was misclassified as Error::Network (#3). Routed to ApplicationError in classify_error so caller-initiated cancellation is not retried as a transport failure. - Doc the exact channel item type Result<Bytes, Error> on file_download_to_sender (#4). - Drop now-stale #[allow(clippy::unused_async)] on file_download (#7). - Harden e2e test: assert each streamed chunk is non-empty and >=2 segments arrive (multi-batch property), rename to test_file_download_to_sender_multibatch_round_trip (#6). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
@ #1 — temp-file cleanup. Added a The one case that genuinely wasnt covered is a panic unwinding out of the #3 — dropped-receiver error. Agreed it was a misclassification. Added a deliberate #4 — doc. Added the explicit #6 — test. Renamed to #7 — stale allow. Removed #8 — progress events. Confirmed preserved — Phase-1 DataMap-resolution events and per-chunk #2 / Q1 — per-chunk clone + Sink/Stream-shaped API. Leaving these as-is intentionally. The CI re-running. |
dirvine
left a comment
There was a problem hiding this comment.
Thanks Nic — re-reviewed the updated branch at 38ac1e15b39d0a8799ad277f76eeb747706cb00d and the requested changes are addressed.
Verified:
TempDownloadguard now owns the staging file and defuses only after successful rename, covering early-return and panic-unwind cleanup paths.- Dropped receiver is now a deliberate
Error::Cancelledand is classified asApplicationError, so caller disconnects are not retried as network failures. - Public docs include the concrete
mpsc::channel::<Result<Bytes, Error>>(8)item type and runtime/backpressure notes. - The e2e now asserts non-empty chunks, a multi-segment stream, byte-for-byte round trip, and returned byte count.
- Stale
unused_asyncallow onfile_downloadwas removed and progress reporting remains in the shared core.
Checks:
- GitHub CI for current head is green across format, clippy, unit/e2e/merkle on linux + macOS, docs, and security audit.
- I also ran
cargo check -p ant-corelocally successfully.
LGTM.
Add `Client::file_download_to_sender`, which downloads + decrypts a file and streams the plaintext to a caller-provided `mpsc::Sender<Result<Bytes>>` instead of writing to disk. Constant memory (one decrypt batch resident at a time, same as `file_download`), and the caller receives bytes progressively as each batch decrypts — suitable for forwarding to an HTTP chunked body or a gRPC response stream. The bounded sink applies backpressure; a dropped receiver (client disconnect) ends the download early. Implemented by extracting the existing batched-fetch + streaming-decrypt loop out of `file_download_with_progress` into a private sink-parameterized core, `download_decrypted_chunks(.., on_chunk)`. `file_download_with_progress` is now a thin wrapper whose sink writes to the temp file + atomic-renames (behavior unchanged); the new method's sink forwards to the channel. No duplication of the fetch/retry logic, and `&self` is preserved (the caller spawns + owns the Receiver), so no `Client: Clone`/`'static` bound is required. Adds an e2e round-trip test that streams a multi-batch (~1 MiB) file through the channel and asserts the reassembled bytes equal the source. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Drop redundant .into() on already-Bytes decrypt result (clippy useless_conversion) and apply rustfmt reflows in file.rs + e2e_file.rs. No behavior change. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- TempDownload RAII guard: removes the staging file on every disk-path error AND on a panic unwind out of the block_in_place decrypt loop, replacing three duplicated cleanup arms (#1). drop(file) before rename for Windows. - New Error::Cancelled variant for a dropped receiver; was misclassified as Error::Network (#3). Routed to ApplicationError in classify_error so caller-initiated cancellation is not retried as a transport failure. - Doc the exact channel item type Result<Bytes, Error> on file_download_to_sender (#4). - Drop now-stale #[allow(clippy::unused_async)] on file_download (#7). - Harden e2e test: assert each streamed chunk is non-empty and >=2 segments arrive (multi-batch property), rename to test_file_download_to_sender_multibatch_round_trip (#6). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
It was an intra-doc link from the public file_download_with_progress to the private TempDownload struct, tripping -D rustdoc::private_intra_doc_links. A plain code span conveys the same thing without the link. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
38ac1e1 to
ba1135e
Compare
@
What
Adds
Client::file_download_to_sender, which downloads + decrypts a file and streams the plaintext to a caller-providedmpsc::Sender<Result<Bytes, Error>>instead of writing it to disk.Today the only constant-memory download path,
file_download[_with_progress], sinks decrypted plaintext to a&Path(and onlyDownloadEventprogress comes back over a channel).data_download -> Bytesalso decrypts throughstreaming_decryptinternally, but then concatenates the whole result into one in-memoryBytes. So the actual gap isnt "streaming decrypt" — its that the only constant-memory path was shaped around a single&Pathsink. A consumer that wants to forward a download to an HTTP chunked body or a gRPC response stream therefore had to either buffer the whole file in memory or stage it on disk first. This adds the missing byte-sink variant.How
The existing batched-fetch + streaming-decrypt loop is extracted out of
file_download_with_progressinto a private, sink-parameterized core:file_download_with_progressbecomes a thin wrapper whoseon_chunkdoes the temp-file write; aTempDownloadRAII guard owns the staging file and removes it on any error path or panic, thencommit()renames atomically — behavior unchanged for callers.file_download_to_senderis a thin wrapper whoseon_chunkdoessink.send(Ok(bytes)).await.No duplication of the ~340-line batched-fetch / deferred-retry logic.
Properties
file_download.sinknaturally throttles the producer.Error::Cancelled(a deliberate caller-cancellation signal, not a transportError::Network;classify_errorroutes it toApplicationErrorso it is never retried as a network fault).Client— stays&self; the callertokio::spawns the call and owns the matchingReceiver. (Requires a multi-threaded Tokio runtime, likefile_download, because the decrypt iterator usesblock_in_place.)Testing
cargo check -p ant-core,cargo clippy --all-targets,cargo fmt --check— clean.test_file_download_to_sender_multibatch_round_trip(ant-core/tests/e2e_file.rs): uploads a ~1 MiB multi-batch file to a liveMiniTestnet, downloads it viafile_download_to_sender, asserts each streamed chunk is non-empty and that >=2 segments arrive (proving it streams rather than buffering), reassembles the stream, and asserts it equals the source. Passes (~5 min against the in-process testnet).🤖 Generated with Claude Code
@