Skip to main content

A Python client library for interacting with the scidx POP and create streams.

Project description

SciDX Streaming Library

SciDX Streaming wraps an authenticated ndp_ep.APIClient so you can register datasets, manage resources, discover resources, and build derived Kafka streams with filters. Everything here is runnable against a CKAN + Kafka stack.

Quickstart (Kafka-only streaming)

from ndp_ep import APIClient
from scidx_streaming import StreamingClient

client = StreamingClient(APIClient(base_url=API_URL, token=get_token()))

# Register a Kafka resource definition (stored in CKAN; points at the real topic)
client.register_resource(dataset_id, {
    "type": "kafka",
    "name": "drone_fleet_alpha",
    "description": "Kafka telemetry for Fleet Alpha (Barcelona area)",
    "host": "localhost",
    "port": 9092,
    "topic": "demo.drone.telemetry.fleet_alpha",
})

# Discover resources
results = client.search_resources(terms=[dataset_id], types=["kafka"])

# Build and consume a derived stream with filters
filters = client.compile_filters([
    {"type": "mapping", "column": "STATE", "action": "rename", "new_name": "state"},
    {"type": "comparison", "column": "state", "op": "eq", "value": "UT"},
    {"type": "comparison", "column": "value", "op": "gt", "value": 100},
])
derived = client.create_stream(resource_ids=[results.ids()[0]], filters=filters)
handle = derived.connect(client).start(from_beginning=True)
print(handle.summary())
handle.stop()

What’s implemented in 0.1.8

  • Resource lifecycle: register/update/deactivate/delete resource definitions (stored in CKAN). Definitions describe where the real data lives (Kafka topic/URL/etc.); CKAN never stores the data itself.
  • Discovery: search the catalog for resource definitions by terms (keywords) and types.
  • Filters: Mapping, comparison, and group filters that can be compiled and applied to the live data.
  • Derived streams: Kafka topics that have been created using the resource definitions to get the real data with applyed filters. Local-only consumption is available for testing purposes or having private streams.
  • Consumers: Handle for Kafka topics with bounded buffers, retention controls, and optional CSV persistence.

Planned but not yet wired: derived streams from API streams, RSS, and static files (csv/json/txt/netcdf).

Producers vs. consumers (and CKAN’s role)

  • CKAN holds resource definitions only: dataset metadata + connection details (Kafka host/port/topic, URLs, etc.).
  • Producers publish the actual data to the sources referenced by those definitions (e.g., Kafka topics in the drone demo).
  • This library reads the definitions from CKAN, connects to the real sources, and builds derived Kafka topics (public by default) that others can consume or further derive.
  • Consumers use StreamHandle (or their own Kafka clients) to read derived topics; they can also create new derived streams with their own filters.

Notebooks (GitHub only; not on PyPI)

  • notebooks/simulated_drone_demo/00_start_simulation.ipynb04_cleanup.ipynb
    Full resource management + Kafka-derived streams + filters with real data.
  • notebooks/test/00_overview.ipynb, 03_create_stream.ipynb, 04_consumption.ipynb
    Lightweight regression/demo set.

Keep .env local (gitignored); use .env_template as a starting point.

Documentation hub

See docs/README.md for the full guide set:

  • Architecture/overview
  • Resource lifecycle
  • Discovery
  • Filters (mapping/comparison/group)
  • Derived streams & consumption

Tests

Key checks live in tests/ (offline + live Kafka/CKAN coverage). Run pytest to validate resource lifecycle, filter compilation, stream creation, and consumer behaviour before releasing.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

scidx_streaming-0.1.9.tar.gz (96.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

scidx_streaming-0.1.9-py3-none-any.whl (103.5 kB view details)

Uploaded Python 3

File details

Details for the file scidx_streaming-0.1.9.tar.gz.

File metadata

  • Download URL: scidx_streaming-0.1.9.tar.gz
  • Upload date:
  • Size: 96.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for scidx_streaming-0.1.9.tar.gz
Algorithm Hash digest
SHA256 52742350b75999b92e7d4cda64f462096e48d39b8ef7f08ededfc310b7b76e14
MD5 dad61c1c055df489c8672376d3616327
BLAKE2b-256 98259562fc6b7ade405bd19d9263567a3c7397ecf20438d80f8c2f27de996a87

See more details on using hashes here.

File details

Details for the file scidx_streaming-0.1.9-py3-none-any.whl.

File metadata

File hashes

Hashes for scidx_streaming-0.1.9-py3-none-any.whl
Algorithm Hash digest
SHA256 c6a4a4763b0e4080f6bf66332557cd73f163d60637cf67570169870e6a699893
MD5 a8941012c05d9f229446e47d9272eb92
BLAKE2b-256 54463159996b4fed01c44e8b5bd712cda5b5f2852aa27b47e2f62ce2904496c9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page