A topic-based publish-subscribe system for asyncio applications, inspired by Phoenix's PubSub
Project description
phoenix-pubsub
A topic-based publish‑subscribe system for asyncio applications, inspired by the Phoenix PubSub library from the Elixir Phoenix framework.
Features
- Subscribe to one or more topics
- Broadcast messages to all subscribers of a topic
- Broadcast messages while excluding the publisher itself
- Graceful handling of slow consumers (messages are dropped when a subscriber’s queue is full)
Installation
pip install phoenix-pubsub
uv add phoenix-pubsub
Examples
Basic: Subscribe, Broadcast, Unsubscribe
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
queue = asyncio.Queue()
# Subscribe to a topic
await pubsub.subscribe(queue, "alerts")
# Broadcast a message
await pubsub.broadcast("System alert!", "alerts")
# Receive the message
topic, msg = await queue.get()
print(topic, msg) # alerts System alert!
# Unsubscribe from the topic
await pubsub.unsubscribe(queue, "alerts")
# Broadcast another message – this one should not be received
await pubsub.broadcast("Another alert", "alerts")
try:
await asyncio.wait_for(queue.get(), timeout=0.1)
print("Unexpected message received after unsubscribe")
except asyncio.TimeoutError:
print("No message received after unsubscribe (as expected)")
asyncio.run(main())
Subscribing to multiple topics
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
queue = asyncio.Queue()
await pubsub.subscribe(queue, "alerts", "news", "sports")
await pubsub.broadcast("Earthquake!", "alerts")
await pubsub.broadcast("Score update", "sports")
# Order of reception may vary
topic, msg = await queue.get()
print(topic, msg) # alerts Earthquake! (or sports Score update)
topic, msg = await queue.get()
print(topic, msg) # sports Score update (or alerts Earthquake!)
asyncio.run(main())
Broadcasting to multiple topics
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
queue_news = asyncio.Queue()
queue_sports = asyncio.Queue()
await pubsub.subscribe(queue_news, "news")
await pubsub.subscribe(queue_sports, "sports")
await pubsub.broadcast("Breaking news!", "news", "sports")
topic, msg = await queue_news.get()
print(topic, msg) # news Breaking news!
topic, msg = await queue_sports.get()
print(topic, msg) # sports Breaking news!
asyncio.run(main())
Excluding the publisher from broadcast
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
publisher = asyncio.Queue()
other = asyncio.Queue()
await pubsub.subscribe(publisher, "chat")
await pubsub.subscribe(other, "chat")
await pubsub.broadcast_from(publisher, "Hello everyone!", "chat")
topic, msg = await other.get()
print(topic, msg) # chat Hello everyone!
try:
await asyncio.wait_for(publisher.get(), timeout=0.1)
print("Unexpected message")
except asyncio.TimeoutError:
print("Publisher received nothing (as expected)")
asyncio.run(main())
Slow consumer - Messages are dropped
import asyncio
from phoenix_pubsub import PubSub
async def main():
pubsub = PubSub()
slow = asyncio.Queue(maxsize=1) # can hold only one message
fast = asyncio.Queue()
await pubsub.subscribe(slow, "alerts")
await pubsub.subscribe(fast, "alerts")
await pubsub.broadcast("Alert 1", "alerts")
await pubsub.broadcast("Alert 2", "alerts")
await pubsub.broadcast("Alert 3", "alerts")
# Fast consumer receives all three
topic, msg = await fast.get()
print(topic, msg) # alerts Alert 1
topic, msg = await fast.get()
print(topic, msg) # alerts Alert 2
topic, msg = await fast.get()
print(topic, msg) # alerts Alert 3
# Slow consumer receives only the first (others are dropped)
topic, msg = await slow.get()
print(topic, msg) # alerts Alert 1
try:
await asyncio.wait_for(slow.get(), timeout=0.1)
print("Unexpected second message")
except asyncio.TimeoutError:
print("Slow queue received no further messages (as expected)")
asyncio.run(main())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
phoenix_pubsub-0.2.0.tar.gz
(3.7 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file phoenix_pubsub-0.2.0.tar.gz.
File metadata
- Download URL: phoenix_pubsub-0.2.0.tar.gz
- Upload date:
- Size: 3.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.22 {"installer":{"name":"uv","version":"0.9.22","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"22.04","id":"jammy","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5978c6df2a8600856092a4aacbaab272536b043ad49edefa8e0aeb70b4760ffe
|
|
| MD5 |
9c8723c97e2ef2fa0563d8e5957c3f6d
|
|
| BLAKE2b-256 |
90bc385f8f2199aecd34580461ca52fa54f67531091a556a531f251199c7ea65
|
File details
Details for the file phoenix_pubsub-0.2.0-py3-none-any.whl.
File metadata
- Download URL: phoenix_pubsub-0.2.0-py3-none-any.whl
- Upload date:
- Size: 5.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.22 {"installer":{"name":"uv","version":"0.9.22","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"22.04","id":"jammy","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a912aca74e0eb2d2e4fafa60e75329986af6b19e6ef2c30c652be638e2cb38c0
|
|
| MD5 |
2380b63c811df12a9d48a88ece9276c1
|
|
| BLAKE2b-256 |
c90a4339c0c968d6ebc01ec7430d112a572dd4fb9f08c511d2f45dd48ee6cc7a
|