Skip to main content

Polls project management APIs for ready tickets and spawns AI agents to work on them

Project description

artificer-dispatcher

Polls task queues, dispatches agent subprocesses, and exposes an HTTP API so agents can interact with tasks without knowing which backend is in use.

How it works

The router polls configured queues for ready tasks. When it finds one, it moves the task to an in-progress queue, spawns a subprocess (any command), and passes task details to the agent. The subprocess uses a local HTTP API to read task details, post comments, update fields, and move the task when done. The agent never talks to the backend directly.

Key concepts

  • Backend adapters — Protocol-based (TaskAdapter, 11 methods). Ships with a JSON file adapter. A Planka adapter is included as a user-land example (planka_backend.py). Implement the protocol for anything else (Jira, Trello, Linear, SQLite, SQS, etc.).
  • Agent adapters — Protocol-based (AgentAdapter). Ships with a Claude adapter (session tracking, resume hints) and a default pass-through for any command.
  • Routes — Flask-style @dispatcher.route() decorators map queues to prompt-generating functions.
  • HTTP API — Agents hit localhost. No credentials, no backend coupling.

Quick start

Requires Python 3.13+.

uv pip install -e .          # preferred (pip install -e . also works)
uv pip show artificer-dispatcher  # verify the install succeeded

Create a Python script (e.g. run.py):

from artificer import AgentDispatcher, JsonFileAdapter

dispatcher = AgentDispatcher(
    command="claude",
    poll_interval=30,
    agent_timeout=600,
    max_concurrent_agents=3,
    queue_backend=JsonFileAdapter("/tmp/board.json"),
)

@dispatcher.route(
    args=["--agent", "engineer", "-p"],
    queue_name="Todo",
    in_progress_queue="In Progress",
)
def engineer_agent(task_id: str, task_name: str) -> str:
    return f"Work on task {task_id}: {task_name}."

if __name__ == "__main__":
    dispatcher.run(debug=True)  # enable DEBUG logging (default: False)

For Planka users, see planka_backend.py at the repo root for a ready-made backend.

python run.py

This starts two things:

  1. Router — polls configured queues, picks up tasks, moves them to in-progress, and spawns agent subprocesses.
  2. HTTP API — listens on http://{api_host}:{api_port} so spawned agents can interact with tasks.

Configuration reference

All configuration is done via the AgentDispatcher constructor.

Constructor arguments

Argument Type Default Description
command str (required) Base command to run for all routes (e.g. "claude")
poll_interval int 30 Seconds between polls
agent_timeout int | None None Default timeout in seconds for all agents
max_concurrent_agents int 3 Max agent processes at once
api_host str "127.0.0.1" HTTP API bind address
api_port int 8000 HTTP API port
queue_backend TaskAdapter | None None Task backend (required before calling run()). Any object with a create_adapter() method also works.
agent_adapters dict[str, AgentAdapter] | None None Custom agent adapters by command name
enable_queue_management bool False Enable queue CRUD HTTP endpoints

Route decorator

The @dispatcher.route() decorator registers a queue-to-command mapping. The decorated function receives (task_id, task_name) and returns a prompt string appended to the command arguments.

@dispatcher.route(
    queue_name="My Project.My Board.Todo",          # required: queue to poll
    in_progress_queue="My Project.My Board.WIP",    # default: "In Progress"
    args=["--agent", "engineer", "-p"],              # extra args before prompt
    timeout=1800,                                    # route-specific timeout (optional)
    poll_interval=10,                                # route-specific poll interval (optional)
    priority=1,                                      # dispatch priority (optional)
)
def my_agent(task_id: str, task_name: str) -> str:
    return f"Work on task {task_id}: {task_name}."

Agent timeouts

You can configure timeouts to automatically terminate agent processes that run too long:

  • agent_timeout (constructor): Sets a global timeout in seconds for all agents. If not specified, agents run indefinitely.
  • timeout (per-route): Sets a route-specific timeout in seconds. Overrides agent_timeout for that route.

When an agent times out:

  1. The process receives a TERM signal and has 5 seconds to exit gracefully
  2. If it doesn't exit, it receives a KILL signal
  3. A comment is added to the task noting the timeout
dispatcher = AgentDispatcher(
    command="my-agent",
    agent_timeout=3600,  # 1 hour default for all agents
    queue_backend=my_backend,
)

@dispatcher.route(
    queue_name="Quick Tasks",
    timeout=300,  # 5 minutes for quick tasks (overrides default)
)
def quick(task_id, task_name):
    return f"Handle {task_id}"

@dispatcher.route(queue_name="Long Tasks")
# No timeout — uses default of 3600 seconds
def long_running(task_id, task_name):
    return f"Handle {task_id}"

Route priority

When max_concurrent_agents is limited, routes with lower priority values are dispatched first. This lets you ensure downstream queues (closer to completion) are serviced before upstream ones, so a task flows all the way through a pipeline before new work begins.

Routes without an explicit priority use their registration order as a tiebreaker.

@dispatcher.route(queue_name="QA",          priority=1)  # serviced first
def qa(task_id, task_name):
    return f"Review {task_id}"

@dispatcher.route(queue_name="Engineering", priority=2)
def eng(task_id, task_name):
    return f"Implement {task_id}"

@dispatcher.route(queue_name="Todo",        priority=3)  # serviced last
def todo(task_id, task_name):
    return f"Handle {task_id}"

Per-queue poll intervals

By default, all queues are polled at the global poll_interval rate. You can override this per-route to poll high-priority queues more frequently or low-priority queues less often:

The router's internal tick rate automatically adjusts to the shortest configured interval, so no queue is ever starved.

dispatcher = AgentDispatcher(
    command="my-agent",
    poll_interval=60,  # default for all queues
    queue_backend=my_backend,
)

@dispatcher.route(queue_name="High Priority", poll_interval=10)
def urgent(task_id, task_name):
    return f"Handle {task_id}"

@dispatcher.route(queue_name="Background", poll_interval=1800)
def background(task_id, task_name):
    return f"Handle {task_id}"

@dispatcher.route(queue_name="Normal")
# No poll_interval — uses global default of 60s
def normal(task_id, task_name):
    return f"Handle {task_id}"

HTTP API

Method Endpoint Description
GET /tasks/{task_id} Full task info: description, labels, assignees, comments
POST /tasks/{task_id}/comments Post a comment on a task ({"comment": "text"})
POST /tasks/{task_id}/move Move a task to a different queue ({"target_queue": "name"})
PATCH /tasks/{task_id} Update task fields ({"name": "...", "description": "...", "labels": [...], "assignees": [...]})
POST /tasks Create a new task ({"queue_name": "...", "name": "...", "description": "..."})
GET /queues List all queues with task counts
GET /queues/{queue_name} Get details for a specific queue
POST /queues Create a new queue ({"name": "..."})
PATCH /queues/{queue_name} Update/rename a queue ({"name": "..."})
DELETE /queues/{queue_name} Delete an empty queue
GET /status Router status: active agents, available slots

Task lifecycle

  1. Task sits in a watched queue (e.g. Todo)
  2. Router picks it up, moves it to the in-progress queue, and assigns the authenticated user
  3. Router spawns the configured command as a subprocess
  4. The agent uses the HTTP API to read task details, add comments, etc.
  5. When finished, the agent calls the move endpoint to move the task to a done queue

Backends

Planka

The Planka backend is provided as a user-land file (planka_backend.py at the repo root), not as part of the library. It requires plankapy>=2.3.0 to be installed separately:

uv pip install plankapy>=2.3.0

Uses dot-notation for queue naming: Project.Board.List.

from artificer import AgentDispatcher
from planka_backend import PlankaBackend

dispatcher = AgentDispatcher(
    command="my-agent",
    queue_backend=PlankaBackend(url="http://localhost:1337"),
)

@dispatcher.route(
    queue_name="My Project.My Board.Todo",
    in_progress_queue="My Project.My Board.In Progress",
    args=["-p"],
)
def handle(task_id: str, task_name: str) -> str:
    return f"Work on task {task_id}: {task_name}"

Planka authentication

Credentials can be passed directly as kwargs or resolved from environment variables:

# Option 1: API token (kwarg)
PlankaBackend(url="http://localhost:3000", token="your-token-here")

# Option 2: Username + password (kwargs)
PlankaBackend(url="http://localhost:3000", username="admin", password="secret")

# Option 3: Environment variables (default when no kwargs are given)
# PLANKA_TOKEN=your-token-here
# — or —
# PLANKA_USER=admin  +  PLANKA_PASSWORD=secret
PlankaBackend(url="http://localhost:3000")

Credentials are resolved at dispatcher.run() time, not at import time. If you use .env files, call dotenv.load_dotenv() in your script before dispatcher.run().

JSON file

For development/testing or lightweight use without external services.

from artificer import AgentDispatcher, JsonFileAdapter

dispatcher = AgentDispatcher(
    command="my-agent",
    queue_backend=JsonFileAdapter("/tmp/board.json"),
)

The JSON file structure:

{
  "queues": {
    "Todo": [
      {"id": "1", "name": "Fix crash", "description": "...", "labels": [], "assignees": [], "comments": [], "tasks": []}
    ],
    "In Progress": [],
    "Done": []
  }
}

Custom

Implement the TaskAdapter protocol (11 methods) in artificer/adapters/base.py:

  • get_ready_tasks(queue_names) — Return tasks from the given queues
  • get_task(task_id) — Return a single task by ID
  • move_task(task_id, target_queue) — Move a task between queues
  • add_comment(task_id, text) — Add a comment to a task
  • update_task(task_id, *, assignees, name, description, labels) — Update task fields
  • create_task(queue_name, name, description) — Create a new task
  • list_queues() — List all queues with task counts
  • get_queue(queue_name) — Get a single queue's info
  • create_queue(queue_name) — Create a new empty queue
  • update_queue(queue_name, *, new_name) — Rename a queue
  • delete_queue(queue_name) — Delete an empty queue

Pass your custom adapter directly to the constructor:

dispatcher = AgentDispatcher(
    command="my-agent",
    queue_backend=MyCustomAdapter(),
)

Development

uv pip install -e ".[dev]"   # pip install -e ".[dev]" also works
pytest

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

artificer_dispatcher-0.2.4.tar.gz (64.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

artificer_dispatcher-0.2.4-py3-none-any.whl (23.1 kB view details)

Uploaded Python 3

File details

Details for the file artificer_dispatcher-0.2.4.tar.gz.

File metadata

  • Download URL: artificer_dispatcher-0.2.4.tar.gz
  • Upload date:
  • Size: 64.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.0 {"installer":{"name":"uv","version":"0.10.0","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for artificer_dispatcher-0.2.4.tar.gz
Algorithm Hash digest
SHA256 8eec1e83ec2dbf8fccc17684279ec9082c3f63065d947241b945e00e2edf8ff8
MD5 080b071c4a285c45ca5ab8487a049b7b
BLAKE2b-256 bfaefdd8816c2d79bf4f918b3d523a8ba141a7e1f77d0c84ef2ff36185c29c69

See more details on using hashes here.

File details

Details for the file artificer_dispatcher-0.2.4-py3-none-any.whl.

File metadata

  • Download URL: artificer_dispatcher-0.2.4-py3-none-any.whl
  • Upload date:
  • Size: 23.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.0 {"installer":{"name":"uv","version":"0.10.0","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for artificer_dispatcher-0.2.4-py3-none-any.whl
Algorithm Hash digest
SHA256 a3440f27fc4fc6af879e86c7848cac2c321a994c6f7ad9318131b1fb6bc44ce1
MD5 279183ff0f300594cc6627fc05dc6b5d
BLAKE2b-256 478782771bae4d3456b87b0e9c65bc001e98cd59b9f1264c9fc7edb6cba9a417

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page