Skip to main content

Python bindings for LaminarDB streaming SQL database

Project description

laminardb

PyPI CI Python 3.11+ License: MIT

Python bindings for the LaminarDB streaming SQL database. Built with PyO3 and Apache Arrow for zero-copy performance.

import laminardb

conn = laminardb.open(":memory:")
conn.execute("CREATE SOURCE sensors (ts BIGINT, device VARCHAR, value DOUBLE)")
conn.insert("sensors", [
    {"ts": 1, "device": "sensor_a", "value": 42.0},
    {"ts": 2, "device": "sensor_b", "value": 43.5},
    {"ts": 3, "device": "sensor_a", "value": 44.1},
])

conn.sql("SELECT * FROM sensors WHERE value > 43.0").df()
#    ts    device  value
# 0   2  sensor_b   43.5
# 1   3  sensor_a   44.1

Why laminardb?

  • DuckDB-style API.sql(), .df(), .arrow(), .fetchall(), .show() — feels like DuckDB
  • Streaming-first — sources, streams, sinks, watermarks, continuous subscriptions
  • 10 input formats — dicts, pandas, polars, pyarrow, JSON, CSV, Arrow PyCapsule
  • 6 output formats — pandas, polars, pyarrow, dicts, auto-detect, PyCapsule (zero-copy)
  • Full type safety — PEP 561 stubs with py.typed, passes mypy --strict
  • Thread-safe — GIL release on all blocking ops, Send + Sync for free-threaded Python 3.13t+
  • Pipeline observability — topology DAG, per-component metrics, watermark tracking

Installation

pip install laminardb

With optional DataFrame libraries:

pip install laminardb[pandas]    # pandas + pyarrow
pip install laminardb[polars]    # polars
pip install laminardb[pyarrow]   # pyarrow only
pip install laminardb[dev]       # all dev dependencies

Requires Python 3.11+ and a supported platform (Linux x86_64/aarch64, macOS x86_64/aarch64, Windows x86_64).


Table of Contents


Quick Start

Connect and query

import laminardb

conn = laminardb.open(":memory:")
conn.execute("CREATE SOURCE sensors (ts BIGINT, device VARCHAR, value DOUBLE)")
conn.insert("sensors", [
    {"ts": 1, "device": "sensor_a", "value": 42.0},
    {"ts": 2, "device": "sensor_b", "value": 43.5},
    {"ts": 3, "device": "sensor_a", "value": 44.1},
])

result = conn.sql("SELECT * FROM sensors WHERE value > 43.0")
result.show()          # print to terminal
df = result.df()       # -> pandas DataFrame
pl_df = result.pl()    # -> polars DataFrame
table = result.arrow() # -> pyarrow Table
rows = result.fetchall()  # -> list of tuples
conn.close()

Real-time streaming in 10 lines

import laminardb

conn = laminardb.open(":memory:")
conn.execute("CREATE SOURCE readings (ts BIGINT, sensor VARCHAR, temp DOUBLE)")
conn.execute("CREATE STREAM alerts AS SELECT * FROM readings WHERE temp > 100.0")
conn.start()

# Subscribe to the alerts stream
sub = conn.subscribe_stream("alerts")
conn.insert("readings", {"ts": 1, "sensor": "boiler_1", "temp": 105.3})

batch = sub.next_timeout(2000)  # wait up to 2s
if batch:
    batch.show()  # shows the alert row
sub.cancel()
conn.close()

Module-level queries

import laminardb

# Uses a thread-safe default in-memory connection
laminardb.sql("SELECT 1 + 1 AS answer").show()

Context manager

with laminardb.open(":memory:") as conn:
    conn.execute("CREATE SOURCE events (id BIGINT, name VARCHAR)")
    conn.insert("events", [{"id": 1, "name": "click"}, {"id": 2, "name": "view"}])
    conn.sql("SELECT * FROM events").show()
# auto-closes on exit

Core Concepts: Sources, Streams, Sinks

LaminarDB is a streaming SQL database. Data flows through a pipeline of three primitives:

Sources  -->  Streams  -->  Sinks
(ingest)     (transform)    (output)
Concept What it is SQL Python API
Source Entry point for data. Defines a schema and accepts inserts. CREATE SOURCE conn.create_table(), conn.insert()
Stream A continuously-maintained SQL query over sources or other streams. Like a materialized view that updates in real time. CREATE STREAM ... AS SELECT ... conn.execute(), conn.subscribe_stream()
Sink An output destination that receives data from streams. CREATE SINK conn.execute()

How it works

  1. Create sources — define the schema for incoming data
  2. Create streams — write SQL transformations that run continuously
  3. Start the pipelineconn.start() begins processing
  4. Insert data — push rows into sources via insert() or a Writer
  5. Subscribe — receive real-time results from streams as data flows through
  6. Query — run ad-hoc SQL at any time with conn.sql()
conn = laminardb.open(":memory:")

# 1. Define sources
conn.execute("CREATE SOURCE page_views (ts BIGINT, user_id VARCHAR, page VARCHAR)")
conn.execute("CREATE SOURCE clicks     (ts BIGINT, user_id VARCHAR, button VARCHAR)")

# 2. Define streams (continuous SQL transformations)
conn.execute("""
    CREATE STREAM active_users AS
    SELECT user_id, COUNT(*) AS view_count
    FROM page_views
    GROUP BY user_id
""")

conn.execute("""
    CREATE STREAM click_through AS
    SELECT p.user_id, p.page, c.button
    FROM page_views p
    JOIN clicks c ON p.user_id = c.user_id
""")

# 3. Start the pipeline
conn.start()

# 4. Insert data into sources
conn.insert("page_views", {"ts": 1, "user_id": "alice", "page": "/home"})
conn.insert("clicks", {"ts": 2, "user_id": "alice", "button": "signup"})

# 5. Subscribe to stream results
sub = conn.subscribe_stream("active_users")
batch = sub.next_timeout(2000)
if batch:
    batch.show()
sub.cancel()

# 6. Ad-hoc query
conn.sql("SELECT * FROM page_views").show()

conn.close()

Connections

# In-memory database
conn = laminardb.open(":memory:")

# Path-based (reserved for future persistence)
conn = laminardb.open("mydb")

# URI-based (reserved for future remote connections)
conn = laminardb.connect("laminar://localhost:5432/mydb")

# With configuration
config = laminardb.LaminarConfig(
    buffer_size=131072,
    checkpoint_interval_ms=5000,
)
conn = laminardb.open(":memory:", config=config)

Connection properties

conn.is_closed              # bool
conn.pipeline_state         # "running", "stopped", etc.
conn.pipeline_watermark     # global watermark (int64)
conn.total_events_processed # total events across all sources
conn.source_count           # number of sources
conn.sink_count             # number of sinks
conn.active_query_count     # number of active queries
conn.is_checkpoint_enabled  # whether checkpointing is enabled

Sources

Sources are the entry points for data into the pipeline. Each source has a fixed schema and an internal buffer for incoming rows.

Creating sources

# Via SQL
conn.execute("CREATE SOURCE sensors (ts BIGINT, device VARCHAR, value DOUBLE)")

# Via Python dict schema (column_name -> Arrow type string)
conn.create_table("metrics", {
    "ts": "int64",
    "name": "string",
    "value": "float64",
})

# Via PyArrow schema
import pyarrow as pa
conn.create_table("logs", pa.schema([
    ("ts", pa.int64()),
    ("level", pa.string()),
    ("message", pa.string()),
]))

Discovering sources

conn.list_tables()    # -> ["sensors", "metrics", "logs"]
conn.tables()         # DuckDB-style alias

# Detailed source info
for source in conn.sources():
    print(source.name)              # "sensors"
    print(source.schema)            # pyarrow.Schema
    print(source.watermark_column)  # watermark column name, or None

# Get a specific source's schema
schema = conn.schema("sensors")  # -> pyarrow.Schema

Source metrics

sm = conn.source_metrics("sensors")
if sm:
    print(sm.name)              # "sensors"
    print(sm.total_events)      # total rows ingested
    print(sm.pending)           # rows buffered, not yet processed
    print(sm.capacity)          # buffer capacity
    print(sm.is_backpressured)  # True if buffer is full
    print(sm.watermark)         # current source watermark
    print(sm.utilization)       # 0.0 - 1.0 buffer usage

# All sources at once
for sm in conn.all_source_metrics():
    print(f"{sm.name}: {sm.total_events} events, {sm.utilization:.0%} buffer used")

Data Ingestion

LaminarDB accepts 10 input formats. All insert() calls return the number of rows inserted.

# Single dict (one row)
conn.insert("sensors", {"ts": 1, "device": "a", "value": 42.0})

# List of dicts (row-oriented)
conn.insert("sensors", [
    {"ts": 1, "device": "a", "value": 42.0},
    {"ts": 2, "device": "b", "value": 43.5},
])

# Columnar dict
conn.insert("sensors", {
    "ts": [1, 2, 3],
    "device": ["a", "b", "c"],
    "value": [42.0, 43.5, 44.1],
})

# pandas DataFrame
conn.insert("sensors", pd.DataFrame({"ts": [1, 2], "device": ["a", "b"], "value": [1.0, 2.0]}))

# polars DataFrame
conn.insert("sensors", pl.DataFrame({"ts": [1, 2], "device": ["a", "b"], "value": [1.0, 2.0]}))

# pyarrow RecordBatch or Table
conn.insert("sensors", pa.record_batch({"ts": [1], "device": ["a"], "value": [1.0]}))
conn.insert("sensors", pa.table({"ts": [1], "device": ["a"], "value": [1.0]}))

# JSON string
conn.insert_json("sensors", '[{"ts": 1, "device": "a", "value": 1.0}]')

# CSV string
conn.insert_csv("sensors", "ts,device,value\n1,a,1.0\n2,b,2.0\n")

# Any object with __arrow_c_stream__ (zero-copy)
conn.insert("sensors", arrow_capsule_object)

Streaming writer

For high-throughput ingestion with explicit watermark control:

with conn.writer("sensors") as w:
    for i in range(1000):
        w.insert({"ts": i, "device": f"d{i % 10}", "value": float(i)})

        # Emit watermark every 100 rows to signal time progress
        if i % 100 == 99:
            w.watermark(i)

    print(w.name)              # "sensors"
    print(w.schema)            # pyarrow.Schema
    print(w.current_watermark) # 999
# auto-flush on context exit

Streams (Materialized Views)

Streams are continuously-updated SQL queries. When new data arrives at a source, all dependent streams are automatically recomputed.

Creating streams

# Filter stream
conn.execute("""
    CREATE STREAM high_temp AS
    SELECT * FROM sensors WHERE value > 100.0
""")

# Aggregation stream
conn.execute("""
    CREATE STREAM sensor_stats AS
    SELECT device, COUNT(*) AS cnt, AVG(value) AS avg_value
    FROM sensors
    GROUP BY device
""")

# Join stream (across multiple sources)
conn.execute("""
    CREATE STREAM enriched_readings AS
    SELECT s.ts, s.value, d.location, d.owner
    FROM sensors s
    JOIN device_registry d ON s.device = d.device_id
""")

# Chained streams (stream of a stream)
conn.execute("""
    CREATE STREAM critical_alerts AS
    SELECT * FROM high_temp WHERE value > 200.0
""")

Windowed aggregations

LaminarDB supports time-based window functions in streaming SQL. Use them in GROUP BY clauses to aggregate data over sliding or fixed time intervals.

# Tumbling window — fixed, non-overlapping 5-second intervals
conn.execute("""
    CREATE STREAM avg_temp_5s AS
    SELECT device, AVG(value) AS avg_value, COUNT(*) AS cnt
    FROM sensors
    GROUP BY device, TUMBLE(ts, 5000)
""")

# Hopping window — 5-second windows that advance every 2.5 seconds (overlapping)
conn.execute("""
    CREATE STREAM avg_temp_hop AS
    SELECT device, AVG(value) AS avg_value, COUNT(*) AS cnt
    FROM sensors
    GROUP BY device, HOPPING(ts, 5000, 2500)
""")

# Session window — groups events within 10-second inactivity gaps
conn.execute("""
    CREATE STREAM session_stats AS
    SELECT device, AVG(value) AS avg_value, COUNT(*) AS cnt
    FROM sensors
    GROUP BY device, SESSION(ts, 10000)
""")
Window Type Syntax Behavior
TUMBLE TUMBLE(ts_col, size_ms) Fixed, non-overlapping windows
HOPPING HOPPING(ts_col, size_ms, hop_ms) Fixed windows that advance by hop interval (windows overlap when hop < size)
SESSION SESSION(ts_col, gap_ms) Dynamic windows that close after an inactivity gap

Note: Both integer millisecond syntax (TUMBLE(ts, 5000)) and SQL INTERVAL syntax (tumble(ts, INTERVAL '5' SECOND)) are supported. The Rust engine normalizes both forms.

Discovering streams

conn.list_streams()            # -> ["high_temp", "sensor_stats", ...]
conn.materialized_views()      # DuckDB-style alias

for stream in conn.streams():
    print(stream.name)  # "high_temp"
    print(stream.sql)   # "SELECT * FROM sensors WHERE value > 100.0"

Stream metrics

stm = conn.stream_metrics("high_temp")
if stm:
    print(stm.name)              # "high_temp"
    print(stm.total_events)      # events emitted by this stream
    print(stm.pending)           # events waiting to be processed
    print(stm.capacity)          # output buffer capacity
    print(stm.is_backpressured)  # downstream can't keep up
    print(stm.watermark)         # stream watermark position
    print(stm.sql)               # SQL definition

for stm in conn.all_stream_metrics():
    print(f"{stm.name}: {stm.total_events} emitted, watermark={stm.watermark}")

MaterializedView wrapper

For a higher-level API, wrap a stream in a MaterializedView:

from laminardb import MaterializedView, ChangeEvent

mv = laminardb.mv(conn, "high_temp", "SELECT * FROM sensors WHERE value > 100.0")

# Query current state
result = mv.query()
result.show()

# Query with filter
result = mv.query(where="device = 'boiler_1'")

# Get schema
print(mv.schema())  # Schema wrapper

# Subscribe with callback
def on_alert(event: ChangeEvent):
    for row in event:
        print(f"[{row.op}] {row['device']}: {row['value']}")

thread = mv.subscribe(handler=on_alert)  # background daemon thread

Sinks

Sinks are output destinations that receive data from streams. They define where processed data goes after transformation.

# Create a sink via SQL
conn.execute("CREATE SINK output_sink FROM high_temp")

# Discover sinks
conn.list_sinks()  # -> ["output_sink"]

for sink in conn.sinks():
    print(sink.name)  # "output_sink"

print(conn.sink_count)  # 1

External Connectors

LaminarDB supports external source and sink connectors for integrating with external systems. Connectors are created via SQL CREATE SOURCE and CREATE SINK statements.

Available Connectors

Connector Source Sink Feature Flag
Kafka yes yes kafka
WebSocket yes yes websocket
PostgreSQL CDC yes -- postgres-cdc
PostgreSQL -- yes postgres-sink
MySQL CDC yes -- mysql-cdc
Delta Lake yes yes delta-lake
Apache Iceberg -- yes iceberg

Kafka

# Kafka source — consume from a topic
conn.execute("""
    CREATE SOURCE trades (symbol VARCHAR, price DOUBLE, volume BIGINT, ts TIMESTAMP)
    WITH (
        connector = 'kafka',
        'bootstrap.servers' = 'localhost:9092',
        'topic' = 'market-trades',
        'group.id' = 'laminardb-consumer',
        'auto.offset.reset' = 'earliest',
        'format' = 'json'
    )
""")

# Kafka sink — publish query results to a topic
conn.execute("""
    CREATE SINK alerts AS
        SELECT symbol, price FROM trades WHERE price > 1000
    WITH (
        connector = 'kafka',
        'bootstrap.servers' = 'localhost:9092',
        'topic' = 'price-alerts',
        'format' = 'json'
    )
""")

WebSocket

# WebSocket source (client mode) — connect to a market data feed
conn.execute("""
    CREATE SOURCE prices (symbol VARCHAR, bid DOUBLE, ask DOUBLE, ts TIMESTAMP)
    WITH (
        connector = 'websocket',
        'url' = 'wss://feed.example.com/prices',
        'format' = 'json',
        'reconnect.enabled' = 'true',
        'reconnect.max.delay.ms' = '30000'
    )
""")

# WebSocket sink (server mode) — fan out results to dashboard clients
conn.execute("""
    CREATE SINK dashboard AS
        SELECT symbol, AVG(bid) as avg_bid FROM prices
        GROUP BY symbol, TUMBLE(ts, INTERVAL '5 seconds')
    WITH (
        connector = 'websocket',
        'mode' = 'server',
        'bind.address' = '0.0.0.0:8080',
        'format' = 'json',
        'slow.client.policy' = 'drop_oldest'
    )
""")

PostgreSQL CDC

# Capture changes from a PostgreSQL table via logical replication
conn.execute("""
    CREATE SOURCE users (id BIGINT, name VARCHAR, email VARCHAR, updated_at TIMESTAMP)
    WITH (
        connector = 'postgres-cdc',
        'connection' = 'postgresql://user:pass@localhost:5432/mydb',
        'table' = 'public.users',
        'slot.name' = 'laminardb_slot'
    )
""")

MySQL CDC

# Capture changes from a MySQL table via binary log replication
conn.execute("""
    CREATE SOURCE orders (id BIGINT, product VARCHAR, quantity INT, created_at TIMESTAMP)
    WITH (
        connector = 'mysql-cdc',
        'connection' = 'mysql://user:pass@localhost:3306/mydb',
        'table' = 'orders',
        'server.id' = '12345'
    )
""")

Delta Lake

# Delta Lake source — poll for new versions
conn.execute("""
    CREATE SOURCE inventory (id BIGINT, sku VARCHAR, quantity INT)
    WITH (
        connector = 'delta-lake',
        'table_uri' = 's3://my-bucket/warehouse/inventory',
        'poll_interval_ms' = '5000'
    )
""")

# Delta Lake sink — write query results as Delta table
conn.execute("""
    CREATE SINK archive AS
        SELECT * FROM trades
    WITH (
        connector = 'delta-lake',
        'table_uri' = 's3://my-bucket/warehouse/trades',
        'write_mode' = 'append'
    )
""")

Apache Iceberg

# Iceberg sink — write to an Iceberg table
conn.execute("""
    CREATE SINK warehouse AS
        SELECT symbol, price, ts FROM trades
    WITH (
        connector = 'iceberg',
        'catalog_uri' = 'https://catalog.example.com',
        'catalog_type' = 'rest',
        'table' = 'analytics.trades',
        'write_mode' = 'append'
    )
""")

Watermarks & Event Time

Watermarks are the foundation of event-time processing in LaminarDB. A watermark is a timestamp that declares: "all events with timestamps up to this point have been seen."

Why watermarks matter

In streaming systems, data can arrive out of order. Watermarks let the engine know when it's safe to:

  • Close time-based windows (e.g., "all data for the 10:00-10:05 window has arrived")
  • Emit aggregation results
  • Detect late-arriving data
  • Advance the pipeline's logical clock

Emitting watermarks

Use the streaming Writer to emit watermarks alongside data:

import time

conn = laminardb.open(":memory:")
conn.execute("CREATE SOURCE readings (ts BIGINT, sensor VARCHAR, temp DOUBLE)")
conn.start()

with conn.writer("readings") as w:
    # Simulate a timeseries: readings every second
    for t in range(100):
        w.insert({
            "ts": t * 1000,  # millisecond timestamps
            "sensor": f"sensor_{t % 5}",
            "temp": 20.0 + (t % 30) * 0.5,
        })

        # Advance the watermark every 10 events
        # This tells the engine: "no more events with ts <= this value"
        if t % 10 == 9:
            w.watermark(t * 1000)
            print(f"Watermark advanced to {w.current_watermark}")

Reading watermarks

Watermarks are visible at every level of the pipeline:

# Writer-level: current watermark for a specific source
with conn.writer("readings") as w:
    w.watermark(5000)
    print(w.current_watermark)  # 5000

# Source-level: via source metrics
sm = conn.source_metrics("readings")
if sm:
    print(sm.watermark)  # source watermark position

# Stream-level: via stream metrics
stm = conn.stream_metrics("alerts")
if stm:
    print(stm.watermark)  # how far this stream has processed

# Pipeline-level: global watermark (min across all sources)
print(conn.pipeline_watermark)

# Via PipelineMetrics
m = conn.metrics()
print(m.pipeline_watermark)

Watermark Python type

For application-level watermark tracking:

from laminardb import Watermark

wm = Watermark(current=5000, lag_ms=200)
print(wm.current)  # 5000
print(wm.lag_ms)   # 200 (how far behind real-time)

Querying

SQL queries

# Standard query &mdash; collects all results
result = conn.query("SELECT * FROM sensors WHERE value > 43.0")

# DuckDB-style alias
result = conn.sql("SELECT * FROM sensors WHERE value > 43.0")

# DDL / DML execution
exec_result = conn.execute("CREATE SOURCE metrics (ts BIGINT, value DOUBLE)")
print(exec_result.result_type)  # "ddl"
print(exec_result.ddl_type)     # "CREATE SOURCE"
print(exec_result.ddl_object)   # "metrics"
print(int(exec_result))         # rows affected

QueryResult

Every query returns a QueryResult with multiple export options:

result = conn.sql("SELECT * FROM sensors")

# Properties
result.num_rows       # total row count
result.num_columns    # column count
result.num_batches    # Arrow batch count
result.columns        # list of column names
result.schema         # pyarrow.Schema
len(result)           # same as num_rows

Export to DataFrames

result.to_pandas()    # -> pandas.DataFrame
result.to_polars()    # -> polars.DataFrame
result.to_arrow()     # -> pyarrow.Table
result.to_dicts()     # -> dict[str, list]  (columnar)
result.to_df()        # -> auto-detect best library (polars > pandas > pyarrow)

DuckDB-style methods

result.df()                # -> pandas.DataFrame
result.pl()                # -> polars.DataFrame
result.pl(lazy=True)       # -> polars.LazyFrame
result.arrow()             # -> pyarrow.Table
result.fetchall()          # -> list[tuple]
result.fetchone()          # -> tuple | None
result.fetchmany(10)       # -> list[tuple] (up to 10 rows)
result.show()              # print preview (default 20 rows)
result.show(max_rows=50)   # print more rows
result._repr_html_()       # Jupyter HTML rendering

Iteration

# Iterate over rows as tuples
for row in result:
    ts, device, value = row
    print(f"{device}: {value}")

# Zero-copy Arrow PyCapsule export
capsule = result.__arrow_c_stream__()

Streaming large results

For result sets that don't fit in memory:

for batch in conn.stream("SELECT * FROM sensors"):
    print(batch.num_rows)   # each batch is a QueryResult
    df = batch.to_pandas()

Explain queries

plan = conn.explain("SELECT * FROM sensors WHERE value > 43.0")
print(plan)

Subscriptions

Subscribe to continuous queries for real-time streaming.

Synchronous

sub = conn.subscribe("SELECT * FROM sensors")

# Iterator protocol
for result in sub:
    df = result.to_pandas()
    process(df)

# Or non-blocking poll
batch = sub.try_next()  # returns QueryResult or None

sub.cancel()  # safe to call multiple times
print(sub.is_active)  # False

Asynchronous

sub = await conn.subscribe_async("SELECT * FROM sensors")

async for result in sub:
    df = result.to_pandas()
    await process(df)

sub.cancel()

Named stream subscriptions

Subscribe directly to a named stream with schema access and timeout support:

sub = conn.subscribe_stream("alerts")
print(sub.schema)       # pyarrow.Schema of the stream
print(sub.is_active)    # True

# Blocking with timeout (recommended for most use cases)
batch = sub.next_timeout(1000)  # wait up to 1000ms, returns None on timeout

# Non-blocking poll
batch = sub.try_next()  # returns immediately, None if no data

# Blocking (waits indefinitely)
batch = sub.next()

# Iterator protocol
for batch in sub:
    batch.show()

sub.cancel()

Async stream subscriptions

sub = await conn.subscribe_stream_async("alerts")
print(sub.schema)  # pyarrow.Schema

async for batch in sub:
    df = batch.df()
    await send_to_dashboard(df)

sub.cancel()

Streaming Examples

IoT sensor monitoring

A complete pipeline that ingests sensor readings, detects anomalies, and tracks per-device statistics:

import time
import threading
import laminardb
from laminardb import ChangeEvent

conn = laminardb.open(":memory:")

# ── Define sources ──
conn.execute("""
    CREATE SOURCE sensors (
        ts       BIGINT,
        device   VARCHAR,
        temp     DOUBLE,
        humidity DOUBLE
    )
""")

# ── Define streams ──
# Real-time anomaly detection
conn.execute("""
    CREATE STREAM temp_alerts AS
    SELECT ts, device, temp
    FROM sensors
    WHERE temp > 80.0 OR temp < -20.0
""")

# Per-device running statistics
conn.execute("""
    CREATE STREAM device_stats AS
    SELECT
        device,
        COUNT(*)   AS reading_count,
        AVG(temp)  AS avg_temp,
        MAX(temp)  AS max_temp,
        MIN(temp)  AS min_temp
    FROM sensors
    GROUP BY device
""")

# ── Start the pipeline ──
conn.start()

# ── Subscribe to alerts ──
alert_count = 0

def on_alert(event: ChangeEvent):
    global alert_count
    for row in event:
        alert_count += 1
        print(f"  ALERT [{row.op}] {row['device']}: temp={row['temp']}")

mv = laminardb.mv(conn, "temp_alerts")
alert_thread = mv.subscribe(handler=on_alert)

# ── Ingest sensor data with watermarks ──
with conn.writer("sensors") as w:
    for t in range(200):
        w.insert({
            "ts": t * 1000,
            "device": f"device_{t % 5}",
            "temp": 20.0 + (t % 50) * 1.5,   # some will exceed 80.0
            "humidity": 40.0 + (t % 20) * 1.0,
        })

        if t % 50 == 49:
            w.watermark(t * 1000)

time.sleep(0.5)  # let the pipeline process

# ── Query current state ──
print("\n=== Device Statistics ===")
conn.sql("SELECT * FROM device_stats").show()

print(f"\nTotal alerts: {alert_count}")

# ── Observe pipeline health ──
m = conn.metrics()
print(f"Pipeline: {m.state}, {m.total_events_ingested} events ingested")
print(f"Uptime: {m.uptime_secs:.1f}s, watermark: {m.pipeline_watermark}")

conn.close()

Timeseries analytics

Compute rolling metrics over time-ordered data:

import laminardb

conn = laminardb.open(":memory:")

# ── Source: stock trades ──
conn.execute("""
    CREATE SOURCE trades (
        ts      BIGINT,
        symbol  VARCHAR,
        price   DOUBLE,
        volume  BIGINT
    )
""")

# ── Stream: per-symbol price summary ──
conn.execute("""
    CREATE STREAM price_summary AS
    SELECT
        symbol,
        COUNT(*)    AS trade_count,
        AVG(price)  AS avg_price,
        MAX(price)  AS high,
        MIN(price)  AS low,
        SUM(volume) AS total_volume
    FROM trades
    GROUP BY symbol
""")

# ── Stream: high-volume trades ──
conn.execute("""
    CREATE STREAM whale_trades AS
    SELECT ts, symbol, price, volume
    FROM trades
    WHERE volume > 10000
""")

conn.start()

# ── Simulate a trading day ──
import random
symbols = ["AAPL", "GOOGL", "MSFT", "AMZN"]

with conn.writer("trades") as w:
    for t in range(1000):
        sym = random.choice(symbols)
        w.insert({
            "ts": 1700000000 + t,
            "symbol": sym,
            "price": round(100 + random.gauss(0, 5), 2),
            "volume": random.randint(100, 50000),
        })
        if t % 100 == 99:
            w.watermark(1700000000 + t)

# ── Query results ──
print("=== Price Summary ===")
conn.sql("SELECT * FROM price_summary").show()

print("\n=== Whale Trades ===")
conn.sql("SELECT * FROM whale_trades").show(max_rows=10)

# ── Export to pandas for further analysis ──
df = conn.sql("SELECT * FROM price_summary").df()
print(f"\nHighest avg price: {df.loc[df['avg_price'].idxmax(), 'symbol']}")

conn.close()

Multi-source event correlation

Join events from different sources in real time:

import laminardb

conn = laminardb.open(":memory:")

# ── Two event sources ──
conn.execute("CREATE SOURCE logins    (ts BIGINT, user_id VARCHAR, ip VARCHAR)")
conn.execute("CREATE SOURCE purchases (ts BIGINT, user_id VARCHAR, amount DOUBLE)")

# ── Stream: correlate logins with purchases ──
conn.execute("""
    CREATE STREAM user_activity AS
    SELECT
        l.user_id,
        l.ip,
        p.amount,
        p.ts AS purchase_ts
    FROM logins l
    JOIN purchases p ON l.user_id = p.user_id
""")

# ── Stream: suspicious activity (purchase from new IP) ──
conn.execute("""
    CREATE STREAM large_purchases AS
    SELECT user_id, amount, purchase_ts
    FROM user_activity
    WHERE amount > 500.0
""")

conn.start()

# ── Ingest correlated events ──
conn.insert("logins", [
    {"ts": 1, "user_id": "alice", "ip": "1.2.3.4"},
    {"ts": 2, "user_id": "bob",   "ip": "5.6.7.8"},
])
conn.insert("purchases", [
    {"ts": 3, "user_id": "alice", "amount": 29.99},
    {"ts": 4, "user_id": "bob",   "amount": 999.00},
    {"ts": 5, "user_id": "alice", "amount": 750.00},
])

import time; time.sleep(0.3)

# ── Check results ──
print("=== User Activity ===")
conn.sql("SELECT * FROM user_activity").show()

print("\n=== Large Purchases ===")
conn.sql("SELECT * FROM large_purchases").show()

conn.close()

Real-time dashboard with async subscriptions

import asyncio
import laminardb

async def dashboard():
    async with laminardb.open(":memory:") as conn:
        conn.execute("CREATE SOURCE metrics (ts BIGINT, service VARCHAR, latency_ms DOUBLE)")
        conn.execute("""
            CREATE STREAM slow_requests AS
            SELECT * FROM metrics WHERE latency_ms > 500.0
        """)
        conn.start()

        # Subscribe to slow requests asynchronously
        sub = await conn.subscribe_stream_async("slow_requests")

        # Simulate ingestion in a background task
        async def ingest():
            import random
            for t in range(100):
                conn.insert("metrics", {
                    "ts": t,
                    "service": random.choice(["api", "web", "worker"]),
                    "latency_ms": random.expovariate(1/300) ,  # some will be > 500ms
                })
                await asyncio.sleep(0.01)

        asyncio.create_task(ingest())

        # Process alerts as they arrive
        count = 0
        async for batch in sub:
            for row in batch.fetchall():
                ts, service, latency = row
                print(f"  Slow request: {service} @ {latency:.0f}ms")
                count += 1
            if count >= 5:
                break

        sub.cancel()
        print(f"\nDetected {count} slow requests")

asyncio.run(dashboard())

Producer-consumer with background threads

import threading
import time
import laminardb

conn = laminardb.open(":memory:")
conn.execute("CREATE SOURCE events (ts BIGINT, type VARCHAR, payload DOUBLE)")
conn.start()

# ── Producer thread ──
def produce():
    with conn.writer("events") as w:
        for t in range(500):
            w.insert({"ts": t, "type": f"evt_{t % 3}", "payload": float(t)})
            if t % 50 == 49:
                w.watermark(t)
        print(f"Producer done: {w.current_watermark} final watermark")

# ── Consumer thread ──
consumed = 0

def consume():
    global consumed
    sub = conn.subscribe("SELECT * FROM events")
    timeout = time.time() + 5.0
    while time.time() < timeout:
        batch = sub.try_next()
        if batch:
            consumed += batch.num_rows
    sub.cancel()

producer = threading.Thread(target=produce)
consumer = threading.Thread(target=consume)

consumer.start()
producer.start()
producer.join()
time.sleep(0.5)
consumer.join()

print(f"Consumed {consumed} events")

# Check pipeline metrics
m = conn.metrics()
print(f"Pipeline: ingested={m.total_events_ingested}, emitted={m.total_events_emitted}")
conn.close()

Pipeline Control

# Start the streaming pipeline (required before subscriptions receive data)
conn.start()

# Graceful shutdown (drains in-flight events)
conn.shutdown()

# Trigger a checkpoint (returns checkpoint ID, or None if disabled)
checkpoint_id = conn.checkpoint()

# Cancel a specific running query
conn.cancel_query(query_id)

# Check pipeline state
print(conn.pipeline_state)         # "running", "stopped", etc.
print(conn.is_checkpoint_enabled)  # True/False

Catalog Introspection

# List names
conn.list_tables()   # or conn.tables()     -> source names
conn.list_streams()  # or conn.materialized_views() -> stream names
conn.list_sinks()                            # -> sink names

# Get a source's schema
schema = conn.schema("sensors")  # -> pyarrow.Schema

# Detailed catalog info with typed objects
for source in conn.sources():
    print(source.name, source.schema, source.watermark_column)

for stream in conn.streams():
    print(stream.name, stream.sql)

for sink in conn.sinks():
    print(sink.name)

for query in conn.queries():
    print(query.id, query.sql, query.active)

Table statistics

stats = conn.stats("sensors")
print(stats["name"])             # "sensors"
print(stats["total_events"])     # total ingested events
print(stats["pending"])          # events in buffer
print(stats["capacity"])         # buffer capacity
print(stats["is_backpressured"]) # True if buffer full
print(stats["watermark"])        # current watermark
print(stats["utilization"])      # 0.0 - 1.0

Pipeline Metrics & Observability

Full visibility into every component of the streaming pipeline:

# ── Pipeline-wide metrics ──
m = conn.metrics()
print(m.total_events_ingested)   # total rows ingested across all sources
print(m.total_events_emitted)    # total rows emitted by streams
print(m.total_events_dropped)    # dropped events (backpressure, errors)
print(m.total_cycles)            # pipeline processing cycles
print(m.total_batches)           # batches processed
print(m.uptime_secs)             # pipeline uptime in seconds
print(m.state)                   # "running", "stopped", etc.
print(m.source_count)            # registered sources
print(m.stream_count)            # registered streams
print(m.sink_count)              # registered sinks
print(m.pipeline_watermark)      # global watermark (min across sources)

# ── Per-source metrics ──
for sm in conn.all_source_metrics():
    print(f"  {sm.name}: {sm.total_events} events, "
          f"{sm.utilization:.0%} buffer, wm={sm.watermark}")

# ── Per-stream metrics ──
for stm in conn.all_stream_metrics():
    print(f"  {stm.name}: {stm.total_events} emitted, "
          f"wm={stm.watermark}, sql={stm.sql}")

# ── Pipeline topology (DAG visualization) ──
topo = conn.topology()
for node in topo.nodes:
    print(f"  [{node.node_type}] {node.name}")
    if node.sql:
        print(f"    SQL: {node.sql}")
for edge in topo.edges:
    print(f"  {edge.from_node} --> {edge.to_node}")

Metrics Python wrapper

from laminardb import Metrics

metrics = Metrics(conn.metrics())
print(metrics.events_per_second)   # calculated throughput
print(metrics.uptime_secs)         # pipeline uptime
print(metrics.state)               # pipeline state string

Error Handling

Structured exception hierarchy with numeric error codes:

import laminardb

try:
    conn.query("INVALID SQL")
except laminardb.QueryError as e:
    print(f"Query failed: {e}")
    print(f"Error code: {e.code}")
except laminardb.LaminarError as e:
    print(f"LaminarDB error: {e}")

Exception hierarchy

Exception Raised when
LaminarError Base class for all LaminarDB errors
ConnectionError Connection cannot be established or is lost
QueryError SQL query fails (syntax error, missing table, etc.)
IngestionError Data insertion fails (type mismatch, invalid format)
SchemaError Schema operation fails (table exists, invalid schema)
SubscriptionError Subscription operation fails
StreamError Stream / materialized view operation fails
CheckpointError Checkpoint operation fails
ConnectorError Connector operation fails

Error codes

Every exception carries a numeric .code attribute. Constants are available via laminardb.codes:

from laminardb import codes

codes.CONNECTION_FAILED     # 100
codes.CONNECTION_CLOSED     # 101
codes.CONNECTION_IN_USE     # 102
codes.TABLE_NOT_FOUND       # 200
codes.TABLE_EXISTS          # 201
codes.SCHEMA_MISMATCH       # 202
codes.INVALID_SCHEMA        # 203
codes.INGESTION_FAILED      # 300
codes.WRITER_CLOSED         # 301
codes.BATCH_SCHEMA_MISMATCH # 302
codes.QUERY_FAILED          # 400
codes.SQL_PARSE_ERROR       # 401
codes.QUERY_CANCELLED       # 402
codes.SUBSCRIPTION_FAILED   # 500
codes.SUBSCRIPTION_CLOSED   # 501
codes.SUBSCRIPTION_TIMEOUT  # 502
codes.INTERNAL_ERROR        # 900
codes.SHUTDOWN              # 901

Configuration

config = laminardb.LaminarConfig(
    buffer_size=65536,             # source buffer size (default: 65536)
    storage_dir="/tmp/laminar",    # storage directory (optional)
    checkpoint_interval_ms=10000,  # checkpoint interval in ms (optional)
)

conn = laminardb.open(":memory:", config=config)

laminardb.Config is an alias for laminardb.LaminarConfig.


Python Types

High-level Python wrappers in laminardb.types:

from laminardb import (
    Schema, Column, TableStats, Watermark,
    CheckpointStatus, Metrics, ChangeRow, ChangeEvent, MaterializedView,
)
Type Description
Column(name, type, nullable) Frozen dataclass for column metadata
Schema Wraps pyarrow.Schema with columns, names, indexing by name/position
TableStats(row_count, size_bytes) Frozen dataclass
Watermark(current, lag_ms) Frozen dataclass for watermark state
CheckpointStatus(checkpoint_id, enabled) Frozen dataclass
Metrics Wrapper with events_per_second, uptime_secs, state
ChangeRow Dict-like row with .op, ["col"], .keys(), .to_dict()
ChangeEvent Iterable batch of ChangeRows with .df(), .arrow(), .pl()
MaterializedView Stream wrapper with .query(), .subscribe(handler=), .schema()

Async Support

import asyncio
import laminardb

async def main():
    async with laminardb.open(":memory:") as conn:
        conn.execute("CREATE SOURCE events (id BIGINT, name VARCHAR)")
        conn.insert("events", [{"id": 1, "name": "click"}])

        sub = await conn.subscribe_async("SELECT * FROM events")
        async for batch in sub:
            print(batch.to_dicts())
            break
        sub.cancel()

asyncio.run(main())

Type Safety

LaminarDB ships with PEP 561 type stubs (py.typed marker). All public APIs are fully typed and pass mypy --strict:

mypy python/laminardb --strict

IDEs like VS Code and PyCharm provide full autocompletion out of the box.


Performance

Feature Details
Zero-copy export Arrow PyCapsule interface (__arrow_c_stream__) avoids serialization
GIL release Every blocking Rust call uses py.allow_threads()
Free-threaded Python All types are Send + Sync, ready for Python 3.13t / 3.14t
Batch buffering Streaming writer for efficient bulk inserts
LTO release builds Link-time optimization + symbol stripping

Benchmarks

python benchmarks/bench_ingestion.py   # ingestion throughput by format
python benchmarks/bench_query.py       # query + conversion throughput
python benchmarks/bench_streaming.py   # subscription iteration throughput

API Aliases (DuckDB Compatibility)

For users coming from DuckDB:

DuckDB style Equivalent
conn.sql(query) conn.query(query)
conn.tables() conn.list_tables()
conn.materialized_views() conn.list_streams()
result.df() result.to_pandas()
result.pl() result.to_polars()
result.arrow() result.to_arrow()
result.fetchall() rows as list[tuple]
result.fetchone() first row as tuple or None
result.fetchmany(n) up to n rows as list[tuple]
laminardb.Config laminardb.LaminarConfig
laminardb.BatchWriter laminardb.Writer

Contributing

Development setup

# Clone with the laminardb monorepo dependency
git clone https://github.com/laminardb/laminardb-python.git
git clone https://github.com/laminardb/laminardb.git  # sibling directory

# Build and install in development mode
cd laminardb-python
python -m venv .venv
source .venv/bin/activate   # or .venv\Scripts\activate on Windows
pip install maturin
maturin develop --extras dev

Running checks

# Tests
pytest tests/ -v

# Rust checks
cargo fmt --check
cargo clippy -- -D warnings

# Type checking
mypy python/laminardb --strict

Project structure

python/laminardb/          # Python package
  __init__.py              # Public API, module-level functions, aliases
  types.py                 # High-level Python types (Schema, MaterializedView, etc.)
  _laminardb.pyi           # PEP 561 type stubs
  py.typed                 # Type marker
src/                       # Rust source (PyO3 bindings)
  lib.rs                   # Module entry point, open()/connect()
  connection.rs            # Connection class (40+ methods)
  query.rs                 # QueryResult class
  conversion.rs            # Python <-> Arrow type conversion
  writer.rs                # Streaming writer with watermark support
  subscription.rs          # Sync subscription iterator
  async_support.rs         # Tokio runtime + async subscription
  stream_subscription.rs   # Named stream subscriptions (sync + async)
  execute.rs               # ExecuteResult for DDL/DML introspection
  error.rs                 # Exception hierarchy with error codes
  config.rs                # LaminarConfig
  catalog.rs               # Catalog info (SourceInfo, SinkInfo, StreamInfo, QueryInfo)
  metrics.rs               # Pipeline topology and metrics
tests/                     # pytest test suite (15 files, 233+ tests)
benchmarks/                # Performance benchmarks
examples/                  # Example scripts
docs/                      # Architecture and feature specs

License

MIT — see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

laminardb-0.17.0.tar.gz (1.9 MB view details)

Uploaded Source

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

laminardb-0.17.0-cp311-abi3-win_amd64.whl (41.0 MB view details)

Uploaded CPython 3.11+Windows x86-64

laminardb-0.17.0-cp311-abi3-manylinux_2_28_x86_64.whl (40.1 MB view details)

Uploaded CPython 3.11+manylinux: glibc 2.28+ x86-64

laminardb-0.17.0-cp311-abi3-macosx_11_0_arm64.whl (33.0 MB view details)

Uploaded CPython 3.11+macOS 11.0+ ARM64

File details

Details for the file laminardb-0.17.0.tar.gz.

File metadata

  • Download URL: laminardb-0.17.0.tar.gz
  • Upload date:
  • Size: 1.9 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for laminardb-0.17.0.tar.gz
Algorithm Hash digest
SHA256 bdb119817f78c0bc498a7da846219870c52cdbb387a8831842549cbc734e309b
MD5 9d359889ef0ea44fdf4faeaea278c495
BLAKE2b-256 e416028914da6048aebc580ccd43c27eb79da40ca83e47be034939df08ab950b

See more details on using hashes here.

Provenance

The following attestation bundles were made for laminardb-0.17.0.tar.gz:

Publisher: release.yml on laminardb/laminardb-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file laminardb-0.17.0-cp311-abi3-win_amd64.whl.

File metadata

  • Download URL: laminardb-0.17.0-cp311-abi3-win_amd64.whl
  • Upload date:
  • Size: 41.0 MB
  • Tags: CPython 3.11+, Windows x86-64
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for laminardb-0.17.0-cp311-abi3-win_amd64.whl
Algorithm Hash digest
SHA256 78f10e4e347b3d8d13256563652bfc08b06bdccba7d6bb9695a64f7844d73ce8
MD5 cfa3e692c073e71640843f48192ed27d
BLAKE2b-256 aceb49e6b6d2b332cefb16a0fc32969bdffe67e70180a2eb8c03abdeaf4dfe83

See more details on using hashes here.

Provenance

The following attestation bundles were made for laminardb-0.17.0-cp311-abi3-win_amd64.whl:

Publisher: release.yml on laminardb/laminardb-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file laminardb-0.17.0-cp311-abi3-manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for laminardb-0.17.0-cp311-abi3-manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 da77976186bffdd9fe2f86a0c066d7d0e4f6a63beadaeb0101a16fcbe3ffd319
MD5 6cbb60d2b77aa9bde304066ca4cde8a4
BLAKE2b-256 c74bfe3e407e9fc3c1bc38f9bfb33571dbdade4172397f02e321bc3a838edf4d

See more details on using hashes here.

Provenance

The following attestation bundles were made for laminardb-0.17.0-cp311-abi3-manylinux_2_28_x86_64.whl:

Publisher: release.yml on laminardb/laminardb-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file laminardb-0.17.0-cp311-abi3-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for laminardb-0.17.0-cp311-abi3-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 db60a361f450cc61ded33569ca46414b93efed10a6087d3b4b1190b72c02026b
MD5 e75be5fd518292c2e314514a73bef215
BLAKE2b-256 521ba4224ebcbbeb9962a8a5e67e57b6e662d112af1603c178cd515e61bfec31

See more details on using hashes here.

Provenance

The following attestation bundles were made for laminardb-0.17.0-cp311-abi3-macosx_11_0_arm64.whl:

Publisher: release.yml on laminardb/laminardb-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page