A Python client library for interacting with the scidx POP and create streams.
Project description
SciDX Streaming Library
SciDX Streaming wraps an authenticated ndp_ep.APIClient so you can register
datasets, manage resources, discover resources, and build derived
Kafka streams with filters. Everything here is runnable against a CKAN +
Kafka stack.
Quickstart (Kafka-only streaming)
from ndp_ep import APIClient
from scidx_streaming import StreamingClient
client = StreamingClient(APIClient(base_url=API_URL, token=get_token()))
# Register a Kafka resource definition (stored in CKAN; points at the real topic)
client.register_resource(dataset_id, {
"type": "kafka",
"name": "drone_fleet_alpha",
"description": "Kafka telemetry for Fleet Alpha (Barcelona area)",
"host": "localhost",
"port": 9092,
"topic": "demo.drone.telemetry.fleet_alpha",
})
# Discover resources
results = client.search_resources(terms=[dataset_id], types=["kafka"])
# Build and consume a derived stream with filters
filters = client.compile_filters([
{"type": "mapping", "column": "STATE", "action": "rename", "new_name": "state"},
{"type": "comparison", "column": "state", "op": "eq", "value": "UT"},
{"type": "comparison", "column": "value", "op": "gt", "value": 100},
])
derived = client.create_stream(resource_ids=[results.ids()[0]], filters=filters)
handle = derived.connect(client).start(from_beginning=True)
print(handle.summary())
handle.stop()
What’s implemented in 0.1.8
- Resource lifecycle: register/update/deactivate/delete resource definitions (stored in CKAN). Definitions describe where the real data lives (Kafka topic/URL/etc.); CKAN never stores the data itself.
- Discovery: search the catalog for resource definitions by terms (keywords) and types.
- Filters: Mapping, comparison, and group filters that can be compiled and applied to the live data.
- Derived streams: Kafka topics that have been created using the resource definitions to get the real data with applyed filters. Local-only consumption is available for testing purposes or having private streams.
- Consumers: Handle for Kafka topics with bounded buffers, retention controls, and optional CSV persistence.
Planned but not yet wired: derived streams from API streams, RSS, and static files (csv/json/txt/netcdf).
Producers vs. consumers (and CKAN’s role)
- CKAN holds resource definitions only: dataset metadata + connection details (Kafka host/port/topic, URLs, etc.).
- Producers publish the actual data to the sources referenced by those definitions (e.g., Kafka topics in the drone demo).
- This library reads the definitions from CKAN, connects to the real sources, and builds derived Kafka topics (public by default) that others can consume or further derive.
- Consumers use
StreamHandle(or their own Kafka clients) to read derived topics; they can also create new derived streams with their own filters.
Notebooks (GitHub only; not on PyPI)
notebooks/simulated_drone_demo/00_start_simulation.ipynb→04_cleanup.ipynb
Full resource management + Kafka-derived streams + filters with real data.notebooks/test/00_overview.ipynb,03_create_stream.ipynb,04_consumption.ipynb
Lightweight regression/demo set.
Keep .env local (gitignored); use .env_template as a starting point.
Documentation hub
See docs/README.md for the full guide set:
- Architecture/overview
- Resource lifecycle
- Discovery
- Filters (mapping/comparison/group)
- Derived streams & consumption
Tests
Key checks live in tests/ (offline + live Kafka/CKAN coverage). Run pytest
to validate resource lifecycle, filter compilation, stream creation, and
consumer behaviour before releasing.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file scidx_streaming-0.1.9.tar.gz.
File metadata
- Download URL: scidx_streaming-0.1.9.tar.gz
- Upload date:
- Size: 96.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
52742350b75999b92e7d4cda64f462096e48d39b8ef7f08ededfc310b7b76e14
|
|
| MD5 |
dad61c1c055df489c8672376d3616327
|
|
| BLAKE2b-256 |
98259562fc6b7ade405bd19d9263567a3c7397ecf20438d80f8c2f27de996a87
|
File details
Details for the file scidx_streaming-0.1.9-py3-none-any.whl.
File metadata
- Download URL: scidx_streaming-0.1.9-py3-none-any.whl
- Upload date:
- Size: 103.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c6a4a4763b0e4080f6bf66332557cd73f163d60637cf67570169870e6a699893
|
|
| MD5 |
a8941012c05d9f229446e47d9272eb92
|
|
| BLAKE2b-256 |
54463159996b4fed01c44e8b5bd712cda5b5f2852aa27b47e2f62ce2904496c9
|