Skip to main content

An async python Background task system using Azure Service Bus Queues

Project description

Boilermaker

This project is an extremely-lightweight task-runner exclusively for async Python and Azure Service Bus Queues. If you need a fully fledged task-runner which allows connecting to different backends, you should consider Celery instead.

To Install

pip install "boilermaker-servicebus"

How to Use

The Boilermaker application object requires some state (which will get sent to all handlers as the first argument upon invocation) and an authenticated ServiceBus client.

A task handler for Boilermaker is any async function which takes as its first argument application state and which has been registered:

# This is a background task that we'll register
async def background_task1(state, somearg, somekwarg=True):
    """`state` must be first argument."""
    await state.data.get("key")
    print(state)
    print(somearg)
    print(somekwarg)

# Our task must be registered to be invocable.
boilermaker_app.register_async(background_task1, policy=...A retry policy goes here...)

Note: Boilermaker does not currently have a way to store or use the results of tasks, and all arguments must be JSON-serializable.

Complete Example

For a fuller example, in our application, we may have some code like the following:

from azure.identity.aio import DefaultAzureCredential
from boilermaker.app import Boilermaker
from boilermaker.config import Config
# The boilermaker ServiceBus wrapper is for convenience/example
from boilermaker.service_bus import AzureServiceBus
from boilermaker import retries


# This represents our "App" object which will be passed as `state` to our handlers
class App:
    def __init__(self, data):
        self.data = data


# This is a background task that we'll register
async def background_task1(state: App, somearg, somekwarg=True):
    """`state` must be first argument."""
    await state.data.get("key")
    print(state)
    print(somearg)
    print(somekwarg)


service_bus_namespace_url = os.environ["SERVICE_BUS_NAMESPACE_URL"]
service_bus_queue_name = os.environ["SERVICE_BUS_QUEUE_NAME"]
conf = Config(
    service_bus_namespace_url=service_bus_namespace_url,
    service_bus_queue_name=service_bus_queue_name,
)
# We create a service bus Sender client
service_bus_client = AzureServiceBus.from_config(conf)

# Next we'll create a worker and register our task
worker = Boilermaker(App({"key": "value"}), service_bus_client=service_bus_client)
worker.register_async(background_task1, policy=retries.RetryPolicy.default())


# Finally, this will schedule the task
async def publish_task():
    await worker.apply_async(
        background_task1, "first-arg", somekwarg=False
    )

if __name__ == "__main__":
    asyncio.run(publish_task())

In another process, we'll need to run a background task worker. That looks like this:

async def run_worker():
    await worker.run()

If we look in the logs for our other process, we should be able to see this task registered and run:

{"event": "Registered background function fn=background_task1", "level": "info", "logger": "boilermaker.app", "timestamp": "2024/12/10 15:46:54"}
{"event": "[background_task1] Begin Task sequence_number=19657", "level": "info", "logger": "boilermaker.app", "timestamp": "2024/12/10 15:52:23"}
...prints-go-here...
{"event": "[background_task1] Completed Task sequence_number=19657 in 0.00021130498498678207s", "level": "info", "logger": "boilermaker.app", "timestamp": "2024/12/10 15:52:23"}

Callbacks and Chains

It is possible to register callbacks for tasks, which can run on success or failure. To schedule a task with callbacks, we have to create a task object and then set a success and/or failure callback. Finally, instead of apply_async we have to call publish_task:

...see worker-instantation example above...

from boilermaker.failure import TaskFailureResult

async def a_background_task(state, param: str):
    if param == "success":
        print("Things seem to be going really well")
        return None
    elif param == "fail":
        return TaskFailureResult
    raise ValueError("Exceptions are a bummer!")


async def happy_path(state):
    print("Everything is great!")


async def sad_path(state):
    print("Everything is sad")


# We need to be sure our tasks are registered
worker.register_async(a_background_task, policy=retries.NoRetry())
worker.register_async(happy_path, policy=retries.NoRetry())
worker.register_async(sad_path, policy=retries.NoRetry())

# Now we can create a happy task and add callbacks
happy_task = worker.create_task(a_background_task, "success")

# For good measure, we'll create a sad task too
sad_task = worker.create_task(a_background_task, "uh oh!")

# This callback should get scheduled on success
happy_task >> worker.create_task(happy_path)

# It's also possible to assign tasks directly to `on_success` and `on_failure`.
# This callback should not get scheduled
sad_task.on_success = worker.create_task(happy_path)
# This callback should get scheduled
sad_task.on_failure = worker.create_task(sad_path)

# Finally, this will schedule the tasks
async def publish_task():
    await worker.publish_task(happy_task)
    # If we wait a bit we can see more clearly one task before the other
    await asyncio.sleep(4)
    await worker.publish_task(sad_task)

Chains

It's also possible to chain tasks together, like this:

async def publish_chain():
    fail = Task.si(sad_path)
    workflow = app.chain(
            Task.si(a_background_task, "success"),
            Task.si(happy_path),
            Task.si(a_background_task, "fail"),
            on_failure=fail,
        )
    await app.publish_task(workflow)

Here are examples of log out from running the above:

// Logs from registering the functions
{"event": "Registered background function fn=callback_task_test", "level": "info", "logger": "boilermaker.app", "timestamp": "2024/12/10 15:46:54"}
{"event": "Registered background function fn=happy_path", "level": "info", "logger": "boilermaker.app", "timestamp": "2024/12/10 15:46:54"}
{"event": "Registered background function fn=sad_path", "level": "info", "logger": "boilermaker.app", "timestamp": "2024/12/10 15:46:54"}
// Happy path test ->
{"event": "[callback_task_test] Begin Task sequence_number=19657", "version": "pr72-0.25.1", "level": "info", "logger": "boilermaker.app", "timestamp": "2024/12/10 15:52:23"}
{"event": "[callback_task_test] Completed Task sequence_number=19657 in 0.00021130498498678207s", "level": "info", "logger": "boilermaker.app", "timestamp": "2024/12/10 15:52:23"}
{"event": "[happy_path] Begin Task sequence_number=19659", "level": "info", "logger": "boilermaker.app", "timestamp": "2024/12/10 15:52:24"}
{"event": "[happy_path] Completed Task sequence_number=19659 in 0.0001390039687976241s", "level": "info", "logger": "boilermaker.app", "timestamp": "2024/12/10 15:52:24"}
// Sad path test ->
{"event": "[callback_task_test] Begin Task sequence_number=19661", "level": "info", "logger": "boilermaker.app", "timestamp": "2024/12/10 15:52:27"}
{"event": "Failed processing task sequence_number=19661  Traceback (most recent call last):\n  File \"/app/.venv/lib/python3.12/site-packages/boilermaker/app.py\", line 238, in message_handler\n    result = await self.task_handler(task, sequence_number)\n             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/app/.venv/lib/python3.12/site-packages/boilermaker/app.py\", line 306, in task_handler\n    result = await function(\n             ^^^^^^^^^^^^^^^\n  File \"/app/boilermaker_example.py\", line 210, in callback_task_test\n    raise ValueError(\"Negative numbers are a bummer\")\nValueError: Negative numbers are a bummer\n", "level": "error", "logger": "boilermaker.app", "timestamp": "2024/12/10 15:52:27"}
{"event": "[sad_path] Begin Task sequence_number=19663", "level": "info", "logger": "boilermaker.app", "timestamp": "2024/12/10 15:52:28"}
{"event": "[sad_path] Completed Task sequence_number=19663 in 0.00020030408632010221s", "level": "info", "logger": "boilermaker.app", "timestamp": "2024/12/10 15:52:28"}

Retries

Retries are configured in Boilermaker using a RetryPolicy:

class RetryPolicy(BaseModel):
    # Capped at this value
    max_tries: int = MAX_REASONABLE_RETRIES
    # Seconds
    delay: int = 0
    # Max seconds
    delay_max: int = MAX_BACKOFF_SECONDS
    # Mode for retrying
    retry_mode: RetryMode = RetryMode.Fixed

Retries are scheduled only by raising a RetryException:

def sometask(state):
    raise RetryException("Please try again!")

Note: all tasks must be registered with a RetryPolicy, but Boilermaker will only retry tasks that have raised a RetryException.

If you do not want your tasks to be retried:

  • Do not throw a RetryException from inside of your task.
  • Register them with a NoRetry policy: worker.register_async(..., policy=retries.NoRetry())

NoRetry() is simply a special policy where max_tries=1.

Note: on_failure callbacks will only be run after all retries have been exhausted.

Scheduling Retries

There are three places where retries for a task can be configured:

  • When initially registering a task (the task default).
  • When scheduling a task (overrides the default for that invocation).
  • When raising a RetryException with a custom policy (overrides default if different).

Examples of each are included below.

Register Task with a Default Retry Policy

When a task has been registered, we can set a retry policy for any invocation of it:

from boilermaker import retries

async def a_background_task(state, param: str):
    if param == "throwing":
        # This will retry using the *default policy* set when the task was registered
        raise retries.RetryException("We are thrown")
    return "OK"

worker.register_async(a_background_task, policy=retries.RetryPolicy.default())

This example works like a template for running the task. If a_background_task gets retried, Boilermaker will use the default policy to calculate its next delay and max attempts allowed.

Scheduling a Task with a Different Retry Policy

Sometimes we'd like to publish our task so that its retries follow a different plan than the task was originally registered with.

We can publish (schedule) a particular task with a different retry policy like this:

from boilermaker import retries

async def a_background_task(state, param: str):
    if param == "throwing":
        # This will retry using the RetryPolicy set
        # when *scheduling* the task (see below)
        raise retries.RetryException("We are thrown")
    return "OK"

# First we register the task
worker.register_async(a_background_task, policy=retries.RetryPolicy.default())

async def schedule_task_with_different_policy():
    new_policy=retries.RetryPolicy(
        max_tries=100, delay=2, delay_max=1000, retry_mode=retries.RetryMode.Exponential,
    )
    await worker.apply_async(a_background_task, "throwing", policy=new_policy)

In the above example, this invocation of the task has a new retry policy, but all other invocations of this task elsewhere in our codebase will continue to use the default.

Raising a RetryException with a Different Policy

Lastly, it's also possible to request a different RetryPolicy from inside a task:

from boilermaker import retries

async def a_background_task(state, param: str):
    if param == "throwing":
        # This will retry using *THIS* RetryPolicy
        raise retries.RetryExceptionDefaultExponential("We are thrown", max_tries=42)
    return "OK"

# First we register the task
worker.register_async(a_background_task, policy=retries.RetryPolicy.default())

async def schedule_task_with_different_policy():
    new_policy=retries.RetryPolicy(
        max_tries=10, delay=20, delay_max=100, retry_mode=retries.RetryMode.Linear,
    )
    # Because the task raises a different policy, this one will get dropped in face of `RetryExceptionDefaultExponential`
    await worker.apply_async(a_background_task, "throwing", policy=new_policy)

FAQ

Where does the name come from?

From here

Are You Going to Add Other Queue Services?

No, not planning to. Consider one of the other background-task libraries.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

boilermaker_servicebus-0.7.0.tar.gz (35.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

boilermaker_servicebus-0.7.0-py3-none-any.whl (35.9 kB view details)

Uploaded Python 3

File details

Details for the file boilermaker_servicebus-0.7.0.tar.gz.

File metadata

  • Download URL: boilermaker_servicebus-0.7.0.tar.gz
  • Upload date:
  • Size: 35.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for boilermaker_servicebus-0.7.0.tar.gz
Algorithm Hash digest
SHA256 f1aece486675f2fdf5e6f001e105d895c65a5248d907f179693bf0fa63863a58
MD5 d8a9ecb7133f7c6d1cbb9676e88bc3c1
BLAKE2b-256 ad5f2f5026199a976a24a959c095024a4e9bc2bbaa895642f3ad59180b05a920

See more details on using hashes here.

Provenance

The following attestation bundles were made for boilermaker_servicebus-0.7.0.tar.gz:

Publisher: release.yaml on MulliganFunding/boilermaker-servicebus

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file boilermaker_servicebus-0.7.0-py3-none-any.whl.

File metadata

File hashes

Hashes for boilermaker_servicebus-0.7.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b231f932627c2d386b6695db3c2cf19ac7759afbbc8dceb2da93df1878367eb9
MD5 bfc7c9cf23c08bd6134fef1bdb58159f
BLAKE2b-256 65aba36e2e163c0f76ec40df667d4b7db22fac8aa576ef092e1b9b6941aeea21

See more details on using hashes here.

Provenance

The following attestation bundles were made for boilermaker_servicebus-0.7.0-py3-none-any.whl:

Publisher: release.yaml on MulliganFunding/boilermaker-servicebus

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page