A lightweight, networked, opinionated event bus for Python 3
Project description
autobus
Autobus is a lightweight, networked, opinionated event bus for Python 3.
Autobus supports two transport backends:
- UDP multicast (default) - Zero configuration, no external dependencies
- Redis pub/sub - For distributed systems across network boundaries
It also leans on asyncio, pydantic, and schedule to do most of its heavy lifting.
Synopsis
import autobus
import asyncio
@autobus.subscribe(MyEventClass)
def handle_my_event(event):
# do something in response
...
autobus.run()
and then elsewhere, maybe in another process entirely:
event = MyEventClass(...)
autobus.publish(event)
autobus.run()
Presto! The event you published in one place is transparently handled in another.
Installation
pip install autobus
For Redis transport support:
pip install autobus[redis]
If you want to build from source:
git clone https://github.com/schuyler/autobus
cd autobus
pip install -e .
Usage
Defining events
import autobus
class SomethingHappened(autobus.Event):
id: int
name: str
details: dict
...
Now autobus.Event is just a subclass of pydantic.BaseModel, so anything
Pydantic can do, autobus.Event can
do also.
Publishing an event to the bus
event = SomethingHappened(...)
autobus.publish(event)
When publish() is called, the event object is serialized to JSON and sent out
over the transport. All subscribers to that event class will receive a
copy of the event object.
Receving an event from the bus
Receving an event is as simple as using the @autobus.subscribe decorator:
@autobus.subscribe(MyEventClass)
def handle_my_event(event):
# do something in response
...
The event handler can be either a regular Python function or an async
function. If the latter, the async function is run with
asyncio.create_task() and later awaited.
Either way, the event handler's return value is discarded.
Running recurring tasks
Autobus uses the Schedule module to run tasks on a recurring basis.
from autobus import every
@autobus.schedule(every(10).minutes)
def do_this_regularly():
# this will get called ~every 10 minutes
...
As with event handlers, the scheduled function can be regular or async, and
the return value is discarded.
Running the event bus itself
Autobus relies on asyncio for
concurrency. You need to have the bus running in order to send and receive
events. The easiest way to run Autobus is to pass its run() function to
asyncio.run():
import asyncio
if __name__ == "__main__":
autobus.run()
Configuration
Transport backends
Autobus supports two transport backends, selected via URL scheme:
UDP Multicast (default)
UDP multicast requires no external services and works out of the box for processes on the same network segment:
# Default - uses UDP multicast on 239.255.0.1:5000
autobus.run()
# Explicit UDP configuration
autobus.run(url="udp://239.255.0.1:5000")
Note: UDP multicast has a message size limit of approximately 64KB. For larger messages, use the Redis transport.
Redis pub/sub
For distributed systems across network boundaries, use Redis:
autobus.run(url="redis://my.redis.host:6379")
# Redis with TLS
autobus.run(url="rediss://secure.redis.host:6379")
Autobus does not use Redis for storage in any way; it only uses the pub/sub functions.
Note: Redis transport requires the redis package. Install with
pip install autobus[redis].
Adding an application namespace
If you want to run multiple, separate buses on the same transport, you can configure autobus with a namespace:
autobus.run(namespace="distinct_namespace")
Two autobus clients with different namespaces will not be able to share messages, which is probably what you wanted.
Encrypting events over the bus
Finally, you can optionally encrypt events sent over the bus using a shared key:
autobus.run(shared_key=my_shared_key)
If a shared_key is supplied, autobus will use symmetric Fernet encryption for
all events sent and received on the bus. Any events received by autobus which
cannot be decrypted and verified using the supplied key will be discarded.
This functionality relies on Fernet
from the Python cryptography module, which
must be installed for shared_key to work.
To generate a key suitable for shared encryption, run this command from the command line, and then supply the Base64 string it prints out to your autobus application:
python -m autobus.serializer generate
Operation
Logging and exception handling
Autobus relies on Python's built-in
logging facility. You can set
the log level, et cetera, using logging.basicConfig(...) in the usual way, and
get different degrees of detail from inside autobus.
Autobus attempts to catch exceptions thrown inside handler functions. Any
exceptions will get logged along with tracebacks using logging.exception().
Running alongside other asyncio applications
Autobus is designed to play nicely with other asyncio applications.
autobus.run() is basically a wrapper for:
try:
await autobus.start()
# ... wait until told to stop ...
finally:
await autobus.stop()
Instead of calling run(), you can call start() and stop() as needed.
Currently an autobus client cannot be restarted.
Suppose you have an API server that emits events, and so you want to run autobus and uvicorn in the same process. Following this example from the uvicorn docs, you could do something like:
async def main():
config = uvicorn.Config("main:app", port=5000, log_level="info")
server = uvicorn.Server(config)
try:
await autobus.start()
await server.serve()
finally:
await autobus.stop()
asyncio.run(main())
It is also possible to run multiple autobus clients in a single process, by
instantiating and using autobus.Client directly.
async def main():
client = autobus.Client(...)
try:
await client.start()
finally:
await client.stop()
asyncio.run(main())
Long-running handlers
Python's asyncio provides concurrency but not parallelism. Everything runs
in a single thread by default. If you want your event handlers and scheduled
functions to be non-blocking, you should consider defining them async and
having them await as appropriate.
Caveats on event definition
Currently, you don't have to use autobus.Event as the base class for your
events -- in principle, any simple class will work, so long as you can call
dict(obj) and get back a dict that you can pass as keyword args to the
constructor.
Two other small caveats apply:
- Events are routed by their Python class name, which means that all of your event classes must be uniquely named in order to be routed correctly.
- Autobus adds a type key to the return value from
dict(event)before serializing the result, so that the event can be identified on the other end. (You aren't usingtypeas the name of an instance property in Python, are you?)
Testing
Running the tests
# Run UDP multicast tests (no external dependencies)
pytest tests/test_multicast.py
# Run Redis tests (requires Redis)
REDIS_URL=redis://localhost:6379 pytest tests/test_redis.py
Redis on MacOS
Just for my future reference. You can install Redis with brew install redis,
but if you have Docker running locally, this works too:
docker pull redis:alpine
docker run --name redis -d -p 6379:6379 redis:alpine
Contributing
I regard autobus as essentially feature complete, so a lack of recent releases just means that no one's reported any bugs.
That said, patches are welcome. By all means, please send pull requests!
License
MIT. See LICENSE.md for details.
References
Autobus was significantly inspired by Lightbus. Lightbus is more feature rich than Autobus, and, honestly, Adam Charnock not only has a Discord chat but he actually publishes a phone number and encourages you to call him if you want to talk about the project. You should probably use Lightbus instead of Autobus. For myself. I wanted something similar to Lightbus but simpler, and I thought it would be fun to write my own, which it was. So here we are.
Thanks to @javawizard for donating ownership
of the autobus project in PyPI.
Autobus was conceived and written to support AI chatbot experiments in collaboration with @hackerfriendly. Thanks as always for the collaboration, Rob!
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file autobus-1.0.0.tar.gz.
File metadata
- Download URL: autobus-1.0.0.tar.gz
- Upload date:
- Size: 17.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9b361c4ac34b55608b452613eb255b1ea0ab7b308600372ea7fbc7fa6ba05220
|
|
| MD5 |
995fcddc4b69606ea2a30cdcb9696570
|
|
| BLAKE2b-256 |
4cd0c26e140abdab6a4b3283b2d8bd7fdacea16655d6ab7d7a7c5ddca9824166
|
Provenance
The following attestation bundles were made for autobus-1.0.0.tar.gz:
Publisher:
publish.yml on schuyler/autobus
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
autobus-1.0.0.tar.gz -
Subject digest:
9b361c4ac34b55608b452613eb255b1ea0ab7b308600372ea7fbc7fa6ba05220 - Sigstore transparency entry: 731384300
- Sigstore integration time:
-
Permalink:
schuyler/autobus@3b52683641579ba773cf1fa10c75e491704024a8 -
Branch / Tag:
refs/tags/v1.0.0 - Owner: https://github.com/schuyler
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@3b52683641579ba773cf1fa10c75e491704024a8 -
Trigger Event:
push
-
Statement type:
File details
Details for the file autobus-1.0.0-py3-none-any.whl.
File metadata
- Download URL: autobus-1.0.0-py3-none-any.whl
- Upload date:
- Size: 15.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
eb008117162b68e71056346fc5ebe60b5ed55fc06db6e27e63c440509a927d6f
|
|
| MD5 |
f0379ef9690bdbccdc0b4bf4d346665d
|
|
| BLAKE2b-256 |
d8a4c0235f1345f4f803e004e7e19030f0acc43f4acf08202d09e1f8811aa4f3
|
Provenance
The following attestation bundles were made for autobus-1.0.0-py3-none-any.whl:
Publisher:
publish.yml on schuyler/autobus
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
autobus-1.0.0-py3-none-any.whl -
Subject digest:
eb008117162b68e71056346fc5ebe60b5ed55fc06db6e27e63c440509a927d6f - Sigstore transparency entry: 731384302
- Sigstore integration time:
-
Permalink:
schuyler/autobus@3b52683641579ba773cf1fa10c75e491704024a8 -
Branch / Tag:
refs/tags/v1.0.0 - Owner: https://github.com/schuyler
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@3b52683641579ba773cf1fa10c75e491704024a8 -
Trigger Event:
push
-
Statement type: