GlassFlow Python SDK: Create GlassFlow pipelines between Kafka and ClickHouse
Project description
GlassFlow Python SDK
A Python SDK for creating and managing data pipelines between Kafka and ClickHouse.
Features
- Create and manage data pipelines between Kafka and ClickHouse
- Ingest from Kafka sources or OTLP signals (logs, metrics, traces)
- Unified transforms pipeline: dedup, filter, and stateless transformations
- Temporal joins between sources based on a common key with a given time window
- Per-source Schema Registry integration
- Pipeline configuration via YAML or JSON
- Schema validation and configuration management
- Fine-grained resource control per pipeline component
Installation
pip install glassflow
Quick Start
Initialize client
from glassflow.etl import Client
client = Client(host="your-glassflow-etl-url")
Create a pipeline
The example below uses pipeline version v3. See Migrating from V2 to V3 if you have existing v2 configurations.
pipeline_config = {
"version": "v3",
"pipeline_id": "my-pipeline-id",
"sources": [
{
"type": "kafka",
"source_id": "users",
"connection_params": {
"brokers": ["my.kafka.broker:9093"],
"protocol": "PLAINTEXT",
},
"topic": "users",
"consumer_group_initial_offset": "latest",
"schema_fields": [
{"name": "event_id", "type": "string"},
{"name": "user_id", "type": "string"},
{"name": "created_at", "type": "string"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"},
],
}
],
"transforms": [
{
"type": "dedup",
"source_id": "users",
"config": {
"key": "event_id",
"time_window": "1h",
},
}
],
"sink": {
"type": "clickhouse",
"connection_params": {
"host": "my.clickhouse.server",
"port": "9000",
"database": "default",
"username": "default",
"password": "mysecret",
"secure": False,
},
"table": "users",
"mapping": [
{"name": "event_id", "column_name": "event_id", "column_type": "UUID"},
{"name": "user_id", "column_name": "user_id", "column_type": "UUID"},
{"name": "created_at", "column_name": "created_at", "column_type": "DateTime"},
{"name": "name", "column_name": "name", "column_type": "String"},
{"name": "email", "column_name": "email", "column_type": "String"},
],
},
}
pipeline = client.create_pipeline(pipeline_config)
You can also load configurations from YAML or JSON files:
pipeline = client.create_pipeline(
pipeline_config_yaml_path="pipeline.yaml"
)
# or
pipeline = client.create_pipeline(
pipeline_config_json_path="pipeline.json"
)
For full configuration reference — including Schema Registry, joins, OTLP sources, and resource controls — see the GlassFlow docs.
Get pipeline
pipeline = client.get_pipeline("my-pipeline-id")
List pipelines
pipelines = client.list_pipelines()
for pipeline in pipelines:
print(f"Pipeline ID: {pipeline['pipeline_id']}, State: {pipeline['state']}")
Stop / Terminate / Resume pipeline
pipeline = client.get_pipeline("my-pipeline-id")
pipeline.stop() # graceful stop → STOPPING
client.stop_pipeline("my-pipeline-id", terminate=True) # ungraceful → TERMINATING
pipeline.resume() # restart → RESUMING
Delete pipeline
Only stopped or terminated pipelines can be deleted.
client.delete_pipeline("my-pipeline-id")
# or
pipeline.delete()
Migrating from V2 to V3
Pipeline version v2 has been removed. Use Client.migrate_pipeline_v2_to_v3() to convert an existing configuration automatically:
from glassflow.etl import Client
client = Client(host="your-glassflow-etl-url")
v2_config = ... # your existing v2 pipeline config dict
v3_config = client.migrate_pipeline_v2_to_v3(v2_config)
pipeline = client.create_pipeline(v3_config)
If you prefer to migrate manually, the key changes are:
| Area | V2 | V3 |
|---|---|---|
version |
"v2" |
"v3" |
| Sources | source: {type, connection_params, topics: [...]} |
sources: [{type, source_id, connection_params, topic, ...}] flat list |
| Schema | top-level schema.fields block |
sources[].schema_fields per source |
| Deduplication | per-topic deduplication: {enabled, id_field, ...} |
transforms: [{type: "dedup", source_id, config: {key, time_window}}] |
| Filter | top-level filter: {enabled, expression} |
transforms: [{type: "filter", source_id, config: {expression}}] |
| Transformation | top-level stateless_transformation |
transforms: [{type: "stateless", source_id, config: {transforms: [...]}}] |
| Join | join.sources: [{source_id, key, orientation}] |
join: {left_source: {...}, right_source: {...}, output_fields: [...]} |
| Sink connection | flat fields (host, port, ...) at top level |
nested sink.connection_params object |
| Sink field mapping | top-level schema.fields with source_id |
sink.mapping list of {name, column_name, column_type} |
| Resources | pipeline_resources: {ingestor, transform, ...} |
resources: {sources: [...], transform: [...], ...} |
| Sink password | base64-encoded | plain text |
Tracking
The SDK includes anonymous usage stats collection to help improve the product. It collects non-identifying information such as SDK version, Python version, and feature flags (e.g., whether joins or deduplication are enabled). No personally identifiable information is collected.
Usage states collection is enabled by default. To disable it:
export GF_USAGESTATS_ENABLED=false
client.disable_usagestats()
Development
Setup
- Clone the repository
- Create a virtual environment
- Install dependencies:
uv venv
source .venv/bin/activate
uv pip install -e .[dev]
Testing
pytest
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file glassflow-4.0.0.tar.gz.
File metadata
- Download URL: glassflow-4.0.0.tar.gz
- Upload date:
- Size: 109.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2d269b3e4119c58c87712c1ce4295b76d7eb7685645784473fc548d7ded237b2
|
|
| MD5 |
bc164337b982e9306cf0b2729bb91158
|
|
| BLAKE2b-256 |
0a46215720a9f70cc379e3bda60dd4a6c22eee2c4caa22db9fed0d75aff8d145
|
File details
Details for the file glassflow-4.0.0-py3-none-any.whl.
File metadata
- Download URL: glassflow-4.0.0-py3-none-any.whl
- Upload date:
- Size: 28.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
61af971340090a2a2834d476edf2d56af43b33ff03119b924dd72f3f7e3c0c30
|
|
| MD5 |
a073a2f752a3fd5fe672ef1337c8d151
|
|
| BLAKE2b-256 |
8feed76013474de30d68162f190f41fb9da300bb921a892096700d75f4188691
|