ZeroDB event connector for Temporal workflows. Bridge ZeroDB events to Temporal.
Project description
zerodb-temporal
ZeroDB event connector for Temporal workflows.
Bridge ZeroDB database events to Temporal's durable execution engine.
Why use this?
| Feature | Description |
|---|---|
| Event-driven | ZeroDB events automatically trigger Temporal workflows |
| Durable | Temporal guarantees workflow completion even through failures |
| Activity helpers | Store results, memories, files, and vectors back to ZeroDB |
| Auto-provision | No signup needed -- ZeroDB project created on first use |
| Webhook + polling | Two modes: poll event stream or receive webhooks |
| Connector pattern | Bridges ZeroDB and Temporal -- doesn't replace either |
Installation
pip install zerodb-temporal
Quick Start
Basic Trigger
from zerodb_temporal import ZeroDBTrigger, ZeroDBActivity
# Listen for file upload events
trigger = ZeroDBTrigger(event_type='zerodb.file.uploaded')
@trigger.on_event
async def process_file(event):
print(f"File uploaded: {event.data}")
# Store result back to ZeroDB
activity = ZeroDBActivity()
activity.store_result({
'file_id': event.data.get('file_id'),
'status': 'processed',
})
return {'processed': True}
# Start polling for events
trigger.start()
Connector Pattern (Multiple Event Types)
from zerodb_temporal import ZeroDBTemporalConnector
connector = ZeroDBTemporalConnector(
temporal_host='localhost:7233',
task_queue='zerodb-events',
)
@connector.workflow('zerodb.file.uploaded')
async def handle_upload(event):
result = connector.activity.store_result({
'event': event.event_id,
'status': 'processed',
})
return result
@connector.workflow('zerodb.table.insert')
async def handle_new_row(event):
connector.activity.store_memory(
f"New row inserted in {event.data.get('table')}",
tags=['db-event'],
)
return {'acknowledged': True}
# Start all triggers
import asyncio
asyncio.run(connector.run())
Webhook Mode
from flask import Flask, request
from zerodb_temporal import ZeroDBTemporalConnector
app = Flask(__name__)
connector = ZeroDBTemporalConnector()
@connector.workflow('zerodb.file.uploaded')
def handle_upload(event):
return {'processed': True}
@app.post('/webhook/zerodb')
def webhook():
results = connector.process_webhook(request.json)
return {'results': results}
API Reference
ZeroDBTrigger(event_type, **kwargs)
Subscribe to a specific ZeroDB event type.
| Param | Default | Description |
|---|---|---|
event_type |
required | Event type to listen for |
api_key |
auto | ZeroDB API key |
project_id |
auto | ZeroDB project ID |
poll_interval |
5 | Seconds between polls |
batch_size |
100 | Max events per poll |
Methods:
| Method | Description |
|---|---|
@trigger.on_event |
Decorator to register handler |
trigger.add_handler(func) |
Register handler (non-decorator) |
trigger.start() |
Start polling in background thread |
trigger.stop() |
Stop polling |
trigger.process_webhook(payload) |
Process a webhook payload |
ZeroDBActivity(**kwargs)
Temporal activity helpers for ZeroDB operations.
| Method | Description |
|---|---|
store_result(data, table='workflow_results') |
Store workflow result in table |
query_table(table, filters=None) |
Query rows from table |
store_memory(content, namespace='workflow') |
Store memory via ZeroMemory |
recall_memory(query, namespace='workflow') |
Search memories |
upload_file(path, file_data) |
Upload file to storage |
download_file(path) |
Download file from storage |
create_event(event_type, data) |
Emit a new ZeroDB event |
store_vector(id, values, metadata) |
Store vector embedding |
ZeroDBTemporalConnector(**kwargs)
High-level connector managing multiple triggers.
| Param | Default | Description |
|---|---|---|
temporal_host |
localhost:7233 |
Temporal server address |
task_queue |
zerodb-events |
Temporal task queue |
namespace |
default |
Temporal namespace |
Methods:
| Method | Description |
|---|---|
@connector.workflow(event_type) |
Register workflow for event type |
connector.start() |
Start all triggers |
connector.stop() |
Stop all triggers |
await connector.run() |
Run blocking (async) |
connector.process_webhook(payload) |
Route webhook to correct workflow |
ZeroDBEvent
Event data wrapper.
| Field | Type | Description |
|---|---|---|
event_type |
str | Event type string |
event_id |
str | Unique event ID |
data |
dict | Event payload |
timestamp |
str | ISO 8601 timestamp |
project_id |
str | ZeroDB project ID |
metadata |
dict | Additional metadata |
EventType Constants
| Constant | Value |
|---|---|
TABLE_INSERT |
zerodb.table.insert |
TABLE_UPDATE |
zerodb.table.update |
TABLE_DELETE |
zerodb.table.delete |
FILE_UPLOADED |
zerodb.file.uploaded |
FILE_DELETED |
zerodb.file.deleted |
VECTOR_UPSERT |
zerodb.vector.upsert |
MEMORY_REMEMBER |
zerodb.memory.remember |
HOOK_INVOKED |
zerodb.hook.invoked |
Configuration
Environment Variables
export ZERODB_API_KEY="your-api-key"
export ZERODB_PROJECT_ID="your-project-id"
# Optional
export ZERODB_BASE_URL="https://api.ainative.studio"
Auto-Provisioning
If no credentials are found, zerodb-temporal automatically creates a free ZeroDB project on first use. Credentials are saved to ~/.zerodb/config.json.
Use Cases
ETL Pipeline
@connector.workflow('zerodb.file.uploaded')
async def etl_pipeline(event):
# Download uploaded file
raw = connector.activity.download_file(event.data['path'])
# Transform
processed = transform(raw)
# Store result
connector.activity.store_result({
'source': event.data['path'],
'rows_processed': len(processed),
})
AI Agent Memory
@connector.workflow('zerodb.table.insert')
async def enrich_memory(event):
row = event.data.get('row', {})
connector.activity.store_memory(
f"User {row.get('name')} registered from {row.get('source')}",
entity_id=row.get('user_id'),
tags=['registration', 'user-event'],
)
Event Chain
@connector.workflow('zerodb.hook.completed')
async def chain_next(event):
# Trigger next step in pipeline
connector.activity.create_event(
'zerodb.custom',
{'step': 'next', 'previous_result': event.data},
)
Built by AINative Studio
Free database for AI agents. Auto-provisions in 200ms.
Get started | Documentation | GitHub
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file zerodb_temporal-0.1.0.tar.gz.
File metadata
- Download URL: zerodb_temporal-0.1.0.tar.gz
- Upload date:
- Size: 17.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ad490c0fba71844135cc9b25a9b3d59d260253949cf7a86130041ef1b8023099
|
|
| MD5 |
e5ac992dbfba65f29c0fd6cdbb53ef3b
|
|
| BLAKE2b-256 |
d9d9d736a73ffa298993bd00d78338d4028b1e51f873624b66cde06f8e923864
|
File details
Details for the file zerodb_temporal-0.1.0-py3-none-any.whl.
File metadata
- Download URL: zerodb_temporal-0.1.0-py3-none-any.whl
- Upload date:
- Size: 13.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
11d31001ee8fd33642fed79368576992d8c7307091b37eca40c251f99be4c5f4
|
|
| MD5 |
a8455ca7163b7fd38ac898948c6494e0
|
|
| BLAKE2b-256 |
fe4ffbb0b992f9a510c1d2213dd39391c37de1052d811955e04aad25381b2b42
|