A topic-based publish-subscribe system for asyncio applications, inspired by Phoenix's PubSub
Project description
phoenix-pubsub
A topic-based publish‑subscribe system for asyncio applications, inspired by the Phoenix PubSub library from the Elixir Phoenix framework.
Features
- Subscribe to one or more topics
- Broadcast messages to all subscribers of a topic
- Broadcast messages while excluding the publisher itself
- Graceful handling of slow consumers (messages are dropped when a subscriber’s queue is full)
Installation
pip install phoenix-pubsub
uv add phoenix-pubsub
Examples
Basic: Subscribe, Broadcast, Unsubscribe
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
queue = asyncio.Queue()
# Subscribe to a topic
await pubsub.subscribe(queue, "alerts")
# Broadcast a message
await pubsub.broadcast("System alert!", "alerts")
# Receive the message
topic, msg = await queue.get()
print(topic, msg) # alerts System alert!
# Unsubscribe from the topic
await pubsub.unsubscribe(queue, "alerts")
# Broadcast another message – this one should not be received
await pubsub.broadcast("Another alert", "alerts")
try:
await asyncio.wait_for(queue.get(), timeout=0.1)
print("Unexpected message received after unsubscribe")
except asyncio.TimeoutError:
print("No message received after unsubscribe (as expected)")
asyncio.run(main())
Subscribing to multiple topics
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
queue = asyncio.Queue()
await pubsub.subscribe(queue, "alerts", "news", "sports")
await pubsub.broadcast("Earthquake!", "alerts")
await pubsub.broadcast("Score update", "sports")
topic, msg = await queue.get()
print(topic, msg) # alerts Earthquake!
topic, msg = await queue.get()
print(topic, msg) # sports Score update
asyncio.run(main())
Broadcasting to multiple topics
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
queue_news = asyncio.Queue()
queue_sports = asyncio.Queue()
await pubsub.subscribe(queue_news, "news")
await pubsub.subscribe(queue_sports, "sports")
await pubsub.broadcast("Breaking news!", "news", "sports")
topic, msg = await queue_news.get()
print(topic, msg) # news Breaking news!
topic, msg = await queue_sports.get()
print(topic, msg) # sports Breaking news!
asyncio.run(main())
Excluding the publisher from broadcast
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
publisher = asyncio.Queue()
other = asyncio.Queue()
await pubsub.subscribe(publisher, "chat")
await pubsub.subscribe(other, "chat")
await pubsub.broadcast_from(publisher, "Hello everyone!", "chat")
topic, msg = await other.get()
print(topic, msg) # chat Hello everyone!
try:
await asyncio.wait_for(publisher.get(), timeout=0.1)
print("Unexpected message")
except asyncio.TimeoutError:
print("Publisher received nothing (as expected)")
asyncio.run(main())
Slow consumer - Messages are dropped
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
slow = asyncio.Queue(maxsize=1) # can hold only one message
fast = asyncio.Queue()
await pubsub.subscribe(slow, "alerts")
await pubsub.subscribe(fast, "alerts")
await pubsub.broadcast("Alert 1", "alerts")
await pubsub.broadcast("Alert 2", "alerts")
await pubsub.broadcast("Alert 3", "alerts")
# Fast consumer receives all three
topic, msg = await fast.get()
print(topic, msg) # alerts Alert 1
topic, msg = await fast.get()
print(topic, msg) # alerts Alert 2
topic, msg = await fast.get()
print(topic, msg) # alerts Alert 3
# Slow consumer receives only the first (others are dropped)
topic, msg = await slow.get()
print(topic, msg) # alerts Alert 1
try:
await asyncio.wait_for(slow.get(), timeout=0.1)
print("Unexpected second message")
except asyncio.TimeoutError:
print("Slow queue received no further messages (as expected)")
asyncio.run(main())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
phoenix_pubsub-0.4.3.tar.gz
(3.8 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file phoenix_pubsub-0.4.3.tar.gz.
File metadata
- Download URL: phoenix_pubsub-0.4.3.tar.gz
- Upload date:
- Size: 3.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.22 {"installer":{"name":"uv","version":"0.9.22","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"22.04","id":"jammy","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e03112c9aac3c051964eb647f9efdce2c740bf2c033a90f8a1d5293c6ba3e25f
|
|
| MD5 |
043ab171e33718cd1bd52a42752c3587
|
|
| BLAKE2b-256 |
2e47aace2dc68de056f9a770c6533d8f3d05a029ab9df4cc7f1d0dc065ba7ea2
|
File details
Details for the file phoenix_pubsub-0.4.3-py3-none-any.whl.
File metadata
- Download URL: phoenix_pubsub-0.4.3-py3-none-any.whl
- Upload date:
- Size: 5.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.22 {"installer":{"name":"uv","version":"0.9.22","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"22.04","id":"jammy","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a0a730adcbfecb1136d55797536b1b14382547d804553e3865f2a435e9a0b4db
|
|
| MD5 |
2c544e91e043307f085b2d31fb0777e5
|
|
| BLAKE2b-256 |
e4d6970caa6545b3991cb8fdd793a2f1ef7b288963ca6752790f1b9d238ad396
|