Skip to main content

Community Streamflow Service — live acquisition and harmonization of global streamflow observations

Project description

CSFS — Community Streamflow Service

Live acquisition and harmonization of global streamflow observations.

CI License: GPL v3

CSFS connects to open streamflow data providers worldwide — national hydrological agencies, regional networks, research archives, and global model products — harmonizes their observations into one canonical station/observation schema (discharge in m³/s, timestamps in UTC), and maintains a near-real-time DuckDB store with scheduled acquisition, health monitoring, a CLI, and a FastAPI read layer.

Documentation: https://darriey.github.io/CSFS/

Why CSFS?

Programmatic access to river discharge data is fragmented: the community relies either on static archives (GRDC, Caravan, GSIM, EStreams, CAMELS) that are frozen at publication time, or on single-agency clients (USGS dataretrieval, hydrofunctions) that each cover one network. Getting current discharge across, say, France, Brazil, and Japan means learning three APIs, three formats, and three unit conventions. CSFS provides a single interface for live, multi-provider acquisition — one connector per agency, every observation normalized to a common schema, re-acquisition scheduled to each provider's update cadence — and keeps its provider roster honest mechanically, with CI-enforced integrity tests.

Provider roster (the honest numbers)

  • 104 sources cataloged in inventory/providers.yaml, labeled by readiness: 78 implemented, 17 research, 5 fallback, 3 manual, 1 deprecated.
  • 86 connectors registered in code — the 78 implemented entries plus 8 still labeled research while their upstream data paths are validated.
  • 41 implemented providers are realtime/near-realtime; the rest are recent/archive sources, including roughly a dozen offline research archives (GRDC, Caravan, GSIM, EStreams, LamaH, CAMELS variants, ROBIN, ADHI, SIEREM).

These statuses are CI-enforced: tests/test_connector_integrity.py fails the build if a connector ships without tests, lacks a scheduler tier, or if the inventory claims implemented for a connector that does not exist. See the full provider catalog.

Note: live-provider commands talk to real agency APIs and can hit transient upstream outages — a failed fetch is usually them, not you.

Install

pip install community-streamflow-service            # core
pip install "community-streamflow-service[api]"     # + FastAPI read layer

Requires Python 3.11+.

Quick start (CLI)

csfs providers                          # list registered providers + tiers
csfs fetch -p usgs --lookback 168 -n 50 # fetch a week of USGS data
csfs status                             # what's in the local DuckDB
csfs health                             # per-connector freshness + run health
csfs serve                              # HTTP read layer (needs the api extra)

Quick start (Python)

import asyncio

from csfs.scheduler.runner import run_acquisition
from csfs.store.duckdb_store import DuckDBStore


async def main() -> None:
    async with DuckDBStore("csfs.duckdb") as store:
        await run_acquisition(store, providers=["usgs"], lookback_hours=48, max_stations=20)

        stations = await store.get_stations(provider="usgs", limit=5)
        obs = await store.get_observations(stations[0].id, limit=10)
        for row in obs:
            print(row["timestamp"], row["discharge_m3s"])


asyncio.run(main())

The store is a plain DuckDB file — any SQL/pandas/Arrow tooling works on it directly. For direct single-provider access without a database, see the Python API guide.

API keys

Most connectors need no credentials. Exceptions: norway_nve (free NVE HydAPI key) and glofas (Copernicus CDS token in ~/.cdsapirc). Keep keys out of tracked config files.

Architecture

connectors/     Provider plugins (one per data source)
core/           Canonical data models, registry, health, exceptions
store/          Persistence layer (DuckDB default)
scheduler/      Acquisition runner, cron tiers, daemon
api/            FastAPI query layer
cli/            Command-line interface
inventory/      Global provider inventory (YAML)

Details — including the roster-integrity guard system and the hermetic test policy — in the architecture docs.

Contributing

The most valuable contribution is a new provider connector. See CONTRIBUTING.md for the walkthrough and the roster-integrity requirements your PR must satisfy.

Citing

See CITATION.cff.

License

GPL-3.0-or-later. See LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

community_streamflow_service-0.1.0.tar.gz (389.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

community_streamflow_service-0.1.0-py3-none-any.whl (312.9 kB view details)

Uploaded Python 3

File details

Details for the file community_streamflow_service-0.1.0.tar.gz.

File metadata

File hashes

Hashes for community_streamflow_service-0.1.0.tar.gz
Algorithm Hash digest
SHA256 8529e8fa75623c35a7b0df690e4980e21447ab999bcec3bcbb1430954e7083cb
MD5 2adfa756fb23c0946d14acc26dc5faa5
BLAKE2b-256 52a25a51de71a60915045a606dc3d45a0120be6fbf30d250afd1b41c58eb7d62

See more details on using hashes here.

File details

Details for the file community_streamflow_service-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for community_streamflow_service-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 991e9b0ddba30dd02757adfd753daaa00c5497d13d886b0db06fc5ad2650fbe3
MD5 856f2fd2a3ca5d9d31d5f33da09e15e5
BLAKE2b-256 b823a5a41f50b16749b93f7ce8ebefd257cf204da09f7504f2bfc84a79aa44e3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page