Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
d2bf71f
feat(waterdata): Add async parallel chunker over httpx.AsyncClient
thodson-usgs May 25, 2026
1d0acb7
refactor(waterdata): maintainer polish for chunk retries
thodson-usgs May 27, 2026
e740a64
fix(waterdata): finalize resumed chunked results; add reference-table…
thodson-usgs May 27, 2026
f094d42
refactor(waterdata): consolidate trivial chunker helpers; NumPy docst…
thodson-usgs May 27, 2026
7deedd7
refactor(waterdata): collapse sync/async chunker into one async core
thodson-usgs May 27, 2026
fd6f67f
refactor(waterdata): drop _async name suffixes; prune async/sync doc …
thodson-usgs May 27, 2026
9fe3a5a
test(waterdata): tidy the test suite
thodson-usgs May 27, 2026
c507d62
docs(waterdata): trim over-documentation in chunking.py
thodson-usgs May 27, 2026
79a9017
docs(waterdata): editorial pass on chunking.py docs
thodson-usgs May 27, 2026
bbabf50
docs(waterdata): address PR 285 review comments on chunking.py docs
thodson-usgs May 27, 2026
ead1c09
docs(waterdata): final editorial pass on chunking.py
thodson-usgs May 28, 2026
01c734b
docs(waterdata): correct interruption claims to include transport errors
thodson-usgs May 28, 2026
ad1208e
docs(waterdata): editorial pass for readability on chunking.py
thodson-usgs May 28, 2026
c63da1b
refactor(waterdata): address PR 285 review (compact retry doc, drop _…
thodson-usgs May 28, 2026
d958005
docs+tests(waterdata): apply /simplify sweep — close stale refs and c…
thodson-usgs May 28, 2026
6da534d
feat(waterdata): layered config with WaterDataConfig + RetryPolicy + …
thodson-usgs May 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
447 changes: 447 additions & 0 deletions dataretrieval/waterdata/_config.py

Large diffs are not rendered by default.

53 changes: 45 additions & 8 deletions dataretrieval/waterdata/_progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from __future__ import annotations

import contextvars
import os
import sys
from collections.abc import Iterator
from contextlib import contextmanager
Expand Down Expand Up @@ -80,13 +79,17 @@ def _in_jupyter_kernel() -> bool:
def _enabled_default(stream: TextIO) -> bool:
"""Whether to draw the line by default.

``API_USGS_PROGRESS`` wins when set. Otherwise show it for interactive use —
a TTY or a Jupyter/IPython kernel — and stay quiet for redirected output,
logs, and CI.
An explicit on/off from the layered config wins (set via
``$API_USGS_PROGRESS``, a config file's ``[default] progress``, or a
Python override — see :mod:`._config`). Otherwise show it for
interactive use — a TTY or a Jupyter/IPython kernel — and stay quiet
for redirected output, logs, and CI.
"""
override = os.getenv("API_USGS_PROGRESS")
if override is not None:
return override.strip().lower() not in {"", "0", "false", "no", "off"}
from . import _config # local import to avoid an import cycle at module load

explicit = _config.current().progress
if explicit is not None:
return explicit
if _in_jupyter_kernel():
return True
return hasattr(stream, "isatty") and stream.isatty()
Expand Down Expand Up @@ -121,6 +124,9 @@ def __init__(
# The hourly request quota (``x-ratelimit-limit``), shown as the
# denominator when the server reports it.
self.rate_limit: str | None = None
# Transient note shown while a sub-request backs off before a
# retry; cleared by the next page/chunk so it doesn't linger.
self.retry_note: str | None = None
self._last_len = 0
# Whether anything was actually written to the stream — drives whether
# close() needs a terminating newline. (``current_chunk`` is a poor
Expand All @@ -140,13 +146,33 @@ def start_chunk(self, index: int) -> None:
avoids a premature "0 pages" frame before the first page arrives.
"""
self.current_chunk = index
self.retry_note = None
if self.total_chunks > 1:
self._render()

def add_page(self, rows: int = 0) -> None:
"""Record one fetched page carrying ``rows`` rows and redraw."""
self.pages += 1
self.rows += int(rows)
self.retry_note = None
self._render()

def note_retry(self, *, attempt: int, wait: float) -> None:
"""Show that a sub-request is backing off before retry ``attempt``.

Cleared by the next :meth:`add_page` / :meth:`start_chunk` (or by
:meth:`close`) so the line returns to normal once the retry resolves.
"""
# Keep sub-second waits explicit (avoid misleading ``0s``) while
# rendering whole-second waits without unnecessary ``.0`` noise.
# ``float()`` to support Python 3.9-3.11: ``round(int, 1)`` returns an
# int and ``int.is_integer()`` (used below) only exists on 3.12+.
wait_1dp = round(float(wait), 1)
if wait_1dp < 1 or not wait_1dp.is_integer():
secs = f"{wait_1dp:.1f}s"
else:
secs = f"{wait_1dp:.0f}s"
self.retry_note = f"retrying (attempt {attempt}, waiting {secs})"
self._render()

def set_rate_remaining(
Expand Down Expand Up @@ -179,6 +205,8 @@ def _format(self) -> str:
else:
segment = f"{remaining} requests remaining"
parts.append(segment)
if self.retry_note is not None:
parts.append(self.retry_note)
if self.service:
return f"Retrieving: {self.service} · " + " · ".join(parts)
return "Progress: " + " · ".join(parts)
Expand Down Expand Up @@ -209,6 +237,13 @@ def close(self) -> None:
"""
if self._closed:
return
# A retry note set during the final backoff would otherwise freeze as
# the persisted last line of a call that has since completed or given
# up; clear it and redraw (while still un-closed, so ``_render`` runs)
# so the final state isn't a stale "retrying".
if self.enabled and self._rendered and self.retry_note is not None:
self.retry_note = None
self._render()
self._closed = True
if not (self.enabled and self._rendered):
return
Expand All @@ -220,8 +255,10 @@ def close(self) -> None:
self.enabled = False

def _maybe_hint_api_key(self) -> None:
from . import _config # local import to avoid an import cycle at module load

global _api_key_hint_shown
if _api_key_hint_shown or os.getenv("API_USGS_PAT"):
if _api_key_hint_shown or _config.current().api_token:
return
# Set the once-per-process latch only after a successful write, so a
# failed write (broken pipe) doesn't silently burn the hint for every
Expand Down
11 changes: 10 additions & 1 deletion dataretrieval/waterdata/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2022,6 +2022,7 @@ def get_reference_table(
collection: str,
limit: int | None = None,
query: dict | None = None,
max_rows: int | None = None,
) -> tuple[pd.DataFrame, BaseMetadata]:
"""Get metadata reference tables for the USGS Water Data API.

Expand All @@ -2046,6 +2047,12 @@ def get_reference_table(
query: dictionary, optional
The optional args parameter can be used to pass a dictionary of
query parameters to the collection API call.
max_rows : int, optional
Cap the total number of rows returned, stopping pagination early
instead of downloading the whole table. Useful for cheaply
previewing large tables (e.g. ``hydrologic-unit-codes`` has ~125k
rows). Unlike ``limit`` (the per-page size), this bounds the total
result. The default (None) downloads every page.

Returns
-------
Expand Down Expand Up @@ -2092,7 +2099,9 @@ def get_reference_table(
query_args = dict(query) if query else {}
if limit is not None:
query_args["limit"] = limit
return get_ogc_data(args=query_args, output_id=output_id, service=collection)
return get_ogc_data(
args=query_args, output_id=output_id, service=collection, max_rows=max_rows
)


def get_codes(code_service: CODE_SERVICES) -> pd.DataFrame:
Expand Down
Loading