A topic-based publish-subscribe system for asyncio applications, inspired by Phoenix's PubSub
Project description
phoenix-pubsub
A topic-based publish‑subscribe system for asyncio applications, inspired by the Phoenix PubSub library from the Elixir Phoenix framework.
Features
- Subscribe to one or more topics
- Broadcast messages to all subscribers of a topic
- Broadcast messages while excluding the publisher itself
- Subscribers metadata & Custom dispatchers – attach metadata to subscriptions and implement your own delivery logic by passing a dispatcher function
broadcast/broadcast_from
Installation
pip install phoenix-pubsub
uv add phoenix-pubsub
Examples
Basic: Subscribe, Broadcast, Unsubscribe
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
queue = asyncio.Queue()
# Subscribe to a topic
await pubsub.subscribe(queue, "alerts")
# Broadcast a message
await pubsub.broadcast("System alert!", "alerts")
# Receive the message
topic, msg = await queue.get()
print(topic, msg) # alerts System alert!
# Unsubscribe from the topic
await pubsub.unsubscribe(queue, "alerts")
# Broadcast another message – this one should not be received
await pubsub.broadcast("Another alert", "alerts")
try:
await asyncio.wait_for(queue.get(), timeout=0.1)
print("Unexpected message received after unsubscribe")
except asyncio.TimeoutError:
print("No message received after unsubscribe (as expected)")
asyncio.run(main())
Subscribing to multiple topics
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
queue = asyncio.Queue()
await pubsub.subscribe(queue, "alerts", "news", "sports")
await pubsub.broadcast("Earthquake!", "alerts")
await pubsub.broadcast("Score update", "sports")
topic, msg = await queue.get()
print(topic, msg) # alerts Earthquake!
topic, msg = await queue.get()
print(topic, msg) # sports Score update
asyncio.run(main())
Broadcasting to multiple topics
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
queue_news = asyncio.Queue()
queue_sports = asyncio.Queue()
await pubsub.subscribe(queue_news, "news")
await pubsub.subscribe(queue_sports, "sports")
await pubsub.broadcast("Breaking news!", "news", "sports")
topic, msg = await queue_news.get()
print(topic, msg) # news Breaking news!
topic, msg = await queue_sports.get()
print(topic, msg) # sports Breaking news!
asyncio.run(main())
Excluding the publisher from broadcast
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
publisher = asyncio.Queue()
other = asyncio.Queue()
await pubsub.subscribe(publisher, "chat")
await pubsub.subscribe(other, "chat")
await pubsub.broadcast_from(publisher, "Hello everyone!", "chat")
topic, msg = await other.get()
print(topic, msg) # chat Hello everyone!
try:
await asyncio.wait_for(publisher.get(), timeout=0.1)
print("Unexpected message")
except asyncio.TimeoutError:
print("Publisher received nothing (as expected)")
asyncio.run(main())
Slow consumer - Messages are dropped
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
slow = asyncio.Queue(maxsize=1) # can hold only one message
fast = asyncio.Queue()
await pubsub.subscribe(slow, "alerts")
await pubsub.subscribe(fast, "alerts")
await pubsub.broadcast("Alert 1", "alerts")
await pubsub.broadcast("Alert 2", "alerts")
await pubsub.broadcast("Alert 3", "alerts")
# Fast consumer receives all three
topic, msg = await fast.get()
print(topic, msg) # alerts Alert 1
topic, msg = await fast.get()
print(topic, msg) # alerts Alert 2
topic, msg = await fast.get()
print(topic, msg) # alerts Alert 3
# Slow consumer receives only the first (others are dropped)
topic, msg = await slow.get()
print(topic, msg) # alerts Alert 1
try:
await asyncio.wait_for(slow.get(), timeout=0.1)
print("Unexpected second message")
except asyncio.TimeoutError:
print("Slow queue received no further messages (as expected)")
asyncio.run(main())
Metadata & Custom dispatcher
The library provides two dispatchers: synchronous_dispatcher, concurrent_dispatcher. The default one is synchronous.
You can create and pass your own dispatcher like that:
import asyncio
from phoenix_pubsub import PubSub, Topic, Message, Subscribers, Peer
from typing import Optional
async def main():
pubsub = PubSub()
def category_filter_dispatcher(
topic: Topic,
message: Message,
subscribers: Subscribers,
publisher: Optional[Peer] = None,
) -> None:
"""
Deliver message only to subscribers whose 'interests' metadata list
contains the message's 'category' field.
"""
def try_put_message(peer: asyncio.Queue, topic: str, message: Message):
try:
peer.put_nowait((topic, message))
except (asyncio.QueueFull, asyncio.QueueShutDown):
pass
if not isinstance(message, dict) or "category" not in message:
return
category = message["category"]
peers = []
for peer, metadata in subscribers.items():
interests = metadata.get("interests", [])
if category in interests:
peers.append(peer)
if publisher: # broadcast_from
for peer in peers:
if peer != publisher:
try_put_message(peer, topic, message)
else: # broadcast
for peer in peers:
try_put_message(peer, topic, message)
queue1 = asyncio.Queue()
await pubsub.subscribe(
queue1, "news", metadata={"interests": ["sports", "politics"]}
)
queue2 = asyncio.Queue()
await pubsub.subscribe(queue2, "news", metadata={"interests": ["sports"]})
queue3 = asyncio.Queue()
await pubsub.subscribe(queue3, "news", metadata={"interests": ["technology"]})
sports_msg = {"category": "sports", "content": "Game result 3-2"}
await pubsub.broadcast(sports_msg, "news", dispatcher=category_filter_dispatcher)
politics_msg = {"category": "politics", "content": "Election update"}
await pubsub.broadcast(politics_msg, "news", dispatcher=category_filter_dispatcher)
for i, q in enumerate([queue1, queue2, queue3], 1):
received = []
while not q.empty():
received.append(await q.get())
print(f"Subscriber {i} received: {received}")
# Subscriber 1 received: [('news', {'category': 'sports', 'content': 'Game result 3-2'}), ('news', {'category': 'politics', 'content': 'Election update'})]
# Subscriber 2 received: [('news', {'category': 'sports', 'content': 'Game result 3-2'})]
# Subscriber 3 received: []
asyncio.run(main())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file phoenix_pubsub-0.6.0.tar.gz.
File metadata
- Download URL: phoenix_pubsub-0.6.0.tar.gz
- Upload date:
- Size: 5.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.22 {"installer":{"name":"uv","version":"0.9.22","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"22.04","id":"jammy","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cf015b07df99f2950a3e0d29d3f858e00e61e1ae5802dbb6713c83eb72f491cb
|
|
| MD5 |
441930d0a80e45a1ed3b3dbb9ff848a3
|
|
| BLAKE2b-256 |
d32fee3a625df900c3078c4206b61e8fd8e221f6fad1900b94bda89a78a8da4e
|
File details
Details for the file phoenix_pubsub-0.6.0-py3-none-any.whl.
File metadata
- Download URL: phoenix_pubsub-0.6.0-py3-none-any.whl
- Upload date:
- Size: 7.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.22 {"installer":{"name":"uv","version":"0.9.22","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"22.04","id":"jammy","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
721d9514e8e5a7ad677fc62fa674046c6079a432c3d12f9f12d4822f0ab0cb3f
|
|
| MD5 |
6572bcbec9d4a5eda7c096e2825dd3e8
|
|
| BLAKE2b-256 |
f97e929b46525e08403a8f1c877379cae139344c14486ab8982da41a465a4c38
|