Async task queue orchestrator with resource-aware scheduling
Project description
atqo
Async task queue orchestrator with resource-aware scheduling.
Each actor class declares what it consumes (requirements); the scheduler holds the pool (resources) and decides how many actors of each class to run. Optional per-task rate limits gate dispatch against time-windowed budgets.
Install
uv add atqo
Usage
from atqo import ActorBase, Scheduler, SchedulerTask, SingleCPUActor
class Scraper(SingleCPUActor):
def consume(self, url):
return fetch(url)
class HeavyActor(ActorBase):
requirements = {"cpu": 2, "mem": 4}
def consume(self, arg):
return process(arg)
scheduler = Scheduler(
actors=[Scraper, HeavyActor],
resources={"cpu": 8, "mem": 16},
)
scheduler.refill_task_queue(
[SchedulerTask(u, actor=Scraper) for u in urls]
+ [SchedulerTask(j, actor=HeavyActor) for j in jobs]
)
results = scheduler.join()
Rate limits
Per-task budgets recover over time (token bucket), independent of the static resource pool:
from atqo import RateLimit
scheduler = Scheduler(
actors=[Scraper],
resources={"cpu": 4},
rate_limits={"site_a": RateLimit(10, per_seconds=60)},
)
SchedulerTask(url, actor=Scraper, rate_costs={"site_a": 1})
A task whose rate_costs exceeds a bucket's capacity raises ImpossibleRateCost at ingress — it could never run.
Simple parallel API
from atqo import parallel_map, parallel_consume
results = parallel_map(expensive_fn, items, workers=4)
parallel_consume(MyActor, items, workers=4)
Patterns
Stateful actors (logged-in browser, warm cache, etc.)
Register a separate actor class. Its __init__ performs the setup; the scheduler routes tasks needing that state via actor=.
class Browser(ActorBase):
requirements = {"browser_slot": 1}
def __init__(self):
self.driver = open_browser()
def consume(self, url):
return self.driver.fetch(url)
class LoggedInBrowser(Browser):
def __init__(self):
super().__init__()
self.driver.login(USER, PW)
scheduler = Scheduler(
actors=[Browser, LoggedInBrowser],
resources={"browser_slot": 4},
)
SchedulerTask(url, actor=LoggedInBrowser)
Hang protection
Every blocking wait in the scheduler is bounded. Optional knobs on Scheduler(...):
task_timeout: per-task wall-clock cap. Exceeded attempts fail like any exception (count againstallowed_fail_count).stall_timeout: if no progress for this long, raiseSchedulerStalled.poison_timeout: how long graceful actor drain waits before force-cancel (default 5s).
cleanup() and join() always terminate; they cancel listeners and stop the event loop unconditionally.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file atqo-2.0.0.tar.gz.
File metadata
- Download URL: atqo-2.0.0.tar.gz
- Upload date:
- Size: 35.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.16 {"installer":{"name":"uv","version":"0.11.16","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6d7a263699907b9e4ad63137e1fdcc04783c48ee153ef1704549895452f645c5
|
|
| MD5 |
9cf6892501241cd2c771c6f8c235d143
|
|
| BLAKE2b-256 |
559713b20a8c86c76a285b47701d564b6f3fa2fdc4e6c194d5efc533f31a297c
|
File details
Details for the file atqo-2.0.0-py3-none-any.whl.
File metadata
- Download URL: atqo-2.0.0-py3-none-any.whl
- Upload date:
- Size: 31.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.16 {"installer":{"name":"uv","version":"0.11.16","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
eef0154681bbd13150d5b21f9d5c436e8de96885f86070cd9c90bf9d27b3c268
|
|
| MD5 |
70249f1a8af83ee9077a44b68d6d2b99
|
|
| BLAKE2b-256 |
a7df3859bb430d23f7a9b79126ca21df686bfba1b000e197b19d9a897e171184
|