Python bindings for LaminarDB streaming SQL database
Project description
laminardb
Python bindings for the LaminarDB streaming SQL database. Built with PyO3 and Apache Arrow for zero-copy performance.
import laminardb
conn = laminardb.open(":memory:")
conn.execute("CREATE SOURCE sensors (ts BIGINT, device VARCHAR, value DOUBLE)")
conn.insert("sensors", [
{"ts": 1, "device": "sensor_a", "value": 42.0},
{"ts": 2, "device": "sensor_b", "value": 43.5},
{"ts": 3, "device": "sensor_a", "value": 44.1},
])
conn.sql("SELECT * FROM sensors WHERE value > 43.0").df()
# ts device value
# 0 2 sensor_b 43.5
# 1 3 sensor_a 44.1
Why laminardb?
- DuckDB-style API —
.sql(),.df(),.arrow(),.fetchall(),.show()— feels like DuckDB - Streaming-first — sources, streams, sinks, watermarks, continuous subscriptions
- 10 input formats — dicts, pandas, polars, pyarrow, JSON, CSV, Arrow PyCapsule
- 6 output formats — pandas, polars, pyarrow, dicts, auto-detect, PyCapsule (zero-copy)
- Full type safety — PEP 561 stubs with
py.typed, passesmypy --strict - Thread-safe — GIL release on all blocking ops,
Send + Syncfor free-threaded Python 3.13t+ - Pipeline observability — topology DAG, per-component metrics, watermark tracking
Installation
pip install laminardb
With optional DataFrame libraries:
pip install laminardb[pandas] # pandas + pyarrow
pip install laminardb[polars] # polars
pip install laminardb[pyarrow] # pyarrow only
pip install laminardb[dev] # all dev dependencies
Requires Python 3.11+ and a supported platform (Linux x86_64/aarch64, macOS x86_64/aarch64, Windows x86_64).
Table of Contents
- Quick Start
- Core Concepts: Sources, Streams, Sinks
- Connections
- Sources
- Data Ingestion
- Streams (Materialized Views)
- Sinks
- Watermarks & Event Time
- Querying
- Subscriptions
- Streaming Examples
- Pipeline Control
- Catalog Introspection
- Pipeline Metrics & Observability
- Error Handling
- Configuration
- Python Types
- Async Support
- Performance
- API Aliases (DuckDB Compatibility)
- Contributing
Quick Start
Connect and query
import laminardb
conn = laminardb.open(":memory:")
conn.execute("CREATE SOURCE sensors (ts BIGINT, device VARCHAR, value DOUBLE)")
conn.insert("sensors", [
{"ts": 1, "device": "sensor_a", "value": 42.0},
{"ts": 2, "device": "sensor_b", "value": 43.5},
{"ts": 3, "device": "sensor_a", "value": 44.1},
])
result = conn.sql("SELECT * FROM sensors WHERE value > 43.0")
result.show() # print to terminal
df = result.df() # -> pandas DataFrame
pl_df = result.pl() # -> polars DataFrame
table = result.arrow() # -> pyarrow Table
rows = result.fetchall() # -> list of tuples
conn.close()
Real-time streaming in 10 lines
import laminardb
conn = laminardb.open(":memory:")
conn.execute("CREATE SOURCE readings (ts BIGINT, sensor VARCHAR, temp DOUBLE)")
conn.execute("CREATE STREAM alerts AS SELECT * FROM readings WHERE temp > 100.0")
conn.start()
# Subscribe to the alerts stream
sub = conn.subscribe_stream("alerts")
conn.insert("readings", {"ts": 1, "sensor": "boiler_1", "temp": 105.3})
batch = sub.next_timeout(2000) # wait up to 2s
if batch:
batch.show() # shows the alert row
sub.cancel()
conn.close()
Module-level queries
import laminardb
# Uses a thread-safe default in-memory connection
laminardb.sql("SELECT 1 + 1 AS answer").show()
Context manager
with laminardb.open(":memory:") as conn:
conn.execute("CREATE SOURCE events (id BIGINT, name VARCHAR)")
conn.insert("events", [{"id": 1, "name": "click"}, {"id": 2, "name": "view"}])
conn.sql("SELECT * FROM events").show()
# auto-closes on exit
Core Concepts: Sources, Streams, Sinks
LaminarDB is a streaming SQL database. Data flows through a pipeline of three primitives:
Sources --> Streams --> Sinks
(ingest) (transform) (output)
| Concept | What it is | SQL | Python API |
|---|---|---|---|
| Source | Entry point for data. Defines a schema and accepts inserts. | CREATE SOURCE |
conn.create_table(), conn.insert() |
| Stream | A continuously-maintained SQL query over sources or other streams. Like a materialized view that updates in real time. | CREATE STREAM ... AS SELECT ... |
conn.execute(), conn.subscribe_stream() |
| Sink | An output destination that receives data from streams. | CREATE SINK |
conn.execute() |
How it works
- Create sources — define the schema for incoming data
- Create streams — write SQL transformations that run continuously
- Start the pipeline —
conn.start()begins processing - Insert data — push rows into sources via
insert()or aWriter - Subscribe — receive real-time results from streams as data flows through
- Query — run ad-hoc SQL at any time with
conn.sql()
conn = laminardb.open(":memory:")
# 1. Define sources
conn.execute("CREATE SOURCE page_views (ts BIGINT, user_id VARCHAR, page VARCHAR)")
conn.execute("CREATE SOURCE clicks (ts BIGINT, user_id VARCHAR, button VARCHAR)")
# 2. Define streams (continuous SQL transformations)
conn.execute("""
CREATE STREAM active_users AS
SELECT user_id, COUNT(*) AS view_count
FROM page_views
GROUP BY user_id
""")
conn.execute("""
CREATE STREAM click_through AS
SELECT p.user_id, p.page, c.button
FROM page_views p
JOIN clicks c ON p.user_id = c.user_id
""")
# 3. Start the pipeline
conn.start()
# 4. Insert data into sources
conn.insert("page_views", {"ts": 1, "user_id": "alice", "page": "/home"})
conn.insert("clicks", {"ts": 2, "user_id": "alice", "button": "signup"})
# 5. Subscribe to stream results
sub = conn.subscribe_stream("active_users")
batch = sub.next_timeout(2000)
if batch:
batch.show()
sub.cancel()
# 6. Ad-hoc query
conn.sql("SELECT * FROM page_views").show()
conn.close()
Connections
# In-memory database
conn = laminardb.open(":memory:")
# Path-based (reserved for future persistence)
conn = laminardb.open("mydb")
# URI-based (reserved for future remote connections)
conn = laminardb.connect("laminar://localhost:5432/mydb")
# With configuration
config = laminardb.LaminarConfig(
buffer_size=131072,
checkpoint_interval_ms=5000,
table_spill_threshold=2_000_000,
)
conn = laminardb.open(":memory:", config=config)
Connection properties
conn.is_closed # bool
conn.pipeline_state # "running", "stopped", etc.
conn.pipeline_watermark # global watermark (int64)
conn.total_events_processed # total events across all sources
conn.source_count # number of sources
conn.sink_count # number of sinks
conn.active_query_count # number of active queries
conn.is_checkpoint_enabled # whether checkpointing is enabled
Sources
Sources are the entry points for data into the pipeline. Each source has a fixed schema and an internal buffer for incoming rows.
Creating sources
# Via SQL
conn.execute("CREATE SOURCE sensors (ts BIGINT, device VARCHAR, value DOUBLE)")
# Via Python dict schema (column_name -> Arrow type string)
conn.create_table("metrics", {
"ts": "int64",
"name": "string",
"value": "float64",
})
# Via PyArrow schema
import pyarrow as pa
conn.create_table("logs", pa.schema([
("ts", pa.int64()),
("level", pa.string()),
("message", pa.string()),
]))
Discovering sources
conn.list_tables() # -> ["sensors", "metrics", "logs"]
conn.tables() # DuckDB-style alias
# Detailed source info
for source in conn.sources():
print(source.name) # "sensors"
print(source.schema) # pyarrow.Schema
print(source.watermark_column) # watermark column name, or None
# Get a specific source's schema
schema = conn.schema("sensors") # -> pyarrow.Schema
Source metrics
sm = conn.source_metrics("sensors")
if sm:
print(sm.name) # "sensors"
print(sm.total_events) # total rows ingested
print(sm.pending) # rows buffered, not yet processed
print(sm.capacity) # buffer capacity
print(sm.is_backpressured) # True if buffer is full
print(sm.watermark) # current source watermark
print(sm.utilization) # 0.0 - 1.0 buffer usage
# All sources at once
for sm in conn.all_source_metrics():
print(f"{sm.name}: {sm.total_events} events, {sm.utilization:.0%} buffer used")
Data Ingestion
LaminarDB accepts 10 input formats. All insert() calls return the number of rows inserted.
# Single dict (one row)
conn.insert("sensors", {"ts": 1, "device": "a", "value": 42.0})
# List of dicts (row-oriented)
conn.insert("sensors", [
{"ts": 1, "device": "a", "value": 42.0},
{"ts": 2, "device": "b", "value": 43.5},
])
# Columnar dict
conn.insert("sensors", {
"ts": [1, 2, 3],
"device": ["a", "b", "c"],
"value": [42.0, 43.5, 44.1],
})
# pandas DataFrame
conn.insert("sensors", pd.DataFrame({"ts": [1, 2], "device": ["a", "b"], "value": [1.0, 2.0]}))
# polars DataFrame
conn.insert("sensors", pl.DataFrame({"ts": [1, 2], "device": ["a", "b"], "value": [1.0, 2.0]}))
# pyarrow RecordBatch or Table
conn.insert("sensors", pa.record_batch({"ts": [1], "device": ["a"], "value": [1.0]}))
conn.insert("sensors", pa.table({"ts": [1], "device": ["a"], "value": [1.0]}))
# JSON string
conn.insert_json("sensors", '[{"ts": 1, "device": "a", "value": 1.0}]')
# CSV string
conn.insert_csv("sensors", "ts,device,value\n1,a,1.0\n2,b,2.0\n")
# Any object with __arrow_c_stream__ (zero-copy)
conn.insert("sensors", arrow_capsule_object)
Streaming writer
For high-throughput ingestion with explicit watermark control:
with conn.writer("sensors") as w:
for i in range(1000):
w.insert({"ts": i, "device": f"d{i % 10}", "value": float(i)})
# Emit watermark every 100 rows to signal time progress
if i % 100 == 99:
w.watermark(i)
print(w.name) # "sensors"
print(w.schema) # pyarrow.Schema
print(w.current_watermark) # 999
# auto-flush on context exit
Streams (Materialized Views)
Streams are continuously-updated SQL queries. When new data arrives at a source, all dependent streams are automatically recomputed.
Creating streams
# Filter stream
conn.execute("""
CREATE STREAM high_temp AS
SELECT * FROM sensors WHERE value > 100.0
""")
# Aggregation stream
conn.execute("""
CREATE STREAM sensor_stats AS
SELECT device, COUNT(*) AS cnt, AVG(value) AS avg_value
FROM sensors
GROUP BY device
""")
# Join stream (across multiple sources)
conn.execute("""
CREATE STREAM enriched_readings AS
SELECT s.ts, s.value, d.location, d.owner
FROM sensors s
JOIN device_registry d ON s.device = d.device_id
""")
# Chained streams (stream of a stream)
conn.execute("""
CREATE STREAM critical_alerts AS
SELECT * FROM high_temp WHERE value > 200.0
""")
Windowed aggregations
LaminarDB supports time-based window functions in streaming SQL. Use them in GROUP BY clauses to aggregate data over sliding or fixed time intervals.
# Tumbling window — fixed, non-overlapping 5-second intervals
conn.execute("""
CREATE STREAM avg_temp_5s AS
SELECT device, AVG(value) AS avg_value, COUNT(*) AS cnt
FROM sensors
GROUP BY device, TUMBLE(ts, 5000)
""")
# Hopping window — 5-second windows that advance every 2.5 seconds (overlapping)
conn.execute("""
CREATE STREAM avg_temp_hop AS
SELECT device, AVG(value) AS avg_value, COUNT(*) AS cnt
FROM sensors
GROUP BY device, HOPPING(ts, 5000, 2500)
""")
# Session window — groups events within 10-second inactivity gaps
conn.execute("""
CREATE STREAM session_stats AS
SELECT device, AVG(value) AS avg_value, COUNT(*) AS cnt
FROM sensors
GROUP BY device, SESSION(ts, 10000)
""")
| Window Type | Syntax | Behavior |
|---|---|---|
| TUMBLE | TUMBLE(ts_col, size_ms) |
Fixed, non-overlapping windows |
| HOPPING | HOPPING(ts_col, size_ms, hop_ms) |
Fixed windows that advance by hop interval (windows overlap when hop < size) |
| SESSION | SESSION(ts_col, gap_ms) |
Dynamic windows that close after an inactivity gap |
Discovering streams
conn.list_streams() # -> ["high_temp", "sensor_stats", ...]
conn.materialized_views() # DuckDB-style alias
for stream in conn.streams():
print(stream.name) # "high_temp"
print(stream.sql) # "SELECT * FROM sensors WHERE value > 100.0"
Stream metrics
stm = conn.stream_metrics("high_temp")
if stm:
print(stm.name) # "high_temp"
print(stm.total_events) # events emitted by this stream
print(stm.pending) # events waiting to be processed
print(stm.capacity) # output buffer capacity
print(stm.is_backpressured) # downstream can't keep up
print(stm.watermark) # stream watermark position
print(stm.sql) # SQL definition
for stm in conn.all_stream_metrics():
print(f"{stm.name}: {stm.total_events} emitted, watermark={stm.watermark}")
MaterializedView wrapper
For a higher-level API, wrap a stream in a MaterializedView:
from laminardb import MaterializedView, ChangeEvent
mv = laminardb.mv(conn, "high_temp", "SELECT * FROM sensors WHERE value > 100.0")
# Query current state
result = mv.query()
result.show()
# Query with filter
result = mv.query(where="device = 'boiler_1'")
# Get schema
print(mv.schema()) # Schema wrapper
# Subscribe with callback
def on_alert(event: ChangeEvent):
for row in event:
print(f"[{row.op}] {row['device']}: {row['value']}")
thread = mv.subscribe(handler=on_alert) # background daemon thread
Sinks
Sinks are output destinations that receive data from streams. They define where processed data goes after transformation.
# Create a sink via SQL
conn.execute("CREATE SINK output_sink FROM high_temp")
# Discover sinks
conn.list_sinks() # -> ["output_sink"]
for sink in conn.sinks():
print(sink.name) # "output_sink"
print(conn.sink_count) # 1
Watermarks & Event Time
Watermarks are the foundation of event-time processing in LaminarDB. A watermark is a timestamp that declares: "all events with timestamps up to this point have been seen."
Why watermarks matter
In streaming systems, data can arrive out of order. Watermarks let the engine know when it's safe to:
- Close time-based windows (e.g., "all data for the 10:00-10:05 window has arrived")
- Emit aggregation results
- Detect late-arriving data
- Advance the pipeline's logical clock
Emitting watermarks
Use the streaming Writer to emit watermarks alongside data:
import time
conn = laminardb.open(":memory:")
conn.execute("CREATE SOURCE readings (ts BIGINT, sensor VARCHAR, temp DOUBLE)")
conn.start()
with conn.writer("readings") as w:
# Simulate a timeseries: readings every second
for t in range(100):
w.insert({
"ts": t * 1000, # millisecond timestamps
"sensor": f"sensor_{t % 5}",
"temp": 20.0 + (t % 30) * 0.5,
})
# Advance the watermark every 10 events
# This tells the engine: "no more events with ts <= this value"
if t % 10 == 9:
w.watermark(t * 1000)
print(f"Watermark advanced to {w.current_watermark}")
Reading watermarks
Watermarks are visible at every level of the pipeline:
# Writer-level: current watermark for a specific source
with conn.writer("readings") as w:
w.watermark(5000)
print(w.current_watermark) # 5000
# Source-level: via source metrics
sm = conn.source_metrics("readings")
if sm:
print(sm.watermark) # source watermark position
# Stream-level: via stream metrics
stm = conn.stream_metrics("alerts")
if stm:
print(stm.watermark) # how far this stream has processed
# Pipeline-level: global watermark (min across all sources)
print(conn.pipeline_watermark)
# Via PipelineMetrics
m = conn.metrics()
print(m.pipeline_watermark)
Watermark Python type
For application-level watermark tracking:
from laminardb import Watermark
wm = Watermark(current=5000, lag_ms=200)
print(wm.current) # 5000
print(wm.lag_ms) # 200 (how far behind real-time)
Querying
SQL queries
# Standard query — collects all results
result = conn.query("SELECT * FROM sensors WHERE value > 43.0")
# DuckDB-style alias
result = conn.sql("SELECT * FROM sensors WHERE value > 43.0")
# DDL / DML execution
exec_result = conn.execute("CREATE SOURCE metrics (ts BIGINT, value DOUBLE)")
print(exec_result.result_type) # "ddl"
print(exec_result.ddl_type) # "CREATE SOURCE"
print(exec_result.ddl_object) # "metrics"
print(int(exec_result)) # rows affected
QueryResult
Every query returns a QueryResult with multiple export options:
result = conn.sql("SELECT * FROM sensors")
# Properties
result.num_rows # total row count
result.num_columns # column count
result.num_batches # Arrow batch count
result.columns # list of column names
result.schema # pyarrow.Schema
len(result) # same as num_rows
Export to DataFrames
result.to_pandas() # -> pandas.DataFrame
result.to_polars() # -> polars.DataFrame
result.to_arrow() # -> pyarrow.Table
result.to_dicts() # -> dict[str, list] (columnar)
result.to_df() # -> auto-detect best library (polars > pandas > pyarrow)
DuckDB-style methods
result.df() # -> pandas.DataFrame
result.pl() # -> polars.DataFrame
result.pl(lazy=True) # -> polars.LazyFrame
result.arrow() # -> pyarrow.Table
result.fetchall() # -> list[tuple]
result.fetchone() # -> tuple | None
result.fetchmany(10) # -> list[tuple] (up to 10 rows)
result.show() # print preview (default 20 rows)
result.show(max_rows=50) # print more rows
result._repr_html_() # Jupyter HTML rendering
Iteration
# Iterate over rows as tuples
for row in result:
ts, device, value = row
print(f"{device}: {value}")
# Zero-copy Arrow PyCapsule export
capsule = result.__arrow_c_stream__()
Streaming large results
For result sets that don't fit in memory:
for batch in conn.stream("SELECT * FROM sensors"):
print(batch.num_rows) # each batch is a QueryResult
df = batch.to_pandas()
Explain queries
plan = conn.explain("SELECT * FROM sensors WHERE value > 43.0")
print(plan)
Subscriptions
Subscribe to continuous queries for real-time streaming.
Synchronous
sub = conn.subscribe("SELECT * FROM sensors")
# Iterator protocol
for result in sub:
df = result.to_pandas()
process(df)
# Or non-blocking poll
batch = sub.try_next() # returns QueryResult or None
sub.cancel() # safe to call multiple times
print(sub.is_active) # False
Asynchronous
sub = await conn.subscribe_async("SELECT * FROM sensors")
async for result in sub:
df = result.to_pandas()
await process(df)
sub.cancel()
Named stream subscriptions
Subscribe directly to a named stream with schema access and timeout support:
sub = conn.subscribe_stream("alerts")
print(sub.schema) # pyarrow.Schema of the stream
print(sub.is_active) # True
# Blocking with timeout (recommended for most use cases)
batch = sub.next_timeout(1000) # wait up to 1000ms, returns None on timeout
# Non-blocking poll
batch = sub.try_next() # returns immediately, None if no data
# Blocking (waits indefinitely)
batch = sub.next()
# Iterator protocol
for batch in sub:
batch.show()
sub.cancel()
Async stream subscriptions
sub = await conn.subscribe_stream_async("alerts")
print(sub.schema) # pyarrow.Schema
async for batch in sub:
df = batch.df()
await send_to_dashboard(df)
sub.cancel()
Streaming Examples
IoT sensor monitoring
A complete pipeline that ingests sensor readings, detects anomalies, and tracks per-device statistics:
import time
import threading
import laminardb
from laminardb import ChangeEvent
conn = laminardb.open(":memory:")
# ── Define sources ──
conn.execute("""
CREATE SOURCE sensors (
ts BIGINT,
device VARCHAR,
temp DOUBLE,
humidity DOUBLE
)
""")
# ── Define streams ──
# Real-time anomaly detection
conn.execute("""
CREATE STREAM temp_alerts AS
SELECT ts, device, temp
FROM sensors
WHERE temp > 80.0 OR temp < -20.0
""")
# Per-device running statistics
conn.execute("""
CREATE STREAM device_stats AS
SELECT
device,
COUNT(*) AS reading_count,
AVG(temp) AS avg_temp,
MAX(temp) AS max_temp,
MIN(temp) AS min_temp
FROM sensors
GROUP BY device
""")
# ── Start the pipeline ──
conn.start()
# ── Subscribe to alerts ──
alert_count = 0
def on_alert(event: ChangeEvent):
global alert_count
for row in event:
alert_count += 1
print(f" ALERT [{row.op}] {row['device']}: temp={row['temp']}")
mv = laminardb.mv(conn, "temp_alerts")
alert_thread = mv.subscribe(handler=on_alert)
# ── Ingest sensor data with watermarks ──
with conn.writer("sensors") as w:
for t in range(200):
w.insert({
"ts": t * 1000,
"device": f"device_{t % 5}",
"temp": 20.0 + (t % 50) * 1.5, # some will exceed 80.0
"humidity": 40.0 + (t % 20) * 1.0,
})
if t % 50 == 49:
w.watermark(t * 1000)
time.sleep(0.5) # let the pipeline process
# ── Query current state ──
print("\n=== Device Statistics ===")
conn.sql("SELECT * FROM device_stats").show()
print(f"\nTotal alerts: {alert_count}")
# ── Observe pipeline health ──
m = conn.metrics()
print(f"Pipeline: {m.state}, {m.total_events_ingested} events ingested")
print(f"Uptime: {m.uptime_secs:.1f}s, watermark: {m.pipeline_watermark}")
conn.close()
Timeseries analytics
Compute rolling metrics over time-ordered data:
import laminardb
conn = laminardb.open(":memory:")
# ── Source: stock trades ──
conn.execute("""
CREATE SOURCE trades (
ts BIGINT,
symbol VARCHAR,
price DOUBLE,
volume BIGINT
)
""")
# ── Stream: per-symbol price summary ──
conn.execute("""
CREATE STREAM price_summary AS
SELECT
symbol,
COUNT(*) AS trade_count,
AVG(price) AS avg_price,
MAX(price) AS high,
MIN(price) AS low,
SUM(volume) AS total_volume
FROM trades
GROUP BY symbol
""")
# ── Stream: high-volume trades ──
conn.execute("""
CREATE STREAM whale_trades AS
SELECT ts, symbol, price, volume
FROM trades
WHERE volume > 10000
""")
conn.start()
# ── Simulate a trading day ──
import random
symbols = ["AAPL", "GOOGL", "MSFT", "AMZN"]
with conn.writer("trades") as w:
for t in range(1000):
sym = random.choice(symbols)
w.insert({
"ts": 1700000000 + t,
"symbol": sym,
"price": round(100 + random.gauss(0, 5), 2),
"volume": random.randint(100, 50000),
})
if t % 100 == 99:
w.watermark(1700000000 + t)
# ── Query results ──
print("=== Price Summary ===")
conn.sql("SELECT * FROM price_summary").show()
print("\n=== Whale Trades ===")
conn.sql("SELECT * FROM whale_trades").show(max_rows=10)
# ── Export to pandas for further analysis ──
df = conn.sql("SELECT * FROM price_summary").df()
print(f"\nHighest avg price: {df.loc[df['avg_price'].idxmax(), 'symbol']}")
conn.close()
Multi-source event correlation
Join events from different sources in real time:
import laminardb
conn = laminardb.open(":memory:")
# ── Two event sources ──
conn.execute("CREATE SOURCE logins (ts BIGINT, user_id VARCHAR, ip VARCHAR)")
conn.execute("CREATE SOURCE purchases (ts BIGINT, user_id VARCHAR, amount DOUBLE)")
# ── Stream: correlate logins with purchases ──
conn.execute("""
CREATE STREAM user_activity AS
SELECT
l.user_id,
l.ip,
p.amount,
p.ts AS purchase_ts
FROM logins l
JOIN purchases p ON l.user_id = p.user_id
""")
# ── Stream: suspicious activity (purchase from new IP) ──
conn.execute("""
CREATE STREAM large_purchases AS
SELECT user_id, amount, purchase_ts
FROM user_activity
WHERE amount > 500.0
""")
conn.start()
# ── Ingest correlated events ──
conn.insert("logins", [
{"ts": 1, "user_id": "alice", "ip": "1.2.3.4"},
{"ts": 2, "user_id": "bob", "ip": "5.6.7.8"},
])
conn.insert("purchases", [
{"ts": 3, "user_id": "alice", "amount": 29.99},
{"ts": 4, "user_id": "bob", "amount": 999.00},
{"ts": 5, "user_id": "alice", "amount": 750.00},
])
import time; time.sleep(0.3)
# ── Check results ──
print("=== User Activity ===")
conn.sql("SELECT * FROM user_activity").show()
print("\n=== Large Purchases ===")
conn.sql("SELECT * FROM large_purchases").show()
conn.close()
Real-time dashboard with async subscriptions
import asyncio
import laminardb
async def dashboard():
async with laminardb.open(":memory:") as conn:
conn.execute("CREATE SOURCE metrics (ts BIGINT, service VARCHAR, latency_ms DOUBLE)")
conn.execute("""
CREATE STREAM slow_requests AS
SELECT * FROM metrics WHERE latency_ms > 500.0
""")
conn.start()
# Subscribe to slow requests asynchronously
sub = await conn.subscribe_stream_async("slow_requests")
# Simulate ingestion in a background task
async def ingest():
import random
for t in range(100):
conn.insert("metrics", {
"ts": t,
"service": random.choice(["api", "web", "worker"]),
"latency_ms": random.expovariate(1/300) , # some will be > 500ms
})
await asyncio.sleep(0.01)
asyncio.create_task(ingest())
# Process alerts as they arrive
count = 0
async for batch in sub:
for row in batch.fetchall():
ts, service, latency = row
print(f" Slow request: {service} @ {latency:.0f}ms")
count += 1
if count >= 5:
break
sub.cancel()
print(f"\nDetected {count} slow requests")
asyncio.run(dashboard())
Producer-consumer with background threads
import threading
import time
import laminardb
conn = laminardb.open(":memory:")
conn.execute("CREATE SOURCE events (ts BIGINT, type VARCHAR, payload DOUBLE)")
conn.start()
# ── Producer thread ──
def produce():
with conn.writer("events") as w:
for t in range(500):
w.insert({"ts": t, "type": f"evt_{t % 3}", "payload": float(t)})
if t % 50 == 49:
w.watermark(t)
print(f"Producer done: {w.current_watermark} final watermark")
# ── Consumer thread ──
consumed = 0
def consume():
global consumed
sub = conn.subscribe("SELECT * FROM events")
timeout = time.time() + 5.0
while time.time() < timeout:
batch = sub.try_next()
if batch:
consumed += batch.num_rows
sub.cancel()
producer = threading.Thread(target=produce)
consumer = threading.Thread(target=consume)
consumer.start()
producer.start()
producer.join()
time.sleep(0.5)
consumer.join()
print(f"Consumed {consumed} events")
# Check pipeline metrics
m = conn.metrics()
print(f"Pipeline: ingested={m.total_events_ingested}, emitted={m.total_events_emitted}")
conn.close()
Pipeline Control
# Start the streaming pipeline (required before subscriptions receive data)
conn.start()
# Graceful shutdown (drains in-flight events)
conn.shutdown()
# Trigger a checkpoint (returns checkpoint ID, or None if disabled)
checkpoint_id = conn.checkpoint()
# Cancel a specific running query
conn.cancel_query(query_id)
# Check pipeline state
print(conn.pipeline_state) # "running", "stopped", etc.
print(conn.is_checkpoint_enabled) # True/False
Catalog Introspection
# List names
conn.list_tables() # or conn.tables() -> source names
conn.list_streams() # or conn.materialized_views() -> stream names
conn.list_sinks() # -> sink names
# Get a source's schema
schema = conn.schema("sensors") # -> pyarrow.Schema
# Detailed catalog info with typed objects
for source in conn.sources():
print(source.name, source.schema, source.watermark_column)
for stream in conn.streams():
print(stream.name, stream.sql)
for sink in conn.sinks():
print(sink.name)
for query in conn.queries():
print(query.id, query.sql, query.active)
Table statistics
stats = conn.stats("sensors")
print(stats["name"]) # "sensors"
print(stats["total_events"]) # total ingested events
print(stats["pending"]) # events in buffer
print(stats["capacity"]) # buffer capacity
print(stats["is_backpressured"]) # True if buffer full
print(stats["watermark"]) # current watermark
print(stats["utilization"]) # 0.0 - 1.0
Pipeline Metrics & Observability
Full visibility into every component of the streaming pipeline:
# ── Pipeline-wide metrics ──
m = conn.metrics()
print(m.total_events_ingested) # total rows ingested across all sources
print(m.total_events_emitted) # total rows emitted by streams
print(m.total_events_dropped) # dropped events (backpressure, errors)
print(m.total_cycles) # pipeline processing cycles
print(m.total_batches) # batches processed
print(m.uptime_secs) # pipeline uptime in seconds
print(m.state) # "running", "stopped", etc.
print(m.source_count) # registered sources
print(m.stream_count) # registered streams
print(m.sink_count) # registered sinks
print(m.pipeline_watermark) # global watermark (min across sources)
# ── Per-source metrics ──
for sm in conn.all_source_metrics():
print(f" {sm.name}: {sm.total_events} events, "
f"{sm.utilization:.0%} buffer, wm={sm.watermark}")
# ── Per-stream metrics ──
for stm in conn.all_stream_metrics():
print(f" {stm.name}: {stm.total_events} emitted, "
f"wm={stm.watermark}, sql={stm.sql}")
# ── Pipeline topology (DAG visualization) ──
topo = conn.topology()
for node in topo.nodes:
print(f" [{node.node_type}] {node.name}")
if node.sql:
print(f" SQL: {node.sql}")
for edge in topo.edges:
print(f" {edge.from_node} --> {edge.to_node}")
Metrics Python wrapper
from laminardb import Metrics
metrics = Metrics(conn.metrics())
print(metrics.events_per_second) # calculated throughput
print(metrics.uptime_secs) # pipeline uptime
print(metrics.state) # pipeline state string
Error Handling
Structured exception hierarchy with numeric error codes:
import laminardb
try:
conn.query("INVALID SQL")
except laminardb.QueryError as e:
print(f"Query failed: {e}")
print(f"Error code: {e.code}")
except laminardb.LaminarError as e:
print(f"LaminarDB error: {e}")
Exception hierarchy
| Exception | Raised when |
|---|---|
LaminarError |
Base class for all LaminarDB errors |
ConnectionError |
Connection cannot be established or is lost |
QueryError |
SQL query fails (syntax error, missing table, etc.) |
IngestionError |
Data insertion fails (type mismatch, invalid format) |
SchemaError |
Schema operation fails (table exists, invalid schema) |
SubscriptionError |
Subscription operation fails |
StreamError |
Stream / materialized view operation fails |
CheckpointError |
Checkpoint operation fails |
ConnectorError |
Connector operation fails |
Error codes
Every exception carries a numeric .code attribute. Constants are available via laminardb.codes:
from laminardb import codes
codes.CONNECTION_FAILED # 100
codes.CONNECTION_CLOSED # 101
codes.TABLE_NOT_FOUND # 200
codes.TABLE_EXISTS # 201
codes.SCHEMA_MISMATCH # 202
codes.INGESTION_FAILED # 300
codes.WRITER_CLOSED # 301
codes.QUERY_FAILED # 400
codes.SQL_PARSE_ERROR # 401
codes.SUBSCRIPTION_FAILED # 500
codes.INTERNAL_ERROR # 900
Configuration
config = laminardb.LaminarConfig(
buffer_size=65536, # source buffer size (default: 65536)
storage_dir="/tmp/laminar", # storage directory (optional)
checkpoint_interval_ms=10000, # checkpoint interval in ms (optional)
table_spill_threshold=1_000_000, # spill threshold (default: 1M)
)
conn = laminardb.open(":memory:", config=config)
laminardb.Config is an alias for laminardb.LaminarConfig.
Python Types
High-level Python wrappers in laminardb.types:
from laminardb import (
Schema, Column, TableStats, Watermark,
CheckpointStatus, Metrics, ChangeRow, ChangeEvent, MaterializedView,
)
| Type | Description |
|---|---|
Column(name, type, nullable) |
Frozen dataclass for column metadata |
Schema |
Wraps pyarrow.Schema with columns, names, indexing by name/position |
TableStats(row_count, size_bytes) |
Frozen dataclass |
Watermark(current, lag_ms) |
Frozen dataclass for watermark state |
CheckpointStatus(checkpoint_id, enabled) |
Frozen dataclass |
Metrics |
Wrapper with events_per_second, uptime_secs, state |
ChangeRow |
Dict-like row with .op, ["col"], .keys(), .to_dict() |
ChangeEvent |
Iterable batch of ChangeRows with .df(), .arrow(), .pl() |
MaterializedView |
Stream wrapper with .query(), .subscribe(handler=), .schema() |
Async Support
import asyncio
import laminardb
async def main():
async with laminardb.open(":memory:") as conn:
conn.execute("CREATE SOURCE events (id BIGINT, name VARCHAR)")
conn.insert("events", [{"id": 1, "name": "click"}])
sub = await conn.subscribe_async("SELECT * FROM events")
async for batch in sub:
print(batch.to_dicts())
break
sub.cancel()
asyncio.run(main())
Type Safety
LaminarDB ships with PEP 561 type stubs (py.typed marker). All public APIs are fully typed and pass mypy --strict:
mypy python/laminardb --strict
IDEs like VS Code and PyCharm provide full autocompletion out of the box.
Performance
| Feature | Details |
|---|---|
| Zero-copy export | Arrow PyCapsule interface (__arrow_c_stream__) avoids serialization |
| GIL release | Every blocking Rust call uses py.allow_threads() |
| Free-threaded Python | All types are Send + Sync, ready for Python 3.13t / 3.14t |
| Batch buffering | Streaming writer for efficient bulk inserts |
| LTO release builds | Link-time optimization + symbol stripping |
Benchmarks
python benchmarks/bench_ingestion.py # ingestion throughput by format
python benchmarks/bench_query.py # query + conversion throughput
python benchmarks/bench_streaming.py # subscription iteration throughput
API Aliases (DuckDB Compatibility)
For users coming from DuckDB:
| DuckDB style | Equivalent |
|---|---|
conn.sql(query) |
conn.query(query) |
conn.tables() |
conn.list_tables() |
conn.materialized_views() |
conn.list_streams() |
result.df() |
result.to_pandas() |
result.pl() |
result.to_polars() |
result.arrow() |
result.to_arrow() |
result.fetchall() |
rows as list[tuple] |
result.fetchone() |
first row as tuple or None |
result.fetchmany(n) |
up to n rows as list[tuple] |
laminardb.Config |
laminardb.LaminarConfig |
laminardb.BatchWriter |
laminardb.Writer |
Contributing
Development setup
# Clone with the laminardb monorepo dependency
git clone https://github.com/laminardb/laminardb-python.git
git clone https://github.com/laminardb/laminardb.git # sibling directory
# Build and install in development mode
cd laminardb-python
python -m venv .venv
source .venv/bin/activate # or .venv\Scripts\activate on Windows
pip install maturin
maturin develop --extras dev
Running checks
# Tests
pytest tests/ -v
# Rust checks
cargo fmt --check
cargo clippy -- -D warnings
# Type checking
mypy python/laminardb --strict
Project structure
python/laminardb/ # Python package
__init__.py # Public API, module-level functions, aliases
types.py # High-level Python types (Schema, MaterializedView, etc.)
_laminardb.pyi # PEP 561 type stubs
py.typed # Type marker
src/ # Rust source (PyO3 bindings)
lib.rs # Module entry point, open()/connect()
connection.rs # Connection class (40+ methods)
query.rs # QueryResult class
conversion.rs # Python <-> Arrow type conversion
writer.rs # Streaming writer with watermark support
subscription.rs # Sync subscription iterator
async_support.rs # Tokio runtime + async subscription
stream_subscription.rs # Named stream subscriptions (sync + async)
execute.rs # ExecuteResult for DDL/DML introspection
error.rs # Exception hierarchy with error codes
config.rs # LaminarConfig
catalog.rs # Catalog info (SourceInfo, SinkInfo, StreamInfo, QueryInfo)
metrics.rs # Pipeline topology and metrics
tests/ # pytest test suite (15 files, 233+ tests)
benchmarks/ # Performance benchmarks
examples/ # Example scripts
docs/ # Architecture and feature specs
License
MIT — see LICENSE for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distributions
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file laminardb-0.13.1.tar.gz.
File metadata
- Download URL: laminardb-0.13.1.tar.gz
- Upload date:
- Size: 1.4 MB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1c894173b8462db2516e3650c3d9720704f4444f6d290734ed1eb478f594d287
|
|
| MD5 |
006f90f1348ad3ac0a27e616f5546959
|
|
| BLAKE2b-256 |
b3acc337998872ca50e736391adf597224db53a526d3914ab6d04824f4cc62e3
|
Provenance
The following attestation bundles were made for laminardb-0.13.1.tar.gz:
Publisher:
release.yml on laminardb/laminardb-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
laminardb-0.13.1.tar.gz -
Subject digest:
1c894173b8462db2516e3650c3d9720704f4444f6d290734ed1eb478f594d287 - Sigstore transparency entry: 953546066
- Sigstore integration time:
-
Permalink:
laminardb/laminardb-python@9209a56f4f8efda9fde0c2f427ca4c0e9d8ce659 -
Branch / Tag:
refs/tags/v0.13.1 - Owner: https://github.com/laminardb
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@9209a56f4f8efda9fde0c2f427ca4c0e9d8ce659 -
Trigger Event:
push
-
Statement type:
File details
Details for the file laminardb-0.13.1-cp311-abi3-win_amd64.whl.
File metadata
- Download URL: laminardb-0.13.1-cp311-abi3-win_amd64.whl
- Upload date:
- Size: 28.0 MB
- Tags: CPython 3.11+, Windows x86-64
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
431c4706f9458e06a17a4c20b813cd756e82b28a0a84013303d4b0e4599708c9
|
|
| MD5 |
d9cb94df91ac9d01d531315ec2373170
|
|
| BLAKE2b-256 |
ba9ded55a3e75c4a8379c8ead3153977a00414b8afb4857942777a3cf06ea044
|
Provenance
The following attestation bundles were made for laminardb-0.13.1-cp311-abi3-win_amd64.whl:
Publisher:
release.yml on laminardb/laminardb-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
laminardb-0.13.1-cp311-abi3-win_amd64.whl -
Subject digest:
431c4706f9458e06a17a4c20b813cd756e82b28a0a84013303d4b0e4599708c9 - Sigstore transparency entry: 953546067
- Sigstore integration time:
-
Permalink:
laminardb/laminardb-python@9209a56f4f8efda9fde0c2f427ca4c0e9d8ce659 -
Branch / Tag:
refs/tags/v0.13.1 - Owner: https://github.com/laminardb
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@9209a56f4f8efda9fde0c2f427ca4c0e9d8ce659 -
Trigger Event:
push
-
Statement type:
File details
Details for the file laminardb-0.13.1-cp311-abi3-manylinux_2_28_x86_64.whl.
File metadata
- Download URL: laminardb-0.13.1-cp311-abi3-manylinux_2_28_x86_64.whl
- Upload date:
- Size: 26.3 MB
- Tags: CPython 3.11+, manylinux: glibc 2.28+ x86-64
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
64b4ee37b34b15524ea39132b15c0ea8cfcc692a57222be1c9f002b3d73d5068
|
|
| MD5 |
3db4ebf59d34a8b9c7fa22d72a737dcc
|
|
| BLAKE2b-256 |
f8e67c6086b8ec9dd7221840805909277433f3074b94601cb6b070a306e9ebac
|
Provenance
The following attestation bundles were made for laminardb-0.13.1-cp311-abi3-manylinux_2_28_x86_64.whl:
Publisher:
release.yml on laminardb/laminardb-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
laminardb-0.13.1-cp311-abi3-manylinux_2_28_x86_64.whl -
Subject digest:
64b4ee37b34b15524ea39132b15c0ea8cfcc692a57222be1c9f002b3d73d5068 - Sigstore transparency entry: 953546069
- Sigstore integration time:
-
Permalink:
laminardb/laminardb-python@9209a56f4f8efda9fde0c2f427ca4c0e9d8ce659 -
Branch / Tag:
refs/tags/v0.13.1 - Owner: https://github.com/laminardb
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@9209a56f4f8efda9fde0c2f427ca4c0e9d8ce659 -
Trigger Event:
push
-
Statement type:
File details
Details for the file laminardb-0.13.1-cp311-abi3-macosx_11_0_arm64.whl.
File metadata
- Download URL: laminardb-0.13.1-cp311-abi3-macosx_11_0_arm64.whl
- Upload date:
- Size: 22.8 MB
- Tags: CPython 3.11+, macOS 11.0+ ARM64
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ade672b0865f070d32c384c13191b3af110c37bede191490ef12e89deb71d37b
|
|
| MD5 |
fda0c94a41359a05fdacdc1d95a8f084
|
|
| BLAKE2b-256 |
446491b43467840628cbf1f1788fc5c1bbf218b77c09b22123e821039946579c
|
Provenance
The following attestation bundles were made for laminardb-0.13.1-cp311-abi3-macosx_11_0_arm64.whl:
Publisher:
release.yml on laminardb/laminardb-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
laminardb-0.13.1-cp311-abi3-macosx_11_0_arm64.whl -
Subject digest:
ade672b0865f070d32c384c13191b3af110c37bede191490ef12e89deb71d37b - Sigstore transparency entry: 953546068
- Sigstore integration time:
-
Permalink:
laminardb/laminardb-python@9209a56f4f8efda9fde0c2f427ca4c0e9d8ce659 -
Branch / Tag:
refs/tags/v0.13.1 - Owner: https://github.com/laminardb
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@9209a56f4f8efda9fde0c2f427ca4c0e9d8ce659 -
Trigger Event:
push
-
Statement type: