Skip to main content

OpenTelemetry auto-instrumentation for nats-py

Project description

otel-instrumentation-nats

OpenTelemetry auto-instrumentation for nats-py.

Automatically creates spans for NATS publish, subscribe, request-reply, and JetStream operations with distributed trace context propagation through message headers.

Installation

pip install otel-instrumentation-nats

Or with uv:

uv add otel-instrumentation-nats

The instrumented library (nats-py) is an optional dependency. Install it separately if you haven't already:

pip install nats-py

Quick Start

import asyncio
import nats
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor

from otel_instrumentation_nats import NatsInstrumentor

async def main():
    # Set up OpenTelemetry
    provider = TracerProvider()
    provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
    trace.set_tracer_provider(provider)

    # Instrument nats-py -- call this before using the NATS client
    NatsInstrumentor().instrument()

    nc = await nats.connect("nats://localhost:4222")

    # This publish creates a PRODUCER span and injects
    # trace context into the message headers automatically
    await nc.publish("orders.new", b'{"item": "widget"}')

    await nc.close()

asyncio.run(main())

What Gets Instrumented

Method Span Kind Span Name Description
Client.publish() PRODUCER {subject} send Fire-and-forget publish
Client.subscribe() CONSUMER {subject} receive Per-message spans for both callback and next_msg() patterns
Client.request() CLIENT {subject} request Request-reply (waits for response)
JetStreamContext.publish() PRODUCER {subject} send JetStream publish with stream attributes

Span Attributes

All spans include standard OpenTelemetry messaging semantic conventions:

Attribute Value
messaging.system nats
messaging.destination.name NATS subject (e.g. orders.new)
messaging.operation publish, receive, or request
server.address NATS server hostname
server.port NATS server port

JetStream publish spans additionally include:

Attribute Value
messaging.destination.kind stream
messaging.destination.stream Stream name (when specified)

Trace Context Propagation

Trace context is automatically propagated through NATS message headers using the W3C TraceContext format. This means:

  • Publishers inject traceparent and tracestate into message headers
  • Subscribers extract trace context from incoming message headers and create child spans

This links producer and consumer spans into a single distributed trace, even across service boundaries.

Service A                          NATS                          Service B
─────────                          ────                          ─────────
publish("orders.new")  ──────►  message with  ──────►  subscribe callback
  │                             traceparent header        │
  ▼                                                       ▼
[PRODUCER span]                                    [CONSUMER span]
  trace_id: abc123                                   trace_id: abc123
  span_id:  def456                                   parent_id: def456

Usage Patterns

Publish/Subscribe with Callback

NatsInstrumentor().instrument()

nc = await nats.connect()

async def on_message(msg):
    # A CONSUMER span is active here, linked to the producer trace
    print(f"Got {msg.data} on {msg.subject}")

await nc.subscribe("events.>", cb=on_message)
await nc.publish("events.click", b"data")  # Creates a PRODUCER span

Subscribe with next_msg() (Pull Pattern)

sub = await nc.subscribe("tasks")
await nc.publish("tasks", b"do-something")

msg = await sub.next_msg(timeout=5.0)  # Creates a CONSUMER span

Request-Reply

async def handler(msg):
    await msg.respond(b"pong")

await nc.subscribe("ping", cb=handler)

# Creates a CLIENT span that encompasses the full request-reply cycle
response = await nc.request("ping", b"data", timeout=2.0)

JetStream

js = nc.jetstream()

await js.add_stream(name="ORDERS", subjects=["orders.>"])

# Creates a PRODUCER span with stream attributes
await js.publish("orders.new", b"order-data", stream="ORDERS")

# Subscribe callback receives CONSUMER spans linked to the producer
async def process_order(msg):
    await msg.ack()

await js.subscribe("orders.new", cb=process_order, manual_ack=True)

Custom Tracer Provider

from opentelemetry.sdk.trace import TracerProvider

provider = TracerProvider()
# ... configure your provider with exporters, processors, etc.

NatsInstrumentor().instrument(tracer_provider=provider)

Uninstrumenting

instrumentor = NatsInstrumentor()
instrumentor.instrument()

# ... later, to restore original behavior:
instrumentor.uninstrument()

Development

Prerequisites

  • Python 3.13+
  • uv
  • Docker (for integration tests)

Setup

git clone <repo-url>
cd otel-instrumentation-nats
uv sync --extra instruments --extra test

Running Tests

Start a NATS server (or use the included docker-compose):

docker compose up -d

Run the full test suite:

uv run pytest -v

Run only unit tests (no NATS server required):

uv run pytest tests/test_instrumentor.py tests/test_context_propagation.py -v

Run only integration tests:

uv run pytest -m integration -v

Project Structure

src/otel_instrumentation_nats/
├── __init__.py               # Public API
├── instrumentor.py           # NatsInstrumentor (patches/restores methods)
├── publish_wrapper.py        # Client.publish() -> PRODUCER spans
├── subscribe_wrapper.py      # Client.subscribe() -> CONSUMER spans
├── request_wrapper.py        # Client.request() -> CLIENT spans
├── jetstream_wrapper.py      # JetStreamContext.publish() -> PRODUCER spans
├── context_propagation.py    # Trace context inject/extract via NATS headers
├── utils.py                  # Shared span building, nested suppression
├── version.py                # Package version
└── package.py                # Instrumented library version constraints

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

otel_instrumentation_nats-0.1.2.tar.gz (39.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

otel_instrumentation_nats-0.1.2-py3-none-any.whl (12.9 kB view details)

Uploaded Python 3

File details

Details for the file otel_instrumentation_nats-0.1.2.tar.gz.

File metadata

  • Download URL: otel_instrumentation_nats-0.1.2.tar.gz
  • Upload date:
  • Size: 39.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.3 {"installer":{"name":"uv","version":"0.11.3","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for otel_instrumentation_nats-0.1.2.tar.gz
Algorithm Hash digest
SHA256 01e2f046728de9a5474709b73a7743ea03eb0b39291e4e60307f120d55177080
MD5 7cdc791e43600155b50f6b7e758b7f3c
BLAKE2b-256 a8dc868bf598a736f64728ac4039069fb791c710ca1733fe404822dc57a40dde

See more details on using hashes here.

File details

Details for the file otel_instrumentation_nats-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: otel_instrumentation_nats-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 12.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.3 {"installer":{"name":"uv","version":"0.11.3","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for otel_instrumentation_nats-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 f22e99cd67ea20112353eff3449957aecd8349fd40682f95ae9a9dec70e23c29
MD5 1f1b8d003cc3f5d9402b5315170f3be2
BLAKE2b-256 2839cbcb15cc441c5a37c3602f7825ec2d5520648c046f30d5c880647ece85a4

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page