Python SDK for DeltaStream, built on the DeltaStream Connector library
Project description
DeltaStream Python SDK
Modern, async-first Python SDK for managing real-time data infrastructure with DeltaStream.
Overview
The DeltaStream Python SDK provides an ergonomic, type-safe, asynchronous interface to DeltaStream's control and data planes. It simplifies creation, management, and orchestration of streaming data resources (streams, stores, compute pools, schemas, functions, entities, etc.) while offering direct SQL execution for advanced workflows.
Built on top of the official DeltaStream Connector, this SDK focuses on developer productivity, clarity, and composability.
Features
- Async/await native API (Python 3.11+)
- Full CRUD lifecycle for all DeltaStream resources
- Multiple auth/connection strategies: DSN, environment, or programmatic
- Context switching (database, schema, store) with in-memory state tracking
- Direct SQL execution and query helpers
- Rich resource managers (Streams, Stores, Entities, Functions, Compute Pools, Schemas, Registries, Changelogs, Descriptors)
- Type hints throughout for excellent IDE support
- Extensible patterns for adding new resource types
Supported Resource Domains
| Domain | Examples |
|---|---|
| Streams | Creation, status, start/stop, select-based materialization |
| Stores | Kafka, Kinesis, S3 and more |
| Databases & Schemas | Lifecycle + context switching |
| Entities | Creation, insertion, hierarchical listing |
| Functions & Sources | Function code + metadata management |
| Descriptor Sources | Protocol buffer / descriptor handling |
| Schema Registries | Registry management and integration |
| Compute Pools | Provisioning, scaling, lifecycle |
| Changelogs | Tracking and management |
Installation
Install from PyPI (recommended):
pip install deltastream-sdk
Using uv (lockfile-driven Python package manager):
uv add deltastream-sdk
Requirements
- Python 3.11+
- Network access to a DeltaStream deployment
- A valid API token or DSN credentials
Quickstart
Minimal working example demonstrating environment-based configuration:
import asyncio
from deltastream_sdk import DeltaStreamClient
async def main():
client = DeltaStreamClient.from_environment()
if await client.test_connection():
print("✅ Connected to DeltaStream")
if __name__ == "__main__":
asyncio.run(main())
Environment Variables
# Option A (DSN-based)
export DELTASTREAM_DSN="https://:your_token@api.deltastream.io/v2"
# OR explicit config (Option B)
export DELTASTREAM_SERVER_URL="https://api.deltastream.io/v2"
export DELTASTREAM_TOKEN="your_token_here"
export DELTASTREAM_ORGANIZATION_ID="your_org_id"
export DELTASTREAM_DATABASE_NAME="default_db" # optional
export DELTASTREAM_SCHEMA_NAME="public" # optional
export DELTASTREAM_STORE_NAME="default_store" # optional
Usage
Below are common usage patterns. All APIs are async.
Connecting (3 Patterns)
import os
from deltastream_sdk import DeltaStreamClient
# 1. From environment
client = DeltaStreamClient.from_environment()
# 2. Programmatic configuration
async def token_provider():
return os.getenv("DELTASTREAM_TOKEN")
client = DeltaStreamClient(
server_url="https://api.deltastream.io/v2",
token_provider=token_provider,
organization_id="my_org",
)
# 3. DSN-based
client = DeltaStreamClient(dsn="https://:your_token@api.deltastream.io/v2")
Streams
async def stream_example(client):
streams = await client.streams.list()
stream = await client.streams.create_with_schema(
name="user_events",
columns=[
{"name": "user_id", "type": "INTEGER"},
{"name": "event_type", "type": "VARCHAR"},
{"name": "timestamp", "type": "TIMESTAMP"},
],
store="kafka_store",
topic="user-events",
)
analytics_stream = await client.streams.create_from_select(
name="user_analytics",
sql_definition="SELECT user_id, COUNT(*) AS event_count FROM user_events GROUP BY user_id",
)
await client.streams.start("user_events")
status = await client.streams.get_status("user_events")
await client.streams.stop("user_events")
Stores
async def store_example(client):
# Create a Kafka store with PLAIN authentication
kafka_store = await client.stores.create_kafka_store(
name="my_kafka",
parameters={
"uris": "localhost:9092",
"kafka.sasl.hash_function": "PLAIN",
"kafka.sasl.username": "user",
"kafka.sasl.password": "pass",
"schema_registry_name": "my_schema_registry",
},
)
# Create an S3 store with IAM role
s3_store = await client.stores.create_s3_store(
name="my_s3",
parameters={
"uris": "https://mybucket.s3.amazonaws.com/",
"aws.iam_role_arn": "arn:aws:iam::123456789012:role/DeltaStreamRole",
"aws.iam_external_id": "external-id-123",
},
)
# Create a Kinesis store
kinesis_store = await client.stores.create_kinesis_store(
name="my_kinesis",
parameters={
"uris": "https://kinesis.us-east-1.amazonaws.com",
"kinesis.access_key_id": "AKIAIOSFODNN7EXAMPLE",
"kinesis.secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
},
)
# List all stores
all_stores = await client.stores.list()
Databases, Schemas & Context Switching
async def context_example(client):
await client.databases.create(name="analytics_db")
await client.schemas.create(name="customer_data")
await client.use_database("analytics_db")
await client.use_schema("customer_data")
await client.use_store("my_kafka")
current_db = await client.get_current_database()
current_schema = await client.get_current_schema()
current_store = await client.get_current_store()
print(f"Context: {current_db}.{current_schema}/{current_store}")
Entities & Data Insertion
async def entity_example(client):
# Create entity with default parameters
await client.entities.create(
name="simple_entity",
store="my_kafka",
)
# Create entity with custom Kafka configurations
await client.entities.create(
name="user_profiles",
store="my_kafka",
parameters={
"kafka.partitions": 3,
"kafka.replicas": 2,
"kafka.topic.retention.ms": "604800000", # 7 days
},
)
# Create entity with compact cleanup policy
await client.entities.create(
name="user_compact",
store="my_kafka",
parameters={
"kafka.partitions": 2,
"kafka.replicas": 1,
"kafka.topic.cleanup.policy": "compact",
},
)
# Insert data into entity
await client.entities.insert_values(
name="user_profiles",
values=[
{"user_id": 1, "name": "Alice", "email": "alice@example.com"},
{"user_id": 2, "name": "Bob", "email": "bob@example.com"},
],
store="my_kafka",
)
# List entities in a store
entities = await client.entities.list_entities(store="my_kafka")
Compute Pools
async def compute_pool_example(client):
await client.compute_pools.create(name="analytics_pool", min_units=1, max_units=10)
await client.compute_pools.start("analytics_pool")
await client.compute_pools.stop("analytics_pool")
pools = await client.compute_pools.list()
Direct SQL
async def sql_example(client):
await client.execute_sql("CREATE TEMP VIEW users AS SELECT * FROM user_stream;")
rows = await client.query_sql("SELECT COUNT(*) AS user_count FROM users;")
print(rows[0]["user_count"]) # Access by column name
Error Handling
from deltastream_sdk.exceptions import (
DeltaStreamSDKError,
ResourceNotFound,
SQLError,
ConnectionError,
)
async def robust_example(client):
try:
await client.streams.get("non_existent_stream")
except ResourceNotFound:
print("Stream not found")
except SQLError as e:
print("SQL failure", e)
except ConnectionError:
print("Connection issue")
except DeltaStreamSDKError as e:
print("General SDK error", e)
Development
- Fork the repo & create a feature branch
- Add/adjust tests for your change
- Ensure lint & type checks pass
- Open a PR with a clear description & rationale
See CONTRIBUTING.md for more details.
License
Licensed under the Apache License 2.0. See the LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file deltastream_sdk-0.2.1.tar.gz.
File metadata
- Download URL: deltastream_sdk-0.2.1.tar.gz
- Upload date:
- Size: 23.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a1e1336fc6e39defa972acc0621c2982c1cf97deff7f4bd53d12a3cc03785e82
|
|
| MD5 |
48a9afd5fa4c42eeb8a22817715337af
|
|
| BLAKE2b-256 |
d5e6bb97fd11e6330bd1dccdf129eea822f1919d2141f969478fd9e31f075b52
|
Provenance
The following attestation bundles were made for deltastream_sdk-0.2.1.tar.gz:
Publisher:
release.yml on deltastreaminc/deltastream-sdk-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
deltastream_sdk-0.2.1.tar.gz -
Subject digest:
a1e1336fc6e39defa972acc0621c2982c1cf97deff7f4bd53d12a3cc03785e82 - Sigstore transparency entry: 634514612
- Sigstore integration time:
-
Permalink:
deltastreaminc/deltastream-sdk-python@24f6187908827be7ae19196d9a13c3693fc76caf -
Branch / Tag:
refs/heads/main - Owner: https://github.com/deltastreaminc
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@24f6187908827be7ae19196d9a13c3693fc76caf -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file deltastream_sdk-0.2.1-py3-none-any.whl.
File metadata
- Download URL: deltastream_sdk-0.2.1-py3-none-any.whl
- Upload date:
- Size: 36.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
86d1c3d41c3d999ba0f946eae5a7b0035d26fb3980b40fd5ec71d0869dfe47f0
|
|
| MD5 |
4450ad3745b99cea2a43bdf82447a855
|
|
| BLAKE2b-256 |
8093c2d75ee8fca61bacaf621b13fcd22d85a896ac95cea599774527735dcb3e
|
Provenance
The following attestation bundles were made for deltastream_sdk-0.2.1-py3-none-any.whl:
Publisher:
release.yml on deltastreaminc/deltastream-sdk-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
deltastream_sdk-0.2.1-py3-none-any.whl -
Subject digest:
86d1c3d41c3d999ba0f946eae5a7b0035d26fb3980b40fd5ec71d0869dfe47f0 - Sigstore transparency entry: 634514615
- Sigstore integration time:
-
Permalink:
deltastreaminc/deltastream-sdk-python@24f6187908827be7ae19196d9a13c3693fc76caf -
Branch / Tag:
refs/heads/main - Owner: https://github.com/deltastreaminc
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@24f6187908827be7ae19196d9a13c3693fc76caf -
Trigger Event:
workflow_dispatch
-
Statement type: