A topic-based publish-subscribe system for asyncio applications, inspired by Phoenix's PubSub
Project description
phoenix-pubsub
A topic-based publish‑subscribe system for asyncio applications, inspired by the Phoenix PubSub library from the Elixir Phoenix framework.
Features
- Subscribe to one or more topics
- Broadcast messages to all subscribers of a topic
- Broadcast messages while excluding the publisher itself
- Subscribers metadata & Custom dispatchers – attach metadata to subscriptions and implement your own delivery logic (filtering, fast‑laning, etc.) by passing a dispatcher function
broadcast/broadcast_from
Installation
pip install phoenix-pubsub
uv add phoenix-pubsub
Examples
Basic: Subscribe, Broadcast, Unsubscribe
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
queue = asyncio.Queue()
# Subscribe to a topic
await pubsub.subscribe(queue, "alerts")
# Broadcast a message
await pubsub.broadcast("System alert!", "alerts")
# Receive the message
topic, msg = await queue.get()
print(topic, msg) # alerts System alert!
# Unsubscribe from the topic
await pubsub.unsubscribe(queue, "alerts")
# Broadcast another message – this one should not be received
await pubsub.broadcast("Another alert", "alerts")
try:
await asyncio.wait_for(queue.get(), timeout=0.1)
print("Unexpected message received after unsubscribe")
except asyncio.TimeoutError:
print("No message received after unsubscribe (as expected)")
asyncio.run(main())
Subscribing to multiple topics
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
queue = asyncio.Queue()
await pubsub.subscribe(queue, "alerts", "news", "sports")
await pubsub.broadcast("Earthquake!", "alerts")
await pubsub.broadcast("Score update", "sports")
topic, msg = await queue.get()
print(topic, msg) # alerts Earthquake!
topic, msg = await queue.get()
print(topic, msg) # sports Score update
asyncio.run(main())
Broadcasting to multiple topics
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
queue_news = asyncio.Queue()
queue_sports = asyncio.Queue()
await pubsub.subscribe(queue_news, "news")
await pubsub.subscribe(queue_sports, "sports")
await pubsub.broadcast("Breaking news!", "news", "sports")
topic, msg = await queue_news.get()
print(topic, msg) # news Breaking news!
topic, msg = await queue_sports.get()
print(topic, msg) # sports Breaking news!
asyncio.run(main())
Excluding the publisher from broadcast
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
publisher = asyncio.Queue()
other = asyncio.Queue()
await pubsub.subscribe(publisher, "chat")
await pubsub.subscribe(other, "chat")
await pubsub.broadcast_from(publisher, "Hello everyone!", "chat")
topic, msg = await other.get()
print(topic, msg) # chat Hello everyone!
try:
await asyncio.wait_for(publisher.get(), timeout=0.1)
print("Unexpected message")
except asyncio.TimeoutError:
print("Publisher received nothing (as expected)")
asyncio.run(main())
Slow consumer - Messages are dropped
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
slow = asyncio.Queue(maxsize=1) # can hold only one message
fast = asyncio.Queue()
await pubsub.subscribe(slow, "alerts")
await pubsub.subscribe(fast, "alerts")
await pubsub.broadcast("Alert 1", "alerts")
await pubsub.broadcast("Alert 2", "alerts")
await pubsub.broadcast("Alert 3", "alerts")
# Fast consumer receives all three
topic, msg = await fast.get()
print(topic, msg) # alerts Alert 1
topic, msg = await fast.get()
print(topic, msg) # alerts Alert 2
topic, msg = await fast.get()
print(topic, msg) # alerts Alert 3
# Slow consumer receives only the first (others are dropped)
topic, msg = await slow.get()
print(topic, msg) # alerts Alert 1
try:
await asyncio.wait_for(slow.get(), timeout=0.1)
print("Unexpected second message")
except asyncio.TimeoutError:
print("Slow queue received no further messages (as expected)")
asyncio.run(main())
Custom dispatcher
import asyncio
from phoenix_pubsub import PubSub, Topic, Message, Subscribers, Peer
from typing import Optional
async def main():
pubsub = PubSub()
def category_filter_dispatcher(
topic: Topic,
message: Message,
subscribers: Subscribers,
publisher: Optional[Peer] = None,
) -> None:
"""
Deliver message only to subscribers whose 'interests' metadata list
contains the message's 'category' field.
"""
def try_put_message(peer: asyncio.Queue, topic: str, message: Message):
try:
peer.put_nowait((topic, message))
except (asyncio.QueueFull, asyncio.QueueShutDown):
pass
if not isinstance(message, dict) or "category" not in message:
return
category = message["category"]
peers = []
for peer, metadata in subscribers.items():
interests = metadata.get("interests", [])
if category in interests:
peers.append(peer)
if publisher: # broadcast_from
for peer in peers:
if peer != publisher:
try_put_message(peer, topic, message)
else: # broadcast
for peer in peers:
try_put_message(peer, topic, message)
queue1 = asyncio.Queue()
await pubsub.subscribe(
queue1, "news", metadata={"interests": ["sports", "politics"]}
)
queue2 = asyncio.Queue()
await pubsub.subscribe(queue2, "news", metadata={"interests": ["sports"]})
queue3 = asyncio.Queue()
await pubsub.subscribe(queue3, "news", metadata={"interests": ["technology"]})
sports_msg = {"category": "sports", "content": "Game result 3-2"}
await pubsub.broadcast(sports_msg, "news", dispatcher=category_filter_dispatcher)
politics_msg = {"category": "politics", "content": "Election update"}
await pubsub.broadcast(politics_msg, "news", dispatcher=category_filter_dispatcher)
for i, q in enumerate([queue1, queue2, queue3], 1):
received = []
while not q.empty():
received.append(await q.get())
print(f"Subscriber {i} received: {received}")
# Subscriber 1 received: [('news', {'category': 'sports', 'content': 'Game result 3-2'}), ('news', {'category': 'politics', 'content': 'Election update'})]
# Subscriber 2 received: [('news', {'category': 'sports', 'content': 'Game result 3-2'})]
# Subscriber 3 received: []
asyncio.run(main())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
phoenix_pubsub-0.5.3.tar.gz
(5.0 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file phoenix_pubsub-0.5.3.tar.gz.
File metadata
- Download URL: phoenix_pubsub-0.5.3.tar.gz
- Upload date:
- Size: 5.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.7 {"installer":{"name":"uv","version":"0.10.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a1dc6ba10e7ecb68a83b15c55b4f4985808a2e993c3b658fa8e3d2506f17a5a7
|
|
| MD5 |
a5740e2d4d04bfbcae64da800ed628df
|
|
| BLAKE2b-256 |
4799ce985766f63c2384d91f24f9afe49a95d63a8dcabad0de458d7c4cd05985
|
File details
Details for the file phoenix_pubsub-0.5.3-py3-none-any.whl.
File metadata
- Download URL: phoenix_pubsub-0.5.3-py3-none-any.whl
- Upload date:
- Size: 7.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.7 {"installer":{"name":"uv","version":"0.10.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a68225d3404da5019a47b6fd26377dc51d7a13c3a9d35e2b4c0afcd48fef657b
|
|
| MD5 |
7d4159ad29a3c400b7e4c7242d4b6c78
|
|
| BLAKE2b-256 |
5faf175236159f813ed2988ce9653325280d0afb38defac9cdded55f400daace
|