feat(waterdata): Add async parallel chunker over httpx.AsyncClient#285
Draft
thodson-usgs wants to merge 3 commits into
Draft
feat(waterdata): Add async parallel chunker over httpx.AsyncClient#285thodson-usgs wants to merge 3 commits into
thodson-usgs wants to merge 3 commits into
Conversation
c618ea8 to
06d43aa
Compare
57ff5bd to
6d1d5db
Compare
4 tasks
57bf0ee to
2ec3f51
Compare
Adds a parallel fan-out path to `multi_value_chunked`. When
`API_USGS_CONCURRENT` resolves to >1 (default: 16), the decorator
runs the sub-requests of an over-budget plan concurrently under
one shared `httpx.AsyncClient`, instead of issuing them serially.
Falls back to the serial sync path (with a one-time UserWarning)
when no async fetch sibling is wired or when an asyncio event
loop is already running (Jupyter, IPython, async apps —
`asyncio.run` would otherwise raise).
Architecture (`dataretrieval/waterdata/chunking.py`):
* `_fan_out_async(plan, fetch_once, fetch_async, *, max_concurrent)`
is the orchestrator: it dispatches every sub-request concurrently
via `asyncio.gather(return_exceptions=True)`. Completed pairs
survive a sibling's transient failure, so partial state stays
recoverable through `ChunkedCall.resume()` on the sync path.
* Failure precedence in the gather:
1. Cancellation/interrupt signals (CancelledError,
KeyboardInterrupt, SystemExit) propagate unmodified — never
wrapped as transients. Cancellation is asyncio's abort
signal; rewriting it as ChunkInterrupted would silently
consume the user's stop request.
2. Recognized transients (RateLimited, ServiceUnavailable, bare
httpx.HTTPError) wrap as ChunkInterrupted so the user gets
a resumable handle even when a non-transient bug landed
earlier in submission order.
3. Otherwise raise the first failure in submission order,
preserving its type.
* `_execute_in_parallel` owns the sync→async bridge: `asyncio.run`
dispatch with the `fetch_async is None` and running-event-loop
fallbacks (each a one-time UserWarning, then serial).
* `_publish_async_client` / `get_active_async_client` /
`_chunked_async_client` ContextVar let async paginated-loop
helpers (`_walk_pages_async`, `_paginate_async`) reuse one
`AsyncClient` connection pool across every concurrent
sub-request.
Wiring (`dataretrieval/waterdata/utils.py`):
* `_walk_pages_async`, `_paginate_async`, `_client_for_async`,
`_fetch_once_async` — async siblings of the sync paginate path,
sharing the same `parse_response` / `follow_up` callbacks and
the `_ogc_parse_response` parser.
* The `@chunking.multi_value_chunked(fetch_async=_fetch_once_async)`
decorator on `_fetch_once` wires the async sibling so the
parallel path is available to every Water Data OGC getter.
* `ChunkedCall.record()` encapsulates the completion write so the
serial loop and the parallel fan-out share it; `_chunks` is a
sparse index map so a parallel partial-failure resumes correctly
via the sync path.
Concurrency cap (`API_USGS_CONCURRENT`):
* Integer N >= 1: bounded fan-out (semaphore-gated, N=1 forces
serial sync). Default 16 — the server-friendly sweet spot.
* `unbounded`: no per-call cap (`Semaphore(sys.maxsize)`).
* Unset: default 16.
Retries (`API_USGS_RETRIES`, default 4; `0` disables): each
sub-request is retried on a transient failure with exponential
backoff + full jitter, so a large fan-out completes through the
AWS API Gateway's burst throttling and the occasional backend
straggler instead of aborting on the first 429/5xx/timeout.
* `RetryPolicy` — a frozen value object owning the timing decisions
(`from_env`, `should_retry`, `backoff`). Full jitter
(`random.uniform(0, ceiling)`) de-correlates the concurrent
retries so they don't re-burst in lockstep. A server `Retry-After`
overrides the computed backoff, unless it exceeds `retry_after_cap`
(60s) — a multi-minute quota-window reset escalates to the
resumable interruption instead of blocking the call inline.
* `_retryable` — chain-walking predicate, narrower than
`_classify_chunk_error`: retries `RateLimited` / `ServiceUnavailable`
/ `httpx.TransportError` but NOT `httpx.InvalidURL`.
* `_retry_sync` / `_retry_async` drivers wrap the per-sub-request
fetch at both seams (`ChunkedCall._issue`, `_fan_out_async.track`);
the async retry runs inside the semaphore, so a backing-off chunk
holds its slot and effective concurrency shrinks under throttling.
On exhaustion the last exception re-raises into the existing
`wrap_failure` path, so `.resume()` stays the escape hatch.
* `ProgressReporter.note_retry` surfaces the backoff on the status
line ("retrying (attempt N, waiting Ns)"), cleared by the next page.
Test scaffolding: `tests/conftest.py` extends the `_serial_chunker`
autouse fixture to pin `API_USGS_CONCURRENT=1` and `API_USGS_RETRIES=0`
so the existing mocked suite stays on the deterministic serial path
with transients surfacing immediately; async and retry tests opt back
in by re-setting the env vars inside their body.
Tests: async-path coverage in `tests/waterdata_chunking_test.py`
(one-call-per-sub-request, mid-fan-out transient yields resumable
ChunkInterrupted, fallback-to-serial parametrized over
running-loop and missing-fetch_async, cancellation-wins-over-
transient-sibling regression), plus retry coverage (policy
math/jitter bounds, `_retryable` taxonomy, sync+async
transient-then-success, exhausted-still-resumable, long-`Retry-After`
escalation). `tests/waterdata_progress_test.py` adds progress
integration for `_fan_out_async` / `_paginate_async` and the
`note_retry` render/clear. `tests/waterdata_utils_test.py` adds a
`_walk_pages_async` initial-parse-error test.
Test suite: 435 passing, 2 skipped (mocked); ruff clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2ec3f51 to
d2bf71f
Compare
… row cap Post-review fixes layered on the async parallel chunker: - Funnel OGC post-processing through the chunker via an injected `finalize` hook so `ChunkInterrupted.call.resume()` returns the same type-coerced `(df, BaseMetadata)` as an un-interrupted call instead of the raw `(frame, httpx.Response)`. `partial_frame`/`partial_response` stay raw, so building the exception never triggers finalize's side effects (a schema network GET on an empty frame would otherwise fire inside the ctor). - Add `max_rows` to `get_reference_table`/`get_ogc_data` to preview large reference tables without downloading every page; enforced as the exact total in `_finalize_ogc` (after dedup) and validated as a positive integer (accepts numpy ints via `numbers.Integral`). - Co-locate the parallel fan-out into `ChunkedCall.resume_async`, sharing a `_pending()` generator with the serial `resume()` so the two execution paths can't drift. - Harden `ProgressReporter.note_retry` for Python 3.9-3.11 (int `wait` and `int.is_integer()`). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds a parallel fan-out path to
multi_value_chunked. WhenAPI_USGS_CONCURRENTresolves to >1 (default 16), the decorator runs the sub-requests of an over-budget plan concurrently under one sharedhttpx.AsyncClientinstead of issuing them serially. Falls back to the serial sync path (with a one-timeUserWarning) when no async fetch sibling is wired or when an asyncio event loop is already running (Jupyter / IPython kernels, async apps —asyncio.runwould otherwise raise).Note
Rebased onto
mainnow that the httpx upgrade (#289) has merged — this PR is a single commit containing only the async chunker.Benchmark: ~6.7× speedup on a 6-state
get_dailydischarge pull over 17,298 stream sites (chunked into 64 sub-requests) — parallel @API_USGS_CONCURRENT=16: 3.11s; serial @=1: 20.94s (~1,920 rows each). Distinct date windows per side so the server cache can't bias either run; both authenticated viaAPI_USGS_PAT.API_USGS_CONCURRENT_CONCURRENCY_DEFAULT)≥ 21ChunkedCall.resume()path)unbounded0, negative, malformedValueErrorat call timeConnection-pool sharing across all sub-requests of a single chunked call via the
_chunked_client(sync) /_chunked_async_client(async)ContextVars —_walk_pages/_walk_pages_asyncread them as fallbacks before opening a fresh client.Retries (
API_USGS_RETRIES)Each sub-request is retried on a transient failure with exponential backoff + full jitter, honoring a server
Retry-After, so a large fan-out completes through the AWS API Gateway's burst throttling and the occasional backend straggler instead of aborting on the first 429/5xx/timeout..resume()remains the escape hatch once retries are exhausted._RETRIES_DEFAULT)N ≥ 1Nretries per sub-request0ChunkInterruptedValueErrorat call timeRetryPolicy— a frozen value object owning the timing decisions (from_env/should_retry/backoff). Full jitter (random.uniform(0, ceiling)) de-correlates the ≤16 concurrent retries so a throttle event doesn't make them re-burst in lockstep.Retry-Aftersplit. A short hint is slept off inline; one longer thanretry_after_cap(60s) — e.g. a multi-minute quota-window reset — escalates immediately to the resumable interruption rather than blocking the call._retryableis narrower than the resumability classifier: it retriesRateLimited/ServiceUnavailable/httpx.TransportError(connect/read timeouts) but nothttpx.InvalidURL(a too-long cursor won't fix on retry).retrying (attempt N, waiting Ns)).Architecture
_fan_out_async(plan, fetch_once, fetch_async, *, max_concurrent)is the orchestrator: it dispatches every sub-request concurrently viaasyncio.gather(return_exceptions=True). Completed pairs survive a sibling's transient failure, so partial state stays recoverable throughChunkedCall.resume()on the sync path.Failure precedence in the gather
CancelledError,KeyboardInterrupt,SystemExit) propagate unmodified — never wrapped as transients. Cancellation is asyncio's abort signal; rewriting it asChunkInterruptedwould silently consume the user's stop request.RateLimited,ServiceUnavailable, barehttpx.HTTPError) wrap asChunkInterruptedso the user gets a resumable handle even when a non-transient bug landed earlier in submission order.Sync ↔ async bridge
_execute_in_parallelowns theasyncio.rundispatch with two recoverable-misconfig fallbacks (each emits a one-timeUserWarning, then runs serial):asyncio.run()raises inside an already-running loop. The bridge callsasyncio.get_running_loop()first and, when one is active, falls back to the serial path so Jupyter / IPython users don't see a confusingRuntimeError.fetch_asyncwarning. IfAPI_USGS_CONCURRENTrequests parallel but the decorator wasn't wired withfetch_async=, the wrapper warns + runs serial rather than silently no-op'ing the env var.The async paginate path (
_paginate_async,_walk_pages_async,_client_for_async,_fetch_once_async) mirrors the sync path exactly, sharing theparse_response/follow_upcallbacks and the_ogc_parse_responseparser, so there is one pagination contract for both.Test plan
ruff checkclean.tests/waterdata_chunking_test.py:ChunkInterruptedwhose.call.resume()re-issues only the unfinished indices via the sync pathrunning-loopandmissing-fetch_asyncRetryPolicymath + full-jitter bounds,_retryabletaxonomy (incl.InvalidURLnot retried), sync + async transient-then-success, exhausted-still-resumable, and long-Retry-Afterescalation — all with_SLEEP/_ASLEEPpatched to no-ops.tests/waterdata_progress_test.py(reporter calls from_paginate_asyncand_fan_out_async;note_retryrender/clear)._walk_pages_asyncinitial-parse-error test intests/waterdata_utils_test.py.Design notes / out of scope
RequestExceedsQuotabefore a burst that thex-ratelimit-remainingheader said couldn't fit; it was dropped as not worth the complexity. The reactive retry-with-backoff above is the chosen alternative — absorb transients rather than predict them — and a transient that outlives the retries still surfaces as a resumableChunkInterrupted._fan_out_asyncmaterializes all(df, response)pairs before combining. Consider streaming-combine viaasyncio.as_completedif users push concurrency very high.asyncio.TaskGroup(3.11+): would replacegatheronce the 3.9 floor is dropped, though the partial-completion contract fightsTaskGroup's cancel-on-first-failure default — the gather form may stay the right shape regardless.NEWS.mdentry — left for the merger to draft.🤖 Generated with Claude Code