Community Streamflow Service — live acquisition and harmonization of global streamflow observations
Project description
CSFS — Community Streamflow Service
Live acquisition and harmonization of global streamflow observations.
CSFS connects to open streamflow data providers worldwide — national hydrological agencies, regional networks, research archives, and global model products — harmonizes their observations into one canonical station/observation schema (discharge in m³/s, timestamps in UTC), and maintains a near-real-time DuckDB store with scheduled acquisition, health monitoring, a CLI, and a FastAPI read layer.
Documentation: https://darriey.github.io/CSFS/
Why CSFS?
Programmatic access to river discharge data is fragmented: the community
relies either on static archives (GRDC, Caravan, GSIM, EStreams, CAMELS)
that are frozen at publication time, or on single-agency clients (USGS
dataretrieval, hydrofunctions) that each cover one network. Getting
current discharge across, say, France, Brazil, and Japan means learning three
APIs, three formats, and three unit conventions. CSFS provides a single
interface for live, multi-provider acquisition — one connector per agency,
every observation normalized to a common schema, re-acquisition scheduled to
each provider's update cadence — and keeps its provider roster honest
mechanically, with CI-enforced integrity tests.
Provider roster (the honest numbers)
- 104 sources cataloged in
inventory/providers.yaml, labeled by readiness: 78 implemented, 17 research, 5 fallback, 3 manual, 1 deprecated. - 86 connectors registered in code — the 78
implementedentries plus 8 still labeledresearchwhile their upstream data paths are validated. - 41 implemented providers are realtime/near-realtime; the rest are recent/archive sources, including roughly a dozen offline research archives (GRDC, Caravan, GSIM, EStreams, LamaH, CAMELS variants, ROBIN, ADHI, SIEREM).
These statuses are CI-enforced: tests/test_connector_integrity.py
fails the build if a connector ships without tests, lacks a scheduler tier,
or if the inventory claims implemented for a connector that does not exist.
See the full provider catalog.
Note: live-provider commands talk to real agency APIs and can hit transient upstream outages — a failed fetch is usually them, not you.
Install
pip install community-streamflow-service # core
pip install "community-streamflow-service[pandas]" # + DataFrame store queries
pip install "community-streamflow-service[api]" # + FastAPI read layer
Requires Python 3.11+.
Quick start (CLI)
csfs providers # list registered providers + tiers
csfs fetch -p usgs --lookback 168 -n 50 # fetch a week of USGS data
csfs status # what's in the local DuckDB
csfs health # per-connector freshness + run health
csfs serve # HTTP read layer (needs the api extra)
Quick start (Python)
import asyncio
import csfs
async def main() -> None:
async with csfs.open_store("csfs.duckdb", read_only=False) as store:
await csfs.run_acquisition(store, providers=["usgs"], lookback_hours=48, max_stations=20)
stations = await store.get_stations(provider="usgs", limit=5)
# pandas DataFrame indexed by timestamp (needs the [pandas] extra);
# get_observations() / get_observations_arrow() need no extra.
df = await store.get_observations_df(stations[0].id)
print(df["discharge_m3s"].describe())
asyncio.run(main())
Or pull one gauge's series straight from a provider, no database involved:
from datetime import UTC, datetime, timedelta
import csfs
end = datetime.now(UTC)
chunk = csfs.fetch_observations_sync("usgs", "usgs:01646500", start=end - timedelta(days=7), end=end)
The store is a plain DuckDB file — any SQL/pandas/Arrow tooling works on it
directly. The blessed, stable surface is what import csfs re-exports; see
the Python API guide.
API keys
Most connectors need no credentials. Exceptions: norway_nve (free
NVE HydAPI key) and glofas
(Copernicus CDS token in ~/.cdsapirc).
Keep keys out of tracked config files.
Architecture
connectors/ Provider plugins (one per data source)
core/ Canonical data models, registry, health, exceptions
store/ Persistence layer (DuckDB default)
scheduler/ Acquisition runner, cron tiers, daemon
api/ FastAPI query layer
cli/ Command-line interface
inventory/ Global provider inventory (YAML)
Details — including the roster-integrity guard system and the hermetic test policy — in the architecture docs.
Contributing
The most valuable contribution is a new provider connector. See CONTRIBUTING.md for the walkthrough and the roster-integrity requirements your PR must satisfy.
Citing
See CITATION.cff.
License
GPL-3.0-or-later. See LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file community_streamflow_service-0.2.0.tar.gz.
File metadata
- Download URL: community_streamflow_service-0.2.0.tar.gz
- Upload date:
- Size: 395.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bf8173c60584bc3270cc6396e1d0dbfb22813970a06c5f5c6bc022341d5ce5dd
|
|
| MD5 |
3004ae569d3e2a70686186950012730e
|
|
| BLAKE2b-256 |
fc7887f51b2b9c7f0c1a9e5deb5edf8be53fa0868457818838b72aa681605e6e
|
File details
Details for the file community_streamflow_service-0.2.0-py3-none-any.whl.
File metadata
- Download URL: community_streamflow_service-0.2.0-py3-none-any.whl
- Upload date:
- Size: 315.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2f8ce9ff0eebb45cef891958ee1874466bfe7e1365c9672c407a4493c8023128
|
|
| MD5 |
2d1c6b4a3b39c75a1454601a3dca631f
|
|
| BLAKE2b-256 |
73e7e0e073b56d57f57aa2f559c2f9a0616758555e65e12f8d8b7f7a1efdb1f2
|