Skip to content

Commit 33e928d

Browse files
authored
Add waterdata.get_nearest_continuous helper (#239)
For each target timestamp, fetch the nearest continuous observation in a single round trip. Builds a CQL OR-chain of per-target bracketed windows, pipes it through ``get_continuous`` (which already auto-chunks long filters across multiple sub-requests), then selects the single observation closest to each target client-side. Exists because the Water Data API matches a single-instant ``time=`` parameter exactly (10:30:31 returns zero rows on a 15-minute gauge), does not implement ``sortby`` for arbitrary queryables, and does not expose a ``T_NEAREST`` CQL function. The narrow-window + client-side reduction is the one pattern that works today for multi-target nearest lookups in one API call. Tie handling is configurable via ``on_tie``: - "first" (default): keep the earlier observation - "last": keep the later observation - "mean": average numeric columns; set ``time`` to the target
1 parent 04d94fd commit 33e928d

4 files changed

Lines changed: 505 additions & 1 deletion

File tree

NEWS.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
**04/23/2026:** Added `waterdata.get_nearest_continuous(targets, ...)` — for each of N target timestamps, fetches the single continuous observation closest to that timestamp in one HTTP round-trip (auto-chunked when the resulting CQL filter is long, via the facility added in #238). The helper is designed for workflows that pair many discrete-measurement timestamps with surrounding instantaneous data, which the OGC `time` parameter can't express since it only accepts one instant or one interval per request. Ties at window midpoints are resolved per a configurable `on_tie` ∈ {`"first"`, `"last"`, `"mean"`}; the default `window="PT7M30S"` matches a 15-minute continuous gauge.
2+
13
**04/22/2026:** Highlights since the `v1.1.0` release (2025-11-26), which shipped the `waterdata` module:
24

35
- Added `get_channel` for channel-measurement data (#218) and `get_stats_por` / `get_stats_date_range` for period-of-record and daily statistics (#207).

dataretrieval/waterdata/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
get_latest_continuous,
2020
get_latest_daily,
2121
get_monitoring_locations,
22+
get_nearest_continuous,
2223
get_reference_table,
2324
get_samples,
2425
get_stats_date_range,
@@ -47,6 +48,7 @@
4748
"get_latest_continuous",
4849
"get_latest_daily",
4950
"get_monitoring_locations",
51+
"get_nearest_continuous",
5052
"get_reference_table",
5153
"get_samples",
5254
"get_stats_date_range",

dataretrieval/waterdata/api.py

Lines changed: 234 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import json
1010
import logging
1111
from io import StringIO
12-
from typing import get_args
12+
from typing import Literal, get_args
1313

1414
import pandas as pd
1515
import requests
@@ -440,6 +440,239 @@ def get_continuous(
440440
return get_ogc_data(args, output_id, service)
441441

442442

443+
def get_nearest_continuous(
444+
targets,
445+
monitoring_location_id: str | list[str] | None = None,
446+
parameter_code: str | list[str] | None = None,
447+
*,
448+
window: str | pd.Timedelta = "PT7M30S",
449+
on_tie: Literal["first", "last", "mean"] = "first",
450+
**kwargs,
451+
) -> tuple[pd.DataFrame, BaseMetadata]:
452+
"""For each target timestamp, return the nearest continuous observation.
453+
454+
Builds one bracketed ``(time >= t-window AND time <= t+window)`` clause
455+
per target, joins them as a top-level CQL ``OR`` filter, and lets
456+
``get_continuous`` (with its auto-chunking) fetch every observation
457+
that falls in any window. Then, per ``(monitoring_location_id, target)``
458+
pair, picks the single observation with the smallest ``|time - target|``.
459+
460+
The USGS continuous endpoint matches ``time`` parameters exactly rather
461+
than fuzzily, and it does not implement ``sortby`` for arbitrary fields;
462+
this function is the single-round-trip way to ask "what reading is
463+
nearest this timestamp?" for many timestamps at once.
464+
465+
Parameters
466+
----------
467+
targets : list-like of datetime-convertible
468+
Target timestamps. Naive datetimes are treated as UTC. Accepts a
469+
list, ``pandas.Series``, ``pandas.DatetimeIndex``, ``numpy.ndarray``,
470+
or anything ``pandas.to_datetime`` consumes.
471+
monitoring_location_id : string or list of strings, optional
472+
Forwarded to ``get_continuous``.
473+
parameter_code : string or list of strings, optional
474+
Forwarded to ``get_continuous``.
475+
window : string or ``pandas.Timedelta``, default ``"PT7M30S"``
476+
Half-window around each target, as an ISO 8601 duration
477+
(``"PT7M30S"``, ``"PT15M"``, ``"PT1H"``, etc.). Also accepts
478+
any other form ``pandas.Timedelta`` parses — ``HH:MM:SS``
479+
(``"00:07:30"``), pandas shorthand (``"7min30s"``,
480+
``"450s"``), or a ``pd.Timedelta`` directly. See the
481+
`pandas.Timedelta docs
482+
<https://pandas.pydata.org/docs/reference/api/pandas.Timedelta.html>`_
483+
for the full grammar.
484+
485+
Must be small enough that every target's window captures
486+
roughly one observation at the service cadence. The default
487+
matches a 15-minute continuous gauge; widen (e.g.
488+
``"PT15M"``) for irregular cadences or resilience to data
489+
gaps.
490+
on_tie : {"first", "last", "mean"}, default ``"first"``
491+
How to resolve ties when two observations are exactly equidistant
492+
from a target (which happens when the target falls at the midpoint
493+
between grid points — e.g. target ``10:22:30`` for a 15-minute
494+
gauge).
495+
496+
- ``"first"``: keep the earlier observation.
497+
- ``"last"``: keep the later observation.
498+
- ``"mean"``: average numeric columns; set the ``time`` column to
499+
the target, since no real observation exists at the midpoint.
500+
501+
**kwargs
502+
Additional keyword arguments forwarded to ``get_continuous``
503+
(e.g. ``statistic_id``, ``approval_status``, ``properties``).
504+
Passing ``time``, ``filter``, or ``filter_lang`` raises
505+
``TypeError`` — this function builds those itself.
506+
507+
Returns
508+
-------
509+
df : ``pandas.DataFrame``
510+
One row per ``(target, monitoring_location_id)`` combination that
511+
had at least one observation in its window. Rows are augmented
512+
with a ``target_time`` column indicating which target they
513+
correspond to. Targets with no observations in their window are
514+
silently dropped.
515+
md : :class:`~dataretrieval.utils.BaseMetadata`
516+
Metadata from the underlying ``get_continuous`` call.
517+
518+
Notes
519+
-----
520+
*Window sizing and ties.* When ``window`` is exactly half the service
521+
cadence, most targets' windows contain a single observation and
522+
``on_tie`` is moot. Ties arise only when a target sits exactly at the
523+
window edge — rare in practice but possible. Setting ``window`` to a
524+
full cadence (or larger) guarantees at least one observation per
525+
target in steady state at the cost of more bytes per response.
526+
527+
*Why windowed CQL rather than sort+limit.* The API's advertised
528+
``sortby`` parameter would make this a one-liner per target (``filter``
529+
by ``time <= t`` and ``limit 1``), but it is per-query — you would need
530+
one HTTP round-trip per target. The CQL ``OR``-chain approach folds
531+
all N targets into one request (auto-chunked when the URL is long).
532+
533+
Examples
534+
--------
535+
.. code::
536+
537+
>>> import pandas as pd
538+
>>> from dataretrieval import waterdata
539+
540+
>>> # Pair three off-grid timestamps with nearby observations
541+
>>> targets = pd.to_datetime(
542+
... [
543+
... "2023-06-15T10:30:31Z",
544+
... "2023-06-15T14:07:12Z",
545+
... "2023-06-16T03:45:19Z",
546+
... ]
547+
... )
548+
>>> df, md = waterdata.get_nearest_continuous(
549+
... targets,
550+
... monitoring_location_id="USGS-02238500",
551+
... parameter_code="00060",
552+
... )
553+
554+
>>> # Widen the window for an irregular-cadence gauge
555+
>>> df, md = waterdata.get_nearest_continuous(
556+
... targets,
557+
... monitoring_location_id="USGS-02238500",
558+
... parameter_code="00060",
559+
... window="PT30M",
560+
... on_tie="mean",
561+
... )
562+
"""
563+
_check_nearest_kwargs(kwargs, on_tie)
564+
targets = pd.DatetimeIndex(pd.to_datetime(targets, utc=True))
565+
window_td = pd.Timedelta(window)
566+
567+
if len(targets) == 0:
568+
raise ValueError("targets must contain at least one timestamp")
569+
570+
filter_expr = _build_window_or_filter(targets, window_td)
571+
df, md = get_continuous(
572+
monitoring_location_id=monitoring_location_id,
573+
parameter_code=parameter_code,
574+
filter=filter_expr,
575+
filter_lang="cql-text",
576+
**kwargs,
577+
)
578+
if df.empty:
579+
return _empty_nearest_result(df), md
580+
581+
df = df.assign(time=pd.to_datetime(df["time"], utc=True))
582+
site_groups = (
583+
df.groupby("monitoring_location_id", sort=False)
584+
if "monitoring_location_id" in df.columns
585+
else [(None, df)]
586+
)
587+
588+
selected = [
589+
row
590+
for _, site_df in site_groups
591+
for target in targets
592+
if (row := _pick_nearest_row(site_df, target, window_td, on_tie)) is not None
593+
]
594+
if not selected:
595+
return _empty_nearest_result(df), md
596+
return pd.DataFrame(selected).reset_index(drop=True), md
597+
598+
599+
_VALID_ON_TIE = ("first", "last", "mean")
600+
601+
602+
def _check_nearest_kwargs(kwargs: dict, on_tie: str) -> None:
603+
"""Reject kwargs the helper owns; validate ``on_tie``."""
604+
for forbidden in ("time", "filter", "filter_lang"):
605+
if forbidden in kwargs:
606+
raise TypeError(
607+
f"get_nearest_continuous constructs its own {forbidden!r}; "
608+
"do not pass it directly"
609+
)
610+
if on_tie not in _VALID_ON_TIE:
611+
raise ValueError(f"on_tie must be one of {_VALID_ON_TIE}; got {on_tie!r}")
612+
613+
614+
def _build_window_or_filter(targets: pd.DatetimeIndex, window_td: pd.Timedelta) -> str:
615+
"""Build the CQL OR-chain of ``time >= ... AND time <= ...`` windows.
616+
617+
``get_continuous`` auto-chunks the result if the full URL would
618+
exceed the server's length limit, so this is always safe to build
619+
as one string even for many targets.
620+
"""
621+
return " OR ".join(
622+
f"(time >= '{(t - window_td).strftime('%Y-%m-%dT%H:%M:%SZ')}' "
623+
f"AND time <= '{(t + window_td).strftime('%Y-%m-%dT%H:%M:%SZ')}')"
624+
for t in targets
625+
)
626+
627+
628+
def _pick_nearest_row(
629+
site_df: pd.DataFrame,
630+
target: pd.Timestamp,
631+
window_td: pd.Timedelta,
632+
on_tie: str,
633+
) -> pd.Series | None:
634+
"""Return the single row within ``window_td`` of ``target``, or ``None``.
635+
636+
Resolves ties (two rows equidistant from ``target``) per ``on_tie``.
637+
The returned row carries a ``target_time`` column identifying which
638+
target it was selected for.
639+
"""
640+
in_window = site_df[
641+
(site_df["time"] >= target - window_td)
642+
& (site_df["time"] <= target + window_td)
643+
]
644+
if in_window.empty:
645+
return None
646+
deltas = (in_window["time"] - target).abs()
647+
candidates = in_window[deltas == deltas.min()].sort_values("time")
648+
649+
if len(candidates) == 1 or on_tie == "first":
650+
row = candidates.iloc[0].copy()
651+
elif on_tie == "last":
652+
row = candidates.iloc[-1].copy()
653+
else: # "mean" — synthesize a row whose numeric cols are averaged and
654+
# whose ``time`` is the target (no real observation sits at the midpoint).
655+
row = candidates.iloc[0].copy()
656+
for col in candidates.select_dtypes("number").columns:
657+
row[col] = candidates[col].mean()
658+
row["time"] = target
659+
660+
row["target_time"] = target
661+
return row
662+
663+
664+
def _empty_nearest_result(template: pd.DataFrame | None = None) -> pd.DataFrame:
665+
"""Empty frame with a ``target_time`` column, for no-match cases.
666+
667+
When ``template`` is provided, preserve its columns/dtypes so the
668+
returned frame matches the shape of a real ``get_continuous``
669+
response.
670+
"""
671+
base = pd.DataFrame() if template is None else template.iloc[0:0].copy()
672+
base["target_time"] = pd.Series(dtype="datetime64[ns, UTC]")
673+
return base
674+
675+
443676
def get_monitoring_locations(
444677
monitoring_location_id: list[str] | None = None,
445678
agency_code: list[str] | None = None,

0 commit comments

Comments
 (0)