Skip to main content

TaskIQ integration for HawkAPI — modern async-native task queue, DI, scheduling, JSON-only formatter

Project description

hawkapi-taskiq

TaskIQ integration for HawkAPI. Modern async-native task queue — a lighter, async-first alternative to Celery.

Install

pip install hawkapi-taskiq
pip install 'hawkapi-taskiq[redis]'    # + taskiq-redis
pip install 'hawkapi-taskiq[nats]'     # + taskiq-nats
pip install 'hawkapi-taskiq[cron]'     # + croniter for schedule validation

Quickstart

from hawkapi import Depends, HawkAPI
from hawkapi_taskiq import TaskIQConfig, get_broker, init_taskiq, task

app = HawkAPI()
broker = init_taskiq(app, config=TaskIQConfig(broker_url="redis://localhost:6379/0"))


@task(broker, name="emails.send")
async def send_email(to: str, subject: str) -> None:
    ...


@app.post("/notify")
async def notify(email: str, b = Depends(get_broker)):
    await send_email.kiq(email, "Hello")
    return {"ok": True}

Broker selection

Choose by URL scheme — all others are rejected:

URL Broker
memory:// InMemoryBroker (tests, single-process)
redis://host:6379/0 ListQueueBroker (taskiq-redis)
rediss://... same, with TLS
nats://server:4222 NatsBroker (taskiq-nats)

Any other scheme raises ValueError at create_broker() — this is a security feature, not a limitation. The allowlist prevents accidentally enabling brokers that use unsafe deserialization formats.

Scheduling

v0.1.0 ships a Scheduled value type with cron-syntax validation. Wire it to TaskIQ's native scheduler yourself — we deliberately avoid a "magic" registration helper that doesn't compose cleanly with the upstream TaskiqScheduler:

from hawkapi_taskiq import Scheduled
from taskiq import TaskiqScheduler
from taskiq.schedule_sources import LabelScheduleSource


@task(broker, name="myapp.cleanup")
async def cleanup() -> None:
    ...


# 1. Validate the schedule (cron syntax) up front.
schedule = Scheduled(cron="0 * * * *")    # raises ValueError if malformed

# 2. Apply it as a label LabelScheduleSource reads.
cleanup.labels["schedule"] = [{"cron": schedule.cron, "args": [], "kwargs": {}}]


# 3. Run a scheduler process alongside the worker.
scheduler = TaskiqScheduler(broker=broker, sources=[LabelScheduleSource(broker)])

Scheduled(cron="...") validates via croniter (install with [cron] extra). Both cron and interval_seconds set are rejected (exactly one is required).

Health

from hawkapi_taskiq import check_broker

report = await check_broker(broker)
# HealthReport(broker_ok=True, broker_type="ListQueueBroker", error="")

Testing

from hawkapi_taskiq import in_memory_broker, task


async def test_my_task():
    async with in_memory_broker() as broker:
        @task(broker, name="t.work")
        async def work(x: int) -> int:
            return x * 2

        await work.kiq(21)
        # Execute pending tasks via TaskIQ's normal flow.

Security

  • JSON-only serialization — TaskIQ defaults are fine; we explicitly reject any other serializer via TaskIQConfig.serializer to prevent arbitrary-deserialization at consume time (CWE-502).
  • Broker URL scheme allowlist — only memory://, redis://, rediss://, nats://.
  • Task name registry — duplicate @task(name=...) raises at registration. TaskIQ silently overrides; we disallow.
  • Cron expression validation at registration time — malformed expressions fail fast.

Development

git clone https://github.com/ashimov/hawkapi-taskiq.git
cd hawkapi-taskiq
uv sync --extra dev
uv run pytest -q
uv run ruff check . && uv run ruff format --check .
uv run pyright src/

License

MIT.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

hawkapi_taskiq-0.1.0.tar.gz (72.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

hawkapi_taskiq-0.1.0-py3-none-any.whl (11.9 kB view details)

Uploaded Python 3

File details

Details for the file hawkapi_taskiq-0.1.0.tar.gz.

File metadata

  • Download URL: hawkapi_taskiq-0.1.0.tar.gz
  • Upload date:
  • Size: 72.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for hawkapi_taskiq-0.1.0.tar.gz
Algorithm Hash digest
SHA256 2368a8b067e11abfb09169f962adcf5349e448a2236c5ecb7a678201579dd8cd
MD5 54ef7424e9f270565fc3f1a7162c8201
BLAKE2b-256 6993e5b5770f0cd080032dae317dc32e8626f76781d16246fb9211d4f26d23d3

See more details on using hashes here.

Provenance

The following attestation bundles were made for hawkapi_taskiq-0.1.0.tar.gz:

Publisher: release.yml on ashimov/hawkapi-taskiq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file hawkapi_taskiq-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: hawkapi_taskiq-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 11.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for hawkapi_taskiq-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 5c365d208958982dfc5f7ca62992c6c001ad4e2bfc58bab9c099afd7923ed27d
MD5 9c730f5825c82867a34fdc8419ff8dec
BLAKE2b-256 40e2061d4253ed0b83b2f9538cf7921cb76e28eb22942ce2288325974da6464b

See more details on using hashes here.

Provenance

The following attestation bundles were made for hawkapi_taskiq-0.1.0-py3-none-any.whl:

Publisher: release.yml on ashimov/hawkapi-taskiq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page