Skip to main content

ZeroDB event connector for Temporal workflows. Bridge ZeroDB events to Temporal.

Project description

zerodb-temporal

ZeroDB event connector for Temporal workflows.

Bridge ZeroDB database events to Temporal's durable execution engine.

PyPI License: MIT

Why use this?

Feature Description
Event-driven ZeroDB events automatically trigger Temporal workflows
Durable Temporal guarantees workflow completion even through failures
Activity helpers Store results, memories, files, and vectors back to ZeroDB
Auto-provision No signup needed -- ZeroDB project created on first use
Webhook + polling Two modes: poll event stream or receive webhooks
Connector pattern Bridges ZeroDB and Temporal -- doesn't replace either

Installation

pip install zerodb-temporal

Quick Start

Basic Trigger

from zerodb_temporal import ZeroDBTrigger, ZeroDBActivity

# Listen for file upload events
trigger = ZeroDBTrigger(event_type='zerodb.file.uploaded')

@trigger.on_event
async def process_file(event):
    print(f"File uploaded: {event.data}")

    # Store result back to ZeroDB
    activity = ZeroDBActivity()
    activity.store_result({
        'file_id': event.data.get('file_id'),
        'status': 'processed',
    })
    return {'processed': True}

# Start polling for events
trigger.start()

Connector Pattern (Multiple Event Types)

from zerodb_temporal import ZeroDBTemporalConnector

connector = ZeroDBTemporalConnector(
    temporal_host='localhost:7233',
    task_queue='zerodb-events',
)

@connector.workflow('zerodb.file.uploaded')
async def handle_upload(event):
    result = connector.activity.store_result({
        'event': event.event_id,
        'status': 'processed',
    })
    return result

@connector.workflow('zerodb.table.insert')
async def handle_new_row(event):
    connector.activity.store_memory(
        f"New row inserted in {event.data.get('table')}",
        tags=['db-event'],
    )
    return {'acknowledged': True}

# Start all triggers
import asyncio
asyncio.run(connector.run())

Webhook Mode

from flask import Flask, request
from zerodb_temporal import ZeroDBTemporalConnector

app = Flask(__name__)
connector = ZeroDBTemporalConnector()

@connector.workflow('zerodb.file.uploaded')
def handle_upload(event):
    return {'processed': True}

@app.post('/webhook/zerodb')
def webhook():
    results = connector.process_webhook(request.json)
    return {'results': results}

API Reference

ZeroDBTrigger(event_type, **kwargs)

Subscribe to a specific ZeroDB event type.

Param Default Description
event_type required Event type to listen for
api_key auto ZeroDB API key
project_id auto ZeroDB project ID
poll_interval 5 Seconds between polls
batch_size 100 Max events per poll

Methods:

Method Description
@trigger.on_event Decorator to register handler
trigger.add_handler(func) Register handler (non-decorator)
trigger.start() Start polling in background thread
trigger.stop() Stop polling
trigger.process_webhook(payload) Process a webhook payload

ZeroDBActivity(**kwargs)

Temporal activity helpers for ZeroDB operations.

Method Description
store_result(data, table='workflow_results') Store workflow result in table
query_table(table, filters=None) Query rows from table
store_memory(content, namespace='workflow') Store memory via ZeroMemory
recall_memory(query, namespace='workflow') Search memories
upload_file(path, file_data) Upload file to storage
download_file(path) Download file from storage
create_event(event_type, data) Emit a new ZeroDB event
store_vector(id, values, metadata) Store vector embedding

ZeroDBTemporalConnector(**kwargs)

High-level connector managing multiple triggers.

Param Default Description
temporal_host localhost:7233 Temporal server address
task_queue zerodb-events Temporal task queue
namespace default Temporal namespace

Methods:

Method Description
@connector.workflow(event_type) Register workflow for event type
connector.start() Start all triggers
connector.stop() Stop all triggers
await connector.run() Run blocking (async)
connector.process_webhook(payload) Route webhook to correct workflow

ZeroDBEvent

Event data wrapper.

Field Type Description
event_type str Event type string
event_id str Unique event ID
data dict Event payload
timestamp str ISO 8601 timestamp
project_id str ZeroDB project ID
metadata dict Additional metadata

EventType Constants

Constant Value
TABLE_INSERT zerodb.table.insert
TABLE_UPDATE zerodb.table.update
TABLE_DELETE zerodb.table.delete
FILE_UPLOADED zerodb.file.uploaded
FILE_DELETED zerodb.file.deleted
VECTOR_UPSERT zerodb.vector.upsert
MEMORY_REMEMBER zerodb.memory.remember
HOOK_INVOKED zerodb.hook.invoked

Configuration

Environment Variables

export ZERODB_API_KEY="your-api-key"
export ZERODB_PROJECT_ID="your-project-id"
# Optional
export ZERODB_BASE_URL="https://api.ainative.studio"

Auto-Provisioning

If no credentials are found, zerodb-temporal automatically creates a free ZeroDB project on first use. Credentials are saved to ~/.zerodb/config.json.

Use Cases

ETL Pipeline

@connector.workflow('zerodb.file.uploaded')
async def etl_pipeline(event):
    # Download uploaded file
    raw = connector.activity.download_file(event.data['path'])

    # Transform
    processed = transform(raw)

    # Store result
    connector.activity.store_result({
        'source': event.data['path'],
        'rows_processed': len(processed),
    })

AI Agent Memory

@connector.workflow('zerodb.table.insert')
async def enrich_memory(event):
    row = event.data.get('row', {})
    connector.activity.store_memory(
        f"User {row.get('name')} registered from {row.get('source')}",
        entity_id=row.get('user_id'),
        tags=['registration', 'user-event'],
    )

Event Chain

@connector.workflow('zerodb.hook.completed')
async def chain_next(event):
    # Trigger next step in pipeline
    connector.activity.create_event(
        'zerodb.custom',
        {'step': 'next', 'previous_result': event.data},
    )

Built by AINative Studio

Free database for AI agents. Auto-provisions in 200ms.

Get started | Documentation | GitHub

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

zerodb_temporal-0.1.0.tar.gz (17.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

zerodb_temporal-0.1.0-py3-none-any.whl (13.8 kB view details)

Uploaded Python 3

File details

Details for the file zerodb_temporal-0.1.0.tar.gz.

File metadata

  • Download URL: zerodb_temporal-0.1.0.tar.gz
  • Upload date:
  • Size: 17.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for zerodb_temporal-0.1.0.tar.gz
Algorithm Hash digest
SHA256 ad490c0fba71844135cc9b25a9b3d59d260253949cf7a86130041ef1b8023099
MD5 e5ac992dbfba65f29c0fd6cdbb53ef3b
BLAKE2b-256 d9d9d736a73ffa298993bd00d78338d4028b1e51f873624b66cde06f8e923864

See more details on using hashes here.

File details

Details for the file zerodb_temporal-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for zerodb_temporal-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 11d31001ee8fd33642fed79368576992d8c7307091b37eca40c251f99be4c5f4
MD5 a8455ca7163b7fd38ac898948c6494e0
BLAKE2b-256 fe4ffbb0b992f9a510c1d2213dd39391c37de1052d811955e04aad25381b2b42

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page