Skip to main content

Python SDK for Postgres CDC: watch tables via logical replication and sync changes to vector stores (pgvector, Qdrant).

Project description

pgstream

PyPI version Python License: MIT CI

A Python SDK that watches Postgres tables via logical replication (CDC) and syncs row changes to vector stores in real time.

pip install pgstream

What it does

pgstream connects to your Postgres database using the logical replication protocol (the same protocol Debezium and pglogical use). Whenever you INSERT, UPDATE, or DELETE a row in a watched table, pgstream decodes the WAL change into a structured ChangeEvent Python object and calls your async handler with it.

Your handler is where the application logic lives — typically:

  • Embed the new row content with your model of choice
  • Upsert the resulting vector into a vector store (pgvector, Qdrant, etc.)
  • Delete vectors for deleted rows

pgstream handles all the low-level plumbing: slot management, keepalive ACKs, at-least-once delivery, graceful shutdown.


Quick start

import asyncio
from pgstream import PGStream, ChangeEvent
from pgstream.sinks import QdrantSink

stream = PGStream(dsn="postgresql://user:pass@localhost/db")
stream.watch("documents")
stream.sink(QdrantSink(url="http://localhost:6333", collection_name="docs"))

@stream.on_change
async def handle(event: ChangeEvent, sink):
    if event.operation in ("insert", "update"):
        vector = await my_embed_function(event.row["content"])
        await sink.upsert(
            id=event.row["id"],
            vector=vector,
            payload={"content": event.row["content"]},
        )
    elif event.operation == "delete":
        await sink.delete(event.row["id"])

async def main():
    await stream.setup()   # idempotent — creates slot + publication once
    await stream.start()   # blocks; press Ctrl+C to stop

asyncio.run(main())

Installation

# Core only (replication + event decoding)
pip install pgstream

# With pgvector sink
pip install "pgstream[pgvector]"

# With Qdrant sink
pip install "pgstream[qdrant]"

# Both
pip install "pgstream[all]"

Requirements:

  • Python 3.13+
  • Postgres 10+ with wal_level = logical
  • The user in the DSN must have the REPLICATION privilege

Architecture

┌────────────────────────────────────────────────────────────────────┐
│  Postgres                                                          │
│                                                                    │
│  WAL  ──►  pgoutput plugin  ──►  replication slot  ──►  client    │
└────────────────────────────────────────────────────────────────────┘
                                                          │
                                        psycopg2 replication protocol
                                                          │
┌─────────────────────────────────────────────────────────┼──────────┐
│  pgstream                                               │          │
│                                                         ▼          │
│  ┌──────────────────────────┐    ┌────────────────────────────┐   │
│  │  ReplicationStream       │    │  PgOutputDecoder           │   │
│  │  (background thread)     │───►│  (binary wire format       │   │
│  │                          │    │   parser, pure Python)     │   │
│  │  · psycopg2 blocking     │    └────────────┬───────────────┘   │
│  │    replication loop      │                 │ ChangeEvent        │
│  │  · read_message()        │                 ▼                    │
│  │  · send_feedback (ACK)   │    ┌────────────────────────────┐   │
│  │  · keepalive every 10s   │    │  on_change handler         │   │
│  └──────────────────────────┘    │  (user's async function)   │   │
│            │                     │                            │   │
│  asyncio.run_coroutine_          │  runs in main event loop   │   │
│  threadsafe()  ─────────────────►│  via run_coroutine_        │   │
│                                  │  threadsafe()              │   │
│                                  └────────────┬───────────────┘   │
│                                               │                    │
│                                               ▼                    │
│                                  ┌────────────────────────────┐   │
│                                  │  Sink                      │   │
│                                  │  (PgVectorSink /           │   │
│                                  │   QdrantSink / custom)     │   │
└──────────────────────────────────────────────────────────────────┘

Two connections, two purposes

Connection Library Purpose
Replication connection psycopg2 Streams WAL bytes using the logical replication protocol
Normal query connection asyncpg (in sinks) Regular SQL: CREATE PUBLICATION, slot management, vector writes

Why two libraries? asyncpg does not implement the logical replication protocol — it is a query client only. psycopg2 is the only Python library with full replication support (LogicalReplicationConnection, start_replication(), read_message(), send_feedback()).

Threading model

The psycopg2 replication loop is blocking. Running it directly in the asyncio event loop would stall all other coroutines. pgstream runs it in a background daemon thread. When an event is decoded, asyncio.run_coroutine_threadsafe() submits the user's async handler to the main event loop and blocks the thread until it completes before ACKing the LSN. This preserves at-least-once delivery.

[background thread]             [main event loop]
        │                               │
  decode WAL bytes                      │
        │                               │
  run_coroutine_threadsafe ────────────►│  await handler(event, sink)
        │                               │        │
  future.result()  ◄───────────────────┤  return │
  (blocks thread)                       │
        │
  send_feedback (ACK)

Module breakdown

File Responsibility
events.py ChangeEvent dataclass — the single object flowing through the pipeline
decoder.py PgOutputDecoder — pure Python binary parser for the pgoutput protocol
replication.py SlotManager (setup/teardown) + ReplicationStream (streaming loop)
stream.py PGStream — top-level user API, threading bridge, lifecycle management
sinks/base.py Sink abstract base class
sinks/pgvector.py PgVectorSink — asyncpg-based pgvector reference implementation
sinks/qdrant.py QdrantSink — qdrant-client-based Qdrant reference implementation

The ChangeEvent object

@dataclass
class ChangeEvent:
    operation:   Literal["insert", "update", "delete", "truncate"]
    schema:      str                          # e.g. "public"
    table:       str                          # e.g. "documents"
    row:         dict[str, str | None]        # new row (text-encoded values)
    old_row:     dict[str, str | None] | None # old row (only with REPLICA IDENTITY FULL)
    lsn:         str                          # WAL position, e.g. "0/1A3F28"
    commit_time: datetime                     # UTC datetime of the transaction
    xid:         int                          # Postgres transaction ID

Column values are always strings. pgoutput sends all column data text-encoded. pgstream does not coerce types — event.row["price"] is "9.99", not 9.99. Cast in your handler.


Delivery guarantee

pgstream provides at-least-once delivery:

  • Each LSN is ACKed (via send_feedback()) only after your handler returns successfully.
  • If your handler raises, the LSN is not ACKed. On the next restart, Postgres replays from the last confirmed position.
  • This means your handler may be called twice for the same event if it crashes mid-way. Design your sink writes to be idempotent (both PgVectorSink and QdrantSink use upsert semantics by default).

Running the example

# Postgres must have wal_level = logical
export PGSTREAM_DSN=postgresql://user:pass@localhost:5432/db

uv run python examples/basic_watch.py

In a second terminal:

INSERT INTO documents (content) VALUES ('Hello, pgstream!');
UPDATE documents SET content = 'Updated' WHERE id = 1;
DELETE FROM documents WHERE id = 1;

Running tests

# Unit tests only (no DB required)
uv run pytest tests/test_decoder.py -v

# Integration tests (requires Postgres with wal_level = logical)
export PGSTREAM_DSN=postgresql://user:pass@localhost:5432/db
uv run pytest tests/ -v

# Skip integration tests
uv run pytest tests/ -v -m "not integration"

Implementing a custom Sink

from pgstream.sinks import Sink

class MyPineconeSink(Sink):
    async def upsert(self, id: str, vector: list[float], payload: dict | None = None) -> None:
        # write to Pinecone, Weaviate, Milvus, etc.
        ...

    async def delete(self, id: str) -> None:
        ...

    async def close(self) -> None:
        # optional: release HTTP sessions, pools, etc.
        ...

Postgres setup

-- 1. Enable logical replication (in postgresql.conf, then restart)
wal_level = logical

-- 2. Grant replication privilege to your user
ALTER USER myuser REPLICATION;

-- 3. pgstream handles the rest (CREATE PUBLICATION, slot creation) via setup()

Key decisions and what I learned

Why psycopg2 for replication?

asyncpg is widely used for async Postgres in Python, but it only implements the regular query protocol. The logical replication protocol requires a separate connection type (LogicalReplicationConnection) and special commands (start_replication, read_message, send_feedback). Only psycopg2 exposes these in Python. psycopg3 does not yet have a stable public API for replication. This was a surprising discovery that shaped the entire architecture.

Why a background thread?

The psycopg2 replication loop is fundamentally blocking — select() and read_message() block the calling thread. Running this in the asyncio event loop would freeze all coroutines. The solution is a daemon thread for the replication loop, with asyncio.run_coroutine_threadsafe() to bridge back to the event loop for the user's handler. The thread blocks on future.result() until the handler completes, preserving the at-least-once delivery guarantee.

Why text encoding?

pgoutput protocol v1 sends column values as text strings (e.g. "42" for an integer column). pgstream deliberately does not coerce types — doing so would require knowing the Postgres OID → Python type mapping, which is complex (timezone-aware datetimes, Decimals, custom enum types, arrays, JSONB...). Keeping values as strings keeps the library simple and lets the user decide on casting.

The Relation message cache

Before any INSERT/UPDATE/DELETE, Postgres sends a Relation (R) message with the table's OID and column schema. This must be cached — DML messages only contain the OID, not column names. If the cache is missing an OID (edge case: consumer joined mid-stream), we emit a warning and skip the event rather than crashing. The cache is per-connection (OIDs are session-stable).

ACK timing

send_feedback(flush_lsn=...) tells Postgres "I have processed everything up to this LSN, you can discard WAL". We ACK after the handler returns, never before. This is the crucial point for at-least-once delivery. We also send keepalive feedback every 10 seconds even when idle — Postgres will kill the replication connection after wal_sender_timeout (default 60s) if it hears nothing.

Idempotent setup

setup() checks pg_replication_slots and pg_publication before creating anything. It is safe to call on every startup. This simplifies deployment — you don't need a migration step to create the slot; just call setup() at boot.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgstream-0.2.0.tar.gz (18.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pgstream-0.2.0-py3-none-any.whl (22.4 kB view details)

Uploaded Python 3

File details

Details for the file pgstream-0.2.0.tar.gz.

File metadata

  • Download URL: pgstream-0.2.0.tar.gz
  • Upload date:
  • Size: 18.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.11 {"installer":{"name":"uv","version":"0.10.11","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for pgstream-0.2.0.tar.gz
Algorithm Hash digest
SHA256 ac983472ab0facd9fe0db52a07176a66b5ea14215890daa122cec752e2d5534c
MD5 8262d431a45bf2e5686bfb754718b64e
BLAKE2b-256 5a3e321472a14d7be39c5c88d8b9509f0648f363133e44c95ffb1856ee2a07db

See more details on using hashes here.

File details

Details for the file pgstream-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: pgstream-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 22.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.11 {"installer":{"name":"uv","version":"0.10.11","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for pgstream-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 9e0ee501b8ba2afcb79d0cc36379c1ee6ea688b9d3174776ea1d82b1eb10c6e5
MD5 5b1c6a511b5a25c09676c227e88d8a8a
BLAKE2b-256 c92934cbe19d893c812e563b2a2b91d3451464a6d07d0d7870db328d5ad16472

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page