Skip to main content

TaskIQ integration for HawkAPI — modern async-native task queue, DI, scheduling, JSON-only formatter

Project description

hawkapi-taskiq

TaskIQ integration for HawkAPI. Modern async-native task queue — a lighter, async-first alternative to Celery.

Install

pip install hawkapi-taskiq
pip install 'hawkapi-taskiq[redis]'    # + taskiq-redis
pip install 'hawkapi-taskiq[nats]'     # + taskiq-nats
pip install 'hawkapi-taskiq[cron]'     # + croniter for schedule validation

Quickstart

from hawkapi import Depends, HawkAPI
from hawkapi_taskiq import TaskIQConfig, get_broker, init_taskiq, task

app = HawkAPI()
broker = init_taskiq(app, config=TaskIQConfig(broker_url="redis://localhost:6379/0"))


@task(broker, name="emails.send")
async def send_email(to: str, subject: str) -> None:
    ...


@app.post("/notify")
async def notify(email: str, b = Depends(get_broker)):
    await send_email.kiq(email, "Hello")
    return {"ok": True}

Broker selection

Choose by URL scheme — all others are rejected:

URL Broker
memory:// InMemoryBroker (tests, single-process)
redis://host:6379/0 ListQueueBroker (taskiq-redis)
rediss://... same, with TLS
nats://server:4222 NatsBroker (taskiq-nats)

Any other scheme raises ValueError at create_broker() — this is a security feature, not a limitation. The allowlist prevents accidentally enabling brokers that use unsafe deserialization formats.

Scheduling

v0.1.0 ships a Scheduled value type with cron-syntax validation. Wire it to TaskIQ's native scheduler yourself — we deliberately avoid a "magic" registration helper that doesn't compose cleanly with the upstream TaskiqScheduler:

from hawkapi_taskiq import Scheduled
from taskiq import TaskiqScheduler
from taskiq.schedule_sources import LabelScheduleSource


@task(broker, name="myapp.cleanup")
async def cleanup() -> None:
    ...


# 1. Validate the schedule (cron syntax) up front.
schedule = Scheduled(cron="0 * * * *")    # raises ValueError if malformed

# 2. Apply it as a label LabelScheduleSource reads.
cleanup.labels["schedule"] = [{"cron": schedule.cron, "args": [], "kwargs": {}}]


# 3. Run a scheduler process alongside the worker.
scheduler = TaskiqScheduler(broker=broker, sources=[LabelScheduleSource(broker)])

Scheduled(cron="...") validates via croniter (install with [cron] extra). Both cron and interval_seconds set are rejected (exactly one is required).

Health

from hawkapi_taskiq import check_broker

report = await check_broker(broker)
# HealthReport(broker_ok=True, broker_type="ListQueueBroker", error="")

Testing

from hawkapi_taskiq import in_memory_broker, task


async def test_my_task():
    async with in_memory_broker() as broker:
        @task(broker, name="t.work")
        async def work(x: int) -> int:
            return x * 2

        await work.kiq(21)
        # Execute pending tasks via TaskIQ's normal flow.

Security

  • JSON-only serialization — TaskIQ defaults are fine; we explicitly reject any other serializer via TaskIQConfig.serializer to prevent arbitrary-deserialization at consume time (CWE-502).
  • Broker URL scheme allowlist — only memory://, redis://, rediss://, nats://.
  • Task name registry — duplicate @task(name=...) raises at registration. TaskIQ silently overrides; we disallow.
  • Cron expression validation at registration time — malformed expressions fail fast.

Development

git clone https://github.com/Hawk-API/hawkapi-taskiq.git
cd hawkapi-taskiq
uv sync --extra dev
uv run pytest -q
uv run ruff check . && uv run ruff format --check .
uv run pyright src/

License

MIT.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

hawkapi_taskiq-0.1.1.tar.gz (72.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

hawkapi_taskiq-0.1.1-py3-none-any.whl (11.9 kB view details)

Uploaded Python 3

File details

Details for the file hawkapi_taskiq-0.1.1.tar.gz.

File metadata

  • Download URL: hawkapi_taskiq-0.1.1.tar.gz
  • Upload date:
  • Size: 72.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for hawkapi_taskiq-0.1.1.tar.gz
Algorithm Hash digest
SHA256 2c4c9874e075ee41966ee729301b7340dc0ddc6534520310370db59ae816b89f
MD5 3a38fbcd24bbc95f0a527d3e09bd2ae2
BLAKE2b-256 28ff20afd4532c2d2acc92b9c7a1c2c0f2169eb49dd6db839d62178e1f6bbfd8

See more details on using hashes here.

Provenance

The following attestation bundles were made for hawkapi_taskiq-0.1.1.tar.gz:

Publisher: release.yml on Hawk-API/hawkapi-taskiq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file hawkapi_taskiq-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: hawkapi_taskiq-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 11.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for hawkapi_taskiq-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 b1bcd9c0d474033cd065f51705540de70f2d63857faf8833bb2bf61d0a0b2ba7
MD5 055b5dbc58db20d5b62819ac974f56d5
BLAKE2b-256 2be25858d9e9cb969a6744f1cb0e3211bf416af90bf08c7ee8bae893383f803d

See more details on using hashes here.

Provenance

The following attestation bundles were made for hawkapi_taskiq-0.1.1-py3-none-any.whl:

Publisher: release.yml on Hawk-API/hawkapi-taskiq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page