Skip to main content

Temporal Activities for AgenTrux event operations

Project description

temporal-agentrux

Temporal Activities for AgenTrux event operations.

Status: Beta

Install

pip install -e plugins/temporal

Environment Variables

Variable Required Description
AGENTRUX_BASE_URL Yes AgenTrux server URL
AGENTRUX_SCRIPT_ID Yes Script ID for authentication
AGENTRUX_CLIENT_SECRET Yes Client Secret for authentication
AGENTRUX_TIMEOUT_S No HTTP timeout in seconds (default: 30)

Secrets are read from environment variables at worker startup. They are never passed as activity arguments.

Worker Setup

import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from temporal_agentrux.worker import init_client, shutdown_client, get_activities

async def main():
    # Initialize the shared AgenTrux client (reads env vars, obtains JWT)
    await init_client()

    # Connect to Temporal
    temporal_client = await Client.connect("localhost:7233")

    # Start the worker with AgenTrux activities registered
    worker = Worker(
        temporal_client,
        task_queue="agentrux-queue",
        activities=get_activities(),
    )
    try:
        await worker.run()
    finally:
        await shutdown_client()

asyncio.run(main())

Activities

publish_event

Publish an event to an AgenTrux topic.

  • Input: PublishInput(topic_id, event_type, payload)
  • Output: PublishResult(event_id)

list_events_activity

List events from a topic with optional filtering and pagination.

  • Input: ListEventsInput(topic_id, limit=50, cursor=None, event_type=None)
  • Output: ListEventsResult(events, next_cursor)

get_event_activity

Get a single event by ID.

  • Input: GetEventInput(topic_id, event_id)
  • Output: dict with event fields

wait_for_event

Subscribe via SSE and wait for a matching event. Sends periodic heartbeats to Temporal to prevent activity timeout.

  • Input: WaitInput(topic_id, event_type=None, timeout_seconds=300, heartbeat_interval_seconds=10)
  • Output: WaitResult(found, event)

Example Workflow

from datetime import timedelta
from temporalio import workflow

with workflow.unsafe.imports_passed_through():
    from temporal_agentrux import (
        PublishInput, PublishResult,
        WaitInput, WaitResult,
    )

@workflow.defn
class EventPipeline:
    @workflow.run
    async def run(self, topic_id: str) -> str:
        # Publish an event
        result = await workflow.execute_activity(
            "publish_event",
            PublishInput(topic_id=topic_id, event_type="pipeline.started", payload={"step": 1}),
            start_to_close_timeout=timedelta(seconds=30),
        )

        # Wait for a response event
        wait_result = await workflow.execute_activity(
            "wait_for_event",
            WaitInput(topic_id=topic_id, event_type="pipeline.completed", timeout_seconds=120),
            start_to_close_timeout=timedelta(seconds=150),
            heartbeat_timeout=timedelta(seconds=30),
        )

        if wait_result.found:
            return f"Completed: {wait_result.event.event_id}"
        return "Timed out waiting for completion"

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

temporal_agentrux-0.1.0b2.tar.gz (5.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

temporal_agentrux-0.1.0b2-py3-none-any.whl (6.5 kB view details)

Uploaded Python 3

File details

Details for the file temporal_agentrux-0.1.0b2.tar.gz.

File metadata

  • Download URL: temporal_agentrux-0.1.0b2.tar.gz
  • Upload date:
  • Size: 5.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.13

File hashes

Hashes for temporal_agentrux-0.1.0b2.tar.gz
Algorithm Hash digest
SHA256 3d8608aa9a1cdb60481b567470ba0c015891932d8e2783133c39ca3002ac200a
MD5 f0c1e597e2b31443f51c62fec1c99091
BLAKE2b-256 4fc912db8c6a65b216e6b21d584ee3d1b1da5ba30334bfc896d33fef9cd136bf

See more details on using hashes here.

File details

Details for the file temporal_agentrux-0.1.0b2-py3-none-any.whl.

File metadata

File hashes

Hashes for temporal_agentrux-0.1.0b2-py3-none-any.whl
Algorithm Hash digest
SHA256 1f40da61217eace04a4648478052f90eeaf478bbaf016e57923ad0c56ab0d5c5
MD5 9f86df0101e7c405ef5bd31278e6d044
BLAKE2b-256 fc053209c8bf89ff4afb3fe5412987a100aaa647b25b103904076dd5a5a560ad

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page