Skip to main content

A pure-python implementation of the database signal processing theory stream processing paradigm

Project description

PyDBSP

Introduction - (a subset of) Differential Dataflow for the masses

This library provides an implementation of the DBSP language for incremental streaming computations. It started as a research-first reference implementation — see it as the PyTorch of streaming — and has since grown into something usable: a schedule-driven evaluator with explicit per-operator dependencies, antichain-guided garbage collection, and per-layer parallelism that scales on free-threaded Python (3.14t).

It has zero dependencies, and is written in pure python.

Here you can find a single-notebook implementation of almost everything in the DBSP paper. It mirrors what is in this library in an accessible way, and with more examples.

What is DBSP?

DBSP is differential dataflow's less expressive successor. It is a competing theory and framework to other stream processing systems such as Flink and Spark.

Its value is most easily understood in that it is capable of transforming "batch" possibly-iterative relational queries into "streaming incremental ones". This however only conveys a fraction of the theory's power.

As an extreme example, you can find a incremental Interpreter for Datalog under pydbsp.algorithm. Datalog is a query language that is similar to SQL, with focus in efficiently supporting recursion. By implementing Datalog interpretation with dbsp, we get an interpreter whose queries can both change during runtime and respond to new data being streamed in.

Examples

Python API

The default user-facing path owns outer ticks and materialization for you:

from pydbsp import Datalog, V, atom, fact, facts, rule, rules

X, Y, Z = V("X"), V("Y"), V("Z")
program = rules(
    rule(atom("tc", X, Y), atom("e", X, Y)),
    rule(atom("tc", X, Z), atom("tc", X, Y), atom("e", Y, Z)),
)

db = Datalog(indexed=True, parallelism=1)
db.step(
    facts=facts(fact("e", 0, 1), fact("e", 1, 2), fact("e", 2, 3)),
    program=program,
)

print(db.relation("tc").inner)

For programs with stratified negation, use StratifiedDatalog — it runs on the 3-D (outer, stratum, inner) time lattice so each stratum's fixpoint is delayed into its successor. Note: the 3-D path is currently unrefined and roughly an order of magnitude slower than 2-D Datalog on equivalent workloads, with parallel scaling capped near 1.4×. It is the only path to full multi-stratum negation today; expect a perf cliff vs Datalog (which handles the semipositive case):

from pydbsp import StratifiedDatalog, V, atom, fact, facts, not_, rule, rules

X = V("X")
program = rules(
    rule(atom("alive", X), atom("person", X), not_(atom("dead", X))),
)
db = StratifiedDatalog()
db.step(
    facts=facts(fact("person", "alice"), fact("person", "bob"), fact("dead", "bob")),
    program=program,
)
print(db.relation("alive").inner)  # {('alice',): 1}

Reachability and RDFS follow the same shape, all accepting parallelism=N — the schedule-driven evaluator dispatches each P-antichain layer over a thread pool, which scales on free-threaded Python (3.14t / PYTHON_GIL=0).

The lower-level circuit API remains available under pydbsp.algorithms.* for experiments that need direct access to inputs, feedback state, evaluators, and saturation drivers.

For non-iterative streaming relational queries, use the typed 2-D DBSP interface. It keeps DBSP operator names visible while hiding lattice and evaluator boilerplate:

from typing import NamedTuple

from pydbsp import (
    DeltaLiftedDeltaLiftedSortMergeJoin,
    DeltaLiftedDistinct,
    Program2D,
    Source,
)


class Order(NamedTuple):
    id: int
    customer: int
    total: float


class Customer(NamedTuple):
    id: int
    country: str


class EEOrder(NamedTuple):
    order_id: int
    total: float


p = Program2D(gc=True)
orders: Source[Order] = p.source("orders")
customers: Source[Customer] = p.source("customers")

ee_orders = DeltaLiftedDeltaLiftedSortMergeJoin(
    orders,
    customers,
    left_key=lambda o: o.customer,
    right_key=lambda c: c.id,
    projection=lambda o, _c: EEOrder(o.id, o.total),
)

view = p.view("ee_orders", DeltaLiftedDistinct(ee_orders))
p.step({orders: [Order(1, 7, 30.0)], customers: [Customer(7, "EE")]})

print(view.delta().inner)
print(view.materialized().inner)

Typed queries are compiled as logical plans when a view is registered. DeltaLiftedDistinct is normalized as a final result constraint: nested or intermediate distincts are removed and at most one physical DBSP distinct is emitted at the view root. This matches the DBSP distinct deduplication laws and avoids redundant distinct circuits.

Aggregates use Z-set semantics directly. LiftedLiftedAggregate receives the cumulative ZSet for its input and returns a ZSet delta relation; LiftedLiftedGroupBySum and LiftedLiftedGroupByMax are convenience wrappers built on that primitive. If an aggregate wants set-input semantics, it should interpret/clamp weights inside the aggregate function explicitly.

Paper walkthroughs

Blogposts

Notebooks

Tests

There many examples living in each test/test_*.py file.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pydbsp-1.0.0.tar.gz (356.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pydbsp-1.0.0-py3-none-any.whl (65.4 kB view details)

Uploaded Python 3

File details

Details for the file pydbsp-1.0.0.tar.gz.

File metadata

  • Download URL: pydbsp-1.0.0.tar.gz
  • Upload date:
  • Size: 356.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.5

File hashes

Hashes for pydbsp-1.0.0.tar.gz
Algorithm Hash digest
SHA256 6f4bb5081288528ca22140cb3f557fa52f3d4e34564b5fa2aa69e153442a9606
MD5 01ac95934796b65c3bc5b184bd260901
BLAKE2b-256 41abb41fc37d3f93b235ff2304ccce84850798836126e62cdf0bc11ec35eec08

See more details on using hashes here.

File details

Details for the file pydbsp-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: pydbsp-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 65.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.5

File hashes

Hashes for pydbsp-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e3f4435bd1dc4c0d463740b53cc892050270dfff8653a522c3618eb8a44719ad
MD5 6d152eace984a5fdddf7f3aa67191a24
BLAKE2b-256 a43416882b700cde1a6eaec3719b60e51723ad298398127e85eca3aea8197b03

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page