Skip to main content

A distributed background task system for Python functions

Project description

Docket is a distributed background task system for Python functions with a focus on the scheduling of future work as seamlessly and efficiency as immediate work.

PyPI - Version PyPI - Python Version GitHub main checks Codecov PyPI - License

At a glance

from datetime import datetime, timedelta, timezone

from docket import Docket


async def greet(name: str, greeting="Hello") -> None:
    print(f"{greeting}, {name} at {datetime.now()}!")


async with Docket() as docket:
    await docket.add(greet)("Jane")

    now = datetime.now(timezone.utc)
    soon = now + timedelta(seconds=3)
    await docket.add(greet, when=soon)("John", greeting="Howdy")
from docket import Docket, Worker

async with Docket() as docket:
    async with Worker(docket) as worker:
        await worker.run_until_finished()
Hello, Jane at 2025-03-05 13:58:21.552644!
Howdy, John at 2025-03-05 13:58:24.550773!

Why docket?

⚡️ Snappy one-way background task processing without any bloat

📅 Schedule immediate or future work seamlessly with the same interface

⏭️ Skip problematic tasks or parameters without redeploying

🌊 Purpose-built for Redis streams

🧩 Fully type-complete and type-aware for your background task functions

Installing docket

Docket is available on PyPI under the package name pydocket. It targets Python 3.12 or above.

With uv:

uv pip install pydocket

or

uv add pydocket

With pip:

pip install pydocket

Docket requires a Redis server with Streams support (which was introduced in Redis 5.0.0). Docket is tested with Redis 7.

Creating a Docket

Each Docket should have a name that will be shared across your system, like the name of a topic or queue. By default this is "docket". You can support many separate dockets on a single Redis server as long as they have different names.

Docket accepts a URL to connect to the Redis server (defaulting to the local server), and you can pass any additional connection configuration you need on that connection URL.

async with Docket(name="orders", url="redis://my-redis:6379/0") as docket:
    ...

The name and url together represent a single shared docket of work across all your system.

Scheduling work

A Docket is the entrypoint to scheduling immediate and future work. You define work in the form of async functions that return None. These task functions can accept any parameter types, so long as they can be serialized with cloudpickle.

def now() -> datetime:
    return datetime.now(timezone.utc)

async def send_welcome_email(customer_id: int, name: str) -> None:
    ...

async def send_followup_email(customer_id: int, name: str) -> None:
    ...

async with Docket() as docket:
    await docket.add(send_welcome_email)(12345, "Jane Smith")

    tomorrow = now() + timedelta(days=1)
    await docket.add(send_followup_email, when=tomorrow)(12345, "Jane Smith")

docket.add schedules both immediate work (the default) or future work (with the when: datetime parameter).

All task executions are identified with a key that captures the unique essence of that piece of work. By default they are randomly assigned UUIDs, but assigning your own keys unlocks many powerful capabilities.

async with Docket() as docket:
    await docket.add(send_welcome_email)(12345, "Jane Smith")

    tomorrow = now() + timedelta(days=1)
    key = "welcome-email-for-12345"
    await docket.add(send_followup_email, when=tomorrow, key=key)(12345, "Jane Smith")

If you've given your future work a key, then only one unique instance of that execution will exist in the future:

key = "welcome-email-for-12345"
await docket.add(send_followup_email, when=tomorrow, key=key)(12345, "Jane Smith")

Calling .add a second time with the same key won't do anything, so luckily your customer won't get two emails!

However, at any time later you can replace that task execution to alter when it will happen:

key = "welcome-email-for-12345"
next_week = now() + timedelta(days=7)
await docket.replace(send_followup_email, when=next_week, key=key)(12345, "Jane Smith")

what arguments will be passed:

key = "welcome-email-for-12345"
await docket.replace(send_followup_email, when=tomorrow, key=key)(12345, "Jane Q. Smith")

Or just cancel it outright:

await docket.cancel("welcome-email-for-12345")

Tasks may also be called by name, in cases where you can't or don't want to import the module that has your tasks. This may be common in a distributed environment where the code of your task system just isn't available, or it requires heavyweight libraries that you wouldn't want to import into your web server. In this case, you will lose the type-checking for .add and .replace calls, but otherwise everything will work as it does with the actual function:

await docket.add("send_followup_email", when=tomorrow)(12345, "Jane Smith")

These primitives of .add, .replace, and .cancel are sufficient to build a large-scale and robust system of background tasks for your application.

Writing tasks

Tasks are any async function that takes cloudpickle-able parameters, and returns None. Returning None is a strong signal that these are fire-and-forget tasks whose results aren't used or waited-on by your application. These are the only kinds of tasks that Docket supports.

Docket uses a parameter-based dependency and configuration pattern, which has become common in frameworks like FastAPI, Typer, or FastMCP. As such, there is no decorator for tasks.

A very common requirement for tasks is that they have access to schedule further work on their own docket, especially for chains of self-perpetuating tasks to implement distributed polling and other periodic systems. One of the first dependencies you may look for is the CurrentDocket:

from docket import Docket, CurrentDocket

POLLING_INTERVAL = timedelta(seconds=10)

async def poll_for_changes(file: Path, docket: Docket = CurrentDocket()) -> None:
    if file.exists():
        ...do something interesting...
        return
    else:
        await docket.add(poll_for_changes, when=now() + POLLING_INTERVAL)(file)

Here the argument to docket is an instance of Docket with the same name and URL as the worker it's running on. You can ask for the CurrentWorker and CurrentExecution as well. Many times it could be useful to have your own task key available in order to idempotently schedule future work:

from docket import Docket, CurrentDocket, TaskKey

async def poll_for_changes(
    file: Path,
    key: str = TaskKey(),
    docket: Docket = CurrentDocket()
) -> None:
    if file.exists():
        ...do something interesting...
        return
    else:
        await docket.add(poll_for_changes, when=now() + POLLING_INTERVAL, key=key)(file)

This helps to ensure that there is one continuous "chain" of these future tasks, as they all use the same key.

Configuring the retry behavior for a task is also done with a dependency:

from datetime import timedelta
from docket import Retry

async def faily(retry: Retry = Retry(attempts=5, delay=timedelta(seconds=3))):
    if retry.attempt == 4:
        print("whew!")
        return

    raise ValueError("whoops!")

In this case, the task faily will run 4 times with a delay of 3 seconds between each attempt. If it were to get to 5 attempts, no more would be attempted. This is a linear retry, and an ExponentialRetry is also available:

from datetime import timedelta
from docket import Retry, ExponentialRetry


async def faily(
    retry: Retry = Retry(
        attempts=5,
        minimum_delay=timedelta(seconds=2),
        maximum_delay=timedelta(seconds=32),
    ),
):
    if retry.attempt == 4:
        print("whew!")
        return

    raise ValueError("whoops!")

This would retry in 2, 4, 8, then 16 seconds before that fourth attempt succeeded.

Running workers

You can run as many workers as you like to process the tasks on your docket. You can either run a worker programmatically in Python, or via the CLI. Clients using docket have the advantage that they are usually passing the task functions, but workers don't necessarily know which tasks they are supposed to run. Docket solves this by allowing you to explicitly register tasks.

In my_tasks.py:

async def my_first_task():
    ...

async def my_second_task():
    ...

my_task_collection = [
    my_first_task,
    my_second_task,
]

From Python:

from my_tasks import my_task_collection

async with Docket() as docket:
    for task in my_task_collection:
        docket.register(task)

    async with Worker(docket) as worker:
        await worker.run_forever()

From the CLI:

docket worker --tasks my_tasks:my_task_collection

By default, workers will process up to 10 tasks concurrently, but you can adjust this to your needs with the concurrency= keyword argument or the --concurrency CLI option.

When a worker crashes ungracefully, any tasks it was currently executing will be held for a period of time before being redelivered to other workers. You can control this time period with redelivery_timeout= or --redelivery-timeout. You'd want to set this to a value higher than the longest task you expect to run. For queues of very fast tasks, a few seconds may be ideal; for long data-processing steps involving large amount of data, you may need minutes.

Hacking on docket

We use uv for project management, so getting set up should be as simple as cloning the repo and running:

uv sync

The to run the test suite:

pytest

We aim to main 100% test coverage, which is required for all PRs to docket. We believe that docket should stay small, simple, understandable, and reliable, and that begins with testing all the dusty branches and corners. This will give us the confidence to upgrade dependencies quickly and to adapt to new versions of Redis over time.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pydocket-0.6.2.tar.gz (84.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pydocket-0.6.2-py3-none-any.whl (32.0 kB view details)

Uploaded Python 3

File details

Details for the file pydocket-0.6.2.tar.gz.

File metadata

  • Download URL: pydocket-0.6.2.tar.gz
  • Upload date:
  • Size: 84.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.6.10

File hashes

Hashes for pydocket-0.6.2.tar.gz
Algorithm Hash digest
SHA256 819042d9232686c0ef82a4363f3657faeed361ad1b782582725ff180730cdefa
MD5 8aad0cfa3eaa29afc69d2f6168b5f895
BLAKE2b-256 a16509aa5e0d399e1990ef92bd1130465c7554b9a03a4747f73f510829bc1d59

See more details on using hashes here.

File details

Details for the file pydocket-0.6.2-py3-none-any.whl.

File metadata

  • Download URL: pydocket-0.6.2-py3-none-any.whl
  • Upload date:
  • Size: 32.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.6.10

File hashes

Hashes for pydocket-0.6.2-py3-none-any.whl
Algorithm Hash digest
SHA256 a039e14307371ef4644f1ac58cf81c72c514da7e763bf102ad17de7053f74c9a
MD5 5be6891b9bb488186168ee55478388bb
BLAKE2b-256 197989be2d8213bf7ed991c54779a461c0bd10badbb0ad20d046c43cd108b6b9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page