asyncio async/await nsq support
Project description
asyncnsq
async nsq with asyncio
if you dont like the pynsq(which use tornado) way to interact with nsq, then this library may be suitable for you
you can use this library as the common way to write things
User Documents
Install
This project uses uv for environment and dependency management. The uv index
is configured in uv.toml to use Tencent's PyPI mirror.
uv sync
Usage examples
Consumer:
import asyncio
from asyncnsq import create_reader
async def main():
reader = await create_reader(
nsqd_tcp_addresses=["127.0.0.1:4150"],
max_in_flight=200,
)
await reader.subscribe("test_async_nsq", "nsq")
try:
async for message in reader.messages():
print(message.body)
await message.fin()
finally:
await reader.graceful_close()
asyncio.run(main())
Producer:
import asyncio
from asyncnsq import create_writer
async def main():
writer = await create_writer(
host="127.0.0.1",
port=4150,
heartbeat_interval=30000,
feature_negotiation=True,
tls_v1=True,
snappy=False,
deflate=False,
deflate_level=0,
)
for i in range(100):
await writer.pub("test_async_nsq", f"test_async_nsq:{i}")
await writer.dpub("test_async_nsq", i * 1000, f"delay:{i}")
writer.close()
asyncio.run(main())
Requirements
- Python 3.12+
- uv
- NSQ, tested with
nsqio/nsq:v1.3.0 - Docker, for the local integration test cluster
Release 2.0.0
2.0.0 is the Python 3.12+ modernization release. It intentionally raises the
baseline and cleans up old client behavior instead of preserving every Python
3.6-era compatibility detail.
Highlights:
- Python 3.12+ is now required. Production code and tests use modern asyncio
APIs such as
asyncio.run(),asyncio.timeout(), and task groups where they fit. - The project uses
uvfor virtual environment and dependency management, with Tencent's PyPI mirror configured inuv.toml. - HTTP support moved from
aiohttptohttpx, including lookupd/nsqd HTTP helpers and the development auth server. test_service/provides one-command Docker lifecycle scripts for a local three-node NSQ cluster:start.sh,stop.sh, benchmark, and go-nsq comparison scripts.- Reader shutdown is graceful by default through
reader.graceful_close(): it sendsRDY 0, requeues queued and unfinished in-flight messages, and then closes connections. MessageTrackernow owns the in-flight message ledger. The queue remains the default handler path, while the tracker separately records messages that have been delivered by nsqd but not yetFINed orREQed.set_message_handler()is queue-backed by default. The TCP reader task keeps reading and enqueuing messages, and handler workers process them. The previous low-overhead behavior is still available withdirect=True.- RDY redistribution and low-water updates were tuned for high-throughput
consumers while respecting nsqd
max_rdy_count. - TCP hot paths were reduced for
FIN,REQ,TOUCH,RDY, andPUB; parser buffer handling avoids unnecessary copies. - Benchmarks now cover PUB ACK throughput, MPUB throughput, end-to-end
pub-to-FIN latency, graceful close requeue recovery, Python multi-process
scale-out, and official
go-nsqcomparison. - The test suite was refreshed with unit and integration coverage for protocol, HTTP, graceful shutdown, RDY, benchmark helpers, and edge cases.
Release tag: 2.0.0
Running Tests
uv sync
./test_service/start.sh
uv run python -m pytest --cov=asyncnsq --cov-report=term-missing
./test_service/stop.sh
Without the Docker cluster, integration tests that need NSQ ports are skipped and the unit test suite still runs.
For consumers, prefer await reader.graceful_close() during shutdown. It sends
RDY 0, requeues queued and in-flight unfinished messages with REQ, then
closes the TCP connections. reader.close() remains an immediate close.
set_message_handler() uses the internal asyncio.Queue by default. The TCP
reader task only receives and enqueues messages, while a handler worker drains
the queue and FIN/REQs each message:
def handle(message):
process(message.body)
reader.set_message_handler(handle, auto_fin=True)
await reader.subscribe("topic", "channel")
CPU-bound synchronous handlers still occupy the Python event loop while they run; use multiple consumer processes when you need real multi-core processing.
For very small, fast handlers where the queue overhead matters, opt into direct
handler mode. It bypasses the queue and can FIN messages on the TCP fast path:
reader.set_message_handler(handle, auto_fin=True, direct=True)
Benchmark
The benchmark suite is designed for pull requests: it is deterministic enough for repeated local runs, fast by default, and strict enough to fail when the client loses messages, duplicates messages, times out, or fails graceful shutdown recovery.
It benchmarks the surfaces that matter for an NSQ client under high throughput:
- TCP PUB ack throughput: many concurrent
PUBcommands over persistent TCP connections, including per-message publish ACK latency. - TCP MPUB batch throughput: high-volume batch publishing, including batch ACK latency and MiB/s.
- End-to-end pub -> FIN latency: messages are published, consumed through
the reader, parsed, and
FINed; the report includes p50/p95/p99 latency. - Graceful close requeue recovery: messages are intentionally consumed but
not finished,
reader.graceful_close()sendsRDY 0andREQ, and a fresh reader must recover and finish all of them.
One-command local PR benchmark:
uv sync
./test_service/benchmark.sh --profile pr --markdown benchmark.md
Run against an already running cluster:
uv run asyncnsq-benchmark --profile pr --markdown benchmark.md
Compare against the official Go client on the same local NSQ cluster:
PROFILE=pr ./test_service/benchmark_compare_go.sh
For single-core fairness, limit the Go scheduler to one OS thread:
GO_MAX_PROCS=1 PROFILE=pr ./test_service/benchmark_compare_go.sh
This writes:
benchmark-results/asyncnsq-benchmark.mdbenchmark-results/go-nsq-benchmark.md
The Go baseline uses github.com/nsqio/go-nsq v1.1.0 and runs the comparable
PUB, MPUB, and end-to-end consume/finish scenarios. The asyncnsq benchmark
also includes the Python-specific graceful shutdown requeue proof. Go runs as
one process, but goroutines can execute on multiple cores unless GOMAXPROCS
is limited; the Go report records the effective GOMAXPROCS value.
To measure Python scale-out across CPU cores, run the asyncnsq benchmark with multiple consumer processes. Each process owns its own asyncio loop and joins the same NSQ topic/channel, so NSQ distributes messages across them:
PROFILE=pr ./test_service/benchmark_compare_go.sh --consumer-processes 4
Latest local results
These numbers were measured for the 2.0.0 release on 2026-05-15 with Python
3.12.4, NSQ nsqio/nsq:v1.3.0, a local Docker three-node nsqd cluster, 10,000
messages, 512 B payloads, concurrency 256, max_in_flight=1024, and
output_buffer_timeout=25ms. Treat them as a same-machine reference, not a
portable guarantee.
Current asyncnsq PR benchmark:
| Scenario | Messages | msg/s | MiB/s | p50 ms | p95 ms | p99 ms | Errors | Notes |
|---|---|---|---|---|---|---|---|---|
| TCP PUB ack | 10,000 | 82,918.92 | 40.49 | 2.859 | 3.633 | 11.704 | 0 | per-message publish ACK latency |
| TCP MPUB batch ack | 10,000 | 774,507.49 | 378.18 | 6.451 | 9.219 | 9.582 | 0 | ACK latency measured per MPUB batch |
| end-to-end pub->fin | 10,000 | 36,381.65 | 17.76 | 4.912 | 10.683 | 11.912 | 0 | missing=0, duplicates=0, fin_errors=0 |
| graceful close requeue | 512 | 13,096.60 | 6.39 | 14.580 | 16.847 | 16.947 | 0 | requeued=512, recovered=512 |
Handler dispatch comparison after making queue-backed handlers the default:
| Consumer mode | Messages | msg/s | p95 ms | Errors | Notes |
|---|---|---|---|---|---|
direct=True handler |
10,000 | 36,469.93 | 10.000 | 0 | TCP reader calls the handler directly |
| default queue-backed handler | 10,000 | 36,310.22 | 12.778 | 0 | TCP reader enqueues; handler worker drains the queue |
The queue-backed path kept throughput within about 0.5% of the direct handler
path in this run. It is now the default because it keeps socket reading and user
handler execution decoupled while MessageTracker separately tracks unfinished
in-flight messages for safe FIN/REQ and graceful shutdown.
asyncnsq vs official go-nsq, default Go scheduler:
| Scenario | asyncnsq msg/s | asyncnsq p95 ms | go-nsq msg/s | go-nsq p95 ms | Errors |
|---|---|---|---|---|---|
| TCP PUB ack | 82,793.09 | 3.373 | 191,773.18 | 1.914 | 0 / 0 |
| TCP MPUB batch ack | 601,022.00 | 15.343 | 645,822.53 | 12.322 | 0 / 0 |
| end-to-end pub->fin | 36,329.07 | 10.256 | 134,190.70 | 3.258 | 0 / 0 |
In that run, Go reported GOMAXPROCS=16. This is the normal production shape
for the Go client, but it is not a single-core comparison against one Python
asyncio process.
asyncnsq vs official go-nsq with Go limited to one OS thread:
| Scenario | asyncnsq msg/s | asyncnsq p95 ms | go-nsq msg/s | go-nsq p95 ms | Errors |
|---|---|---|---|---|---|
| TCP PUB ack | 79,446.79 | 3.946 | 90,948.43 | 3.731 | 0 / 0 |
| TCP MPUB batch ack | 589,566.88 | 15.812 | 531,962.03 | 15.317 | 0 / 0 |
| end-to-end pub->fin | 36,390.14 | 9.908 | 50,251.00 | 89.039 | 0 / 0 |
The single-thread Go run narrows the throughput gap substantially. In this
sample, go-nsq still had higher end-to-end throughput, while asyncnsq had much
lower p95 end-to-end latency. For multi-core Python consumption, use
--consumer-processes so each process has its own event loop and NSQ can
distribute messages across processes.
Profiles:
| Profile | Use case | Messages | Payload | Concurrency | Batch |
|---|---|---|---|---|---|
quick |
fast smoke benchmark | 5,000 | 256 B | 64 | 100 |
pr |
recommended PR evidence | 50,000 | 512 B | 256 | 250 |
stress |
local high-throughput soak | 250,000 | 1,024 B | 512 | 500 |
Useful overrides:
uv run asyncnsq-benchmark \
--profile pr \
--messages 100000 \
--payload-size 1024 \
--concurrency 512 \
--batch-size 500 \
--markdown benchmark.md \
--json benchmark.json
The generated markdown is PR-ready:
| Scenario | Messages | Payload | Batch | Concurrency | Duration | msg/s | MiB/s | p50 ms | p95 ms | p99 ms | Errors | Notes |
| --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | --- |
| TCP PUB ack | ... | ... | n/a | ... | ... | ... | ... | ... | ... | ... | 0 | per-message publish ACK latency |
| TCP MPUB batch ack | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | 0 | ACK latency is measured per MPUB batch |
| end-to-end pub->fin | ... | ... | n/a | ... | ... | ... | ... | ... | ... | ... | 0 | missing=0, duplicates=0, fin_errors=0 |
| graceful close requeue | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | 0 | requeued=..., recovered=..., missing=0, unexpected=0 |
For a PR, attach benchmark.md and keep every row at Errors = 0. Compare
throughput and latency only between runs on the same machine and Docker setup.
For Go comparisons, attach both files from benchmark-results/ and compare
only the matching PUB, MPUB, and end-to-end rows.
License
The asyncnsq is offered under MIT license.
Donation
If you like this repo, buy me a coffee.
ETH wallet
Or you can participate with this project.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file asyncnsq-2.0.0.tar.gz.
File metadata
- Download URL: asyncnsq-2.0.0.tar.gz
- Upload date:
- Size: 54.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.6.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8b928c632c8108b22978eac850c917e6bf2efae1ea84d734897fd2efabde587f
|
|
| MD5 |
fcd4e79e6370abecd9acdcf94d15f574
|
|
| BLAKE2b-256 |
893ff6822181dacbfef1815dfb970f7c49f2850266c794dc4399f43c41881027
|
File details
Details for the file asyncnsq-2.0.0-py3-none-any.whl.
File metadata
- Download URL: asyncnsq-2.0.0-py3-none-any.whl
- Upload date:
- Size: 39.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.6.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7135b39545971ba34e8a7a9b81efdcd243c9e07d592168abb1917566ed9efaf2
|
|
| MD5 |
d2eeaad79a3235461c0e3b345ad58551
|
|
| BLAKE2b-256 |
205e09a547435452b3890716febf5beb169500e0afb92a242ec0c6533c49ff30
|