Skip to main content

Add your description here

Project description

nats-common

Small shared building blocks for NATS-based Python services:

  • Envelope[T]: a thin, typed message wrapper (metadata + payload)
  • NatsCommonSettings: shared settings layout using pydantic-settings (supports .env)
  • Jobs: Long-running, exactly-once tasks with heartbeat, cancellation, and progress reporting (NatsJobService / NatsJobClient)
  • RPCs: Lightweight request/reply pattern with load balancing (NatsRPCService / NatsRPCClient)

Envelope (message wrapper)

Envelope[T] wraps your actual payload (data) with common metadata (id, ts, correlation_id, …).
The type field is auto-filled from data.__msg_type__ (if present), otherwise from the payload class path.

Create / serialize / validate

from pydantic import BaseModel

from nats_common.envelope import Envelope


class AudioChunk(BaseModel):
    __msg_type__ = "audio.chunk"
    samples: list[float]


env_out: Envelope[AudioChunk] = Envelope.from_data(AudioChunk(samples=[0.1, 0.2]), id=True)

### The following is usually not needed, as FastStream handles this automatically
# Serialize for transport (JSON-compatible dict)
payload: dict[str, object] = env_out.model_dump(mode="json")

# Validate/parse back (e.g. on the subscriber side)
env_in: Envelope[AudioChunk] = Envelope[AudioChunk].model_validate(payload)
assert env_in.data is not None
print(env_in.type, env_in.data.samples)

FastStream example (publish + subscribe with Envelope)

This mirrors the pattern used in nats-microphone: define publishers/subscribers with Envelope[T], and publish an Envelope instance.

from pydantic import BaseModel
from faststream import FastStream
from faststream.nats import NatsBroker

from nats_common.envelope import Envelope
from nats_common.settings import NatsCommonSettings


class Ping(BaseModel):
    __msg_type__ = "ping"
    text: str


settings = NatsCommonSettings()
broker = NatsBroker(settings.url)
app = FastStream(broker)


@broker.subscriber("demo.in.ping")
async def on_ping(env: Envelope[Ping]) -> None:
    assert env.data is not None
    print(f"got ping: {env.data.text} (type={env.type}, id={env.id})")


@app.after_startup
async def publish_once() -> None:
    await broker.publish(
        Envelope.from_data(Ping(text="hello"), id=True),
        subject="demo.in.ping",
    )

Settings (NatsCommonSettings)

NatsCommonSettings reads configuration from environment variables and .env files.

  • Prefix: NATS_
  • .env file: ./.env (UTF-8)
  • Nested delimiter: __ (double underscore)

Basic usage

from nats_common.settings import NatsCommonSettings

settings = NatsCommonSettings()
print(settings.url)
print(settings.subject_prefix)
print(settings.device)

Example .env:

NATS_URL=nats://localhost:4222
NATS_SUBJECT_PREFIX=test.foo
NATS_DEVICE=device1

Nested settings + .env (principle)

Because NatsCommonSettings uses env_nested_delimiter="__", you can override nested configuration via env vars like: NATS_<FIELDNAME>__<SUBFIELD>=...

Example:

from pydantic import BaseModel, Field

from nats_common.settings import NatsCommonSettings


class MicrophoneSettings(BaseModel):
    sample_rate: int = 44100


class Settings(NatsCommonSettings):
    microphone: MicrophoneSettings = Field(default_factory=MicrophoneSettings)


settings = Settings()
print(settings.microphone.sample_rate)

Example .env to override the nested value:

NATS_URL=nats://localhost:4222

# Override the nested value, with the nested delimiter "__" (two underscores)
NATS_MICROPHONE__SAMPLE_RATE=16000

Jobs (Long-running tasks)

NatsJobService and NatsJobClient provide a complete solution for long-running, exactly-once jobs with heartbeat, cancellation, and progress reporting.

Service-side (Job handler)

import asyncio
import nats
from pydantic import BaseModel

from nats_common.services import JobStatus, NatsJob, NatsJobService


# Define your models
class ProcessRequest(BaseModel):
    file_url: str


class ProcessChunk(BaseModel):
    text: str
    offset: int


class ProcessResult(BaseModel):
    full_text: str


# Implement your job handler
class ProcessJob(NatsJob[ProcessRequest, ProcessResult, ProcessChunk]):
    async def handle(self) -> None:
        req: ProcessRequest | None = self.request.data  # type: ignore[assignment]
        if req is None:
            return

        # Check for cancellation
        if self.cancel_requested:
            return

        # Update progress
        self.set_local_status(JobStatus.RUNNING, progress=0.5, message="Processing...")

        # Publish intermediate results
        await self.publish_intermediate_result(
            ProcessChunk(text="chunk1", offset=0),
        )

        # Publish final result
        await self.publish_final_result(
            ProcessResult(full_text="complete"),
        )


# Start the service
async def main():
    nats_client = await nats.connect("nats://localhost:4222")

    service = NatsJobService(
        nats_client=nats_client,
        subject="myapp.process",
        job_cls=ProcessJob,
    )

    await service.start()
    # ... keep running ...
    await service.stop()

Client-side (Job submitter)

import asyncio
import nats
from pydantic import BaseModel

from nats_common.services import JobStatusUpdate, NatsJobClient


class ProcessRequest(BaseModel):
    file_url: str


class ProcessChunk(BaseModel):
    text: str
    offset: int


class ProcessResult(BaseModel):
    full_text: str


async def main():
    nats_client = await nats.connect("nats://localhost:4222")

    client = NatsJobClient[ProcessRequest, ProcessChunk, ProcessResult](
        nats_client=nats_client,
        subject="myapp.process",
    )

    # Submit a job
    await client.submit(
        job_id="job-123",
        data=ProcessRequest(file_url="https://example.com/file.txt"),
    )

    # Option 1: Wait for completion (high-level)
    completion = await client.wait_for_completion(
        "job-123",
        timeout_seconds=60,
        on_status=lambda update: print(f"Status: {update.status.value}"),
        on_intermediate=lambda chunk: print(f"Chunk: {chunk.text}"),
    )
    if completion.succeeded:
        print(f"Result: {completion.final_result}")

    # Option 2: Manual subscriptions (low-level)
    async def on_status(update: JobStatusUpdate) -> None:
        print(f"Status: {update.status.value} - {update.message}")

    sub = await client.subscribe_status("job-123", on_status)
    # ... later: await sub.unsubscribe()

RPCs (Request/Reply)

NatsRPCService and NatsRPCClient provide a lightweight request/reply pattern with load balancing across service replicas.

Service-side (RPC handler)

import asyncio
import nats
from pydantic import BaseModel

from nats_common.services import NatsRPC, NatsRPCService


# Define your models
class EchoRequest(BaseModel):
    message: str


class EchoResponse(BaseModel):
    echoed: str


# Implement your RPC handler
class EchoRpc(NatsRPC[EchoRequest, EchoResponse]):
    async def handle(self) -> EchoResponse:
        req: EchoRequest | None = self.request.data  # type: ignore[assignment]
        return EchoResponse(echoed=req.message if req else "")


# Start the service
async def main():
    nats_client = await nats.connect("nats://localhost:4222")

    service = NatsRPCService(
        nats_client=nats_client,
        subject="myapp.echo",
        rpc_cls=EchoRpc,
        request_model=EchoRequest,
        response_model=EchoResponse,
    )

    await service.start()
    # ... keep running ...
    await service.stop()

Client-side (RPC caller)

import asyncio
import nats
from pydantic import BaseModel

from nats_common.services import NatsRPCClient


class EchoRequest(BaseModel):
    message: str


class EchoResponse(BaseModel):
    echoed: str


async def main():
    nats_client = await nats.connect("nats://localhost:4222")

    client = NatsRPCClient(
        nats_client=nats_client,
        subject="myapp.echo",
        response_model=EchoResponse,
    )

    # Make an RPC call
    resp = await client.call(data=EchoRequest(message="hello"))

    if resp.error:
        print(f"Error: {resp.error}")
    else:
        print(f"Response: {resp.data}")  # EchoResponse(echoed="hello")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

nats_common-0.1.0.tar.gz (28.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

nats_common-0.1.0-py3-none-any.whl (24.1 kB view details)

Uploaded Python 3

File details

Details for the file nats_common-0.1.0.tar.gz.

File metadata

  • Download URL: nats_common-0.1.0.tar.gz
  • Upload date:
  • Size: 28.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.0 {"installer":{"name":"uv","version":"0.10.0","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for nats_common-0.1.0.tar.gz
Algorithm Hash digest
SHA256 91160c0b2974826e5e6607895c2d80968c7afc6ff302ddda8c96f0af80f13330
MD5 fc3a5c405ef712f4f61e3866df9f6fbf
BLAKE2b-256 3971f46151d4f007432b20de1b48a8766dc9538e8ba93989da0016f362ef398f

See more details on using hashes here.

File details

Details for the file nats_common-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: nats_common-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 24.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.0 {"installer":{"name":"uv","version":"0.10.0","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for nats_common-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c835fa32f7ffb6ab02e301d5a6240cfa7587c8e3211163274a8ce311c38ef956
MD5 1908114be1d63aa2e6acb8856115151b
BLAKE2b-256 745e6603e8a87973ef288346f2db3bc69ab89111b029d3bb1929d493ae6ae2a5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page