A topic-based publish-subscribe system for asyncio applications, inspired by Phoenix's PubSub
Project description
phoenix-pubsub
A topic-based publish‑subscribe system for asyncio applications, inspired by the Phoenix PubSub library from the Elixir Phoenix framework.
Features
- Subscribe to one or more topics
- Broadcast messages to all subscribers of a topic
- Broadcast messages while excluding the publisher itself
- Graceful handling of slow consumers (messages are dropped when a subscriber’s queue is full)
Installation
pip install phoenix-pubsub
uv add phoenix-pubsub
Examples
Basic: Subscribe, Broadcast, Unsubscribe
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
queue = asyncio.Queue()
# Subscribe to a topic
await pubsub.subscribe(queue, "alerts")
# Broadcast a message
await pubsub.broadcast("System alert!", "alerts")
# Receive the message
topic, msg = await queue.get()
print(topic, msg) # alerts System alert!
# Unsubscribe from the topic
await pubsub.unsubscribe(queue, "alerts")
# Broadcast another message – this one should not be received
await pubsub.broadcast("Another alert", "alerts")
try:
await asyncio.wait_for(queue.get(), timeout=0.1)
print("Unexpected message received after unsubscribe")
except asyncio.TimeoutError:
print("No message received after unsubscribe (as expected)")
asyncio.run(main())
Subscribing to multiple topics
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
queue = asyncio.Queue()
await pubsub.subscribe(queue, "alerts", "news", "sports")
await pubsub.broadcast("Earthquake!", "alerts")
await pubsub.broadcast("Score update", "sports")
# Order of reception may vary
topic, msg = await queue.get()
print(topic, msg) # alerts Earthquake! (or sports Score update)
topic, msg = await queue.get()
print(topic, msg) # sports Score update (or alerts Earthquake!)
asyncio.run(main())
Broadcasting to multiple topics
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
queue_news = asyncio.Queue()
queue_sports = asyncio.Queue()
await pubsub.subscribe(queue_news, "news")
await pubsub.subscribe(queue_sports, "sports")
await pubsub.broadcast("Breaking news!", "news", "sports")
topic, msg = await queue_news.get()
print(topic, msg) # news Breaking news!
topic, msg = await queue_sports.get()
print(topic, msg) # sports Breaking news!
asyncio.run(main())
Excluding the publisher from broadcast
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
publisher = asyncio.Queue()
other = asyncio.Queue()
await pubsub.subscribe(publisher, "chat")
await pubsub.subscribe(other, "chat")
await pubsub.broadcast_from(publisher, "Hello everyone!", "chat")
topic, msg = await other.get()
print(topic, msg) # chat Hello everyone!
try:
await asyncio.wait_for(publisher.get(), timeout=0.1)
print("Unexpected message")
except asyncio.TimeoutError:
print("Publisher received nothing (as expected)")
asyncio.run(main())
Slow consumer - Messages are dropped
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
slow = asyncio.Queue(maxsize=1) # can hold only one message
fast = asyncio.Queue()
await pubsub.subscribe(slow, "alerts")
await pubsub.subscribe(fast, "alerts")
await pubsub.broadcast("Alert 1", "alerts")
await pubsub.broadcast("Alert 2", "alerts")
await pubsub.broadcast("Alert 3", "alerts")
# Fast consumer receives all three
topic, msg = await fast.get()
print(topic, msg) # alerts Alert 1
topic, msg = await fast.get()
print(topic, msg) # alerts Alert 2
topic, msg = await fast.get()
print(topic, msg) # alerts Alert 3
# Slow consumer receives only the first (others are dropped)
topic, msg = await slow.get()
print(topic, msg) # alerts Alert 1
try:
await asyncio.wait_for(slow.get(), timeout=0.1)
print("Unexpected second message")
except asyncio.TimeoutError:
print("Slow queue received no further messages (as expected)")
asyncio.run(main())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
phoenix_pubsub-0.4.1.tar.gz
(3.8 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file phoenix_pubsub-0.4.1.tar.gz.
File metadata
- Download URL: phoenix_pubsub-0.4.1.tar.gz
- Upload date:
- Size: 3.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.22 {"installer":{"name":"uv","version":"0.9.22","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"22.04","id":"jammy","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a77b83b1034861aee27b8b0114deec906f544739bdc6cd738b5229df2c2799a7
|
|
| MD5 |
0acb7eef2e000afe05ae75055a2e9a48
|
|
| BLAKE2b-256 |
f6d089aca207144d436973a5e7c298155f7258feeb7a51d9b2b152e909346c6c
|
File details
Details for the file phoenix_pubsub-0.4.1-py3-none-any.whl.
File metadata
- Download URL: phoenix_pubsub-0.4.1-py3-none-any.whl
- Upload date:
- Size: 5.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.22 {"installer":{"name":"uv","version":"0.9.22","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"22.04","id":"jammy","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2ce228fe43f88cafc40963e54119a6c13d0d2e643083b2ceabdf21a11012c252
|
|
| MD5 |
5db2cf43cea55b019f99beb0bfb2946c
|
|
| BLAKE2b-256 |
7845742041e2ce9009d991143f091a8bae3c428d4b68c3d49364dd428af111e3
|