Simple pubsub pattern for asyncio applications
Project description
Simple publish-subscribe pattern for asyncio applications.
Why
When building big applications, separation of concerns is a great way to keep things manageable. In messaging systems, the publish-subscribe pattern is often used to decouple data producers and data consumers. We went a step ahead and designed even the internals of our applications around this pattern.
We explain our thinking and the workings of aiopubsub in detail in our article Design your app using the pub-sub pattern with aiopubsub. We recommend reading it before using aiopubsub in your project.
Installation
aiopubsub is only compatible with Python 3.8 and higher. There are no plans to support older versions.
aiopubsub is available on PyPI and you can install it with:
pip install aiopubsub
or
poetry add aiopubsub
How to use it
The following comprehensive example is explained step-by-step in our article “Design your app using the pub-sub pattern with aiopubsub”.
import asyncio
import dataclasses
import decimal
import aiopubsub
@dataclasses.dataclass
class Trade:
timestamp: float
quantity: int
price: decimal.Decimal
async def on_trade(key: aiopubsub.Key, trade: Trade) -> None:
print(f'Processing trade = {trade} with key = {key}.')
async def on_nyse_trade(key: aiopubsub.Key, trade: Trade) -> None:
print(f'Processing trade = {trade} with key = {key} that happened in NYSE')
async def main():
# create an aiopubsub hub
hub = aiopubsub.Hub()
# create a sample of data to send
trade = Trade(timestamp = 123.5, quantity = 56, price = decimal.Decimal('1639.43'))
# subscriber listens on every trade and calls the `on_trade` function
subscriber = aiopubsub.Subscriber(hub, 'trades')
subscribe_key = aiopubsub.Key('*', 'trade', '*')
subscriber.add_async_listener(subscribe_key, on_trade)
# publisher has a NASDAQ prefix and sends the trade that happened on Google stock
publisher = aiopubsub.Publisher(hub, prefix = aiopubsub.Key('NASDAQ'))
publish_key = aiopubsub.Key('trade', 'GOOGL')
publisher.publish(publish_key, trade)
# sleep so the event loop can process the action
await asyncio.sleep(0.001)
# expected output:
# Processing trade = Trade(timestamp=123.5, quantity=56, price=Decimal('1639.43')) with key = ('NASDAQ', 'trade', 'GOOGL').
# sample from another stock exchange
trade_nyse = Trade(timestamp = 127.45, quantity = 67, price = decimal.Decimal('1639.44'))
# subscribe only for the NYSE exchange
subscribe_key_nyse = aiopubsub.Key('NYSE', 'trade', '*')
subscriber.add_async_listener(subscribe_key_nyse, on_nyse_trade)
# publish NYSE trade
publisher_nyse = aiopubsub.Publisher(hub, prefix = aiopubsub.Key('NYSE'))
publisher_nyse.publish(aiopubsub.Key('trade', 'GOOGL'), trade_nyse)
# sleep so the event loop can process the action
await asyncio.sleep(0.001)
# expected output:
# Processing trade = Trade(timestamp=127.45, quantity=67, price=Decimal('1639.44')) with key = ('NYSE', 'trade', 'GOOGL').
# Processing trade = Trade(timestamp=127.45, quantity=67, price=Decimal('1639.44')) with key = ('NYSE', 'trade', 'GOOGL') that happened in NYSE
# clean the subscriber before the end of the program
await subscriber.remove_all_listeners()
if __name__ == '__main__':
asyncio.run(main())
Aiopubsub will use logwood if it is installed, otherwise it will default to the standard logging module. Note that logwood is required to run tests.
Architecture
Hub accepts messages from Publishers and routes them to Subscribers. Each message is routed by its Key - an iterable of strings forming a hierarchic namespace. Subscribers may subscribe to wildcard keys, where any part of the key may be replaced replaced with a * (star).
addedSubscriber and removedSubscriber messages
When a new subscriber is added the Hub sends this message
{
"key": ("key", "of", "added", "subscriber"),
"currentSubscriberCount": 2
}
under the key ('Hub', 'addedSubscriber', 'key', 'of', 'added', 'subscriber') (the part after addedSubscriber is made of the subscribed key). Note the currentSubscriberCount field indicating how many subscribers are currently subscribed.
When a subscriber is removed a message in the same format is sent, but this time under the key ('Hub', 'removedSubscriber', 'key', 'of', 'added', 'subscriber').
Contributing
Pull requests are welcome! In particular, we are aware that the documentation could be improved. If anything about aiopubsub is unclear, please feel free to simply open an issue and we will do our best to advise and explain 🙂
fastenum was made by Quantlane, a systematic trading firm. We design, build and run our own stock trading platform.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file aiopubsub-3.0.0.tar.gz
.
File metadata
- Download URL: aiopubsub-3.0.0.tar.gz
- Upload date:
- Size: 11.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.7.1 importlib_metadata/4.10.0 pkginfo/1.8.2 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.8.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 41de577f8552006d8a0885b5ed468f1216ccf594b695cc1d89aa5eba838c4ad3 |
|
MD5 | 6510df4b147e75b79aa3e96d6c5a5a69 |
|
BLAKE2b-256 | 1ee15b764bb0738af66d3806a97fb7730c2e41d316961efcd3829f55bec6362e |