Skip to main content

Polls project management APIs for ready tickets and spawns AI agents to work on them

Project description

artificer-dispatcher

Polls task queues, dispatches agent subprocesses, and exposes an HTTP API so agents can interact with tasks without knowing which backend is in use.

How it works

The router polls configured queues for ready tasks. When it finds one, it moves the task to an in-progress queue, spawns an agent (via an AgentAdapter), and passes task details to the agent. The subprocess uses a local HTTP API to read task details, post comments, update fields, and move the task when done. The agent never talks to the backend directly.

Key concepts

  • Queue adapters — Protocol-based (QueueAdapter, 11 methods). Ships with a JSON file adapter. A Planka adapter is included as a user-land example (planka_backend.py). Implement the protocol for anything else (Jira, Trello, Linear, SQLite, SQS, etc.).
  • Agent adapters — Protocol-based (AgentAdapter). Owns the full agent run lifecycle (spawn, monitor, cleanup). Ships with SubprocessAgentAdapter (generic subprocess management). A Claude adapter using the claude-code-sdk is included as a user-land example (claude_adapter.py).
  • Routes — Flask-style @dispatcher.route() decorators map queues to prompt-generating functions. Each route can specify its own agent adapter.
  • HTTP API — Agents hit localhost. No credentials, no backend coupling.

Quick start

Requires Python 3.13+.

uv pip install -e .          # preferred (pip install -e . also works)
uv pip show artificer-dispatcher  # verify the install succeeded

Create a Python script (e.g. run.py):

from artificer import AgentDispatcher, JsonFileAdapter

dispatcher = AgentDispatcher(
    command="claude",
    poll_interval=30,
    agent_timeout=600,
    max_concurrent_agents=3,
    queue_backend=JsonFileAdapter("/tmp/board.json"),
)

@dispatcher.route(
    args=["--agent", "engineer", "-p"],
    queue_name="Todo",
    in_progress_queue="In Progress",
)
def engineer_agent(task_id: str, task_name: str) -> str:
    return f"Work on task {task_id}: {task_name}."

if __name__ == "__main__":
    dispatcher.run(debug=True)  # enable DEBUG logging (default: False)

For Planka users, see planka_backend.py at the repo root for a ready-made backend.

python run.py

This starts two things:

  1. Router — polls configured queues, picks up tasks, moves them to in-progress, and spawns agents via agent adapters.
  2. HTTP API — listens on http://{api_host}:{api_port} so spawned agents can interact with tasks.

Configuration reference

All configuration is done via the AgentDispatcher constructor.

Constructor arguments

Argument Type Default Description
command str "" Base command to run for all routes (e.g. "claude"). Optional when using per-route agent= adapters.
poll_interval int 30 Seconds between polls
agent_timeout int | None None Default timeout in seconds for all agents
max_concurrent_agents int 3 Max agent processes at once
api_host str "127.0.0.1" HTTP API bind address
api_port int 8000 HTTP API port
queue_backend QueueAdapter | None None Task backend (required before calling run()). Any object with a create_adapter() method also works.
default_agent AgentAdapter | None None Default agent adapter for all routes. Defaults to SubprocessAgentAdapter() if not provided.
max_retries int 0 Default max retry attempts for failed or timed-out agents. 0 disables retries.
dead_letter_queue str | None None Default queue name where tasks are moved after exhausting all retries.
enable_queue_management bool False Enable queue CRUD HTTP endpoints

Route decorator

The @dispatcher.route() decorator registers a queue-to-command mapping. The decorated function receives (task_id, task_name) and returns a prompt string appended to the command arguments.

@dispatcher.route(
    queue_name="My Project.My Board.Todo",          # required: queue to poll
    in_progress_queue="My Project.My Board.WIP",    # default: "In Progress"
    args=["--agent", "engineer", "-p"],              # extra args before prompt
    timeout=1800,                                    # route-specific timeout (optional)
    poll_interval=10,                                # route-specific poll interval (optional)
    priority=1,                                      # dispatch priority (optional)
    max_retries=3,                                   # max retry attempts (optional)
    dead_letter_queue="My Project.My Board.Failed",  # DLQ for exhausted retries (optional)
    agent=ClaudeAgentAdapter(subagent="eng"),         # per-route agent adapter (optional)
)
def my_agent(task_id: str, task_name: str) -> str:
    return f"Work on task {task_id}: {task_name}."

Agent adapters

Agent adapters own the full lifecycle of running an agent: spawning, monitoring stdout, enforcing timeouts, and cleanup.

  • SubprocessAgentAdapter — Generic subprocess management. Uses route.format_command() to build the command, spawns it, reads stdout, and enforces timeouts. Subclass to customize behavior.
  • ClaudeAgentAdapter (claude_adapter.py) — Uses the claude-code-sdk to run Claude agents natively via the SDK's query() API. No subprocess management or stdout parsing needed. Picks up existing .claude/ configuration automatically. Supports subagent, model, permission_mode, max_turns, max_budget_usd, and cwd constructor params.

Install the SDK separately: uv pip install claude-code-sdk

Per-route agent assignment:

from artificer import AgentDispatcher
from claude_adapter import ClaudeAgentAdapter

dispatcher = AgentDispatcher(
    poll_interval=30,
    queue_backend=my_backend,
)

@dispatcher.route(
    queue_name="Engineering",
    agent=ClaudeAgentAdapter(subagent="software-engineer"),
)
def eng(task_id, task_name):
    return f"Implement {task_id}: {task_name}"

@dispatcher.route(
    queue_name="Research",
    agent=ClaudeAgentAdapter(subagent="research"),
)
def research(task_id, task_name):
    return f"Research {task_id}: {task_name}"

Agent timeouts

You can configure timeouts to automatically terminate agent processes that run too long:

  • agent_timeout (constructor): Sets a global timeout in seconds for all agents. If not specified, agents run indefinitely.
  • timeout (per-route): Sets a route-specific timeout in seconds. Overrides agent_timeout for that route.

When an agent times out:

  1. The process receives a TERM signal and has 5 seconds to exit gracefully
  2. If it doesn't exit, it receives a KILL signal
  3. A comment is added to the task noting the timeout
dispatcher = AgentDispatcher(
    command="my-agent",
    agent_timeout=3600,  # 1 hour default for all agents
    queue_backend=my_backend,
)

@dispatcher.route(
    queue_name="Quick Tasks",
    timeout=300,  # 5 minutes for quick tasks (overrides default)
)
def quick(task_id, task_name):
    return f"Handle {task_id}"

@dispatcher.route(queue_name="Long Tasks")
# No timeout — uses default of 3600 seconds
def long_running(task_id, task_name):
    return f"Handle {task_id}"

Route priority

When max_concurrent_agents is limited, routes with lower priority values are dispatched first. This lets you ensure downstream queues (closer to completion) are serviced before upstream ones, so a task flows all the way through a pipeline before new work begins.

Routes without an explicit priority use their registration order as a tiebreaker.

@dispatcher.route(queue_name="QA",          priority=1)  # serviced first
def qa(task_id, task_name):
    return f"Review {task_id}"

@dispatcher.route(queue_name="Engineering", priority=2)
def eng(task_id, task_name):
    return f"Implement {task_id}"

@dispatcher.route(queue_name="Todo",        priority=3)  # serviced last
def todo(task_id, task_name):
    return f"Handle {task_id}"

Per-queue poll intervals

By default, all queues are polled at the global poll_interval rate. You can override this per-route to poll high-priority queues more frequently or low-priority queues less often:

The router's internal tick rate automatically adjusts to the shortest configured interval, so no queue is ever starved.

dispatcher = AgentDispatcher(
    command="my-agent",
    poll_interval=60,  # default for all queues
    queue_backend=my_backend,
)

@dispatcher.route(queue_name="High Priority", poll_interval=10)
def urgent(task_id, task_name):
    return f"Handle {task_id}"

@dispatcher.route(queue_name="Background", poll_interval=1800)
def background(task_id, task_name):
    return f"Handle {task_id}"

@dispatcher.route(queue_name="Normal")
# No poll_interval — uses global default of 60s
def normal(task_id, task_name):
    return f"Handle {task_id}"

Retry and dead-letter queues

The dispatcher can automatically retry failed agents and optionally route exhausted tasks to a dead-letter queue (DLQ).

What triggers a retry:

  • Agent failure (non-zero exit code)
  • Agent timeout
  • Not cancellation (e.g., router shutdown or KeyboardInterrupt)

Retry behavior — when a retry is triggered and the task's retry count is below max_retries:

  1. The task's retry count is incremented.
  2. A comment is added to the task (e.g., "Retry 1/3: timed out. Moving back to source queue for retry.").
  3. The task is moved back to the source queue (the original watched queue), where it will be picked up again on the next poll cycle.

Exhaustion without DLQ — when the retry count reaches max_retries and no dead_letter_queue is configured:

  • A comment is added: "Max retries (N) exhausted. No dead-letter queue configured; leaving in place."
  • The task stays in the in-progress queue. Manual intervention is required.

Exhaustion with DLQ — when the retry count reaches max_retries and a dead_letter_queue is configured:

  • A comment is added: "Max retries (N) exhausted. Moving to dead-letter queue."
  • The task is moved to the configured dead-letter queue.

Configuration — retry and DLQ settings can be configured at two levels:

  • Global defaults: max_retries and dead_letter_queue on the AgentDispatcher constructor apply to all routes.
  • Per-route overrides: max_retries and dead_letter_queue on @dispatcher.route() override the global defaults for that route.

Note: Setting dead_letter_queue=None on a per-route basis does not disable a global DLQ — it falls through to the global default. There is currently no way to explicitly disable a globally configured DLQ for a single route.

dispatcher = AgentDispatcher(
    command="my-agent",
    max_retries=2,                      # default: retry up to 2 times
    dead_letter_queue="Failed Tasks",   # default DLQ for all routes
    queue_backend=my_backend,
)

@dispatcher.route(
    queue_name="Critical",
    max_retries=5,                              # override: more retries for critical tasks
    dead_letter_queue="Critical.Failed",        # override: separate DLQ
)
def critical(task_id, task_name):
    return f"Handle {task_id}"

@dispatcher.route(
    queue_name="Best Effort",
    max_retries=1,                              # override: only one retry
    # No dead_letter_queue override — uses global "Failed Tasks"
)
def best_effort(task_id, task_name):
    return f"Handle {task_id}"

@dispatcher.route(queue_name="Normal")
# No overrides — uses global defaults (2 retries, DLQ = "Failed Tasks")
def normal(task_id, task_name):
    return f"Handle {task_id}"

HTTP API

Method Endpoint Description
GET /status Server metadata: capacity, timing, active agent count
GET /agents List running agents with task IDs, runtime, timeout
GET /agents/{task_id} Single agent detail with buffered event history
GET /tasks/{task_id} Full task info: description, labels, assignees, retry_count, comments
GET /tasks/{task_id}/events SSE stream of agent events for a task
POST /tasks/{task_id}/comments Post a comment on a task ({"comment": "text"})
POST /tasks/{task_id}/move Move a task to a different queue ({"target_queue": "name"})
PATCH /tasks/{task_id} Update task fields ({"name": "...", "description": "...", "labels": [...], "assignees": [...]})
POST /tasks Create a new task ({"queue_name": "...", "name": "...", "description": "..."})
GET /queues List all queues with task counts
GET /queues/{queue_name} Get details for a specific queue
POST /queues Create a new queue ({"name": "..."})
PATCH /queues/{queue_name} Update/rename a queue ({"name": "..."})
DELETE /queues/{queue_name} Delete an empty queue
GET /routes List all configured routes
GET /routes/{queue_name} Get details for a specific route
POST /routes Create a new route ({"queue_name": "...", ...})
PATCH /routes/{queue_name} Update route fields
DELETE /routes/{queue_name} Delete a route

Queue/route write endpoints (POST, PATCH, DELETE) require enable_queue_management=True.

Task lifecycle

  1. Task sits in a watched queue (e.g. Todo)
  2. Router picks it up, moves it to the in-progress queue, and assigns the authenticated user
  3. Router spawns an agent via the route's AgentAdapter
  4. The agent uses the HTTP API to read task details, add comments, etc.
  5. When finished, the agent calls the move endpoint to move the task to a done queue

Backends

Planka

The Planka backend is provided as a user-land file (planka_backend.py at the repo root), not as part of the library. It requires plankapy>=2.3.0 to be installed separately:

uv pip install plankapy>=2.3.0

Uses dot-notation for queue naming: Project.Board.List.

from artificer import AgentDispatcher
from planka_backend import PlankaBackend

dispatcher = AgentDispatcher(
    command="my-agent",
    queue_backend=PlankaBackend(url="http://localhost:1337"),
)

@dispatcher.route(
    queue_name="My Project.My Board.Todo",
    in_progress_queue="My Project.My Board.In Progress",
    args=["-p"],
)
def handle(task_id: str, task_name: str) -> str:
    return f"Work on task {task_id}: {task_name}"

Planka authentication

Credentials can be passed directly as kwargs or resolved from environment variables:

# Option 1: API token (kwarg)
PlankaBackend(url="http://localhost:3000", token="your-token-here")

# Option 2: Username + password (kwargs)
PlankaBackend(url="http://localhost:3000", username="admin", password="secret")

# Option 3: Environment variables (default when no kwargs are given)
# PLANKA_TOKEN=your-token-here
# — or —
# PLANKA_USER=admin  +  PLANKA_PASSWORD=secret
PlankaBackend(url="http://localhost:3000")

Credentials are resolved at dispatcher.run() time, not at import time. If you use .env files, call dotenv.load_dotenv() in your script before dispatcher.run().

JSON file

For development/testing or lightweight use without external services.

from artificer import AgentDispatcher, JsonFileAdapter

dispatcher = AgentDispatcher(
    command="my-agent",
    queue_backend=JsonFileAdapter("/tmp/board.json"),
)

The JSON file structure:

{
  "queues": {
    "Todo": [
      {"id": "1", "name": "Fix crash", "description": "...", "labels": [], "assignees": [], "comments": [], "tasks": []}
    ],
    "In Progress": [],
    "Done": []
  }
}

Custom

Implement the QueueAdapter protocol (11 methods) in artificer/queue_adapters/base.py:

  • get_ready_tasks(queue_names) — Return tasks from the given queues
  • get_task(task_id) — Return a single task by ID
  • move_task(task_id, target_queue) — Move a task between queues
  • add_comment(task_id, text) — Add a comment to a task
  • update_task(task_id, *, assignees, name, description, labels) — Update task fields
  • create_task(queue_name, name, description) — Create a new task
  • list_queues() — List all queues with task counts
  • get_queue(queue_name) — Get a single queue's info
  • create_queue(queue_name) — Create a new empty queue
  • update_queue(queue_name, *, new_name) — Rename a queue
  • delete_queue(queue_name) — Delete an empty queue

Pass your custom adapter directly to the constructor:

dispatcher = AgentDispatcher(
    command="my-agent",
    queue_backend=MyCustomAdapter(),
)

Development

uv pip install -e ".[dev]"   # pip install -e ".[dev]" also works
pytest

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

artificer_dispatcher-0.2.5.tar.gz (147.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

artificer_dispatcher-0.2.5-py3-none-any.whl (46.6 kB view details)

Uploaded Python 3

File details

Details for the file artificer_dispatcher-0.2.5.tar.gz.

File metadata

  • Download URL: artificer_dispatcher-0.2.5.tar.gz
  • Upload date:
  • Size: 147.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.0 {"installer":{"name":"uv","version":"0.10.0","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for artificer_dispatcher-0.2.5.tar.gz
Algorithm Hash digest
SHA256 dc571f8c6a5675b2140ce1bf85a6756e438e8eac47d96eba0c72f437a2a15e5b
MD5 48a1e053e265708ab84fb59242257bd9
BLAKE2b-256 660b14b594165dc5f6b69b116a5ed79e6793a220df6404c346b6c4b430b8fc33

See more details on using hashes here.

File details

Details for the file artificer_dispatcher-0.2.5-py3-none-any.whl.

File metadata

  • Download URL: artificer_dispatcher-0.2.5-py3-none-any.whl
  • Upload date:
  • Size: 46.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.0 {"installer":{"name":"uv","version":"0.10.0","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for artificer_dispatcher-0.2.5-py3-none-any.whl
Algorithm Hash digest
SHA256 55855a67c6de745ee7fed42afc4c3f97c390653b22709114a9b209e25e75a8b7
MD5 dafc3cd68caf0b9a0295ef9bcd78a2a7
BLAKE2b-256 399cd7c38340af011498b5d9accce67acbdb20a5d914d5ba628af39aa0e888ca

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page