Temporal Activities for AgenTrux event operations
Project description
temporal-agentrux
Temporal Activities for AgenTrux event operations.
Status: Beta
Install
pip install -e plugins/temporal
Environment Variables
| Variable | Required | Description |
|---|---|---|
AGENTRUX_BASE_URL |
Yes | AgenTrux server URL |
AGENTRUX_SCRIPT_ID |
Yes | Script ID for authentication |
AGENTRUX_CLIENT_SECRET |
Yes | Client Secret for authentication |
AGENTRUX_TIMEOUT_S |
No | HTTP timeout in seconds (default: 30) |
Secrets are read from environment variables at worker startup. They are never passed as activity arguments.
Worker Setup
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from temporal_agentrux.worker import init_client, shutdown_client, get_activities
async def main():
# Initialize the shared AgenTrux client (reads env vars, obtains JWT)
await init_client()
# Connect to Temporal
temporal_client = await Client.connect("localhost:7233")
# Start the worker with AgenTrux activities registered
worker = Worker(
temporal_client,
task_queue="agentrux-queue",
activities=get_activities(),
)
try:
await worker.run()
finally:
await shutdown_client()
asyncio.run(main())
Activities
publish_event
Publish an event to an AgenTrux topic.
- Input:
PublishInput(topic_id, event_type, payload) - Output:
PublishResult(event_id)
list_events_activity
List events from a topic with optional filtering and pagination.
- Input:
ListEventsInput(topic_id, limit=50, cursor=None, event_type=None) - Output:
ListEventsResult(events, next_cursor)
get_event_activity
Get a single event by ID.
- Input:
GetEventInput(topic_id, event_id) - Output:
dictwith event fields
wait_for_event
Subscribe via SSE and wait for a matching event. Sends periodic heartbeats to Temporal to prevent activity timeout.
- Input:
WaitInput(topic_id, event_type=None, timeout_seconds=300, heartbeat_interval_seconds=10) - Output:
WaitResult(found, event)
Example Workflow
from datetime import timedelta
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
from temporal_agentrux import (
PublishInput, PublishResult,
WaitInput, WaitResult,
)
@workflow.defn
class EventPipeline:
@workflow.run
async def run(self, topic_id: str) -> str:
# Publish an event
result = await workflow.execute_activity(
"publish_event",
PublishInput(topic_id=topic_id, event_type="pipeline.started", payload={"step": 1}),
start_to_close_timeout=timedelta(seconds=30),
)
# Wait for a response event
wait_result = await workflow.execute_activity(
"wait_for_event",
WaitInput(topic_id=topic_id, event_type="pipeline.completed", timeout_seconds=120),
start_to_close_timeout=timedelta(seconds=150),
heartbeat_timeout=timedelta(seconds=30),
)
if wait_result.found:
return f"Completed: {wait_result.event.event_id}"
return "Timed out waiting for completion"
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file temporal_agentrux-0.1.0b1.tar.gz.
File metadata
- Download URL: temporal_agentrux-0.1.0b1.tar.gz
- Upload date:
- Size: 5.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fa30ebcb93eb203969e317a3da0f34aadd1d8898a7e96e48cfc087dd7fa70be9
|
|
| MD5 |
79d6d314b071e9282a8cb01cf2253021
|
|
| BLAKE2b-256 |
79a464626a15109032d5a9ef1389c7a1d727995fcc7754db03d1823911497dac
|
File details
Details for the file temporal_agentrux-0.1.0b1-py3-none-any.whl.
File metadata
- Download URL: temporal_agentrux-0.1.0b1-py3-none-any.whl
- Upload date:
- Size: 6.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5ff6eef68606765625324d4330b83e6f038d3cfa663cb17aab74c852eeacb247
|
|
| MD5 |
f92f991e39c2c27659cb3370dbcd3876
|
|
| BLAKE2b-256 |
c0f28521cb3d9e0e84711cbc1a8943f75ce2f572e8a13f2d498da426aa96ce3f
|