NATS JetStream connectivity for Bluesky
Project description
bluesky-nats
bluesky-nats connects Bluesky document streams to NATS JetStream.
It supports two primary workflows:
- publish Bluesky documents from a
RunEngineto JetStream subjects - consume JetStream subjects and dispatch documents to Bluesky callbacks
Who this is for
Use this package if you want to move Bluesky documents across processes or hosts using NATS JetStream while preserving document semantics.
Installation
Install from PyPI:
uv add bluesky-nats
Install with optional dependencies used by the examples:
uv add bluesky-nats --extra examples
Python support: >=3.11,<3.15
Prerequisites
You need a NATS server with JetStream enabled and a stream configured for your
subject pattern (for example events.>).
By default, this package uses stream name bluesky.
For local setup, use the provided Docker assets:
docker/compose.yamldocker/Readme.adoc(includes stream creation commands)
Quick start (local)
-
Start NATS JetStream (from this repository root):
cd docker docker compose up -d
-
Ensure a JetStream named
blueskyexists and is bound toevents.>. Seedocker/Readme.adocfor exact commands. -
In a separate shell from repository root, run examples:
uv run python examples/print_subscriber.py uv run python examples/publisher.py
-
You should see published documents printed by the subscriber.
Minimal publisher example
[!WARNING] Breaking change:
CoroutineExecutorno longer accepts aloopargument. It now always owns a dedicated background thread and event loop for NATS coroutines. Update old code fromCoroutineExecutor(RE.loop)toCoroutineExecutor().
Attach NATSPublisher to a RunEngine and publish documents:
from bluesky.run_engine import RunEngine
from bluesky_nats.nats_client import NATSClientConfig
from bluesky_nats.nats_publisher import CoroutineExecutor, NATSPublisher
if __name__ == "__main__":
RE = RunEngine({})
config = NATSClientConfig(servers=["nats://localhost:4222"])
executor = CoroutineExecutor()
nats_publisher = NATSPublisher(
client_config=config,
executor=executor,
subject_factory="events.nats-bluesky",
strict_publish=True,
)
if not nats_publisher.ensure_connection(timeout=10):
raise RuntimeError("NATS connection is required before starting plans")
RE.subscribe(nats_publisher)
# Optional: expose current publisher status snapshot
print(nats_publisher.health)
# Optional: register lifecycle cleanup (useful for interactive sessions)
import atexit
atexit.register(
nats_publisher.shutdown_callback(timeout=10, shutdown_executor=True)
)
Minimal dispatcher/consumer example
Consume subjects and forward documents to a callback:
import asyncio
from bluesky_nats.nats_dispatcher import NATSDispatcher
async def main() -> None:
async with NATSDispatcher(subject="events.>") as dispatcher:
dispatcher.subscribe(print)
await asyncio.sleep(60)
if __name__ == "__main__":
asyncio.run(main())
Configuration
- Client connectivity is configured through
NATSClientConfig. - Configuration can also be built from JSON/YAML/TOML via
NATSClientConfigBuilder.from_file(...). - Publisher subjects are derived as
<subject_factory>.<document_name>. - Publisher does not pick a stream explicitly; the server maps subjects to streams, while JetStream publish acknowledgements confirm server receipt.
strict_publish=Trueenables fail-fast behavior: async publish/connect failures are latched and raised on subsequent callback calls.publisher.healthreturns aPublisherHealthsnapshot (connectivity, pending publishes, last error/ack, and last subject).publisher.shutdown_callback(...)returns a zero-arg callable suitable foratexit.register(...).
Strict mode
Use strict mode when message delivery is mandatory for your architecture.
- Set
strict_publish=TrueonNATSPublisher. - Call
ensure_connection(...)before running plans to gate execution. - If async publish/connect fails later, strict mode raises
RuntimeErrorfrom the callback path so the RunEngine can fail fast.
See the examples/ directory for complete runnable scripts.
Troubleshooting
NoStreamResponseError: the target stream does not exist or does not match your subject pattern.- Subscriber receives nothing: verify publisher and dispatcher subjects are
compatible (for example
events.nats-blueskyvsevents.>). - Connection failures: check server URL/port and whether JetStream is enabled.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file bluesky_nats-1.0.0.tar.gz.
File metadata
- Download URL: bluesky_nats-1.0.0.tar.gz
- Upload date:
- Size: 150.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
47377196e23826356b3416cc30d96c279953bc6a1a6e1418bc17f1ae40b92fcd
|
|
| MD5 |
b5c551ac300b98ca7c93bd1ce7ba00c3
|
|
| BLAKE2b-256 |
4681ebfd4ec4592037d0be476f8c9db0f2957c546f09662d1b026bcc4ee4a957
|
File details
Details for the file bluesky_nats-1.0.0-py3-none-any.whl.
File metadata
- Download URL: bluesky_nats-1.0.0-py3-none-any.whl
- Upload date:
- Size: 37.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ebe81f1f4a8911499d5f7f25fd2ad3e57cdce4f710be6eaf61c2202fb0df5528
|
|
| MD5 |
9c3a7961198a6065210e36f4a82bd0bf
|
|
| BLAKE2b-256 |
1800736d85da175812fd4bf362acacda3aaade312e7f75029864aeecc9e9fe6c
|