feat(waterdata): layered config (WaterDataConfig + RetryPolicy + ConcurrencyPolicy)#3
Draft
thodson-usgs wants to merge 16 commits into
Draft
feat(waterdata): layered config (WaterDataConfig + RetryPolicy + ConcurrencyPolicy)#3thodson-usgs wants to merge 16 commits into
thodson-usgs wants to merge 16 commits into
Conversation
Adds a parallel fan-out path to `multi_value_chunked`. When
`API_USGS_CONCURRENT` resolves to >1 (default: 16), the decorator
runs the sub-requests of an over-budget plan concurrently under
one shared `httpx.AsyncClient`, instead of issuing them serially.
Falls back to the serial sync path (with a one-time UserWarning)
when no async fetch sibling is wired or when an asyncio event
loop is already running (Jupyter, IPython, async apps —
`asyncio.run` would otherwise raise).
Architecture (`dataretrieval/waterdata/chunking.py`):
* `_fan_out_async(plan, fetch_once, fetch_async, *, max_concurrent)`
is the orchestrator: it dispatches every sub-request concurrently
via `asyncio.gather(return_exceptions=True)`. Completed pairs
survive a sibling's transient failure, so partial state stays
recoverable through `ChunkedCall.resume()` on the sync path.
* Failure precedence in the gather:
1. Cancellation/interrupt signals (CancelledError,
KeyboardInterrupt, SystemExit) propagate unmodified — never
wrapped as transients. Cancellation is asyncio's abort
signal; rewriting it as ChunkInterrupted would silently
consume the user's stop request.
2. Recognized transients (RateLimited, ServiceUnavailable, bare
httpx.HTTPError) wrap as ChunkInterrupted so the user gets
a resumable handle even when a non-transient bug landed
earlier in submission order.
3. Otherwise raise the first failure in submission order,
preserving its type.
* `_execute_in_parallel` owns the sync→async bridge: `asyncio.run`
dispatch with the `fetch_async is None` and running-event-loop
fallbacks (each a one-time UserWarning, then serial).
* `_publish_async_client` / `get_active_async_client` /
`_chunked_async_client` ContextVar let async paginated-loop
helpers (`_walk_pages_async`, `_paginate_async`) reuse one
`AsyncClient` connection pool across every concurrent
sub-request.
Wiring (`dataretrieval/waterdata/utils.py`):
* `_walk_pages_async`, `_paginate_async`, `_client_for_async`,
`_fetch_once_async` — async siblings of the sync paginate path,
sharing the same `parse_response` / `follow_up` callbacks and
the `_ogc_parse_response` parser.
* The `@chunking.multi_value_chunked(fetch_async=_fetch_once_async)`
decorator on `_fetch_once` wires the async sibling so the
parallel path is available to every Water Data OGC getter.
* `ChunkedCall.record()` encapsulates the completion write so the
serial loop and the parallel fan-out share it; `_chunks` is a
sparse index map so a parallel partial-failure resumes correctly
via the sync path.
Concurrency cap (`API_USGS_CONCURRENT`):
* Integer N >= 1: bounded fan-out (semaphore-gated, N=1 forces
serial sync). Default 16 — the server-friendly sweet spot.
* `unbounded`: no per-call cap (`Semaphore(sys.maxsize)`).
* Unset: default 16.
Retries (`API_USGS_RETRIES`, default 4; `0` disables): each
sub-request is retried on a transient failure with exponential
backoff + full jitter, so a large fan-out completes through the
AWS API Gateway's burst throttling and the occasional backend
straggler instead of aborting on the first 429/5xx/timeout.
* `RetryPolicy` — a frozen value object owning the timing decisions
(`from_env`, `should_retry`, `backoff`). Full jitter
(`random.uniform(0, ceiling)`) de-correlates the concurrent
retries so they don't re-burst in lockstep. A server `Retry-After`
overrides the computed backoff, unless it exceeds `retry_after_cap`
(60s) — a multi-minute quota-window reset escalates to the
resumable interruption instead of blocking the call inline.
* `_retryable` — chain-walking predicate, narrower than
`_classify_chunk_error`: retries `RateLimited` / `ServiceUnavailable`
/ `httpx.TransportError` but NOT `httpx.InvalidURL`.
* `_retry_sync` / `_retry_async` drivers wrap the per-sub-request
fetch at both seams (`ChunkedCall._issue`, `_fan_out_async.track`);
the async retry runs inside the semaphore, so a backing-off chunk
holds its slot and effective concurrency shrinks under throttling.
On exhaustion the last exception re-raises into the existing
`wrap_failure` path, so `.resume()` stays the escape hatch.
* `ProgressReporter.note_retry` surfaces the backoff on the status
line ("retrying (attempt N, waiting Ns)"), cleared by the next page.
Test scaffolding: `tests/conftest.py` extends the `_serial_chunker`
autouse fixture to pin `API_USGS_CONCURRENT=1` and `API_USGS_RETRIES=0`
so the existing mocked suite stays on the deterministic serial path
with transients surfacing immediately; async and retry tests opt back
in by re-setting the env vars inside their body.
Tests: async-path coverage in `tests/waterdata_chunking_test.py`
(one-call-per-sub-request, mid-fan-out transient yields resumable
ChunkInterrupted, fallback-to-serial parametrized over
running-loop and missing-fetch_async, cancellation-wins-over-
transient-sibling regression), plus retry coverage (policy
math/jitter bounds, `_retryable` taxonomy, sync+async
transient-then-success, exhausted-still-resumable, long-`Retry-After`
escalation). `tests/waterdata_progress_test.py` adds progress
integration for `_fan_out_async` / `_paginate_async` and the
`note_retry` render/clear. `tests/waterdata_utils_test.py` adds a
`_walk_pages_async` initial-parse-error test.
Test suite: 435 passing, 2 skipped (mocked); ruff clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… row cap Post-review fixes layered on the async parallel chunker: - Funnel OGC post-processing through the chunker via an injected `finalize` hook so `ChunkInterrupted.call.resume()` returns the same type-coerced `(df, BaseMetadata)` as an un-interrupted call instead of the raw `(frame, httpx.Response)`. `partial_frame`/`partial_response` stay raw, so building the exception never triggers finalize's side effects (a schema network GET on an empty frame would otherwise fire inside the ctor). - Add `max_rows` to `get_reference_table`/`get_ogc_data` to preview large reference tables without downloading every page; enforced as the exact total in `_finalize_ogc` (after dedup) and validated as a positive integer (accepts numpy ints via `numbers.Integral`). - Co-locate the parallel fan-out into `ChunkedCall.resume_async`, sharing a `_pending()` generator with the serial `resume()` so the two execution paths can't drift. - Harden `ProgressReporter.note_retry` for Python 3.9-3.11 (int `wait` and `int.is_integer()`). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…rings No behavior change — tightening and doc consistency on chunking.py: - Inline single-use helpers into their sole callers: `_note_retry` into `_retry_delay`, and `_ordered_chunks`/`_responses_by_completion` into `_combine_raw` (the two distinct orderings — frames by index, responses by completion — are now documented inline). - Drop the redundant `_completion_order` shadow list: `record` is the only writer of `_chunks` and `dict` preserves insertion order, so completion order is just iteration order. - Use the `completed_chunks` accessor consistently (was `len(self._chunks)` in `wrap_failure`). - Un-quote the `_Finalize` alias (`tuple[...]` is a valid runtime expression on the >=3.9 floor). - Reformat 15 prose/undocumented PR functions to NumPy docstring style (Parameters/Returns/Raises/Yields), matching the rest of the package and fixing sibling inconsistencies (e.g. `get_active_async_client`, `combined`, `_pending`). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The chunker carried full sync/async twins. Collapse to a single async implementation behind a synchronous facade (the public getters and `resume()` stay sync, same signatures/returns, by driving the async core through the anyio blocking portal). Removed twins: sync `_paginate`, `_walk_pages`, `_retry_sync`, `_client_for`, `get_active_client`, the `_chunked_client` ContextVar, and the `_fan_out_async`/`_execute_in_parallel`/`resume_async` split (folded into `ChunkedCall.resume()` -> `_run`). `multi_value_chunked` now decorates an `async def` fetcher (drops the `fetch_async=` param). `get_stats_data` drives `_paginate_async` through the portal. Concurrency is now bounded purely by the httpx connection pool (`httpx.Limits(max_connections=N)`) — the explicit `asyncio.Semaphore` is gone; `gather` dispatches every pending sub-request and the pool throttles (N=1 is a single-connection gather, total<=1 a one-element gather). Behavior note: because execution is now `gather(..., return_exceptions=True)` over all pending sub-requests, an interruption completes every non-failing sub-request before surfacing (even at concurrency=1) rather than stopping at the first failure; `resume()` then re-issues only the still-failed chunks. The public API, `resume()` contract, ChunkInterrupted/partials, finalize hook, max_rows cap, retries, and progress reporting are unchanged. Net ~-216 lines. Offline suite (265) + live getter suite (63) green. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…framing The module is async-only now, so the docs and names shouldn't imply a removed sync alternative. Docs: removed 'async-only', 'synchronous facade', 'serial sync path', 'no separate serial path', 'no silent serial degradation', and redundant 'async' qualifiers from the module docstring, the concurrency-env docs, the ContextVar/accessor comments, ChunkedCall, resume(), multi_value_chunked, and get_stats_data. resume() is documented by what it does (drive the call to completion) plus the one useful non-obvious property — it works inside a running event loop because it runs in a worker thread — not as a sync-vs-async contrast. Names: since the sync twins are gone, the async versions reclaim the bare names — _paginate_async -> _paginate, _walk_pages_async -> _walk_pages, _client_for_async -> _client_for, _retry_async -> _retry, get_active_async_client -> get_active_client, _chunked_async_client -> _chunked_client. The utils/progress test shim that drives the async _walk_pages synchronously is renamed _run_walk_pages to avoid the name clash, and a redundant initial-page-parse-error test (the former 'async sibling', now identical) is dropped. Docstrings/comments + mechanical rename only — no behavior change. Offline suite (264) + live getter suite (63) green. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Cleanup only — no test logic, assertions, or coverage changed: - Hoist ~40 in-body imports to module level across the waterdata test files (dedup against existing top-level imports; consolidate stray aliases — _httpx -> httpx, _dt -> datetime, _mock -> mock). The only imports left in-body are the importorskip-guarded geopandas/shapely ones. - Drop stale sync/async framing from docstrings/comments (the chunker is async now): no more 'serial path', 'sync sibling', 'async-only', 'requests-mock', or 'falls back to serial' archaeology. - Rename the autouse fixture _serial_chunker -> _pin_chunker_env (it pins API_USGS_CONCURRENT=1 / API_USGS_RETRIES=0; 'serial' implied a removed path) and tighten its docstring. Offline suite (264) + live getter suite (63) green; ruff clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Cut accreted redundancy (no behavior change): the concurrency-env constants comment now points to the module docstring / _read_concurrency_env instead of triplicating the knob semantics; the _NEVER_CHUNK exclusion taxonomy is compressed 16->7 lines (reasons kept); completed_chunks loses a Returns block that restated its one-line summary; the ChunkInterrupted snapshot comment drops the .copy() archaeology; and multi_value_chunked's two overlapping paragraphs collapse to one, deferring the concurrency model to the module docstring. Net -30 lines. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Readability + accuracy: - Module docstring: 'ChunkedCall iterates the joint cartesian product so every sub-request URL fits' attributed the fit guarantee to ChunkedCall, but that's ChunkPlan's job — reworded so ChunkPlan keeps each URL under budget and ChunkedCall fetches the resulting product. - Dropped two duplicated explanations: the sparse-completion [0,2,5] example (kept on the class docstring, trimmed from __init__) and the 'no semaphore' note (kept in _run's docstring, trimmed from its inline comment). Verified the docs carry no stale references after the async-only refactor + renames: every :meth:/:func:/:class:/:attr: cross-ref resolves, the retry defaults (4 / 0.5s / 30s / 60s) match the constants, and the only 'semaphore' mentions are correct negations (pool throttles, not a semaphore). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Drop the 'not a semaphore' clarification (module docstring + _run). - Omit the 'All four are ... power users' sentence from the retry-defaults comment. - Remove the '(is this worth retrying at all?)' note-to-self in RetryPolicy. - Copy-edit two dense passages for readability (the _Finalize comment and the _retryable docstring). - Drop the 'The async execution core' lead-in from _run's docstring. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Retry-defaults comment: drop the hardcoded numbers (4 / 0.5s / 30s / 60s) — they live in the constants below and were drifting from them. - _publish docstring: drop the 'set/reset token dance' mechanism leak; state the contract. - get_active_client docstring: drop the 'public accessor / private ContextVar' justification paragraph; keep the one-liner + the paginated-loop usage cue. - combined(): drop the 'terminal success result' paragraph that duplicated the Returns section; move the finalize return-shape detail into Returns where it belongs. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Module docstring, ChunkedCall.resume() Raises, and ChunkedCall._run all listed only 429/5xx as the failures that raise ChunkInterrupted, but _classify_chunk_error also wraps bare httpx.HTTPError (ConnectError, TimeoutException, RemoteProtocolError, ...) and httpx.InvalidURL as ServiceInterrupted (chunking.py:1098). So callers who only caught the 429/5xx case per the docs could miss the transport-error path. Fix: list transport errors alongside 429/5xx in all three docstrings, and name QuotaExhausted vs ServiceInterrupted by which case maps where. Surfaced by a docs-vs-code audit; no functional change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Tighten seven prose-heavy passages where the wording made the reader
back up to re-parse. No semantic changes.
- RetryPolicy.__post_init__ comment: lead with intent ('catch invalid
knobs at construction'); keep the why-not (asyncio.sleep silent vs
time.sleep loud) as a clarifying parenthetical.
- RetryPolicy.from_env docstring: split the comma-and-semicolon chain
into one cleaner sentence; lead with the verb ('Reads...').
- _chunked_client comment: drop the 'across every gathered sub-request
of the call' tail and the 'in that case' coda.
- _set_response_url docstring: lead with the control flow (try direct
first; on real responses, swap the bound request) rather than the
read-only-vs-writable mechanism.
- _retry_delay docstring: drop the stale 'sync and async drivers share
it' line left over from before the async-only collapse; format the
three None cases as an em-dash list.
- ChunkedCall class docstring: split the long opener at the natural
sentence boundary instead of trailing it with 'and ... and ... — used
both for...'.
- _pending docstring: replace the awkward 'The single source of the
"walk ..., skip ..." rule' construction with a direct two-sentence
description.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ASLEEP, inline trivial methods) - Module docstring (L29-31): apply suggested wording — drop the "isn't slept off inline" / "doesn't block the call" rationale; the one-line escalation statement is enough. - Drop the ``_ASLEEP = asyncio.sleep`` module-level test hook in favor of a direct ``await asyncio.sleep(delay)``; tests now patch ``chunking.asyncio.sleep`` (still scoped to the chunking module's asyncio binding, no extra indirection in production). - Inline ``ChunkedCall.record(index, pair)`` into the one call site in ``_run.track``; the "single writer of ``_chunks``" invariant moves to a comment on ``self._chunks`` initialization. - Inline ``ChunkedCall.combined()`` into ``_run``'s return; the ``partial_*`` bypass note moves to a comment at the return site, where it's more useful than buried in a removed helper's docstring. No behavior change; 296 offline tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…onsolidate sleep helpers Doc-vs-code drift left by the recent inlining + the 01c734b transport-error fix: - multi_value_chunked Raises (chunking.py:1716): now lists transport errors alongside 429/5xx — the surface the 01c734b sweep missed. - _combine_raw docstring (chunking.py:1444): "record" → "the track closure in _run" since record() was inlined in c63da1b. - track closure docstring (chunking.py:1627): "+ record +" → "+ result-store +" for the same reason. - _aiozero test helper docstring: tightened to "asyncio.sleep (via the chunking module's binding)" — chunking.asyncio IS the asyncio module. - Test section banner: "drivers" (plural) → "driver" (only one remaining after the async-only collapse). Simplifications: - Drop the redundant 3-line comment above the inlined `return self.finalize(*self._combine_raw())` — partial_frame's docstring and the class Attributes already say the same thing twice. - Test sleep-faker variants consolidated: replaced 3 inline `async def _noslept(_d): return None` blocks with the existing module-level `_aiozero`; replaced 2 inline `_record` closures with a new module-level `_recording_sleep(slept)` factory. Net −8 lines. 296 offline tests pass; ruff clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ConcurrencyPolicy
Centralizes the four runtime knobs the Water Data getters consult (retry
budget, concurrency cap, API token, progress mode) behind one config
object with the conventional precedence rule used by git / npm / pip /
cargo — closer to the action wins:
1. Defaults (dataclass field defaults)
2. User config file (~/.config/dataretrieval/config.cfg)
3. Local config file (./dataretrieval.cfg)
4. Environment variables (API_USGS_*)
5. Python override (ContextVar-scoped `override()` context manager)
New `dataretrieval/waterdata/_config.py`:
- `RetryPolicy` (moved from chunking.py).
- `ConcurrencyPolicy` (new — mirrors RetryPolicy for the
concurrency cap; replaces `_read_concurrency_env`).
- `WaterDataConfig` composes them + api_token + progress.
- `WaterDataConfig.load()` runs the precedence chain.
- `current()` returns the active override (if any) or freshly loads.
- `override(WaterDataConfig)` / `set_config(...)` for Python-side
runtime overrides.
- Stdlib `configparser` for the INI files — no new deps. (If TOML +
pyproject.toml integration is later wanted, `tomli` is a small
conditional dep follow-up.)
Wired through:
- chunking.py: imports `RetryPolicy` / `ConcurrencyPolicy` from
`_config` (re-export keeps the `from chunking import RetryPolicy`
path working). `_read_concurrency_env` and `_read_retries_env`
removed.
- utils._default_headers reads `api_token` via `_config.current()`.
- _progress reads progress mode + api-key hint via `_config.current()`.
Backward compatibility: every existing env var keeps working unchanged
(env layer reads them at call time). Existing tests pass with no
changes except (a) the brittle `monkeypatch.setattr(_chunking,
"_RETRY_BASE_BACKOFF", 0.0)` test replaced with one that uses the new
`override()` mechanism, (b) one `_chunking._RETRIES_DEFAULT` reference
re-pointed at `_config._RETRIES_DEFAULT`.
15 new tests covering each precedence layer, the override context
manager (including nesting), parsing edge cases (case-insensitive
"unbounded", invalid values rejected, progress parser truthy/falsy/auto),
and unknown-key tolerance. Total offline suite: 311 passed (+15).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
f81d71a to
97cba05
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Stacked on DOI-USGS#285 (base:
httpx-migration). Pure follow-up; not blocking the merge of DOI-USGS#285.What
Centralizes the four runtime knobs the Water Data getters consult (retry budget, concurrency cap, API token, progress mode) behind one config object with the conventional precedence rule used by
git/npm/pip/cargo— closer to the action wins:_RETRY_*/_CONCURRENCY_DEFAULTconstants)~/.config/dataretrieval/config.cfg(honors$XDG_CONFIG_HOME;%APPDATA%\dataretrieval\config.cfgon Windows)./dataretrieval.cfgAPI_USGS_RETRIES,API_USGS_CONCURRENT,API_USGS_PAT,API_USGS_PROGRESSoverride(WaterDataConfig(...))context manager (ContextVar-scoped; thread- and async-safe)Schema (stdlib INI, no new deps)
Why stdlib
configparserand not TOMLTOML would let us read
[tool.dataretrieval]frompyproject.tomldirectly (the Python ecosystem convention — ruff, mypy, hatch all do this). But it requires a conditionaltomlidep on Python <3.11. Going with stdlibconfigparserkeeps the dep tree at justhttpx+pandas. If pyproject.toml integration shows demand later, addingtomliis a small mechanical follow-up.Module layout
dataretrieval/waterdata/_config.py—RetryPolicy,ConcurrencyPolicy,WaterDataConfig,load(),current(),override(),set_config(), file readers, env reader, precedence merger.RetryPolicylives in_config.pynow;chunking.pyre-exports it (sofrom chunking import RetryPolicystill works for legacy callers and tests).chunking._read_concurrency_env/chunking._read_retries_env(no longer needed —WaterDataConfig.load()handles env reading).chunking.py:_run's concurrency cap comes fromConcurrencyPolicy.from_env().max_connections.utils._default_headers: API token comes from_config.current().api_token._progress: progress mode + api-key-hint logic come from_config.current().Backward compatibility
Every existing env var keeps working identically (env layer reads them at call time, with the same parsing). Two test sites needed updates:
monkeypatch.setattr(_chunking, "_RETRY_BASE_BACKOFF", 0.0)test replaced with one that uses the newoverride()mechanism — which exercises the actual layered loader path and is the pattern the recent reviewer thread (discussion_r3318252717) was pointing toward._chunking._RETRIES_DEFAULTreference re-pointed at_config._RETRIES_DEFAULT.Tests
311 passed (+15 new) covering:
max_retriespreserves file-set sibling keys (base_backoff,max_backoff).override()blocks pop correctly."unbounded", invalid concurrency rejected, negative retries rejected.Diff
Net +557 lines including the new module + tests. The chunking.py shrinkage (−201) is mostly the moved-out
RetryPolicy+_read_*_envhelpers; ~250 lines moved (with cleanup) into_config.py.🤖 Generated with Claude Code