Skip to main content

Internal application publisher/subscriber bus using asyncio queues.

Project description

Asyncio Signal Bus

Asyncio Signal Bus is a simple utility for sending publisher subscriber signals from one python function to another, thus allowing you to simplify asynchronous applications using event based architecture.

Installation

Install with pip.

pip install asyncio-signal-bus

Usage

Simply decorate your publishers and subscribers to pass any data type asynchronously whenever the publisher returns a value.

import asyncio

from asyncio_signal_bus import SignalBus
import json
from typing import Dict

BUS = SignalBus()

@BUS.publisher(topic_name="foo")
async def foo_publisher(arg: str):
    signal = {"message": arg}
    print(f"publishing {json.dumps(signal)}")
    await asyncio.sleep(1)
    return signal

@BUS.subscriber(topic_name="foo")
async def foo_subscriber_0(signal: Dict):
    print(f"foo subscriber 0 received {json.dumps(signal)}")

@BUS.subscriber(topic_name="foo")
async def foo_subscriber_1(signal: Dict):
    print(f"foo subscriber 1 received {json.dumps(signal)}")

In order to start the subscribers listening, you use the bus context. When the bus leaves context, all subscribers stop listening.

async def main():
    inputs = [f"message:{i}" for i in range(10)]
    async with BUS:
        await asyncio.gather(*[foo_publisher(x) for x in inputs])

That is it! You can string together publishers and subscribers in whatever way you want. You can also annotate functions and methods used for other purposes as publishers in order to facilitate post processing and reporting hooks without having to pass them to the function.

Dependency Injection

Subscribers often need additional data beyond the signal, such as URL's, secrets, usernames and connection pools. The inject decorator can be used to inject additional data to other arguments as kwargs defaults. Injection only occors when the bus or it's injector is within context.

import asyncio

from asyncio_signal_bus import SignalBus
BUS = SignalBus()

@BUS.publisher(topic_name="greeting")
async def generate_greeting(arg: str):
    return arg

async def name_factory():
     return "Frank"

@BUS.subscriber(topic_name="greeting")
@BUS.inject("name", name_factory)
async def print_greeting(greeting: str, name: str):
    print(f"{greeting} from {name}")
async def main():
    async with BUS:
        await generate_greeting("hello")
asyncio.run(main())

Periodic Tasks

Sometimes we need to run tasks periodically such as polling an external database or an internal queue. We also want to make sure that when our application shuts down, it gracefully tells the periodic tasks to stop starting new tasks but it waits for the existing task to complete.

import asyncio
from asyncio_signal_bus import SignalBus

BUS = SignalBus()
@BUS.periodic_task(period_seconds=0.5)
async def print_foos():
     print("foo")
     
async def main():
     async with BUS:
         await asyncio.sleep(2)
asyncio.run(main())

Period Tasks As Publishers

Given that the periodic task is the furthest upstream in a process, we may use it to drive downstream subscribers. We can combine the periodic task with the injectors and the publisher to periodically fetch data using configuration information and send it to subscribers for processing.

import asyncio
from typing import List, Dict
from asyncio_signal_bus import SignalBus
import os

BUS = SignalBus()

async def get_url():
    return os.environ.get("URL")

async def get_secret():
    return os.environ.get("SECRET")

@BUS.periodic_task(period_seconds=1)
@BUS.publisher(topic_name="new-data")
@BUS.inject("url", get_url)
@BUS.inject("secret", get_secret)
async def get_data(url: str, secret: str):
    # Perform some sort of IO here to get your data.
    return [
        {"id": 0, "value": "cats"},
        {"id": 1, "value": "dogs"}
    ]

@BUS.subscriber(topic_name="new-data")
async def process_values(data: List[Dict]):
    for row in data:
        print(row["value"].upper())

@BUS.subscriber(topic_name="new-data")
async def process_ids(data: List[Dict]):
    for row in data:
        print(row["id"] + 1)

async def main():
    async with BUS:
        await asyncio.sleep(5)
asyncio.run(main())

Buses Across Multiple Modules

Most practical use of asyncio-signal-bus involve publishers and subscribers across multiple modules or files. In order to connect buses across modules we make a bus in each file.

# my_app/module_1.py
from asyncio_signal_bus import SignalBus

BUS_1 = SignalBus()

@BUS_1.publisher(topic_name="foo")
async def foo_publisher(arg: str):
    signal = {"message": arg}
    return signal
# my_app/module_2.py
from asyncio_signal_bus import SignalBus
import json

BUS_2 = SignalBus()

@BUS_2.subscriber(topic_name="foo")
async def foo_subscriber_0(signal: dict):
    print(f"foo subscriber 0 received {json.dumps(signal)}")

Now we connect all buses in a parent bus. All buses will now function as a single bus.

# my_app/__main__.py
import asyncio
from asyncio_signal_bus import SignalBus
from my_app.module_1 import BUS_1, foo_publisher
from my_app.module_2 import BUS_2

PARENT_BUS = SignalBus()
PARENT_BUS.connect(BUS_1, BUS_2)

async def main():
    inputs = [f"message:{i}" for i in range(10)]
    async with PARENT_BUS:
        await asyncio.gather(*[foo_publisher(x) for x in inputs])

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

asyncio_signal_bus-1.6.0.tar.gz (10.0 kB view details)

Uploaded Source

Built Distribution

asyncio_signal_bus-1.6.0-py3-none-any.whl (12.6 kB view details)

Uploaded Python 3

File details

Details for the file asyncio_signal_bus-1.6.0.tar.gz.

File metadata

  • Download URL: asyncio_signal_bus-1.6.0.tar.gz
  • Upload date:
  • Size: 10.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for asyncio_signal_bus-1.6.0.tar.gz
Algorithm Hash digest
SHA256 bb099c65460667448e2970eaaeb037f63c05a2b671f435ee7a4aa7728d3b1e42
MD5 459e047ae0c00aabab9d98e66f7073b6
BLAKE2b-256 ad4184f8d2c15aadd9623ac828e97733e44b9069c6f6cdb357cd335e82b7d7ef

See more details on using hashes here.

Provenance

The following attestation bundles were made for asyncio_signal_bus-1.6.0.tar.gz:

Publisher: workflow.yml on OpenBuildRight/asyncio-signal-bus

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file asyncio_signal_bus-1.6.0-py3-none-any.whl.

File metadata

File hashes

Hashes for asyncio_signal_bus-1.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2fd771ad5e9f65c8abd259b31262b01fada891717196792021e07b7a9e65316b
MD5 1897baaad5299b92d3dcfa690cbe6d8a
BLAKE2b-256 e835a5a99675de69eef203bd8a8f501d15afb96280f1990243dd54f2aabb0186

See more details on using hashes here.

Provenance

The following attestation bundles were made for asyncio_signal_bus-1.6.0-py3-none-any.whl:

Publisher: workflow.yml on OpenBuildRight/asyncio-signal-bus

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page