Python client for the Lance Wire Protocol (LWP) — high-performance, low-latency data streaming
Project description
lnc-client
Python client for the Lance Wire Protocol (LWP) — high-performance, low-latency data streaming.
Lance is an io_uring-based streaming server designed to saturate 100G NICs with minimal latency. This client implements the full LWP binary protocol for Python applications with API semantics matching the official Rust client.
Installation
pip install lnc-client
Requirements: Python 3.10+
Topic Management
Topics are created by name and referenced by numeric ID in all subsequent operations:
import asyncio
from lnc_client import LanceClient, ClientConfig
async def main():
async with LanceClient(ClientConfig(host="127.0.0.1")) as client:
# Create a topic — returns {"id": 1, "name": "my-events", ...}
topic = await client.create_topic("my-events")
topic_id = topic["id"]
# Create with retention policy (7-day TTL, 1 GB max)
topic = await client.create_topic_with_retention(
"logs", max_age_secs=7 * 86400, max_bytes=1024**3,
)
# List all topics
topics = await client.list_topics()
# Get topic metadata by ID
info = await client.get_topic(topic_id)
# Update retention policy
await client.set_retention(topic_id, max_age_secs=86400)
# Delete a topic
await client.delete_topic(topic_id)
# Latency measurement
rtt = await client.ping()
print(f"Round-trip: {rtt * 1000:.2f}ms")
asyncio.run(main())
Producer
import asyncio
from lnc_client import Producer, ProducerConfig
async def main():
config = (
ProducerConfig()
.with_batch_size(64 * 1024)
.with_linger_ms(10)
.with_compression(True)
)
producer = await Producer.connect("127.0.0.1:1992", config)
# Send with ACK (guaranteed delivery) — returns batch_id
batch_id = await producer.send(topic_id=1, data=b'{"price": 6942.25}')
# Send without waiting for ACK (pipelined, higher throughput)
batch_id = await producer.send_async(topic_id=1, data=b"fire and forget")
# Send multiple records as a single batch
from lnc_client import TlvRecord
records = [TlvRecord.json(b'{"a":1}'), TlvRecord.json(b'{"b":2}')]
await producer.send_batch(topic_id=1, records=records)
# Wait for all pending ACKs
await producer.flush()
await producer.close()
asyncio.run(main())
Consumer (Standalone)
For independent consumption with client-managed offsets:
import asyncio
from lnc_client import StandaloneConsumer, StandaloneConfig, SeekPosition
from pathlib import Path
async def main():
config = (
StandaloneConfig("my-consumer", topic_id=1)
.with_start_position(SeekPosition.BEGINNING)
.with_offset_dir(Path("/var/lib/lance/offsets"))
.with_max_fetch_bytes(1_048_576)
)
consumer = await StandaloneConsumer.connect("127.0.0.1:1992", config)
while True:
result = await consumer.poll()
if result is None:
await asyncio.sleep(0.05)
continue
for record in result.records:
print(f"Type={record.record_type}, Data={record.value}")
print(f"Offset: {result.end_offset}, Lag: {result.lag} bytes")
await consumer.commit()
asyncio.run(main())
Seek Operations
consumer.seek(42000) # Absolute byte offset
consumer.seek_to(SeekPosition.BEGINNING) # Start of stream
consumer.seek_to(SeekPosition.END) # Tail — only new data
consumer.seek_to(SeekPosition.offset(500)) # Specific offset
consumer.rewind() # Alias for seek(0)
Low-Level Client
LanceClient provides direct access to all protocol operations on a single
TCP connection. Producer and StandaloneConsumer are higher-level
abstractions built on top of it.
async with LanceClient(ClientConfig(host="127.0.0.1")) as client:
# Subscribe for server-side streaming
await client.subscribe(topic_id=1, start_offset=0, max_batch_bytes=65536, consumer_id=42)
await client.commit_offset(topic_id=1, consumer_id=42, offset=1024)
await client.unsubscribe(topic_id=1, consumer_id=42)
Offset Persistence
Consumers can persist offsets to disk for crash recovery:
from lnc_client import FileOffsetStore, MemoryOffsetStore
# File-based (survives restarts) — auto-created when offset_dir is set
config = StandaloneConfig("my-consumer", topic_id=1).with_offset_dir("/var/lib/lance/offsets")
# Or provide a store explicitly
store = FileOffsetStore("/var/lib/lance/offsets")
consumer = await StandaloneConsumer.connect("127.0.0.1:1992", config, offset_store=store)
# In-memory (testing)
store = MemoryOffsetStore()
Configuration
ProducerConfig
| Option | Default | Builder Method | Description |
|---|---|---|---|
batch_size |
32768 |
with_batch_size() |
Max batch size in bytes |
linger_ms |
5 |
with_linger_ms() |
Max wait before sending partial batch |
compression |
False |
with_compression() |
Enable LZ4 compression |
max_pending_acks |
64 |
with_max_pending_acks() |
Max unacknowledged batches |
connect_timeout_s |
10.0 |
with_connect_timeout() |
Connection timeout |
request_timeout_s |
30.0 |
with_request_timeout() |
Per-request timeout |
ssl_context |
None |
with_ssl() |
TLS context |
auto_reconnect |
True |
with_auto_reconnect() |
Auto-reconnect on failure |
StandaloneConfig
| Option | Default | Builder Method | Description |
|---|---|---|---|
consumer_name |
"" |
(positional) | Consumer identifier |
topic_id |
0 |
(positional) | Topic to consume from |
max_fetch_bytes |
1048576 |
with_max_fetch_bytes() |
Max bytes per fetch |
start_position |
BEGINNING |
with_start_position() |
Initial seek position |
offset_dir |
None |
with_offset_dir() |
Directory for persistent offsets |
auto_commit_interval_s |
5.0 |
with_auto_commit_interval() |
Auto-commit interval |
poll_timeout_s |
0.1 |
with_poll_timeout() |
Poll response timeout |
ssl_context |
None |
with_ssl() |
TLS context |
ClientConfig
| Option | Default | Builder Method | Description |
|---|---|---|---|
host |
"127.0.0.1" |
with_host() |
Lance server hostname |
port |
1992 |
with_port() |
Lance server port |
connect_timeout_s |
10.0 |
with_connect_timeout() |
Connection timeout |
request_timeout_s |
30.0 |
— | Per-request timeout |
ssl_context |
None |
with_ssl() |
TLS context |
TLV Record Types
Records use Type-Length-Value encoding:
| Type | Code | Description |
|---|---|---|
RawData |
0x01 |
Unstructured binary |
JSON |
0x02 |
JSON-encoded record |
MessagePack |
0x03 |
MessagePack-encoded |
KeyValue |
0x10 |
Key-value pair |
Timestamped |
0x11 |
Timestamp + data |
Null |
0xFF |
Tombstone/empty |
from lnc_client import TlvRecord
rec = TlvRecord.raw(b"binary data")
rec = TlvRecord.json(b'{"key": "value"}')
rec = TlvRecord.key_value("my-key", b"my-value")
rec = TlvRecord.timestamped(1706918400_000_000_000, b"event data")
rec = TlvRecord.null()
# Accessors for structured types
key, value = rec.as_key_value()
timestamp_ns, data = rec.as_timestamped()
Error Handling
All operations return typed exceptions with is_retryable() support:
from lnc_client import (
LanceError,
ConnectionError,
BackpressureError,
TopicNotFoundError,
NotLeaderError,
ServerCatchingUpError,
AccessDeniedError,
InvalidFrameError,
)
try:
await producer.send(topic_id=99, data=b"test")
except NotLeaderError as e:
print(f"Redirect to leader: {e.leader_addr}")
except ServerCatchingUpError as e:
print(f"Server at offset {e.server_offset}, backing off")
except TopicNotFoundError:
print("Topic doesn't exist")
except BackpressureError:
print("Server is overloaded, slow down")
except ConnectionError:
print("Connection lost")
except LanceError as e:
if e.is_retryable():
print("Transient error, retrying...")
Protocol Details
This client implements the Lance Wire Protocol (LWP) v1.0:
- 44-byte fixed header with CRC32C validation
- Hardware-accelerated checksums (SSE4.2 / ARM CRC)
- Backpressure signaling from server
- Keepalive with 30-second timeout
- Batched production with ACK tracking
- Offset-based consumption with seek/rewind
- CATCHING_UP protocol — automatic 5s backoff when server is behind
- LZ4 compression (optional, per-batch)
- Reconnection with exponential backoff (100ms base, 30s max, jitter)
Development
git clone https://github.com/nitecon/lnc-client-py.git
cd lnc-client-py
python3 -m venv .venv
source .venv/bin/activate
pip install -e ".[dev]"
pytest -v
License
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file lnc_client-0.2.9.tar.gz.
File metadata
- Download URL: lnc_client-0.2.9.tar.gz
- Upload date:
- Size: 39.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
faf8276c7dd3e2f5b5ddcc304de16ffd4f75aa4b4e8b59821bb17f15dbc0223e
|
|
| MD5 |
eb7acab0c704346c3214fb48161dfe40
|
|
| BLAKE2b-256 |
267a53315f088eaf5153c3f3347df8e44b6ed707e96382af29c07bcb34de25f8
|
Provenance
The following attestation bundles were made for lnc_client-0.2.9.tar.gz:
Publisher:
release.yml on nitecon/lnc-client-py
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
lnc_client-0.2.9.tar.gz -
Subject digest:
faf8276c7dd3e2f5b5ddcc304de16ffd4f75aa4b4e8b59821bb17f15dbc0223e - Sigstore transparency entry: 1051444884
- Sigstore integration time:
-
Permalink:
nitecon/lnc-client-py@79816fd749086d3cb6954e82329b93016f91e05a -
Branch / Tag:
refs/tags/v0.2.9 - Owner: https://github.com/nitecon
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@79816fd749086d3cb6954e82329b93016f91e05a -
Trigger Event:
push
-
Statement type:
File details
Details for the file lnc_client-0.2.9-py3-none-any.whl.
File metadata
- Download URL: lnc_client-0.2.9-py3-none-any.whl
- Upload date:
- Size: 31.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ad3a8ad6682983be15f5a1dd072e2a129d93577e6835ca921def4a355ecf9a1a
|
|
| MD5 |
fa8be80f2fd13b4419ddbe870f578ea8
|
|
| BLAKE2b-256 |
7cef7de363caefa635080191e90ff74501971fe4bf77346091d8197b1d19c784
|
Provenance
The following attestation bundles were made for lnc_client-0.2.9-py3-none-any.whl:
Publisher:
release.yml on nitecon/lnc-client-py
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
lnc_client-0.2.9-py3-none-any.whl -
Subject digest:
ad3a8ad6682983be15f5a1dd072e2a129d93577e6835ca921def4a355ecf9a1a - Sigstore transparency entry: 1051444886
- Sigstore integration time:
-
Permalink:
nitecon/lnc-client-py@79816fd749086d3cb6954e82329b93016f91e05a -
Branch / Tag:
refs/tags/v0.2.9 - Owner: https://github.com/nitecon
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@79816fd749086d3cb6954e82329b93016f91e05a -
Trigger Event:
push
-
Statement type: