Add your description here
Project description
nats-common
Small shared building blocks for NATS-based Python services:
Envelope[T]: a thin, typed message wrapper (metadata + payload)NatsCommonSettings: shared settings layout usingpydantic-settings(supports.env)- Jobs: Long-running, exactly-once tasks with heartbeat, cancellation, and progress reporting (
NatsJobService/NatsJobClient) - RPCs: Lightweight request/reply pattern with load balancing (
NatsRPCService/NatsRPCClient)
Envelope (message wrapper)
Envelope[T] wraps your actual payload (data) with common metadata (id, ts, correlation_id, …).
The type field is auto-filled from data.__msg_type__ (if present), otherwise from the payload class path.
Create / serialize / validate
from pydantic import BaseModel
from nats_common.envelope import Envelope
class AudioChunk(BaseModel):
__msg_type__ = "audio.chunk"
samples: list[float]
env_out: Envelope[AudioChunk] = Envelope.from_data(AudioChunk(samples=[0.1, 0.2]), id=True)
### The following is usually not needed, as FastStream handles this automatically
# Serialize for transport (JSON-compatible dict)
payload: dict[str, object] = env_out.model_dump(mode="json")
# Validate/parse back (e.g. on the subscriber side)
env_in: Envelope[AudioChunk] = Envelope[AudioChunk].model_validate(payload)
assert env_in.data is not None
print(env_in.type, env_in.data.samples)
FastStream example (publish + subscribe with Envelope)
This mirrors the pattern used in nats-microphone: define publishers/subscribers with Envelope[T], and publish an Envelope instance.
from pydantic import BaseModel
from faststream import FastStream
from faststream.nats import NatsBroker
from nats_common.envelope import Envelope
from nats_common.settings import NatsCommonSettings
class Ping(BaseModel):
__msg_type__ = "ping"
text: str
settings = NatsCommonSettings()
broker = NatsBroker(settings.url)
app = FastStream(broker)
@broker.subscriber("demo.in.ping")
async def on_ping(env: Envelope[Ping]) -> None:
assert env.data is not None
print(f"got ping: {env.data.text} (type={env.type}, id={env.id})")
@app.after_startup
async def publish_once() -> None:
await broker.publish(
Envelope.from_data(Ping(text="hello"), id=True),
subject="demo.in.ping",
)
Settings (NatsCommonSettings)
NatsCommonSettings reads configuration from environment variables and .env files.
- Prefix:
NATS_ .envfile:./.env(UTF-8)- Nested delimiter:
__(double underscore)
Basic usage
from nats_common.settings import NatsCommonSettings
settings = NatsCommonSettings()
print(settings.url)
print(settings.subject_prefix)
print(settings.device)
Example .env:
NATS_URL=nats://localhost:4222
NATS_SUBJECT_PREFIX=test.foo
NATS_DEVICE=device1
Nested settings + .env (principle)
Because NatsCommonSettings uses env_nested_delimiter="__", you can override nested configuration via env vars like:
NATS_<FIELDNAME>__<SUBFIELD>=...
Example:
from pydantic import BaseModel, Field
from nats_common.settings import NatsCommonSettings
class MicrophoneSettings(BaseModel):
sample_rate: int = 44100
class Settings(NatsCommonSettings):
microphone: MicrophoneSettings = Field(default_factory=MicrophoneSettings)
settings = Settings()
print(settings.microphone.sample_rate)
Example .env to override the nested value:
NATS_URL=nats://localhost:4222
# Override the nested value, with the nested delimiter "__" (two underscores)
NATS_MICROPHONE__SAMPLE_RATE=16000
Jobs (Long-running tasks)
NatsJobService and NatsJobClient provide a complete solution for long-running, exactly-once jobs with heartbeat, cancellation, and progress reporting.
Service-side (Job handler)
import asyncio
import nats
from pydantic import BaseModel
from nats_common.services import JobStatus, NatsJob, NatsJobService
# Define your models
class ProcessRequest(BaseModel):
file_url: str
class ProcessChunk(BaseModel):
text: str
offset: int
class ProcessResult(BaseModel):
full_text: str
# Implement your job handler
class ProcessJob(NatsJob[ProcessRequest, ProcessResult, ProcessChunk]):
async def handle(self) -> None:
req: ProcessRequest | None = self.request.data # type: ignore[assignment]
if req is None:
return
# Check for cancellation
if self.cancel_requested:
return
# Update progress
self.set_local_status(JobStatus.RUNNING, progress=0.5, message="Processing...")
# Publish intermediate results
await self.publish_intermediate_result(
ProcessChunk(text="chunk1", offset=0),
)
# Publish final result
await self.publish_final_result(
ProcessResult(full_text="complete"),
)
# Start the service
async def main():
nats_client = await nats.connect("nats://localhost:4222")
service = NatsJobService(
nats_client=nats_client,
subject="myapp.process",
job_cls=ProcessJob,
)
await service.start()
# ... keep running ...
await service.stop()
Client-side (Job submitter)
import asyncio
import nats
from pydantic import BaseModel
from nats_common.services import JobStatusUpdate, NatsJobClient
class ProcessRequest(BaseModel):
file_url: str
class ProcessChunk(BaseModel):
text: str
offset: int
class ProcessResult(BaseModel):
full_text: str
async def main():
nats_client = await nats.connect("nats://localhost:4222")
client = NatsJobClient[ProcessRequest, ProcessChunk, ProcessResult](
nats_client=nats_client,
subject="myapp.process",
)
# Submit a job
await client.submit(
job_id="job-123",
data=ProcessRequest(file_url="https://example.com/file.txt"),
)
# Option 1: Wait for completion (high-level)
completion = await client.wait_for_completion(
"job-123",
timeout_seconds=60,
on_status=lambda update: print(f"Status: {update.status.value}"),
on_intermediate=lambda chunk: print(f"Chunk: {chunk.text}"),
)
if completion.succeeded:
print(f"Result: {completion.final_result}")
# Option 2: Manual subscriptions (low-level)
async def on_status(update: JobStatusUpdate) -> None:
print(f"Status: {update.status.value} - {update.message}")
sub = await client.subscribe_status("job-123", on_status)
# ... later: await sub.unsubscribe()
RPCs (Request/Reply)
NatsRPCService and NatsRPCClient provide a lightweight request/reply pattern with load balancing across service replicas.
Service-side (RPC handler)
import asyncio
import nats
from pydantic import BaseModel
from nats_common.services import NatsRPC, NatsRPCService
# Define your models
class EchoRequest(BaseModel):
message: str
class EchoResponse(BaseModel):
echoed: str
# Implement your RPC handler
class EchoRpc(NatsRPC[EchoRequest, EchoResponse]):
async def handle(self) -> EchoResponse:
req: EchoRequest | None = self.request.data # type: ignore[assignment]
return EchoResponse(echoed=req.message if req else "")
# Start the service
async def main():
nats_client = await nats.connect("nats://localhost:4222")
service = NatsRPCService(
nats_client=nats_client,
subject="myapp.echo",
rpc_cls=EchoRpc,
request_model=EchoRequest,
response_model=EchoResponse,
)
await service.start()
# ... keep running ...
await service.stop()
Client-side (RPC caller)
import asyncio
import nats
from pydantic import BaseModel
from nats_common.services import NatsRPCClient
class EchoRequest(BaseModel):
message: str
class EchoResponse(BaseModel):
echoed: str
async def main():
nats_client = await nats.connect("nats://localhost:4222")
client = NatsRPCClient(
nats_client=nats_client,
subject="myapp.echo",
response_model=EchoResponse,
)
# Make an RPC call
resp = await client.call(data=EchoRequest(message="hello"))
if resp.error:
print(f"Error: {resp.error}")
else:
print(f"Response: {resp.data}") # EchoResponse(echoed="hello")
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file nats_common-0.1.0.tar.gz.
File metadata
- Download URL: nats_common-0.1.0.tar.gz
- Upload date:
- Size: 28.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.0 {"installer":{"name":"uv","version":"0.10.0","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
91160c0b2974826e5e6607895c2d80968c7afc6ff302ddda8c96f0af80f13330
|
|
| MD5 |
fc3a5c405ef712f4f61e3866df9f6fbf
|
|
| BLAKE2b-256 |
3971f46151d4f007432b20de1b48a8766dc9538e8ba93989da0016f362ef398f
|
File details
Details for the file nats_common-0.1.0-py3-none-any.whl.
File metadata
- Download URL: nats_common-0.1.0-py3-none-any.whl
- Upload date:
- Size: 24.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.0 {"installer":{"name":"uv","version":"0.10.0","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c835fa32f7ffb6ab02e301d5a6240cfa7587c8e3211163274a8ce311c38ef956
|
|
| MD5 |
1908114be1d63aa2e6acb8856115151b
|
|
| BLAKE2b-256 |
745e6603e8a87973ef288346f2db3bc69ab89111b029d3bb1929d493ae6ae2a5
|