Skip to main content

A tiny agent orchestration engine: task DAG + scheduler + events in ~400 lines

Project description

AgentKube-Mini

AgentKube-Mini

A tiny agent orchestration engine. Implements a task DAG, dependency-aware parallel scheduler, and event system for multi-agent pipelines — all in about 400 lines of Python with zero dependencies. The idea is to show how agent orchestration actually works under the hood.

Installation

pip install agentkube-mini

Quick start

Example usage

Define agents as simple functions, wire them into a DAG, and run:

from agentkube_mini import Agent, TaskGraph, Runtime

# define agents — each is just a name + function
research = Agent("research", lambda topic: f"data about {topic}")
analysis = Agent("analysis", lambda topic, deps: f"analysis of {deps['research']}")
writer   = Agent("writer",   lambda topic, deps: f"article based on {deps['analysis']}")
critic   = Agent("critic",   lambda topic, deps: f"score=9 for {deps['writer']}")

# wire the DAG
graph = TaskGraph()
graph.add(research)
graph.add(analysis, depends=["research"])
graph.add(writer,   depends=["analysis"])
graph.add(critic,   depends=["writer"])

# run it
result = Runtime(graph).run("AI agents")
print(result.outputs)

Output:

research → data about AI agents
analysis → analysis of data about AI agents
writer   → article based on analysis of data about AI agents
critic   → score=9 for article based on analysis of data about AI agents

The scheduler automatically figures out which agents can run in parallel (independent nodes run concurrently via ThreadPoolExecutor) and which must wait for dependencies. Events are emitted at each step (task_started, task_completed, task_failed) so you get observability for free.

Visualization

The task graph can print itself as text or as a Mermaid diagram:

print(graph.visualize())
# research -> analysis
# analysis -> writer
# writer -> critic

print(graph.to_mermaid())
# graph TD
#     research
#     research --> analysis
#     analysis --> writer
#     writer --> critic

Events and shared memory

Every run produces an event log and a shared memory dict that all agents write into:

from agentkube_mini import Runtime, EventBus

bus = EventBus()
bus.subscribe("task_failed", lambda e: print(f"ALERT: {e.task} failed"))

result = Runtime(graph, event_bus=bus).run("AI agents")

# event log
for ev in result.events:
    print(ev.type, ev.task)

# shared memory — every agent's output is accessible
result.memory["research"]  # → "data about AI agents"

Using with existing code

You don't have to rewrite your services. Wrap them with auto_agent, which auto-detects the function signature:

from agentkube_mini import auto_agent

# your existing function, unchanged
def my_research_service(topic: str) -> dict:
    return {"topic": topic, "facts": ["f1", "f2"]}

research = auto_agent("research", my_research_service)

See integration_example.py for a full working example with legacy service classes.

How it works

The core abstraction is: agents are nodes, dependencies are edges, the scheduler walks the DAG. That's the whole idea. If you understand this, you understand the center of every real multi-agent runtime — the rest (distributed workers, retries, message queues, state stores) is extensions on top.

Agent graph  →  Scheduler  →  Parallel execution + Events

File structure

agent.py               — the Agent dataclass (name + callable)
task_graph.py          — DAG: add nodes, validate, visualize
scheduler.py           — dependency-aware parallel scheduler
runtime.py             — thin wrapper around scheduler
events.py              — event bus with subscribe/emit/history
example.py             — the demo shown above
compat.py              — auto_agent adapter for existing code
smoke_test.py          — tiny correctness test
integration_example.py — legacy service adapter demo

Running tests

python3 smoke_test.py

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agentkube_mini-0.1.1.tar.gz (6.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agentkube_mini-0.1.1-py3-none-any.whl (8.1 kB view details)

Uploaded Python 3

File details

Details for the file agentkube_mini-0.1.1.tar.gz.

File metadata

  • Download URL: agentkube_mini-0.1.1.tar.gz
  • Upload date:
  • Size: 6.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.6

File hashes

Hashes for agentkube_mini-0.1.1.tar.gz
Algorithm Hash digest
SHA256 6c69beeee3aedb3d680af33672f49cf3a94aa56eeb846edb65cba4df1664f9d6
MD5 39f96db6037cc1db709166350765326d
BLAKE2b-256 18ed06609b4fc11a3838ccf8a9ac77cc46a30fd2fbca8d7754f5ea2bfa15300c

See more details on using hashes here.

File details

Details for the file agentkube_mini-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: agentkube_mini-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 8.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.6

File hashes

Hashes for agentkube_mini-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 843c601d15aa18e81ccefe6a1553fdf3ade84f9c898ed15196496fc5694bcc58
MD5 cf4869ac2b5d85a490996e3f7743d5c9
BLAKE2b-256 3b3500fe867414d504db8c8340f53bebdfc4f50dece2c6edb3256dba2e451009

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page