A pure-python implementation of the database signal processing theory stream processing paradigm
Project description
PyDBSP
Introduction - (a subset of) Differential Dataflow for the masses
This library provides an implementation of the DBSP language for incremental streaming computations. It started as a research-first reference implementation — see it as the PyTorch of streaming — and has since grown into something usable: a schedule-driven evaluator with explicit per-operator dependencies, antichain-guided garbage collection, and per-layer parallelism that scales on free-threaded Python (3.14t).
It has zero dependencies, and is written in pure python.
Here you can find a single-notebook implementation of almost everything in the DBSP paper. It mirrors what is in this library in an accessible way, and with more examples.
What is DBSP?
DBSP is differential dataflow's less expressive successor. It is a competing theory and framework to other stream processing systems such as Flink and Spark.
Its value is most easily understood in that it is capable of transforming "batch" possibly-iterative relational queries into "streaming incremental ones". This however only conveys a fraction of the theory's power.
As an extreme example, you can find a incremental Interpreter for Datalog under pydbsp.algorithm. Datalog is a query language that is
similar to SQL, with focus in efficiently supporting recursion. By implementing Datalog interpretation with dbsp, we get an interpreter
whose queries can both change during runtime and respond to new data being streamed in.
Examples
Python API
The default user-facing path owns outer ticks and materialization for you:
from pydbsp import Datalog, V, atom, fact, facts, rule, rules
X, Y, Z = V("X"), V("Y"), V("Z")
program = rules(
rule(atom("tc", X, Y), atom("e", X, Y)),
rule(atom("tc", X, Z), atom("tc", X, Y), atom("e", Y, Z)),
)
db = Datalog(indexed=True, parallelism=1)
db.step(
facts=facts(fact("e", 0, 1), fact("e", 1, 2), fact("e", 2, 3)),
program=program,
)
print(db.relation("tc").inner)
For programs with stratified negation, use StratifiedDatalog — it
runs on the 3-D (outer, stratum, inner) time lattice so each
stratum's fixpoint is delayed into its successor. Note: the 3-D path
is currently unrefined and roughly an order of magnitude slower than
2-D Datalog on equivalent workloads, with parallel scaling capped
near 1.4×. It is the only path to full multi-stratum negation today;
expect a perf cliff vs Datalog (which handles the semipositive
case):
from pydbsp import StratifiedDatalog, V, atom, fact, facts, not_, rule, rules
X = V("X")
program = rules(
rule(atom("alive", X), atom("person", X), not_(atom("dead", X))),
)
db = StratifiedDatalog()
db.step(
facts=facts(fact("person", "alice"), fact("person", "bob"), fact("dead", "bob")),
program=program,
)
print(db.relation("alive").inner) # {('alice',): 1}
Reachability and RDFS follow the same shape, all accepting
parallelism=N — the schedule-driven evaluator dispatches each
P-antichain layer over a thread pool, which scales on free-threaded
Python (3.14t / PYTHON_GIL=0).
The lower-level circuit API remains available under
pydbsp.algorithms.* for experiments that need direct access to
inputs, feedback state, evaluators, and saturation drivers.
For non-iterative streaming relational queries, use the typed 2-D DBSP interface. It keeps DBSP operator names visible while hiding lattice and evaluator boilerplate:
from typing import NamedTuple
from pydbsp import (
DeltaLiftedDeltaLiftedSortMergeJoin,
DeltaLiftedDistinct,
Program2D,
Source,
)
class Order(NamedTuple):
id: int
customer: int
total: float
class Customer(NamedTuple):
id: int
country: str
class EEOrder(NamedTuple):
order_id: int
total: float
p = Program2D(gc=True)
orders: Source[Order] = p.source("orders")
customers: Source[Customer] = p.source("customers")
ee_orders = DeltaLiftedDeltaLiftedSortMergeJoin(
orders,
customers,
left_key=lambda o: o.customer,
right_key=lambda c: c.id,
projection=lambda o, _c: EEOrder(o.id, o.total),
)
view = p.view("ee_orders", DeltaLiftedDistinct(ee_orders))
p.step({orders: [Order(1, 7, 30.0)], customers: [Customer(7, "EE")]})
print(view.delta().inner)
print(view.materialized().inner)
Typed queries are compiled as logical plans when a view is registered.
DeltaLiftedDistinct is normalized as a final result constraint: nested
or intermediate distincts are removed and at most one physical DBSP
distinct is emitted at the view root. This matches the DBSP distinct
deduplication laws and avoids redundant distinct circuits.
Aggregates use Z-set semantics directly. LiftedLiftedAggregate receives
the cumulative ZSet for its input and returns a ZSet delta relation;
LiftedLiftedGroupBySum and LiftedLiftedGroupByMax are convenience
wrappers built on that primitive. If an aggregate wants set-input
semantics, it should interpret/clamp weights inside the aggregate
function explicitly.
Paper walkthroughs
Blogposts
Notebooks
- Quickstart — the 90% path in two cells
- Progress tracking, visually — lattice, antichain, frontier, hyperplane, GC
- Hyperplane parallelism on free-threaded Python
- Graph Reachability
- Datalog Interpretation
- Stratified Datalog Interpretation
- Not-interpreted Datalog
- Streaming Pandas
- SQL Operators in DBSP
- Pure-MLX GPU backend experiment
Tests
There many examples living in each test/test_*.py file.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pydbsp-1.0.0.tar.gz.
File metadata
- Download URL: pydbsp-1.0.0.tar.gz
- Upload date:
- Size: 356.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6f4bb5081288528ca22140cb3f557fa52f3d4e34564b5fa2aa69e153442a9606
|
|
| MD5 |
01ac95934796b65c3bc5b184bd260901
|
|
| BLAKE2b-256 |
41abb41fc37d3f93b235ff2304ccce84850798836126e62cdf0bc11ec35eec08
|
File details
Details for the file pydbsp-1.0.0-py3-none-any.whl.
File metadata
- Download URL: pydbsp-1.0.0-py3-none-any.whl
- Upload date:
- Size: 65.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e3f4435bd1dc4c0d463740b53cc892050270dfff8653a522c3618eb8a44719ad
|
|
| MD5 |
6d152eace984a5fdddf7f3aa67191a24
|
|
| BLAKE2b-256 |
a43416882b700cde1a6eaec3719b60e51723ad298398127e85eca3aea8197b03
|