Skip to main content

Temporal Activities for AgenTrux event operations

Project description

temporal-agentrux

Temporal Activities for AgenTrux event operations.

Status: Beta

Install

pip install -e plugins/temporal

Environment Variables

Variable Required Description
AGENTRUX_BASE_URL Yes AgenTrux server URL
AGENTRUX_SCRIPT_ID Yes Script ID for authentication
AGENTRUX_CLIENT_SECRET Yes Client Secret for authentication
AGENTRUX_TIMEOUT_S No HTTP timeout in seconds (default: 30)

Secrets are read from environment variables at worker startup. They are never passed as activity arguments.

Worker Setup

import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from temporal_agentrux.worker import init_client, shutdown_client, get_activities

async def main():
    # Initialize the shared AgenTrux client (reads env vars, obtains JWT)
    await init_client()

    # Connect to Temporal
    temporal_client = await Client.connect("localhost:7233")

    # Start the worker with AgenTrux activities registered
    worker = Worker(
        temporal_client,
        task_queue="agentrux-queue",
        activities=get_activities(),
    )
    try:
        await worker.run()
    finally:
        await shutdown_client()

asyncio.run(main())

Activities

publish_event

Publish an event to an AgenTrux topic.

  • Input: PublishInput(topic_id, event_type, payload)
  • Output: PublishResult(event_id)

list_events_activity

List events from a topic with optional filtering and pagination.

  • Input: ListEventsInput(topic_id, limit=50, cursor=None, event_type=None)
  • Output: ListEventsResult(events, next_cursor)

get_event_activity

Get a single event by ID.

  • Input: GetEventInput(topic_id, event_id)
  • Output: dict with event fields

wait_for_event

Subscribe via SSE and wait for a matching event. Sends periodic heartbeats to Temporal to prevent activity timeout.

  • Input: WaitInput(topic_id, event_type=None, timeout_seconds=300, heartbeat_interval_seconds=10)
  • Output: WaitResult(found, event)

Example Workflow

from datetime import timedelta
from temporalio import workflow

with workflow.unsafe.imports_passed_through():
    from temporal_agentrux import (
        PublishInput, PublishResult,
        WaitInput, WaitResult,
    )

@workflow.defn
class EventPipeline:
    @workflow.run
    async def run(self, topic_id: str) -> str:
        # Publish an event
        result = await workflow.execute_activity(
            "publish_event",
            PublishInput(topic_id=topic_id, event_type="pipeline.started", payload={"step": 1}),
            start_to_close_timeout=timedelta(seconds=30),
        )

        # Wait for a response event
        wait_result = await workflow.execute_activity(
            "wait_for_event",
            WaitInput(topic_id=topic_id, event_type="pipeline.completed", timeout_seconds=120),
            start_to_close_timeout=timedelta(seconds=150),
            heartbeat_timeout=timedelta(seconds=30),
        )

        if wait_result.found:
            return f"Completed: {wait_result.event.event_id}"
        return "Timed out waiting for completion"

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

temporal_agentrux-0.1.0b1.tar.gz (5.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

temporal_agentrux-0.1.0b1-py3-none-any.whl (6.5 kB view details)

Uploaded Python 3

File details

Details for the file temporal_agentrux-0.1.0b1.tar.gz.

File metadata

  • Download URL: temporal_agentrux-0.1.0b1.tar.gz
  • Upload date:
  • Size: 5.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.13

File hashes

Hashes for temporal_agentrux-0.1.0b1.tar.gz
Algorithm Hash digest
SHA256 fa30ebcb93eb203969e317a3da0f34aadd1d8898a7e96e48cfc087dd7fa70be9
MD5 79d6d314b071e9282a8cb01cf2253021
BLAKE2b-256 79a464626a15109032d5a9ef1389c7a1d727995fcc7754db03d1823911497dac

See more details on using hashes here.

File details

Details for the file temporal_agentrux-0.1.0b1-py3-none-any.whl.

File metadata

File hashes

Hashes for temporal_agentrux-0.1.0b1-py3-none-any.whl
Algorithm Hash digest
SHA256 5ff6eef68606765625324d4330b83e6f038d3cfa663cb17aab74c852eeacb247
MD5 f92f991e39c2c27659cb3370dbcd3876
BLAKE2b-256 c0f28521cb3d9e0e84711cbc1a8943f75ce2f572e8a13f2d498da426aa96ce3f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page