OpenTelemetry auto-instrumentation for nats-py
Project description
otel-instrumentation-nats
OpenTelemetry auto-instrumentation for nats-py.
Automatically creates spans for NATS publish, subscribe, request-reply, and JetStream operations with distributed trace context propagation through message headers.
Installation
pip install otel-instrumentation-nats
Or with uv:
uv add otel-instrumentation-nats
The instrumented library (nats-py) is an optional dependency. Install it separately if you haven't already:
pip install nats-py
Quick Start
import asyncio
import nats
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
from otel_instrumentation_nats import NatsInstrumentor
async def main():
# Set up OpenTelemetry
provider = TracerProvider()
provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
trace.set_tracer_provider(provider)
# Instrument nats-py -- call this before using the NATS client
NatsInstrumentor().instrument()
nc = await nats.connect("nats://localhost:4222")
# This publish creates a PRODUCER span and injects
# trace context into the message headers automatically
await nc.publish("orders.new", b'{"item": "widget"}')
await nc.close()
asyncio.run(main())
What Gets Instrumented
| Method | Span Kind | Span Name | Description |
|---|---|---|---|
Client.publish() |
PRODUCER |
{subject} send |
Fire-and-forget publish |
Client.subscribe() |
CONSUMER |
{subject} receive |
Per-message spans for both callback and next_msg() patterns |
Client.request() |
CLIENT |
{subject} request |
Request-reply (waits for response) |
JetStreamContext.publish() |
PRODUCER |
{subject} send |
JetStream publish with stream attributes |
Span Attributes
All spans include standard OpenTelemetry messaging semantic conventions:
| Attribute | Value |
|---|---|
messaging.system |
nats |
messaging.destination.name |
NATS subject (e.g. orders.new) |
messaging.operation |
publish, receive, or request |
server.address |
NATS server hostname |
server.port |
NATS server port |
JetStream publish spans additionally include:
| Attribute | Value |
|---|---|
messaging.destination.kind |
stream |
messaging.destination.stream |
Stream name (when specified) |
Trace Context Propagation
Trace context is automatically propagated through NATS message headers using the W3C TraceContext format. This means:
- Publishers inject
traceparentandtracestateinto message headers - Subscribers extract trace context from incoming message headers and create child spans
This links producer and consumer spans into a single distributed trace, even across service boundaries.
Service A NATS Service B
───────── ──── ─────────
publish("orders.new") ──────► message with ──────► subscribe callback
│ traceparent header │
▼ ▼
[PRODUCER span] [CONSUMER span]
trace_id: abc123 trace_id: abc123
span_id: def456 parent_id: def456
Usage Patterns
Publish/Subscribe with Callback
NatsInstrumentor().instrument()
nc = await nats.connect()
async def on_message(msg):
# A CONSUMER span is active here, linked to the producer trace
print(f"Got {msg.data} on {msg.subject}")
await nc.subscribe("events.>", cb=on_message)
await nc.publish("events.click", b"data") # Creates a PRODUCER span
Subscribe with next_msg() (Pull Pattern)
sub = await nc.subscribe("tasks")
await nc.publish("tasks", b"do-something")
msg = await sub.next_msg(timeout=5.0) # Creates a CONSUMER span
Request-Reply
async def handler(msg):
await msg.respond(b"pong")
await nc.subscribe("ping", cb=handler)
# Creates a CLIENT span that encompasses the full request-reply cycle
response = await nc.request("ping", b"data", timeout=2.0)
JetStream
js = nc.jetstream()
await js.add_stream(name="ORDERS", subjects=["orders.>"])
# Creates a PRODUCER span with stream attributes
await js.publish("orders.new", b"order-data", stream="ORDERS")
# Subscribe callback receives CONSUMER spans linked to the producer
async def process_order(msg):
await msg.ack()
await js.subscribe("orders.new", cb=process_order, manual_ack=True)
Custom Tracer Provider
from opentelemetry.sdk.trace import TracerProvider
provider = TracerProvider()
# ... configure your provider with exporters, processors, etc.
NatsInstrumentor().instrument(tracer_provider=provider)
Uninstrumenting
instrumentor = NatsInstrumentor()
instrumentor.instrument()
# ... later, to restore original behavior:
instrumentor.uninstrument()
Development
Prerequisites
- Python 3.13+
- uv
- Docker (for integration tests)
Setup
git clone <repo-url>
cd otel-instrumentation-nats
uv sync --extra instruments --extra test
Running Tests
Start a NATS server (or use the included docker-compose):
docker compose up -d
Run the full test suite:
uv run pytest -v
Run only unit tests (no NATS server required):
uv run pytest tests/test_instrumentor.py tests/test_context_propagation.py -v
Run only integration tests:
uv run pytest -m integration -v
Project Structure
src/otel_instrumentation_nats/
├── __init__.py # Public API
├── instrumentor.py # NatsInstrumentor (patches/restores methods)
├── publish_wrapper.py # Client.publish() -> PRODUCER spans
├── subscribe_wrapper.py # Client.subscribe() -> CONSUMER spans
├── request_wrapper.py # Client.request() -> CLIENT spans
├── jetstream_wrapper.py # JetStreamContext.publish() -> PRODUCER spans
├── context_propagation.py # Trace context inject/extract via NATS headers
├── utils.py # Shared span building, nested suppression
├── version.py # Package version
└── package.py # Instrumented library version constraints
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file otel_instrumentation_nats-0.1.5.tar.gz.
File metadata
- Download URL: otel_instrumentation_nats-0.1.5.tar.gz
- Upload date:
- Size: 39.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.3 {"installer":{"name":"uv","version":"0.11.3","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
73d75d82470774856cbb104ca0b202fac5e9a38967507c0cb7b91b93ebc335af
|
|
| MD5 |
e3f1a374af03fb56fab95a02de61b42f
|
|
| BLAKE2b-256 |
f4cc72602b848dd6d736b98c371b731ab3a5119daa4de242cfefbf28d7120a09
|
File details
Details for the file otel_instrumentation_nats-0.1.5-py3-none-any.whl.
File metadata
- Download URL: otel_instrumentation_nats-0.1.5-py3-none-any.whl
- Upload date:
- Size: 12.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.3 {"installer":{"name":"uv","version":"0.11.3","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
59e44f229e30cffab9ef789d45580e750420f5931bc1be3452e16f140c40f85b
|
|
| MD5 |
8425513216e3d99a9716cbffafddf507
|
|
| BLAKE2b-256 |
9dd26d5ed639b911a676fccb2b8440786723344da2017fed424b4d02c9c50af8
|