Skip to main content

Python SDK for AxonPush — real-time event infrastructure for AI agent systems

Project description

axonpush

Python SDK for AxonPush — real-time event infrastructure for AI agent systems.

Publish, subscribe, trace, and deliver agent events with sub-100ms latency. Drop-in integrations for LangChain, OpenAI Agents SDK, Claude/Anthropic, CrewAI, and the Python observability stack (stdlib logging, Loguru, structlog, OpenTelemetry).

Install

pip install axonpush

With framework integrations:

pip install axonpush[langchain]       # LangChain/LangGraph
pip install axonpush[openai-agents]   # OpenAI Agents SDK
pip install axonpush[anthropic]       # Claude/Anthropic
pip install axonpush[crewai]          # CrewAI
pip install axonpush[deepagents]      # LangChain Deep Agents
pip install axonpush[rq]              # Redis Queue backend (python-rq)

With observability integrations:

pip install axonpush                  # stdlib logging — no extra deps
pip install axonpush[loguru]          # Loguru sink
pip install axonpush[structlog]       # structlog processor
pip install axonpush[otel]            # OpenTelemetry SpanExporter
pip install axonpush[all]             # Everything

Quick Start

from axonpush import AxonPush, EventType

with AxonPush(api_key="ak_...", tenant_id="1", environment="production") as client:
    # Publish an event
    event = client.events.publish(
        "web_search",
        {"query": "AI agent frameworks"},
        channel_id=1,
        agent_id="researcher",
        trace_id="tr_run_42",
        event_type=EventType.AGENT_TOOL_CALL_START,
    )
    # event.queued == True, event.id is None — publishes are async-ingested
    # by default. See "Response shape" below.

    # List events
    events = client.events.list(channel_id=1)

    # Get a trace summary
    summary = client.traces.get_summary("tr_run_42")

Async

from axonpush import AsyncAxonPush

async with AsyncAxonPush(api_key="ak_...", tenant_id="1", environment="production") as client:
    event = await client.events.publish(
        "web_search",
        {"query": "AI agents"},
        channel_id=1,
        agent_id="researcher",
        event_type="agent.tool_call.start",
    )

Response shape

By default, events.publish() returns as soon as the server has queued the event — typically under 1 ms. The returned Event carries identifier, queued=True, created_at, and the resolved environment_id, but not a DB-assigned id (event.id is None). Treat event.identifier and event.trace_id as the durable correlation keys. List endpoints and subscriptions return the fully-persisted shape (with id) once the event is written.

Framework Integrations

LangChain / LangGraph

# Sync (default — background thread)
from axonpush import AxonPush
from axonpush.integrations.langchain import AxonPushCallbackHandler

client = AxonPush(api_key="ak_...", tenant_id="1")
handler = AxonPushCallbackHandler(client, channel_id=1, agent_id="my-agent")
chain.invoke({"input": "..."}, config={"callbacks": [handler]})

# Async (default — fire-and-forget tasks, zero event-loop blocking)
from axonpush import AsyncAxonPush
from axonpush.integrations.langchain import get_langchain_handler

client = AsyncAxonPush(api_key="ak_...", tenant_id="1")
handler = get_langchain_handler(client, channel_id=1, agent_id="my-agent")
await chain.ainvoke({"input": "..."}, config={"callbacks": [handler]})

OpenAI Agents SDK

Events are published asynchronously via fire-and-forget tasks by default — no event-loop blocking.

from axonpush import AsyncAxonPush
from axonpush.integrations.openai_agents import AxonPushRunHooks

client = AsyncAxonPush(api_key="ak_...", tenant_id="1")
hooks = AxonPushRunHooks(client, channel_id=1)

result = await Runner.run(agent, input="...", hooks=hooks)
await hooks.flush()  # optional — drain pending publishes before exit

Claude / Anthropic

from axonpush import AxonPush
from axonpush.integrations.anthropic import AxonPushAnthropicTracer

client = AxonPush(api_key="ak_...", tenant_id="1")
tracer = AxonPushAnthropicTracer(client, channel_id=1)

# Wraps messages.create() — auto-emits events for tool_use, text, turns
response = tracer.create_message(
    anthropic_client,
    model="claude-sonnet-4-20250514",
    messages=[{"role": "user", "content": "Hello"}],
)

CrewAI

from axonpush import AxonPush
from axonpush.integrations.crewai import AxonPushCrewCallbacks

client = AxonPush(api_key="ak_...", tenant_id="1")
callbacks = AxonPushCrewCallbacks(client, channel_id=1)

callbacks.on_crew_start()
result = Crew(
    agents=[...],
    tasks=[...],
    step_callback=callbacks.on_step,
    task_callback=callbacks.on_task_complete,
).kickoff()
callbacks.on_crew_end(result)

Publishing Modes

All integrations accept a mode parameter to control how events reach AxonPush:

Mode Backend Best for
"background" (default) In-process queue (sync) or asyncio.create_task (async) Most apps — zero config
"rq" Redis Queue (python-rq) Durable delivery, serverless, high volume
"sync" Direct HTTP call Debugging, tests

Redis Queue mode

Offload event publishing to a separate worker process backed by Redis. Events survive app restarts and are retried on transient failures.

pip install axonpush[rq]
from redis import Redis
from axonpush import AxonPush
from axonpush.integrations.langchain import AxonPushCallbackHandler

client = AxonPush(api_key="ak_...", tenant_id="1")
handler = AxonPushCallbackHandler(
    client, channel_id=1,
    mode="rq",
    rq_options={"redis_conn": Redis(), "queue_name": "axonpush"},
)
chain.invoke({"input": "..."}, config={"callbacks": [handler]})

Start an rq worker to process the queue:

rq worker axonpush

Environments

Tag every event with the environment it came from ("production", "staging", "eval", or any string your team uses). AxonPush uses the tag server-side for isolation, filtering, and per-env quotas. The SDK forwards it as an X-Axonpush-Environment header on every request and threads it into the logging handler's OTel resource attributes.

Constructor

from axonpush import AxonPush

client = AxonPush(api_key="ak_...", tenant_id="1", environment="production")

If you omit environment=, the SDK auto-detects it from the first of these that's set: AXONPUSH_ENVIRONMENTSENTRY_ENVIRONMENTAPP_ENVENV. That ordering means existing Sentry/12-factor setups work out of the box, and you can override with AXONPUSH_ENVIRONMENT when you need to.

Per-call override

client.events.publish(
    "rerun_eval",
    {"dataset": "v2"},
    channel_id=1,
    environment="eval",   # this event only — doesn't change the client default
)

Temporary override with a context manager

Useful for isolating eval runs, backfills, or shadow traffic from your production event stream without constructing a second client:

with client.environment("eval"):
    for row in dataset:
        client.events.publish("row_processed", {"id": row.id}, channel_id=1)
# outside the block: environment reverts to whatever the client was constructed with

Logging & Observability

Ship logs and traces from your existing Python observability stack to AxonPush. All four integrations emit OpenTelemetry-shaped payloads, so the events line up with anything else you're already sending to an OTel-compatible backend.

Non-blocking by default (v0.0.7+). Sync integrations use a bounded in-memory queue + daemon thread. Async integrations use asyncio.create_task() fire-and-forget with backpressure (max 1000 pending tasks). For durable delivery, use mode="rq" to offload publishing to a Redis-backed worker process. Call handler.flush(timeout=) or use @flush_after_invocation(handler) at known checkpoints (end of a Lambda invocation, end of a test) to guarantee delivery. Pass mode="sync" on any integration if you need blocking publishes (one-shot scripts, deterministic tests). Fork-safe via os.register_at_fork — Gunicorn --preload / Celery --pool=prefork workers get a fresh queue + thread after fork.

Self-recursion filter. The stdlib AxonPushLoggingHandler installs a filter by default that drops records from httpx, httpcore, and the SDK's own axonpush logger. Without it, each publish would trigger an httpx INFO log ("HTTP Request: POST /event 201 Created") that would get re-shipped, creating an infinite loop. The filter is always-on and cannot be disabled; you can add more excluded prefixes via exclude_loggers=[...].

Stdlib logging (FastAPI, Flask, Django, …)

import logging
from axonpush import AxonPush
from axonpush.integrations.logging_handler import AxonPushLoggingHandler

client = AxonPush(api_key="ak_...", tenant_id="1")
handler = AxonPushLoggingHandler(client=client, channel_id=1, service_name="my-api")

logging.getLogger().addHandler(handler)
logging.info("order created", extra={"order_id": 1234})

Django uses LOGGING dictConfig, which can't pass a pre-built client — so the handler also accepts credential kwargs (or reads AXONPUSH_API_KEY / AXONPUSH_TENANT_ID from the environment):

# settings.py
LOGGING = {
    "version": 1,
    "disable_existing_loggers": False,
    "handlers": {
        "axonpush": {
            "class": "axonpush.integrations.logging_handler.AxonPushLoggingHandler",
            "channel_id": 1,
            "service_name": "my-django-app",
            "exclude_loggers": ["django.db.backends"],  # optional
        },
    },
    "root": {"handlers": ["axonpush"], "level": "INFO"},
}

FastAPI / Flask — construct the handler with a pre-built client= in your app startup and attach it to logging.getLogger() (or app.logger for Flask).

Uvicorn propagation trap (FastAPI/Starlette): uvicorn's default LOGGING_CONFIG sets uvicorn.propagate=False, so records emitted on logging.getLogger("uvicorn.error") never reach the root logger. If you only attach the handler to root, your app's startup/request logs will be invisible to AxonPush. Also attach the handler to uvicorn.error directly:

logging.getLogger().addHandler(axonpush_handler)
logging.getLogger("uvicorn.error").addHandler(axonpush_handler)
# Optional: one event per HTTP request
# logging.getLogger("uvicorn.access").addHandler(axonpush_handler)

AWS Lambda / Google Cloud Functions / Azure Functions

Serverless containers are frozen between invocations, so the background worker thread can't drain the queue during the freeze. To guarantee delivery, call handler.flush() at the end of each invocation. The @flush_after_invocation decorator wraps your handler function and flushes in a finally: block:

import os, logging
from axonpush import AxonPush
from axonpush.integrations.logging_handler import (
    AxonPushLoggingHandler,
    flush_after_invocation,
)

client = AxonPush(
    api_key=os.environ["AXONPUSH_API_KEY"],
    tenant_id=os.environ["AXONPUSH_TENANT_ID"],
)
handler = AxonPushLoggingHandler(client=client, channel_id=1, service_name="my-lambda")
logging.getLogger().addHandler(handler)
logging.getLogger().setLevel(logging.INFO)

@flush_after_invocation(handler)
def lambda_handler(event, context):
    logging.info("processing event", extra={"event_id": event["id"]})
    return {"statusCode": 200}

Performance stays good: emit() is still O(microseconds) (just a queue enqueue), and flush() runs once per invocation at the end — not once per log call. The handler auto-detects Lambda / GCF / Azure Functions at construction time and logs a one-time reminder to use flush_after_invocation.

Pass *handlers to the decorator to flush multiple handlers in one wrap:

@flush_after_invocation(logging_handler, otel_exporter, structlog_processor)
def lambda_handler(event, context):
    ...

Loguru

from loguru import logger
from axonpush import AxonPush
from axonpush.integrations.loguru import create_axonpush_loguru_sink

client = AxonPush(api_key="ak_...", tenant_id="1")
sink = create_axonpush_loguru_sink(client=client, channel_id=1, service_name="my-api")
logger.add(sink, serialize=True)  # serialize=True is required

logger.error("connection refused", user_id=42)

structlog

import structlog
from axonpush import AxonPush
from axonpush.integrations.structlog import axonpush_structlog_processor

client = AxonPush(api_key="ak_...", tenant_id="1")
forwarder = axonpush_structlog_processor(client=client, channel_id=1, service_name="my-api")

structlog.configure(
    processors=[
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        forwarder,  # non-destructive — composes with other processors
        structlog.processors.JSONRenderer(),
    ],
)
structlog.get_logger().error("downstream timeout", endpoint="/search")

Print capture (stdout/stderr → AxonPush)

For AI agents that emit free-form output via print(). Patches sys.stdout / sys.stderr with a tee stream that still writes to the original console.

from axonpush import AxonPush
from axonpush.integrations.print_capture import setup_print_capture

client = AxonPush(api_key="ak_...", tenant_id="1")
handle = setup_print_capture(client, channel_id=1, agent_id="demo-agent")

print("agent starting")  # forwarded to AxonPush as an agent.log event
handle.unpatch()

OpenTelemetry

If your service is already instrumented with the OTel SDK, add AxonPushSpanExporter to your tracer provider and every span ships to AxonPush alongside whatever other backends you export to.

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from axonpush import AxonPush
from axonpush.integrations.otel import AxonPushSpanExporter

client = AxonPush(api_key="ak_...", tenant_id="1")
provider = TracerProvider()
provider.add_span_processor(
    BatchSpanProcessor(
        AxonPushSpanExporter(client=client, channel_id=1, service_name="my-api")
    )
)
trace.set_tracer_provider(provider)

Sentry

If your app is already using sentry-sdk, point it at AxonPush with a one-liner. install_sentry() builds a Sentry DSN from your AxonPush credentials and calls sentry_sdk.init(**kwargs) for you — errors captured anywhere in your app (including Sentry's Flask/FastAPI/Django/Celery instrumentations) flow into your AxonPush channel instead of Sentry's cloud.

pip install sentry-sdk   # axonpush does not bundle sentry-sdk
from axonpush import install_sentry

install_sentry(
    api_key="ak_...",
    channel_id=42,
    environment="production",
    release="my-app@1.2.3",
    # Any extra kwargs are forwarded to sentry_sdk.init() unchanged:
    traces_sample_rate=0.1,
    send_default_pii=False,
)

# That's it — sentry_sdk.capture_exception / capture_message now ship to AxonPush.

api_key, channel_id, and host fall back to AXONPUSH_API_KEY, AXONPUSH_CHANNEL_ID, and AXONPUSH_HOST (default api.axonpush.xyz) if omitted. environment uses the same auto-detect precedence as the client (AXONPUSH_ENVIRONMENTSENTRY_ENVIRONMENTAPP_ENVENV). If you need a fully-formed DSN instead, pass dsn="..." and the other args are ignored.

Real-Time Subscriptions

axonpush supports two real-time subscription mechanisms: SSE (Server-Sent Events) and WebSocket (Socket.IO).

SSE (Server-Sent Events)

SSE is the simplest way to consume events in real time — no extra dependencies required.

Subscribe to all events on a channel

from axonpush import AxonPush

with AxonPush(api_key="ak_...", tenant_id="1") as client:
    with client.channels.subscribe_sse(channel_id=1) as sub:
        for event in sub:
            print(event.agent_id, event.identifier, event.payload)

Subscribe to a specific event identifier

with client.channels.subscribe_event_sse(channel_id=1, event_identifier="web_search") as sub:
    for event in sub:
        print(event.payload)

Filter by agent, event type, or trace

All SSE methods accept optional filters to narrow the event stream:

with client.channels.subscribe_sse(
    channel_id=1,
    agent_id="researcher",
    event_type=EventType.AGENT_ERROR,
    trace_id="tr_run_42",
) as sub:
    for event in sub:
        print(f"[{event.agent_id}] {event.identifier}: {event.payload}")

WebSocket (Socket.IO)

WebSocket subscriptions are callback-based and support bidirectional communication (subscribe, publish, unsubscribe).

pip install axonpush[websocket]

Sync

ws = client.connect_websocket()
ws.on_event(lambda e: print(e.agent_id, e.payload))
ws.subscribe(channel_id=1, event_type="agent.tool_call.start")
ws.wait()  # blocks until disconnected

Async

ws = await async_client.connect_websocket()
ws.on_event(lambda e: print(e.agent_id, e.payload))
await ws.subscribe(channel_id=1, event_type="agent.tool_call.start")
await ws.wait()

Publish and unsubscribe via WebSocket

ws.publish(channel_id=1, identifier="status", payload={"step": "done"}, agent_id="worker")
ws.unsubscribe(channel_id=1)
ws.disconnect()

Use Case Guides

Step-by-step guides for common scenarios:

Resources

The client exposes Stripe-style resource objects:

Resource Methods
client.events publish(), list()
client.channels create(), get(), update(), delete(), subscribe_sse()
client.apps create(), get(), list(), update(), delete()
client.webhooks create_endpoint(), list_endpoints(), delete_endpoint(), get_deliveries()
client.traces list(), get_events(), get_summary()

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

axonpush-0.0.8.tar.gz (491.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

axonpush-0.0.8-py3-none-any.whl (59.1 kB view details)

Uploaded Python 3

File details

Details for the file axonpush-0.0.8.tar.gz.

File metadata

  • Download URL: axonpush-0.0.8.tar.gz
  • Upload date:
  • Size: 491.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for axonpush-0.0.8.tar.gz
Algorithm Hash digest
SHA256 9b1b17882f98b2ef435dfec0461c669ce3bbe58d98f84f4d75afff38a8836fda
MD5 4449b32fab425c87e834dc2e3ca023e2
BLAKE2b-256 940cd7d432a9ff497695c35d2e0d2e8e718b5d0a967df02a8f1c09bccdd71c6b

See more details on using hashes here.

File details

Details for the file axonpush-0.0.8-py3-none-any.whl.

File metadata

  • Download URL: axonpush-0.0.8-py3-none-any.whl
  • Upload date:
  • Size: 59.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for axonpush-0.0.8-py3-none-any.whl
Algorithm Hash digest
SHA256 bbc9dc559f458d12f8e7922e0e797370abfbeef04c2f02f49145247de59689cf
MD5 f77b98eb48e0279559ec9dfbe4ca8b36
BLAKE2b-256 0ba9fcbd96b634f15eba2fe21154b39b884b1cec8252d9bcaf5fbdf7e84fea97

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page