Skip to main content

Lightweight observability SDK for AI agents

Project description

HiveLoop

Lightweight observability SDK for AI agents.

HiveLoop instruments your AI agents with 3 lines of code and sends structured telemetry to HiveBoard -- giving you heartbeat monitoring, task timelines, LLM cost tracking, plan progress, queue visibility, and issue detection without changing how your agents work.

import hiveloop

hb = hiveloop.init(api_key="hb_live_...")
agent = hb.agent("lead-qualifier", type="sales")

Your agent is now visible on the HiveBoard dashboard. Green dot = alive. Red dot = something's wrong. No logs to check.

Why HiveLoop?

AI agents fail silently. A credential expires, a queue backs up, a task hangs -- and nobody notices until a customer complains. HiveLoop makes these failures visible in seconds, not hours.

  • Heartbeat monitoring -- know instantly when an agent stops responding
  • Task timelines -- see every step an agent took, which tool failed, and why
  • LLM cost tracking -- know exactly what each model call costs, per agent, per task
  • Plan progress -- track multi-step plans and see where they diverge
  • Queue visibility -- see what work is waiting and what's stuck
  • Issue detection -- agents can self-report problems with recurrence tracking
  • Human-in-the-loop -- track escalations and approval workflows

Installation

pip install hiveloop

Requires Python 3.11+. Only dependency: requests.

Quick Start

Layer 0: Presence (3 lines)

Register your agent. HiveBoard will show it as alive with heartbeat monitoring.

import hiveloop

hb = hiveloop.init(api_key="hb_live_...")
agent = hb.agent("my-agent", type="support", version="1.0.0")

# Your agent code runs here. Heartbeats are sent automatically.
# When you're done:
hiveloop.shutdown()

What you see on HiveBoard: Agent card with name, type, status badge, heartbeat indicator.

Layer 1: Task Timelines (add context managers)

Track tasks to get success rates, durations, throughput, and failure diagnostics.

with agent.task("task-001", project="sales-pipeline", type="lead_qualification") as task:
    lead = fetch_crm_data("lead-4821")
    score = score_lead(lead)
    route_lead(score)

The task auto-completes on exit. If an exception is raised, it auto-fails with the exception details. Exceptions are never swallowed.

What you see: Task table, success rates, duration averages, throughput charts.

Layer 2: Full Story (add rich events)

Add LLM call tracking, plans, queue snapshots, and issue reporting for complete observability.

with agent.task("task-001", project="sales-pipeline") as task:
    # Track LLM calls with cost data
    task.llm_call(
        "lead_scoring",
        "claude-sonnet-4-20250514",
        tokens_in=1800,
        tokens_out=250,
        cost=0.004,
        duration_ms=1200,
        prompt_preview="Score this lead based on...",
        response_preview="Score: 72. Reasoning: ...",
    )

    # Track multi-step plans
    task.plan("Process lead", ["Fetch CRM", "Score", "Enrich", "Route"])
    task.plan_step(0, "completed", "CRM data retrieved", turns=1, tokens=800)
    task.plan_step(1, "completed", "Lead scored 72", turns=2, tokens=3200)
    task.plan_step(2, "failed", "Enrichment API timeout")

    # Escalate when needed
    task.escalate("Enrichment service down", reason="service_outage")

    # Request human approval
    task.request_approval("Credit $450 for customer BlueStar?", approver="sales-lead")

What you see: Cost explorer, task timelines with LLM nodes, plan progress bars, escalation tracking.

Action Tracking

Track individual functions with the @agent.track decorator. Supports both sync and async functions, with automatic nesting.

@agent.track("fetch_crm_data")
def fetch_crm_data(lead_id):
    return crm_client.get_lead(lead_id)

@agent.track("enrich_company")
async def enrich_company(company_name):
    return await enrichment_api.lookup(company_name)

@agent.track("process_lead")
def process_lead(lead_id):
    lead = fetch_crm_data(lead_id)       # Nested action
    company = enrich_company(lead.company) # Nested action
    return score(lead, company)

with agent.task("task-001") as task:
    result = process_lead("lead-4821")

Each tracked function emits action_started and action_completed (or action_failed) events with duration. Nesting is tracked automatically -- fetch_crm_data and enrich_company show as children of process_lead in the timeline.

For inline tracking without decorators:

with agent.track_context("data_processing") as action:
    items = process_batch(data)
    action.set_payload({"items_processed": len(items)})

Agent-Level Features

Queue Monitoring

Report your agent's work queue so HiveBoard can detect backed-up or abandoned work.

# Manual snapshot
agent.queue_snapshot(
    depth=5,
    oldest_age_seconds=120,
    items=[
        {"id": "lead-100", "priority": "high", "source": "inbound"},
        {"id": "lead-101", "priority": "normal", "source": "referral"},
    ],
)

# Or use automatic reporting via callback (emits with each heartbeat)
def get_queue():
    return {"depth": len(my_queue), "oldest_age_seconds": my_queue.oldest_age()}

agent = hb.agent("my-agent", queue_provider=get_queue)

Issue Reporting

Agents can self-report problems. HiveBoard tracks recurrence.

agent.report_issue(
    "CRM API returning 403",
    severity="high",
    issue_id="crm-auth-failure",
    category="permissions",
    context={"endpoint": "/api/leads", "status_code": 403},
    occurrence_count=8,
)

# Later, when resolved:
agent.resolve_issue("CRM credentials rotated", issue_id="crm-auth-failure")

TODO Tracking

Track work items that agents create for themselves or for humans.

agent.todo(
    "todo-001",
    action="created",
    summary="Follow up on failed enrichment for lead-4821",
    priority="high",
    source="failed_action",
    due_by="2026-02-14T15:00:00Z",
)

# When completed:
agent.todo("todo-001", action="completed", summary="Enrichment retry succeeded")

Scheduled Work

Report recurring jobs so HiveBoard can show what's coming next.

agent.scheduled([
    {
        "id": "daily-sync",
        "name": "CRM Full Sync",
        "next_run": "2026-02-14T06:00:00Z",
        "interval": "24h",
        "enabled": True,
    },
    {
        "id": "hourly-check",
        "name": "Lead Queue Check",
        "next_run": "2026-02-13T15:00:00Z",
        "interval": "1h",
        "enabled": True,
    },
])

Custom Heartbeat Data

Include operational metrics in heartbeats so HiveBoard can detect behavioral drift.

def heartbeat_data():
    return {
        "crm_sync": True,
        "email_check": True,
        "queue_depth": len(my_queue),
        "last_success": last_success_time.isoformat(),
    }

agent = hb.agent("my-agent", heartbeat_payload=heartbeat_data)

Task Lifecycle

Context Manager (recommended)

with agent.task("task-001") as task:
    do_work()
    # Auto-completes on clean exit
    # Auto-fails with exception details if exception is raised

Manual Lifecycle

For long-running tasks or when you need explicit control:

task = agent.start_task("task-001", project="sales")

try:
    result = do_work()
    task.complete(status="success", payload={"result": result})
except Exception as e:
    task.fail(exception=e)

Setting Payload

Attach data to the completion event:

with agent.task("task-001") as task:
    score = compute_score()
    task.set_payload({"score": score, "threshold": 80, "decision": "nurture"})

Approval Workflows

Track human-in-the-loop decisions:

with agent.task("refund-review") as task:
    task.request_approval(
        "Refund $450 for customer BlueStar",
        approver="finance-team",
    )

    # ... wait for human decision ...

    task.approval_received(
        "Approved by @jsmith",
        approved_by="jsmith",
        decision="approved",
    )

Retry Tracking

Make retry behavior visible in task timelines:

with agent.task("task-001") as task:
    for attempt in range(3):
        try:
            result = call_external_api()
            break
        except TimeoutError:
            task.retry(
                f"API timeout, attempt {attempt + 1}",
                attempt=attempt + 1,
                backoff_seconds=2 ** attempt,
            )
            time.sleep(2 ** attempt)

Configuration

hiveloop.init() Parameters

Parameter Type Default Description
api_key str required API key (must start with hb_)
environment str "production" Environment identifier
group str "default" Organizational group
endpoint str "https://api.hiveboard.io" HiveBoard API endpoint
flush_interval float 5.0 Seconds between automatic flushes
batch_size int 100 Max events per batch (capped at 500)
max_queue_size int 10_000 Max queued events before dropping oldest
debug bool False Enable debug logging

hb.agent() Parameters

Parameter Type Default Description
agent_id str required Unique agent identifier
type str "general" Agent type (e.g., "sales", "support")
version str | None None Agent version string
framework str "custom" Framework (e.g., "langchain", "crewai")
heartbeat_interval float 30.0 Seconds between heartbeats (0 to disable)
stuck_threshold int 300 Seconds before agent is considered stuck
heartbeat_payload callable None Callback returning dict of heartbeat data
queue_provider callable None Callback returning queue state dict

How It Works

HiveLoop runs a background daemon thread that batches events and sends them to HiveBoard via POST /v1/ingest. All SDK calls return immediately -- they never block your agent.

  • Non-blocking: Events are queued in memory and flushed in the background
  • Thread-safe: Safe to call from multiple threads. Task context is thread-local
  • Async-aware: @agent.track works with both sync and async functions
  • Resilient: Transport never raises exceptions to your code. Failed sends are retried with exponential backoff (1s, 2s, 4s, 8s, 16s) and silently dropped after 5 retries
  • Bounded: Queue has a configurable max size (default 10,000). Oldest events are dropped when full
  • Graceful: hiveloop.shutdown() flushes all remaining events before exiting. Also registered as an atexit handler

Framework Compatibility

HiveLoop is framework-agnostic. It works with:

  • Plain Python scripts
  • LangChain / LangGraph agents
  • CrewAI crews
  • AutoGen agents
  • Any Python code that runs AI agents

No framework-specific adapters needed. Just import hiveloop and instrument.

Self-Hosted

Point HiveLoop at your own HiveBoard instance:

hb = hiveloop.init(
    api_key="hb_live_...",
    endpoint="https://hiveboard.your-company.com",
)

API Reference

Module-Level Functions

hiveloop.init(api_key, ...)      # Initialize SDK singleton
hiveloop.shutdown(timeout=5.0)   # Shut down and flush
hiveloop.reset()                 # Clear singleton for re-initialization
hiveloop.flush()                 # Force immediate flush

Task Methods

task.llm_call(name, model, ...)            # Record LLM call
task.plan(goal, steps, revision=0)         # Record plan
task.plan_step(index, action, summary)     # Update plan step
task.escalate(summary, ...)                # Escalate to human
task.request_approval(summary, ...)        # Request approval
task.approval_received(summary, ...)       # Record approval
task.retry(summary, ...)                   # Record retry
task.event(event_type, payload, ...)       # Custom event
task.complete(status="success")            # Manual complete
task.fail(exception=None)                  # Manual fail
task.set_payload(payload)                  # Set completion data

Agent Methods

agent.task(task_id, ...)                   # Create task (context manager)
agent.start_task(task_id, ...)             # Start task (manual lifecycle)
agent.track(action_name)                   # Decorator for action tracking
agent.track_context(action_name)           # Context manager for actions
agent.llm_call(name, model, ...)           # Agent-level LLM call
agent.queue_snapshot(depth, ...)           # Report queue state
agent.todo(todo_id, action, summary, ...)  # TODO lifecycle
agent.scheduled(items)                     # Scheduled work items
agent.report_issue(summary, severity, ...) # Report issue
agent.resolve_issue(summary, ...)          # Resolve issue
agent.event(event_type, payload, ...)      # Custom event

Instrumentation Layers

You don't need to instrument everything on day one. Start small and add depth as needed.

Layer What You Add What You See
Layer 0: Presence hiveloop.init() + hb.agent() Agent cards, heartbeats, stuck detection
Layer 1: Timelines agent.task() + @agent.track() Task table, success rates, durations, action trees
Layer 2: Full Story task.llm_call(), task.plan(), agent.report_issue(), etc. Cost explorer, plan progress, queue visibility, issue tracking

Requirements

  • Python >= 3.11
  • requests >= 2.31.0
  • A HiveBoard instance (cloud or self-hosted)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

hiveloop-0.2.0.tar.gz (31.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

hiveloop-0.2.0-py3-none-any.whl (29.1 kB view details)

Uploaded Python 3

File details

Details for the file hiveloop-0.2.0.tar.gz.

File metadata

  • Download URL: hiveloop-0.2.0.tar.gz
  • Upload date:
  • Size: 31.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for hiveloop-0.2.0.tar.gz
Algorithm Hash digest
SHA256 3dc223b827d4b17b9ffaebb3c946b6456b5d5c635ed70980479e532ccf43894c
MD5 0adf6aae22bda5061a800a326931f368
BLAKE2b-256 a6e7fac0678fca001c4a8b436857a0c6a0891d3f2401c2dd77e570ee5d1ed2a3

See more details on using hashes here.

File details

Details for the file hiveloop-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: hiveloop-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 29.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for hiveloop-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2250f976205a53e28e5ac2f06cc455c869ab75f37c647d47a9764101f5d631b6
MD5 41e5a2ae3632b278f91e5b41f50a2939
BLAKE2b-256 68b2b200590b80e7915e8fdbccd9c8bc1b14498a53b2977d57c0d4196c62a09f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page