An async python Background task system using Azure Service Bus Queues
Project description
Boilermaker
This project is an extremely-lightweight task-runner exclusively for async Python and Azure Service Bus Queues. If you need a fully fledged task-runner, you should consider one of these other projects instead:
To Install
pip install "boilermaker-servicebus"
How to Use
The Boilermaker application object requires some state (which will get sent to all handlers as the first argument upon invocation) and an authenticated ServiceBus client.
A task handler for Boilermaker is any async function which takes as its first argument application state and which has been registered:
# This is a background task that we'll register
async def background_task1(state, somearg, somekwarg=True):
"""`state` must be first argument."""
await state.data.get("key")
print(state)
print(somearg)
print(somekwarg)
# Our task must be registered to be invocable.
boilermaker_app.register_async(background_task1, policy=...A retry policy goes here...)
Note: Boilermaker does not currently have a way to store or use the results of tasks, and all arguments must be JSON-serializable.
Complete Example
For a fuller example, in our application, we may have some code like the following:
from azure.identity.aio import DefaultAzureCredential
from boilermaker.app import Boilermaker
from boilermaker.config import Config
# The boilermaker ServiceBus wrapper is for convenience/example
from boilermaker.service_bus import AzureServiceBus
from boilermaker import retries
# This represents our "App" object which will be passed as `state` to our handlers
class App:
def __init__(self, data):
self.data = data
# This is a background task that we'll register
async def background_task1(state, somearg, somekwarg=True):
"""`state` must be first argument."""
await state.data.get("key")
print(state)
print(somearg)
print(somekwarg)
service_bus_namespace_url = os.environ["SERVICE_BUS_NAMESPACE_URL"]
service_bus_queue_name = os.environ["SERVICE_BUS_QUEUE_NAME"]
conf = Config(
service_bus_namespace_url=service_bus_namespace_url,
service_bus_queue_name=service_bus_queue_name,
)
# We create a service bus Sender client
service_bus_client = AzureServiceBus.from_config(conf)
# Next we'll create a worker and register our task
worker = Boilermaker(App({"key": "value"}), service_bus_client=service_bus_client)
worker.register_async(background_task1, policy=retries.RetryPolicy.default())
# Finally, this will schedule the task
async def publish_task():
await worker.apply_async(
background_task1, "first-arg", somekwarg=False
)
if __name__ == "__main__":
asyncio.run(publish_task())
In another process, we'll need to run a background task worker. That looks like this:
async def run_worker():
await worker.run()
If we look in the logs for our other process, we should be able to see this task registered and run:
{"event": "Registered background function fn=background_task1", "level": "info", "logger": "boilermaker.app", "timestamp": "2024/12/10 15:46:54"}
{"event": "[background_task1] Begin Task sequence_number=19657", "level": "info", "logger": "boilermaker.app", "timestamp": "2024/12/10 15:52:23"}
...prints-go-here...
{"event": "[background_task1] Completed Task sequence_number=19657 in 0.00021130498498678207s", "level": "info", "logger": "boilermaker.app", "timestamp": "2024/12/10 15:52:23"}
Callbacks and Retries
It is possible to register callbacks for tasks, which can run on success or failure. To schedule a task with callbacks, we have to create a task object and then set a success and/or failure callback. Finally, instead of apply_async we have to call publish_task:
...see worker-instantation example above...
from boilermaker.failure import TaskFailureResult
async def a_background_task(state, param: str):
if param == "success":
print("Things seem to be going really well")
return None
elif param == "fail":
return TaskFailureResult
raise ValueError("Exceptions are a bummer!")
async def happy_path(state):
print("Everything is great!")
async def sad_path(state):
print("Everything is sad")
# We need to be sure our tasks are registered
worker.register_async(a_background_task, policy=retries.NoRetry)
worker.register_async(happy_path, policy=retries.NoRetry())
worker.register_async(sad_path, policy=retries.NoRetry())
# Now we can create a happy task and add callbacks
happy_task = worker.create_task(a_background_task, "success")
# This callback should get scheduled
happy_task.on_success = worker.create_task(happy_path)
# This callback will not
happy_task.on_failure = worker.create_task(sad_path)
# For good measure, we'll create a sad task too
sad_task = worker.create_task(a_background_task, "uh oh!")
# This callback should not get scheduled
sad_task.on_success = worker.create_task(happy_path)
# This callback should get scheduled
sad_task.on_failure = worker.create_task(sad_path)
# Finally, this will schedule the tasks
async def publish_task():
await worker.publish_task(happy_task)
# If we wait a bit we can see more clearly one task before the other
await asyncio.sleep(4)
await worker.publish_task(sad_task)
Here are examples of log out from running the above:
// Logs from registering the functions
{"event": "Registered background function fn=callback_task_test", "level": "info", "logger": "boilermaker.app", "timestamp": "2024/12/10 15:46:54"}
{"event": "Registered background function fn=happy_path", "level": "info", "logger": "boilermaker.app", "timestamp": "2024/12/10 15:46:54"}
{"event": "Registered background function fn=sad_path", "level": "info", "logger": "boilermaker.app", "timestamp": "2024/12/10 15:46:54"}
// Happy path test ->
{"event": "[callback_task_test] Begin Task sequence_number=19657", "version": "pr72-0.25.1", "level": "info", "logger": "boilermaker.app", "timestamp": "2024/12/10 15:52:23"}
{"event": "[callback_task_test] Completed Task sequence_number=19657 in 0.00021130498498678207s", "level": "info", "logger": "boilermaker.app", "timestamp": "2024/12/10 15:52:23"}
{"event": "[happy_path] Begin Task sequence_number=19659", "level": "info", "logger": "boilermaker.app", "timestamp": "2024/12/10 15:52:24"}
{"event": "[happy_path] Completed Task sequence_number=19659 in 0.0001390039687976241s", "level": "info", "logger": "boilermaker.app", "timestamp": "2024/12/10 15:52:24"}
// Sad path test ->
{"event": "[callback_task_test] Begin Task sequence_number=19661", "level": "info", "logger": "boilermaker.app", "timestamp": "2024/12/10 15:52:27"}
{"event": "Failed processing task sequence_number=19661 Traceback (most recent call last):\n File \"/app/.venv/lib/python3.12/site-packages/boilermaker/app.py\", line 238, in message_handler\n result = await self.task_handler(task, sequence_number)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File \"/app/.venv/lib/python3.12/site-packages/boilermaker/app.py\", line 306, in task_handler\n result = await function(\n ^^^^^^^^^^^^^^^\n File \"/app/boilermaker_example.py\", line 210, in callback_task_test\n raise ValueError(\"Negative numbers are a bummer\")\nValueError: Negative numbers are a bummer\n", "level": "error", "logger": "boilermaker.app", "timestamp": "2024/12/10 15:52:27"}
{"event": "[sad_path] Begin Task sequence_number=19663", "level": "info", "logger": "boilermaker.app", "timestamp": "2024/12/10 15:52:28"}
{"event": "[sad_path] Completed Task sequence_number=19663 in 0.00020030408632010221s", "level": "info", "logger": "boilermaker.app", "timestamp": "2024/12/10 15:52:28"}
FAQ
Where does the name come from?
From here
Are You Going to Add Other Queue Services?
No, not planning to. Consider one of the other background-task libraries.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file boilermaker_servicebus-0.5.1a1.tar.gz.
File metadata
- Download URL: boilermaker_servicebus-0.5.1a1.tar.gz
- Upload date:
- Size: 26.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.0.1 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e6923092d54f7b98cf651f5bfd03586dc8c7aff6e1aef320ecb1b611c1cf8b77
|
|
| MD5 |
d5e626454aaa0fe739e24943501cd2a2
|
|
| BLAKE2b-256 |
54f645c69a11880bba432943c555325fd32af5186f22f9a05763743618799876
|
Provenance
The following attestation bundles were made for boilermaker_servicebus-0.5.1a1.tar.gz:
Publisher:
release.yaml on MulliganFunding/boilermaker-servicebus
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
boilermaker_servicebus-0.5.1a1.tar.gz -
Subject digest:
e6923092d54f7b98cf651f5bfd03586dc8c7aff6e1aef320ecb1b611c1cf8b77 - Sigstore transparency entry: 154848910
- Sigstore integration time:
-
Permalink:
MulliganFunding/boilermaker-servicebus@27c1b3a7c9cb9fa56696ca359edc325cc909cc6b -
Branch / Tag:
refs/tags/v0.5.1a1 - Owner: https://github.com/MulliganFunding
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yaml@27c1b3a7c9cb9fa56696ca359edc325cc909cc6b -
Trigger Event:
release
-
Statement type:
File details
Details for the file boilermaker_servicebus-0.5.1a1-py3-none-any.whl.
File metadata
- Download URL: boilermaker_servicebus-0.5.1a1-py3-none-any.whl
- Upload date:
- Size: 28.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.0.1 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
501a1fd0c4ef061dc46ab76b5a5adae996e1894df0c9e79fe06a7b97faa27beb
|
|
| MD5 |
f9bdf3dbf4adddf2576ac8156d488b04
|
|
| BLAKE2b-256 |
2adfaa2e5ee6427821e0b40de04e4f8f855bd3197b486189e936e715829d6076
|
Provenance
The following attestation bundles were made for boilermaker_servicebus-0.5.1a1-py3-none-any.whl:
Publisher:
release.yaml on MulliganFunding/boilermaker-servicebus
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
boilermaker_servicebus-0.5.1a1-py3-none-any.whl -
Subject digest:
501a1fd0c4ef061dc46ab76b5a5adae996e1894df0c9e79fe06a7b97faa27beb - Sigstore transparency entry: 154848912
- Sigstore integration time:
-
Permalink:
MulliganFunding/boilermaker-servicebus@27c1b3a7c9cb9fa56696ca359edc325cc909cc6b -
Branch / Tag:
refs/tags/v0.5.1a1 - Owner: https://github.com/MulliganFunding
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yaml@27c1b3a7c9cb9fa56696ca359edc325cc909cc6b -
Trigger Event:
release
-
Statement type: