Polls project management APIs for ready tickets and spawns AI agents to work on them
Project description
athanore
Polls task queues, dispatches agent subprocesses, and exposes an HTTP API so agents can interact with tasks without knowing which backend is in use.
How it works
The router polls configured queues for ready tasks. When it finds one, it moves the task to an in-progress queue, spawns an agent (via an AgentAdapter), and passes task details to the agent. The subprocess uses a local HTTP API to read task details, post comments, update fields, and move the task when done. The agent never talks to the backend directly.
Key concepts
- Queue adapters — Protocol-based (
QueueAdapter, 11 methods). Ships with a JSON file adapter. A Planka adapter is included as a user-land example (planka_backend.py). Implement the protocol for anything else (Jira, Trello, Linear, SQLite, SQS, etc.). - Agent adapters — Protocol-based (
AgentAdapter). Owns the full agent run lifecycle (spawn, monitor, cleanup). Ships withSubprocessAgentAdapter(generic subprocess management). A Claude adapter using theclaude-code-sdkis included as a user-land example (claude_adapter.py). - Routes — Flask-style
@dispatcher.route()decorators map queues to prompt-generating functions. Each route can specify its own agent adapter. - HTTP API — Agents hit localhost. No credentials, no backend coupling.
Quick start
Requires Python 3.13+.
uv pip install -e . # preferred (pip install -e . also works)
uv pip show athanore # verify the install succeeded
Create a Python script (e.g. run.py):
from athanore import AgentDispatcher, JsonFileAdapter
dispatcher = AgentDispatcher(
command="claude",
poll_interval=30,
agent_timeout=600,
max_concurrent_agents=3,
queue_backend=JsonFileAdapter("/tmp/board.json"),
)
@dispatcher.route(
args=["--agent", "engineer", "-p"],
queue_name="Todo",
in_progress_queue="In Progress",
)
def engineer_agent(task_id: str, task_name: str) -> str:
return f"Work on task {task_id}: {task_name}."
if __name__ == "__main__":
dispatcher.run(debug=True) # enable DEBUG logging (default: False)
For Planka users, see
planka_backend.pyat the repo root for a ready-made backend.
python run.py
This starts two things:
- Router — polls configured queues, picks up tasks, moves them to in-progress, and spawns agents via agent adapters.
- HTTP API — listens on
http://{api_host}:{api_port}so spawned agents can interact with tasks.
Configuration reference
All configuration is done via the AgentDispatcher constructor.
Constructor arguments
| Argument | Type | Default | Description |
|---|---|---|---|
command |
str |
"" |
Base command to run for all routes (e.g. "claude"). Optional when using per-route agent= adapters. |
poll_interval |
int |
30 |
Seconds between polls |
agent_timeout |
int | None |
None |
Default timeout in seconds for all agents |
max_concurrent_agents |
int |
3 |
Max agent processes at once |
api_host |
str |
"127.0.0.1" |
HTTP API bind address |
api_port |
int |
8000 |
HTTP API port |
queue_backend |
QueueAdapter | None |
None |
Task backend (required before calling run()). Any object with a create_adapter() method also works. |
default_agent |
AgentAdapter | None |
None |
Default agent adapter for all routes. Defaults to SubprocessAgentAdapter() if not provided. |
max_retries |
int |
0 |
Default max retry attempts for failed or timed-out agents. 0 disables retries. |
dead_letter_queue |
str | None |
None |
Default queue name where tasks are moved after exhausting all retries. |
enable_queue_management |
bool |
False |
Enable queue CRUD HTTP endpoints |
Route decorator
The @dispatcher.route() decorator registers a queue-to-command mapping. The decorated function receives (task_id, task_name) and returns a prompt string appended to the command arguments.
@dispatcher.route(
queue_name="My Project.My Board.Todo", # required: queue to poll
in_progress_queue="My Project.My Board.WIP", # default: "In Progress"
args=["--agent", "engineer", "-p"], # extra args before prompt
timeout=1800, # route-specific timeout (optional)
poll_interval=10, # route-specific poll interval (optional)
priority=1, # dispatch priority (optional)
max_retries=3, # max retry attempts (optional)
dead_letter_queue="My Project.My Board.Failed", # DLQ for exhausted retries (optional)
agent=ClaudeAgentAdapter(subagent="eng"), # per-route agent adapter (optional)
)
def my_agent(task_id: str, task_name: str) -> str:
return f"Work on task {task_id}: {task_name}."
Agent adapters
Agent adapters own the full lifecycle of running an agent: spawning, monitoring stdout, enforcing timeouts, and cleanup.
SubprocessAgentAdapter— Generic subprocess management. Usesroute.format_command()to build the command, spawns it, reads stdout, and enforces timeouts. Subclass to customize behavior.ClaudeAgentAdapter(claude_adapter.py) — Uses theclaude-code-sdkto run Claude agents natively via the SDK'squery()API. No subprocess management or stdout parsing needed. Picks up existing.claude/configuration automatically. Supportssubagent,model,permission_mode,max_turns,max_budget_usd, andcwdconstructor params.
Install the SDK separately: uv pip install claude-code-sdk
Per-route agent assignment:
from athanore import AgentDispatcher
from claude_adapter import ClaudeAgentAdapter
dispatcher = AgentDispatcher(
poll_interval=30,
queue_backend=my_backend,
)
@dispatcher.route(
queue_name="Engineering",
agent=ClaudeAgentAdapter(subagent="software-engineer"),
)
def eng(task_id, task_name):
return f"Implement {task_id}: {task_name}"
@dispatcher.route(
queue_name="Research",
agent=ClaudeAgentAdapter(subagent="research"),
)
def research(task_id, task_name):
return f"Research {task_id}: {task_name}"
Agent timeouts
You can configure timeouts to automatically terminate agent processes that run too long:
agent_timeout(constructor): Sets a global timeout in seconds for all agents. If not specified, agents run indefinitely.timeout(per-route): Sets a route-specific timeout in seconds. Overridesagent_timeoutfor that route.
When an agent times out:
- The process receives a TERM signal and has 5 seconds to exit gracefully
- If it doesn't exit, it receives a KILL signal
- A comment is added to the task noting the timeout
dispatcher = AgentDispatcher(
command="my-agent",
agent_timeout=3600, # 1 hour default for all agents
queue_backend=my_backend,
)
@dispatcher.route(
queue_name="Quick Tasks",
timeout=300, # 5 minutes for quick tasks (overrides default)
)
def quick(task_id, task_name):
return f"Handle {task_id}"
@dispatcher.route(queue_name="Long Tasks")
# No timeout — uses default of 3600 seconds
def long_running(task_id, task_name):
return f"Handle {task_id}"
Route priority
When max_concurrent_agents is limited, routes with lower priority values are dispatched first. This lets you ensure downstream queues (closer to completion) are serviced before upstream ones, so a task flows all the way through a pipeline before new work begins.
Routes without an explicit priority use their registration order as a tiebreaker.
@dispatcher.route(queue_name="QA", priority=1) # serviced first
def qa(task_id, task_name):
return f"Review {task_id}"
@dispatcher.route(queue_name="Engineering", priority=2)
def eng(task_id, task_name):
return f"Implement {task_id}"
@dispatcher.route(queue_name="Todo", priority=3) # serviced last
def todo(task_id, task_name):
return f"Handle {task_id}"
Per-queue poll intervals
By default, all queues are polled at the global poll_interval rate. You can override this per-route to poll high-priority queues more frequently or low-priority queues less often:
The router's internal tick rate automatically adjusts to the shortest configured interval, so no queue is ever starved.
dispatcher = AgentDispatcher(
command="my-agent",
poll_interval=60, # default for all queues
queue_backend=my_backend,
)
@dispatcher.route(queue_name="High Priority", poll_interval=10)
def urgent(task_id, task_name):
return f"Handle {task_id}"
@dispatcher.route(queue_name="Background", poll_interval=1800)
def background(task_id, task_name):
return f"Handle {task_id}"
@dispatcher.route(queue_name="Normal")
# No poll_interval — uses global default of 60s
def normal(task_id, task_name):
return f"Handle {task_id}"
Retry and dead-letter queues
The dispatcher can automatically retry failed agents and optionally route exhausted tasks to a dead-letter queue (DLQ).
What triggers a retry:
- Agent failure (non-zero exit code)
- Agent timeout
- Not cancellation (e.g., router shutdown or
KeyboardInterrupt)
Retry behavior — when a retry is triggered and the task's retry count is below max_retries:
- The task's retry count is incremented.
- A comment is added to the task (e.g., "Retry 1/3: timed out. Moving back to source queue for retry.").
- The task is moved back to the source queue (the original watched queue), where it will be picked up again on the next poll cycle.
Exhaustion without DLQ — when the retry count reaches max_retries and no dead_letter_queue is configured:
- A comment is added: "Max retries (N) exhausted. No dead-letter queue configured; leaving in place."
- The task stays in the in-progress queue. Manual intervention is required.
Exhaustion with DLQ — when the retry count reaches max_retries and a dead_letter_queue is configured:
- A comment is added: "Max retries (N) exhausted. Moving to dead-letter queue."
- The task is moved to the configured dead-letter queue.
Configuration — retry and DLQ settings can be configured at two levels:
- Global defaults:
max_retriesanddead_letter_queueon theAgentDispatcherconstructor apply to all routes. - Per-route overrides:
max_retriesanddead_letter_queueon@dispatcher.route()override the global defaults for that route.
Note: Setting
dead_letter_queue=Noneon a per-route basis does not disable a global DLQ — it falls through to the global default. There is currently no way to explicitly disable a globally configured DLQ for a single route.
dispatcher = AgentDispatcher(
command="my-agent",
max_retries=2, # default: retry up to 2 times
dead_letter_queue="Failed Tasks", # default DLQ for all routes
queue_backend=my_backend,
)
@dispatcher.route(
queue_name="Critical",
max_retries=5, # override: more retries for critical tasks
dead_letter_queue="Critical.Failed", # override: separate DLQ
)
def critical(task_id, task_name):
return f"Handle {task_id}"
@dispatcher.route(
queue_name="Best Effort",
max_retries=1, # override: only one retry
# No dead_letter_queue override — uses global "Failed Tasks"
)
def best_effort(task_id, task_name):
return f"Handle {task_id}"
@dispatcher.route(queue_name="Normal")
# No overrides — uses global defaults (2 retries, DLQ = "Failed Tasks")
def normal(task_id, task_name):
return f"Handle {task_id}"
HTTP API
| Method | Endpoint | Description |
|---|---|---|
GET |
/status |
Server metadata: capacity, timing, active agent count |
GET |
/agents |
List running agents with task IDs, runtime, timeout |
GET |
/agents/{task_id} |
Single agent detail with buffered event history |
GET |
/tasks/{task_id} |
Full task info: description, labels, assignees, retry_count, comments |
GET |
/tasks/{task_id}/events |
SSE stream of agent events for a task |
POST |
/tasks/{task_id}/comments |
Post a comment on a task ({"comment": "text"}) |
POST |
/tasks/{task_id}/move |
Move a task to a different queue ({"target_queue": "name"}) |
PATCH |
/tasks/{task_id} |
Update task fields ({"name": "...", "description": "...", "labels": [...], "assignees": [...]}) |
POST |
/tasks |
Create a new task ({"queue_name": "...", "name": "...", "description": "..."}) |
GET |
/queues |
List all queues with task counts |
GET |
/queues/{queue_name} |
Get details for a specific queue |
POST |
/queues |
Create a new queue ({"name": "..."}) |
PATCH |
/queues/{queue_name} |
Update/rename a queue ({"name": "..."}) |
DELETE |
/queues/{queue_name} |
Delete an empty queue |
GET |
/routes |
List all configured routes |
GET |
/routes/{queue_name} |
Get details for a specific route |
POST |
/routes |
Create a new route ({"queue_name": "...", ...}) |
PATCH |
/routes/{queue_name} |
Update route fields |
DELETE |
/routes/{queue_name} |
Delete a route |
Queue/route write endpoints (POST, PATCH, DELETE) require enable_queue_management=True.
Task lifecycle
- Task sits in a watched queue (e.g.
Todo) - Router picks it up, moves it to the in-progress queue, and assigns the authenticated user
- Router spawns an agent via the route's
AgentAdapter - The agent uses the HTTP API to read task details, add comments, etc.
- When finished, the agent calls the move endpoint to move the task to a done queue
Backends
Planka
The Planka backend is provided as a user-land file (planka_backend.py at the repo root), not as part of the library. It requires plankapy>=2.3.0 to be installed separately:
uv pip install plankapy>=2.3.0
Uses dot-notation for queue naming: Project.Board.List.
from athanore import AgentDispatcher
from planka_backend import PlankaBackend
dispatcher = AgentDispatcher(
command="my-agent",
queue_backend=PlankaBackend(url="http://localhost:1337"),
)
@dispatcher.route(
queue_name="My Project.My Board.Todo",
in_progress_queue="My Project.My Board.In Progress",
args=["-p"],
)
def handle(task_id: str, task_name: str) -> str:
return f"Work on task {task_id}: {task_name}"
Planka authentication
Credentials can be passed directly as kwargs or resolved from environment variables:
# Option 1: API token (kwarg)
PlankaBackend(url="http://localhost:3000", token="your-token-here")
# Option 2: Username + password (kwargs)
PlankaBackend(url="http://localhost:3000", username="admin", password="secret")
# Option 3: Environment variables (default when no kwargs are given)
# PLANKA_TOKEN=your-token-here
# — or —
# PLANKA_USER=admin + PLANKA_PASSWORD=secret
PlankaBackend(url="http://localhost:3000")
Credentials are resolved at dispatcher.run() time, not at import time. If you use .env files, call dotenv.load_dotenv() in your script before dispatcher.run().
JSON file
For development/testing or lightweight use without external services.
from athanore import AgentDispatcher, JsonFileAdapter
dispatcher = AgentDispatcher(
command="my-agent",
queue_backend=JsonFileAdapter("/tmp/board.json"),
)
The JSON file structure:
{
"queues": {
"Todo": [
{"id": "1", "name": "Fix crash", "description": "...", "labels": [], "assignees": [], "comments": [], "tasks": []}
],
"In Progress": [],
"Done": []
}
}
Custom
Implement the QueueAdapter protocol (11 methods) in athanore/queue_adapters/base.py:
get_ready_tasks(queue_names)— Return tasks from the given queuesget_task(task_id)— Return a single task by IDmove_task(task_id, target_queue)— Move a task between queuesadd_comment(task_id, text)— Add a comment to a taskupdate_task(task_id, *, assignees, name, description, labels)— Update task fieldscreate_task(queue_name, name, description)— Create a new tasklist_queues()— List all queues with task countsget_queue(queue_name)— Get a single queue's infocreate_queue(queue_name)— Create a new empty queueupdate_queue(queue_name, *, new_name)— Rename a queuedelete_queue(queue_name)— Delete an empty queue
Pass your custom adapter directly to the constructor:
dispatcher = AgentDispatcher(
command="my-agent",
queue_backend=MyCustomAdapter(),
)
Development
uv pip install -e ".[dev]" # pip install -e ".[dev]" also works
pytest
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file athanore-0.0.2.tar.gz.
File metadata
- Download URL: athanore-0.0.2.tar.gz
- Upload date:
- Size: 6.6 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.0 {"installer":{"name":"uv","version":"0.10.0","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4ea6cef82bc0ff6f85ee52c7c9d10a5854f8c88894a2bd5174a4d6db7cc98815
|
|
| MD5 |
2a049bcd260e1217177ef63be94fab3f
|
|
| BLAKE2b-256 |
6093416a281e3c06524a25762c6e179a7ed44222bbda6f1271563a5723612538
|
File details
Details for the file athanore-0.0.2-py3-none-any.whl.
File metadata
- Download URL: athanore-0.0.2-py3-none-any.whl
- Upload date:
- Size: 46.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.0 {"installer":{"name":"uv","version":"0.10.0","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c21ea14c3a89d53f6cfeb691c6bc19c22b0fa96195ccb3ae8fd7cfb83a8684d3
|
|
| MD5 |
05d6092832c2d824d5ecab22f61084f0
|
|
| BLAKE2b-256 |
700c9a8c3d4d3ccc6bed1d6c75f378b52ad6a7e78e9e5a9b71662f6cb97349ef
|