A topic-based publish-subscribe system for asyncio applications, inspired by Phoenix's PubSub
Project description
phoenix-pubsub
A topic-based publish‑subscribe system for asyncio applications, inspired by the Phoenix PubSub library from the Elixir Phoenix framework.
Features
- Subscribe to one or more topics
- Broadcast messages to all subscribers of a topic
- Broadcast messages while excluding the publisher itself
- Subscribers metadata & Custom dispatchers – attach metadata to subscriptions and implement your own delivery logic by passing a dispatcher function into
broadcast/broadcast_from
Installation
pip install phoenix-pubsub
uv add phoenix-pubsub
Examples
Basic: Subscribe, Broadcast, Unsubscribe
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
queue = asyncio.Queue()
# Subscribe to a topic
await pubsub.subscribe(queue, "alerts")
# Broadcast a message
await pubsub.broadcast("System alert!", "alerts")
# Receive the message
topic, msg = await queue.get()
print(topic, msg) # alerts System alert!
# Unsubscribe from the topic
await pubsub.unsubscribe(queue, "alerts")
# Broadcast another message – this one should not be received
await pubsub.broadcast("Another alert", "alerts")
try:
await asyncio.wait_for(queue.get(), timeout=0.1)
print("Unexpected message received after unsubscribe")
except asyncio.TimeoutError:
print("No message received after unsubscribe (as expected)")
asyncio.run(main())
Subscribing to multiple topics
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
queue = asyncio.Queue()
await pubsub.subscribe(queue, "alerts", "news", "sports")
await pubsub.broadcast("Earthquake!", "alerts")
await pubsub.broadcast("Score update", "sports")
topic, msg = await queue.get()
print(topic, msg) # alerts Earthquake!
topic, msg = await queue.get()
print(topic, msg) # sports Score update
asyncio.run(main())
Broadcasting to multiple topics
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
queue_news = asyncio.Queue()
queue_sports = asyncio.Queue()
await pubsub.subscribe(queue_news, "news")
await pubsub.subscribe(queue_sports, "sports")
await pubsub.broadcast("Breaking news!", "news", "sports")
topic, msg = await queue_news.get()
print(topic, msg) # news Breaking news!
topic, msg = await queue_sports.get()
print(topic, msg) # sports Breaking news!
asyncio.run(main())
Excluding the publisher from broadcast
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
publisher = asyncio.Queue()
other = asyncio.Queue()
await pubsub.subscribe(publisher, "chat")
await pubsub.subscribe(other, "chat")
await pubsub.broadcast_from(publisher, "Hello everyone!", "chat")
topic, msg = await other.get()
print(topic, msg) # chat Hello everyone!
try:
await asyncio.wait_for(publisher.get(), timeout=0.1)
print("Unexpected message")
except asyncio.TimeoutError:
print("Publisher received nothing (as expected)")
asyncio.run(main())
Metadata & Custom dispatcher
The library provides two dispatchers: synchronous_dispatcher, concurrent_dispatcher. The default one is synchronous.
You can create and pass your own dispatchers like that:
Filter dispatcher
import asyncio
from phoenix_pubsub import PubSub, Topic, Message, Subscribers, Peer
from typing import Optional
async def main():
pubsub = PubSub()
def category_filter_dispatcher(
topic: Topic,
message: Message,
subscribers: Subscribers,
publisher: Optional[Peer] = None,
) -> None:
"""
Deliver message only to subscribers whose 'interests' metadata list
contains the message's 'category' field.
"""
def try_put_message(peer: asyncio.Queue, topic: str, message: Message):
try:
peer.put_nowait((topic, message))
except (asyncio.QueueFull, asyncio.QueueShutDown):
pass
if not isinstance(message, dict) or "category" not in message:
return
category = message["category"]
peers = []
for peer, metadata in subscribers.items():
interests = metadata.get("interests", [])
if category in interests:
peers.append(peer)
if publisher: # broadcast_from
for peer in peers:
if peer != publisher:
try_put_message(peer, topic, message)
else: # broadcast
for peer in peers:
try_put_message(peer, topic, message)
queue1 = asyncio.Queue()
await pubsub.subscribe(
queue1, "news", metadata={"interests": ["sports", "politics"]}
)
queue2 = asyncio.Queue()
await pubsub.subscribe(queue2, "news", metadata={"interests": ["sports"]})
queue3 = asyncio.Queue()
await pubsub.subscribe(queue3, "news", metadata={"interests": ["technology"]})
sports_msg = {"category": "sports", "content": "Game result 3-2"}
await pubsub.broadcast(sports_msg, "news", dispatcher=category_filter_dispatcher)
politics_msg = {"category": "politics", "content": "Election update"}
await pubsub.broadcast(politics_msg, "news", dispatcher=category_filter_dispatcher)
for i, q in enumerate([queue1, queue2, queue3], 1):
received = []
while not q.empty():
received.append(await q.get())
print(f"Subscriber {i} received: {received}")
# Subscriber 1 received: [('news', {'category': 'sports', 'content': 'Game result 3-2'}), ('news', {'category': 'politics', 'content': 'Election update'})]
# Subscriber 2 received: [('news', {'category': 'sports', 'content': 'Game result 3-2'})]
# Subscriber 3 received: []
asyncio.run(main())
Batched dispatcher
import asyncio
from phoenix_pubsub import PubSub, Topic, Message, Subscribers, Peer
from typing import Optional
from functools import partial
def batched_dispatcher(
topic: Topic,
message: Message,
subscribers: Subscribers,
publisher: Optional[Peer] = None,
*,
batch_size: int = 5,
) -> None:
"""
Groups subscribers into batches and spawns one background task per batch.
"""
async def process_batch(
batch_peers: list[Peer], topic: str, message: Message
) -> None:
for peer in batch_peers:
try:
peer.put_nowait((topic, message))
except (asyncio.QueueFull, asyncio.QueueShutDown):
pass
if publisher: # broadcast_from
peers = [peer for peer in subscribers.keys() if peer is not publisher]
else: # broadcast
peers = [peer for peer in subscribers.keys()]
for i in range(0, len(peers), batch_size):
batch = peers[i : i + batch_size]
asyncio.create_task(process_batch(batch, topic, message))
async def main():
pubsub = PubSub()
queues = [asyncio.Queue() for _ in range(7)]
for q in queues:
await pubsub.subscribe(q, "notifications")
await pubsub.broadcast(
"Important notification 1",
"notifications",
dispatcher=batched_dispatcher,
)
getters = [q.get() for q in queues]
await asyncio.gather(*getters)
await pubsub.broadcast(
"Important notification 2",
"notifications",
dispatcher=partial(batched_dispatcher, batch_size=10),
)
getters = [q.get() for q in queues]
await asyncio.gather(*getters)
if __name__ == "__main__":
asyncio.run(main())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file phoenix_pubsub-0.6.1.tar.gz.
File metadata
- Download URL: phoenix_pubsub-0.6.1.tar.gz
- Upload date:
- Size: 5.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.7 {"installer":{"name":"uv","version":"0.10.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c5fe392abb6cae262364a128cfd7454c69fa9195c30582d39d7f0c1c63fbadf4
|
|
| MD5 |
381e6ea22e8884d98240f0c09a079a21
|
|
| BLAKE2b-256 |
770907ad0def9f90c5936e743186b231b4e747d112da508d9e76ee7849592ad4
|
File details
Details for the file phoenix_pubsub-0.6.1-py3-none-any.whl.
File metadata
- Download URL: phoenix_pubsub-0.6.1-py3-none-any.whl
- Upload date:
- Size: 7.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.7 {"installer":{"name":"uv","version":"0.10.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7b88d5926fb95472c19c966244df4177a598224bdfe6e682cf6d6462c464e4f3
|
|
| MD5 |
dd91231dbfba0f92098b619f8dc1a6c0
|
|
| BLAKE2b-256 |
0a4adcf4c8da227fb912cd990d393a341a794f2734f815e5297a9164d6355278
|