Skip to main content

Python SDK for AxonPush — real-time event infrastructure for AI agent systems

Project description

axonpush

Python SDK for AxonPush — real-time event infrastructure for AI agent systems.

Publish, subscribe, trace, and deliver agent events with sub-100ms latency. Drop-in integrations for LangChain, OpenAI Agents SDK, Claude/Anthropic, CrewAI, and the Python observability stack (stdlib logging, Loguru, structlog, OpenTelemetry).

Install

pip install axonpush

With framework integrations:

pip install axonpush[langchain]       # LangChain/LangGraph
pip install axonpush[openai-agents]   # OpenAI Agents SDK
pip install axonpush[anthropic]       # Claude/Anthropic
pip install axonpush[crewai]          # CrewAI
pip install axonpush[deepagents]      # LangChain Deep Agents

With observability integrations:

pip install axonpush                  # stdlib logging — no extra deps
pip install axonpush[loguru]          # Loguru sink
pip install axonpush[structlog]       # structlog processor
pip install axonpush[otel]            # OpenTelemetry SpanExporter
pip install axonpush[all]             # Everything

Quick Start

from axonpush import AxonPush, EventType

with AxonPush(api_key="ak_...", tenant_id="1") as client:
    # Publish an event
    event = client.events.publish(
        "web_search",
        {"query": "AI agent frameworks"},
        channel_id=1,
        agent_id="researcher",
        trace_id="tr_run_42",
        event_type=EventType.AGENT_TOOL_CALL_START,
    )

    # List events
    events = client.events.list(channel_id=1)

    # Get a trace summary
    summary = client.traces.get_summary("tr_run_42")

Async

from axonpush import AsyncAxonPush

async with AsyncAxonPush(api_key="ak_...", tenant_id="1") as client:
    event = await client.events.publish(
        "web_search",
        {"query": "AI agents"},
        channel_id=1,
        agent_id="researcher",
        event_type="agent.tool_call.start",
    )

Framework Integrations

LangChain / LangGraph

from axonpush import AxonPush
from axonpush.integrations.langchain import AxonPushCallbackHandler

client = AxonPush(api_key="ak_...", tenant_id="1")
handler = AxonPushCallbackHandler(client, channel_id=1, agent_id="my-agent")

# All chain/tool/LLM events are published automatically
chain.invoke({"input": "..."}, config={"callbacks": [handler]})

OpenAI Agents SDK

from axonpush import AsyncAxonPush
from axonpush.integrations.openai_agents import AxonPushRunHooks

client = AsyncAxonPush(api_key="ak_...", tenant_id="1")
hooks = AxonPushRunHooks(client, channel_id=1)

result = await Runner.run(agent, input="...", hooks=hooks)

Claude / Anthropic

from axonpush import AxonPush
from axonpush.integrations.anthropic import AxonPushAnthropicTracer

client = AxonPush(api_key="ak_...", tenant_id="1")
tracer = AxonPushAnthropicTracer(client, channel_id=1)

# Wraps messages.create() — auto-emits events for tool_use, text, turns
response = tracer.create_message(
    anthropic_client,
    model="claude-sonnet-4-20250514",
    messages=[{"role": "user", "content": "Hello"}],
)

CrewAI

from axonpush import AxonPush
from axonpush.integrations.crewai import AxonPushCrewCallbacks

client = AxonPush(api_key="ak_...", tenant_id="1")
callbacks = AxonPushCrewCallbacks(client, channel_id=1)

callbacks.on_crew_start()
result = Crew(
    agents=[...],
    tasks=[...],
    step_callback=callbacks.on_step,
    task_callback=callbacks.on_task_complete,
).kickoff()
callbacks.on_crew_end(result)

Logging & Observability

Ship logs and traces from your existing Python observability stack to AxonPush. All four integrations emit OpenTelemetry-shaped payloads, so the events line up with anything else you're already sending to an OTel-compatible backend.

The stdlib AxonPushLoggingHandler installs a self-recursion filter by default that drops records from httpx, httpcore, and the SDK's own axonpush logger. Without it, each publish would trigger an httpx INFO log ("HTTP Request: POST /event 201 Created") that would get re-shipped, creating an infinite loop. The filter is always-on and cannot be disabled; you can add more excluded prefixes via exclude_loggers=[...].

Stdlib logging (FastAPI, Flask, Django, …)

import logging
from axonpush import AxonPush
from axonpush.integrations.logging_handler import AxonPushLoggingHandler

client = AxonPush(api_key="ak_...", tenant_id="1")
handler = AxonPushLoggingHandler(client=client, channel_id=1, service_name="my-api")

logging.getLogger().addHandler(handler)
logging.info("order created", extra={"order_id": 1234})

Django uses LOGGING dictConfig, which can't pass a pre-built client — so the handler also accepts credential kwargs (or reads AXONPUSH_API_KEY / AXONPUSH_TENANT_ID from the environment):

# settings.py
LOGGING = {
    "version": 1,
    "disable_existing_loggers": False,
    "handlers": {
        "axonpush": {
            "class": "axonpush.integrations.logging_handler.AxonPushLoggingHandler",
            "channel_id": 1,
            "service_name": "my-django-app",
            "exclude_loggers": ["django.db.backends"],  # optional
        },
    },
    "root": {"handlers": ["axonpush"], "level": "INFO"},
}

FastAPI / Flask — construct the handler with a pre-built client= in your app startup and attach it to logging.getLogger() (or app.logger for Flask).

Loguru

from loguru import logger
from axonpush import AxonPush
from axonpush.integrations.loguru import create_axonpush_loguru_sink

client = AxonPush(api_key="ak_...", tenant_id="1")
sink = create_axonpush_loguru_sink(client=client, channel_id=1, service_name="my-api")
logger.add(sink, serialize=True)  # serialize=True is required

logger.error("connection refused", user_id=42)

structlog

import structlog
from axonpush import AxonPush
from axonpush.integrations.structlog import axonpush_structlog_processor

client = AxonPush(api_key="ak_...", tenant_id="1")
forwarder = axonpush_structlog_processor(client=client, channel_id=1, service_name="my-api")

structlog.configure(
    processors=[
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        forwarder,  # non-destructive — composes with other processors
        structlog.processors.JSONRenderer(),
    ],
)
structlog.get_logger().error("downstream timeout", endpoint="/search")

Print capture (stdout/stderr → AxonPush)

For AI agents that emit free-form output via print(). Patches sys.stdout / sys.stderr with a tee stream that still writes to the original console.

from axonpush import AxonPush
from axonpush.integrations.print_capture import setup_print_capture

client = AxonPush(api_key="ak_...", tenant_id="1")
handle = setup_print_capture(client, channel_id=1, agent_id="demo-agent")

print("agent starting")  # forwarded to AxonPush as an agent.log event
handle.unpatch()

OpenTelemetry

If your service is already instrumented with the OTel SDK, add AxonPushSpanExporter to your tracer provider and every span ships to AxonPush alongside whatever other backends you export to.

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from axonpush import AxonPush
from axonpush.integrations.otel import AxonPushSpanExporter

client = AxonPush(api_key="ak_...", tenant_id="1")
provider = TracerProvider()
provider.add_span_processor(
    BatchSpanProcessor(
        AxonPushSpanExporter(client=client, channel_id=1, service_name="my-api")
    )
)
trace.set_tracer_provider(provider)

Real-Time Subscriptions

axonpush supports two real-time subscription mechanisms: SSE (Server-Sent Events) and WebSocket (Socket.IO).

SSE (Server-Sent Events)

SSE is the simplest way to consume events in real time — no extra dependencies required.

Subscribe to all events on a channel

from axonpush import AxonPush

with AxonPush(api_key="ak_...", tenant_id="1") as client:
    with client.channels.subscribe_sse(channel_id=1) as sub:
        for event in sub:
            print(event.agent_id, event.identifier, event.payload)

Subscribe to a specific event identifier

with client.channels.subscribe_event_sse(channel_id=1, event_identifier="web_search") as sub:
    for event in sub:
        print(event.payload)

Filter by agent, event type, or trace

All SSE methods accept optional filters to narrow the event stream:

with client.channels.subscribe_sse(
    channel_id=1,
    agent_id="researcher",
    event_type=EventType.AGENT_ERROR,
    trace_id="tr_run_42",
) as sub:
    for event in sub:
        print(f"[{event.agent_id}] {event.identifier}: {event.payload}")

WebSocket (Socket.IO)

WebSocket subscriptions are callback-based and support bidirectional communication (subscribe, publish, unsubscribe).

pip install axonpush[websocket]

Sync

ws = client.connect_websocket()
ws.on_event(lambda e: print(e.agent_id, e.payload))
ws.subscribe(channel_id=1, event_type="agent.tool_call.start")
ws.wait()  # blocks until disconnected

Async

ws = await async_client.connect_websocket()
ws.on_event(lambda e: print(e.agent_id, e.payload))
await ws.subscribe(channel_id=1, event_type="agent.tool_call.start")
await ws.wait()

Publish and unsubscribe via WebSocket

ws.publish(channel_id=1, identifier="status", payload={"step": "done"}, agent_id="worker")
ws.unsubscribe(channel_id=1)
ws.disconnect()

Use Case Guides

Step-by-step guides for common scenarios:

Resources

The client exposes Stripe-style resource objects:

Resource Methods
client.events publish(), list()
client.channels create(), get(), update(), delete(), subscribe_sse()
client.apps create(), get(), list(), update(), delete()
client.webhooks create_endpoint(), list_endpoints(), delete_endpoint(), get_deliveries()
client.traces list(), get_events(), get_summary()

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

axonpush-0.0.4.tar.gz (478.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

axonpush-0.0.4-py3-none-any.whl (47.9 kB view details)

Uploaded Python 3

File details

Details for the file axonpush-0.0.4.tar.gz.

File metadata

  • Download URL: axonpush-0.0.4.tar.gz
  • Upload date:
  • Size: 478.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for axonpush-0.0.4.tar.gz
Algorithm Hash digest
SHA256 78c90e7cd337117d725a6244c2b00626a1369f04f47a0bc1037784f965d74df7
MD5 2aae815d7b3a31251a3eadea8ace7fe5
BLAKE2b-256 bce71e7c3cf413df9c72ab835502ee9952233da97cbae2090e48231f5f38a9fe

See more details on using hashes here.

File details

Details for the file axonpush-0.0.4-py3-none-any.whl.

File metadata

  • Download URL: axonpush-0.0.4-py3-none-any.whl
  • Upload date:
  • Size: 47.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for axonpush-0.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 dff43e9cfeed872480af9b5a141ce16bebad56df65aaab7d394d91174a1ba041
MD5 3ed875dbf4b4f7e4cb403b143cd9be25
BLAKE2b-256 c9ffa64e567271a1c6d1634a674ceecc8aa7a3974835a92c64a2a87858d7319d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page