Skip to main content

High-performance PostgreSQL logical replication (CDC) library with Rust backend

Project description

pgoutput-decoder

PyPI Python Versions CI License

Rust-powered PostgreSQL CDC (Change Data Capture) library with Debezium-compatible output for Python 3.12+

Transform PostgreSQL changes into Debezium-format events with blazing-fast Rust performance and Python's async simplicity.


๐Ÿ“‹ Table of Contents

Features

โšก Performance

  • Rust-Powered Core: Critical path implemented in Rust using tokio-postgres
  • Zero-Copy Decoding: Minimal allocations for high-throughput scenarios
  • Async/Await Native: Built on tokio and pyo3-asyncio for true async Python integration

๐Ÿ”„ Compatibility

๐Ÿ”„ Compatibility

  • Debezium Format: Drop-in compatible with Debezium CDC event format
  • pgoutput Plugin: Uses PostgreSQL's native logical replication protocol
  • Python 3.12+: Modern Python with full type hints

๐Ÿ›ก๏ธ Reliability

  • Auto-Reconnect: Exponential backoff for connection failures
  • Manual LSN Control: Optional manual acknowledgment for exactly-once processing
  • Type-Safe: Comprehensive PostgreSQL type support with proper conversions

๐Ÿ‘จโ€๐Ÿ’ป Developer Experience

  • Simple API: Pythonic async iteration over CDC events
  • Helper Functions: Ready-to-use utilities for common tasks
  • Testcontainers: Easy testing with ephemeral PostgreSQL instances
  • Comprehensive Examples: Real-world usage patterns included

Why pgoutput-decoder?

What is CDC (Change Data Capture)?

CDC captures changes (inserts, updates, deletes) from your database and streams them as events. This enables:

  • Real-time data synchronization
  • Event-driven architectures
  • Audit logging
  • Cache invalidation
  • Microservice data replication

Comparison with Alternatives

Feature pgoutput-decoder psycopg2 py-postgresql Pure Python
Performance ๐ŸŸข Native Rust ๐ŸŸก C Extension ๐ŸŸก C Extension ๐Ÿ”ด Pure Python
Async Support ๐ŸŸข Native async ๐Ÿ”ด Sync only ๐ŸŸก Limited ๐ŸŸข asyncio
Debezium Format ๐ŸŸข Built-in ๐Ÿ”ด Manual ๐Ÿ”ด Manual ๐Ÿ”ด Manual
Type Safety ๐ŸŸข Full ๐ŸŸก Partial ๐ŸŸก Partial ๐ŸŸก Partial
Auto-reconnect ๐ŸŸข Yes ๐Ÿ”ด No ๐Ÿ”ด No ๐Ÿ”ด No
**Python 3.12+

| Python 3.12+ | ๐ŸŸข Optimized | ๐ŸŸก Supported | ๐ŸŸก Supported | ๐ŸŸก Supported |

When to Use pgoutput-decoder

โœ… Good fit when you need:

  • Real-time change streaming from PostgreSQL
  • Debezium-compatible event format
  • High-performance async Python CDC
  • Simple, batteries-included solution
  • Python 3.12+ modern features

โŒ Consider alternatives if:

  • You need Python < 3.12 support
  • You're already using Debezium/Kafka Connect
  • You only need occasional polling (triggers might be simpler)
  • Your use case doesn't require sub-second latency

Installation

From PyPI (Recommended)

# Using uv (recommended)
uv pip install pgoutput-decoder

# Or using pip
pip install pgoutput-decoder

From Source

Requires Rust 1.70+ and Python 3.12+:

git clone https://github.com/yourusername/pgoutput-decoder
cd pgoutput-decoder

# Using uv (recommended)
uv sync
uv run maturin develop

# Or using pip
pip install maturin
maturin develop

Quick Start

Prerequisites

Before running this example, ensure:

  1. PostgreSQL 12+ with wal_level = logical (see PostgreSQL Setup)
  2. A publication and replication slot created
  3. User has REPLICATION privilege

Basic Example

import asyncio
import pgoutput_decoder

async def main():
    # Create replication reader
    cdc_reader = pgoutput_decoder.LogicalReplicationReader(
        publication_name="test_pub",
        slot_name="test_slot",
        host="localhost",
        database="mydb",
        port=5432,
        user="postgres",
        password="password",
    )
    
    # Consume replication messages (Debezium-compatible format)
    async for message in cdc_reader:
        if message.op == "c":  # INSERT
            print(f"New row: {message.after}")
        elif message.op == "u":  # UPDATE
            print(f"Updated from {message.before} to {message.after}")
        elif message.op == "d":  # DELETE
            print(f"Deleted row: {message.before}")
        
        # Access source metadata
        print(f"Table: {message.source['schema']}.{message.source['table']}")
        print(f"LSN: {message.source['lsn']}")
        # Access source metadata
        print(f"Table: {message.source['schema']}.{message.source['table']}")
        print(f"LSN: {message.source['lsn']}")
    
    # Stop when done
    await cdc_reader.stop()

if __name__ == "__main__":
    asyncio.run(main())

Expected Output

# When you INSERT a row:
New row: {'id': 1, 'name': 'Alice', 'email': 'alice@example.com'}
Table: public.users
LSN: 0/1234ABC

# When you UPDATE a row:
Updated from {'id': 1, 'name': 'Alice'} to {'id': 1, 'name': 'Alice Smith'}
Table: public.users
LSN: 0/1234ABD

# When you DELETE a row:
Deleted row: {'id': 1, 'name': 'Alice Smith'}
Table: public.users
LSN: 0/1234ABE

More Examples

Use Cases

1. Real-Time Data Synchronization

Keep secondary databases, search indexes, or caches in sync:

async for message in cdc_reader:
    if message.op == "c" or message.op == "u":
        # Update Elasticsearch index
        await es_client.index(
            index=message.source['table'],
            id=message.after['id'],
            document=message.after
        )
    elif message.op == "d":
        # Remove from index
        await es_client.delete(
            index=message.source['table'],
            id=message.before['id']
        )

2. Event-Driven Microservices

Publish database changes to message queues:

from pgoutput_decoder import message_to_debezium_json

async for message in cdc_reader:
    # Publish to Kafka, RabbitMQ, etc.
    await kafka_producer.send(
        topic=f"db.{message.source['table']}",
        value=message_to_debezium_json(message)
    )

3. Audit Logging

Track all data changes with full history:

async for message in cdc_reader:
    audit_entry = {
        "timestamp": message.ts_ms,
        "operation": message.op,
        "table": f"{message.source['schema']}.{message.source['table']}",
        "before": message.before,
        "after": message.after,
        "lsn": message.source['lsn']
    }
    await audit_log.write(audit_entry)

4. Cache Invalidation

Invalidate caches when data changes:

async for message in cdc_reader:
    cache_key = f"{message.source['table']}:{message.after.get('id')}"
    await redis.delete(cache_key)
    logger.info(f"Invalidated cache: {cache_key}")

PostgreSQL Setup

Step-by-Step Configuration

1. Enable Logical Replication

Edit postgresql.conf and restart PostgreSQL:

wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
# On Linux
sudo systemctl restart postgresql

# On macOS (Homebrew)
brew services restart postgresql

# Verify settings
psql -c "SHOW wal_level;"  # Should output: logical

2. Set Replica Identity (Critical!)

For UPDATE/DELETE operations to include old values:

-- For specific tables (recommended)

```sql
-- For specific tables (recommended)
ALTER TABLE users REPLICA IDENTITY FULL;
ALTER TABLE orders REPLICA IDENTITY FULL;

-- Or for all tables in schema (use cautiously)
DO $$
DECLARE
    r RECORD;
BEGIN
    FOR r IN SELECT tablename FROM pg_tables WHERE schemaname = 'public'
    LOOP
        EXECUTE 'ALTER TABLE ' || quote_ident(r.tablename) || ' REPLICA IDENTITY FULL';
    END LOOP;
END$$;

โš ๏ธ Warning: REPLICA IDENTITY FULL increases WAL size. Only apply to tables where you need old values in UPDATE/DELETE events.

3. Create Publication

-- Create a publication for specific tables
CREATE PUBLICATION my_pub FOR TABLE users, orders, products;

-- Or for all tables
CREATE PUBLICATION my_pub FOR ALL TABLES;

-- Verify
SELECT * FROM pg_publication;

4. Create Replication Slot

-- Create a logical replication slot using pgoutput
SELECT pg_create_logical_replication_slot('my_slot', 'pgoutput');

-- Verify
SELECT * FROM pg_replication_slots;

5. Grant Permissions

-- Grant replication permission to your user
ALTER USER myuser WITH REPLICATION;

-- Grant SELECT on published tables
GRANT SELECT ON ALL TABLES IN SCHEMA public TO myuser;
GRANT SELECT ON ALL SEQUENCES IN SCHEMA public TO myuser;

Quick Start with Docker Compose

For local development and testing:

# docker-compose.yml
version: '3.8'

services:
  postgres:
    image: postgres:16-alpine
    environment:
      POSTGRES_PASSWORD: password
      POSTGRES_DB: testdb
    ports:
      - "5432:5432"
    command:
      - "postgres"
      - "-c"
      - "wal_level=logical"
      - "-c"
      - "max_replication_slots=10"
      - "-c"
      - "max_wal_senders=10"
    volumes:
      - ./examples/setup_postgres.sql:/docker-entrypoint-initdb.d/init.sql
# Start PostgreSQL
docker-compose up -d

# Run setup script
docker-compose exec postgres psql -U postgres -d testdb -f /docker-entrypoint-initdb.d/init.sql

# Test connection
python example_debezium.py

Verify Configuration

-- Check WAL level
SHOW wal_level;  -- Must be 'logical'

-- Check replication slots
SELECT slot_name, slot_type, active FROM pg_replication_slots;

-- Check publications
SELECT pubname, puballtables FROM pg_publication;

-- Monitor replication lag
SELECT slot_name, 
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag
FROM pg_replication_slots;

Message Format

What is Debezium?

Debezium is a popular open-source CDC platform. This library produces events in Debezium's format, making it compatible with existing Debezium-based pipelines and tools.

Message Structure

Messages follow the Debezium-compatible format with before/after states:

{
  "op": "c",                    # Operation: "c" (create/INSERT), "u" (update), "d" (delete)
  "before": None,               # Previous row state (UPDATE/DELETE only)
  "after": {                    # New row state (INSERT/UPDATE only)
    "id": 1,
    "name": "Alice",
    "email": "alice@example.com",
    "created_at": "2024-01-15 10:30:00"
  },
  "source": {                   # Source metadata
    "version": "0.1.0",
    "connector": "pgoutput-decoder",
    "name": "pgoutput-decoder",
    "ts_ms": 1705315800000,
    "snapshot": "false",
    "db": "mydb",
    "schema": "public",
    "table": "users",
    "lsn": 123456789
  },
  "ts_ms": 1705315800000,       # Timestamp in milliseconds
  "ts_us": 1705315800000000,    # Timestamp in microseconds (optional)
  "ts_ns": 1705315800000000000  # Timestamp in nanoseconds (optional)
}

Visual Operation Flow

DATABASE CHANGE          โ†’    CDC EVENT           โ†’    YOUR APPLICATION
โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€             โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€            โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
INSERT INTO users...     โ†’    op: "c"             โ†’    Create in cache
                              after: {new data}   โ†’    Index in search
                              before: None             Send notification

UPDATE users SET...      โ†’    op: "u"             โ†’    Update cache
                              after: {new data}   โ†’    Reindex search
                              before: {old data}       Audit the change

DELETE FROM users...     โ†’    op: "d"             โ†’    Remove from cache
                              after: None         โ†’    Delete from search
                              before: {old data}       Log deletion

Operation Types

  • "c" (Create): INSERT operations - after contains new row, before is None
  • "u" (Update): UPDATE operations - after contains new values, before contains old values (requires REPLICA IDENTITY FULL)
  • "d" (Delete): DELETE operations - before contains deleted row, after is None

Message Examples

INSERT:

message.op == "c"
message.after == {"id": 1, "name": "Alice"}
message.before == None

UPDATE:

message.op == "u"
message.after == {"id": 1, "name": "Alice Updated"}
message.before == {"id": 1, "name": "Alice"}

DELETE:

message.op == "d"
message.before == {"id": 1, "name": "Alice"}
message.after == None

Examples

All examples are in the examples/ directory:

Example Description Complexity
setup_postgres.sql PostgreSQL setup script with e-commerce schema โญ
basic_cdc.py Simple CDC monitoring with helper functions โญโญ
example_debezium.py Debezium format demo with auto/manual acknowledgment โญโญ

Running Examples

# 1. Setup PostgreSQL (see PostgreSQL Setup section)

# 2. Run basic CDC example
python examples/basic_cdc.py

# 3. Try Debezium format demo
python example_debezium.py

Advanced Usage

Manual LSN Acknowledgment

By default, LSNs are acknowledged automatically after each message is processed (auto_acknowledge=True). For more control over acknowledgment (e.g., batch processing, transactional guarantees), you can disable auto-acknowledgment:

# Disable auto-acknowledgment for manual control
cdc_reader = pgoutput_decoder.LogicalReplicationReader(
    publication_name="test_pub",
    slot_name="test_slot",
    host="localhost",
    database="mydb",
    port=5432,
    user="postgres",
    password="password",
    auto_acknowledge=False,  # Manual LSN control
)

async for message in cdc_reader:
    try:
        # Process message...
        await process_message(message)
        
        # Manually acknowledge after successful processing
        await cdc_reader.acknowledge()
    except Exception as e:
        print(f"Failed to process message: {e}")
        # Don't acknowledge - will retry from this LSN on restart
        break

When to use manual acknowledgment:

  • Batch processing: Acknowledge after processing N messages
  • Transactional guarantees: Acknowledge only after committing to database
  • Error handling: Skip acknowledgment on failure to replay messages
  • Exactly-once processing: Coordinate acknowledgment with external systems

Helper Functions

The library provides several helper functions for working with CDC messages:

message_to_debezium_json(message, indent=2)

Convert a message to JSON string in Debezium format. This function is implemented in Rust for high performance.

from pgoutput_decoder import message_to_debezium_json

# Pretty-printed JSON with 2-space indentation (default)
json_str = message_to_debezium_json(message, indent=2)
print(json_str)

# Custom indentation (4 spaces)
json_str = message_to_debezium_json(message, indent=4)

# Compact JSON (no indentation)
json_str = message_to_debezium_json(message, indent=None)

message_to_dict(message)

Convert a message to a Python dictionary:

from pgoutput_decoder import message_to_dict

msg_dict = message_to_dict(message)
# Returns: {"op": "c", "before": None, "after": {...}, "source": {...}, ...}

format_operation(op)

Convert operation codes to human-readable format:

from pgoutput_decoder import format_operation

op_name = format_operation("c")  # Returns: "INSERT"
op_name = format_operation("u")  # Returns: "UPDATE"
op_name = format_operation("d")  # Returns: "DELETE"

get_table_name(message)

Extract fully-qualified table name from a message:

from pgoutput_decoder import get_table_name

table = get_table_name(message)  # Returns: "public.customers"

Filtering by Table

async for message in cdc_reader:
    if message.source["table"] == "users":
        # Process only user table changes
        process_user_change(message)

Error Handling

try:
    async for message in cdc_reader:
        process_message(message)
except Exception as e:
    print(f"Replication error: {e}")
    await cdc_reader.stop()

Manual Slot Management

The library requires you to manually create replication slots for safety. This prevents accidental slot creation that could lead to disk space issues if not properly monitored.

# Create slot using psycopg2 or asyncpg before starting replication
import asyncpg

conn = await asyncpg.connect("postgresql://localhost/mydb")
await conn.execute(
    "SELECT pg_create_logical_replication_slot('my_slot', 'pgoutput')"
)

Supported PostgreSQL Types

PostgreSQL Type Python Type
bool bool
int2, int4, int8 int
float4, float8 float
numeric, decimal float or str
text, varchar, char str
bytea bytes
json, jsonb dict or list
uuid str
date, time, timestamp, timestamptz str (ISO 8601)
Arrays list
Composite types dict

Performance

Benchmarks

(Benchmarks coming soon)

Performance Characteristics

  • Throughput: Designed for high-volume streams (1000s of messages/sec)
  • Latency: Sub-millisecond message processing overhead
  • Memory: ~2-5 MB base overhead + message buffer
  • CPU: Minimal Python GIL impact due to Rust core

Optimization Tips

# Batch acknowledgments for higher throughput
messages_batch = []
async for message in cdc_reader:
    messages_batch.append(message)
    
    if len(messages_batch) >= 100:
        await process_batch(messages_batch)
        await cdc_reader.acknowledge()  # Acknowledge batch
        messages_batch.clear()

FAQ

What is CDC and why do I need it?

Change Data Capture (CDC) is a design pattern that captures and streams database changes in real-time. Unlike polling, CDC:

  • โœ… Has minimal database impact (uses WAL, not queries)
  • โœ… Captures all changes in order
  • โœ… Provides sub-second latency
  • โœ… Doesn't miss changes between polls

How is this different from database triggers?

Feature CDC (pgoutput-decoder) Triggers
Performance No query overhead Runs on every DML
Decoupling External consumer Tightly coupled
Reliability Durable WAL Transaction-dependent
Replay Can replay from LSN No replay capability
Schema changes Handles gracefully Requires trigger updates

Can I use this in production?

Yes, but consider:

  • โœ… Monitor replication slots to prevent WAL bloat
  • โœ… Set up alerting for replication lag
  • โœ… Test failover/recovery scenarios
  • โœ… Use manual acknowledgment for critical workloads
  • โš ๏ธ This library is in active development (v0.1.x)

How do I handle schema changes?

Schema changes are captured in the WAL but may require application updates:

async for message in cdc_reader:
    try:
        # Your processing logic
        process_message(message)
    except KeyError as e:
        # Handle missing columns in old messages
        logger.warning(f"Schema mismatch: {e}")
    except Exception as e:
        # Handle unexpected data types
        logger.error(f"Processing error: {e}")

What happens if my consumer crashes?

The replication slot preserves your position (LSN):

  • โœ… WAL data is retained from your last acknowledged LSN
  • โœ… On restart, you resume from where you left off
  • โš ๏ธ Un-acknowledged messages will be replayed
  • โš ๏ธ Monitor slot lag to prevent WAL disk space issues

How do I monitor replication lag?

-- Check replication lag
SELECT 
    slot_name,
    active,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag_size,
    pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) as lag_bytes
FROM pg_replication_slots
WHERE slot_name = 'your_slot';

Set up monitoring alerts when lag_size exceeds acceptable thresholds (e.g., >1GB).

Testing

Running Tests Locally

Tests use Testcontainers to spin up ephemeral PostgreSQL instances:

# Ensure Docker is running
docker ps

# Run all tests
uv run pytest tests/ -v

# Run specific test file
uv run pytest tests/test_ecommerce_comprehensive.py -v

# Run with coverage
uv run pytest tests/ --cov=pgoutput_decoder --cov-report=html

Test Structure

tests/
โ”œโ”€โ”€ test_ecommerce_comprehensive.py  # E2E tests with realistic schema
โ”œโ”€โ”€ test_acknowledgement.py          # LSN acknowledgment tests
โ”œโ”€โ”€ test_json_serialization.py       # Debezium format validation
โ””โ”€โ”€ test_types.py                    # PostgreSQL type conversion

All tests use PostgreSQL 18.1 via Testcontainers and follow the patterns in AGENTS.md.

Security

Principle of Least Privilege

Grant only necessary permissions:

-- Create dedicated replication user
CREATE USER cdc_user WITH REPLICATION PASSWORD 'secure_password';

-- Grant only SELECT on published tables
GRANT SELECT ON TABLE users, orders, products TO cdc_user;

-- Do NOT grant: INSERT, UPDATE, DELETE, or superuser

Connection Security

# Use environment variables, never hardcode credentials
import os

cdc_reader = pgoutput_decoder.LogicalReplicationReader(
    publication_name="my_pub",
    slot_name="my_slot",
    host=os.getenv("PG_HOST", "localhost"),
    database=os.getenv("PG_DATABASE"),
    port=int(os.getenv("PG_PORT", "5432")),
    user=os.getenv("PG_USER"),
    password=os.getenv("PG_PASSWORD"),
)

SSL/TLS

# Enable SSL (implementation depends on tokio-postgres configuration)
# Currently supported via connection parameters
# See: https://www.postgresql.org/docs/current/libpq-ssl.html

Audit Logging

Monitor replication activity:

-- Enable connection logging in postgresql.conf
log_connections = on
log_disconnections = on

-- Check active replication connections
SELECT * FROM pg_stat_replication;

Troubleshooting

"replication slot does not exist"

Create the replication slot manually:

SELECT pg_create_logical_replication_slot('your_slot', 'pgoutput');

"must be superuser or replication role"

Grant replication permission:

ALTER USER your_user WITH REPLICATION;

Slot bloating disk space

Monitor and drop unused slots:

-- Check slot lag
SELECT slot_name, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag
FROM pg_replication_slots;

-- Drop unused slot
SELECT pg_drop_replication_slot('unused_slot');

"permission denied for table"

Grant SELECT permission:

GRANT SELECT ON ALL TABLES IN SCHEMA public TO your_user;

Connection keeps dropping

Check your firewall/network settings and enable auto-reconnect (enabled by default).

Missing "before" values in UPDATE/DELETE

Set REPLICA IDENTITY FULL:

ALTER TABLE your_table REPLICA IDENTITY FULL;

Debugging Tips

import logging

# Enable debug logging
logging.basicConfig(level=logging.DEBUG)

# Check PostgreSQL logs
# tail -f /var/log/postgresql/postgresql-16-main.log

Development

Prerequisites

  • Rust: 1.70+ (from Cargo.toml)
  • Python: 3.12+ only
  • PostgreSQL: 12+ (with logical replication support)
  • Docker: For running tests
  • uv: Python package manager (recommended)

Setup Development Environment

# Clone repository
git clone https://github.com/yourusername/pgoutput-decoder
cd pgoutput-decoder

# Install uv (if not already installed)
curl -LsSf https://astral.sh/uv/install.sh | sh

# Sync dependencies
uv sync

# Build Rust extension in development mode
uv run maturin develop

# Run tests
uv run pytest tests/ -v

Development Workflow

Per AGENTS.md:

# Lint Python code
uv run ruff check .
uv run ruff format .

# Lint Rust code
cargo fmt --all -- --check
cargo clippy --all-targets --all-features

# Run tests
uv run pytest tests/ -v

# Build release
uv run maturin build --release

Code Coverage

The project supports both Python and Rust code coverage:

# Python coverage only (skips Docker tests)
just coverage

# Rust coverage only (skips Docker tests, requires cargo-llvm-cov)
just install-llvm-cov  # One-time installation
just coverage-rust

# Combined Python + Rust coverage (skips Docker tests)
just coverage-all

# Include Docker tests (requires Docker running)
just coverage-docker              # Python only with Docker
just coverage-rust-docker         # Rust with Docker
just coverage-all-docker          # Both with Docker

Local Development: By default, coverage commands skip Docker-dependent tests for faster iteration. Use the -docker variants when you need complete coverage including integration tests.

GitHub Actions: CI automatically generates and uploads both Python and Rust coverage to Codecov:

  • Python coverage: Measures python/pgoutput_decoder/ code
  • Rust coverage: Measures src/ code exercised by Python tests
  • Flags: Separate python and rust flags for tracking

View coverage reports at: https://codecov.io/gh/yourusername/pgoutput-decoder

Project Structure

pgoutput-decoder/
โ”œโ”€โ”€ src/                  # Rust source code
โ”‚   โ”œโ”€โ”€ lib.rs           # PyO3 module definitions
โ”‚   โ”œโ”€โ”€ pgoutput/        # pgoutput decoder implementation
โ”‚   โ””โ”€โ”€ replication.rs   # Replication connection logic
โ”œโ”€โ”€ python/              # Python source code
โ”‚   โ””โ”€โ”€ pgoutput_decoder/
โ”‚       โ”œโ”€โ”€ __init__.py  # Python API
โ”‚       โ””โ”€โ”€ core.py      # Helper functions
โ”œโ”€โ”€ tests/               # Test suite (uses testcontainers)
โ”œโ”€โ”€ examples/            # Example scripts
โ”œโ”€โ”€ Cargo.toml           # Rust dependencies
โ”œโ”€โ”€ pyproject.toml       # Python metadata & build config
โ””โ”€โ”€ README.md            # This file

Contributing

Contributions welcome! Please:

  1. Fork the repository
  2. Create a feature branch
  3. Add tests for new functionality
  4. Ensure cargo fmt, cargo clippy, and ruff pass
  5. Submit a pull request

See CONTRIBUTING.md for detailed guidelines (if available).

Version Compatibility

Supported Versions

Component Version Status
Python 3.12+ โœ… Required
PostgreSQL 12+ โœ… Tested
PostgreSQL 13-16 โœ… Tested
Rust 1.70+ โœ… Required
PyO3 0.20 โœ… Current

Python Version Support

This library requires Python 3.12 or later and uses:

  • Modern type hints
  • async/await patterns
  • PyO3 0.20 with abi3-py312

Why Python 3.12+?

  • Better performance
  • Improved async capabilities
  • Modern standard library features
  • Rust binding compatibility

PostgreSQL Version Testing

Tested with:

  • PostgreSQL 12 (minimum)
  • PostgreSQL 13, 14, 15, 16 (CI tested)
  • PostgreSQL 18.1-alpine (testcontainer default)

Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚           Python Application                โ”‚
โ”‚                                             โ”‚
โ”‚  async for message in cdc_reader:          โ”‚
โ”‚      process(message)                       โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                   โ”‚ Python asyncio
                   โ”‚
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚         PyO3 Bridge (Rust โ†” Python)        โ”‚
โ”‚                                             โ”‚
โ”‚  โ€ข pyo3-asyncio (event loop integration)   โ”‚
โ”‚  โ€ข Type conversion (Rust โ†’ Python)         โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                   โ”‚
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚      Rust Core (tokio-postgres)            โ”‚
โ”‚                                             โ”‚
โ”‚  โ€ข Replication connection                  โ”‚
โ”‚  โ€ข pgoutput binary decoder                 โ”‚
โ”‚  โ€ข Auto-reconnect with backoff             โ”‚
โ”‚  โ€ข Type conversion (PG โ†’ Rust)             โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                   โ”‚ PostgreSQL Protocol
                   โ”‚
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚          PostgreSQL Server                  โ”‚
โ”‚                                             โ”‚
โ”‚  โ€ข WAL stream via replication protocol     โ”‚
โ”‚  โ€ข pgoutput plugin                         โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

MIT License - see LICENSE file for details.

Credits

Built with:

  • PyO3 - Rust โ†” Python bindings
  • tokio-postgres - PostgreSQL async client
  • maturin - Build tool for Rust Python extensions
  • Debezium - Inspiration for message format

Inspired by and compatible with the Debezium CDC ecosystem.


๐Ÿ“š Resources

๐Ÿ’ Support

If you find this project useful, pleaseโญ star the repository on GitHub!


Built with โค๏ธ using Rust and Python

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgoutput_decoder-0.1.0.tar.gz (79.8 kB view details)

Uploaded Source

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

pgoutput_decoder-0.1.0-cp312-abi3-win_amd64.whl (1.3 MB view details)

Uploaded CPython 3.12+Windows x86-64

pgoutput_decoder-0.1.0-cp312-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.4 MB view details)

Uploaded CPython 3.12+manylinux: glibc 2.17+ x86-64

pgoutput_decoder-0.1.0-cp312-abi3-macosx_11_0_arm64.whl (1.3 MB view details)

Uploaded CPython 3.12+macOS 11.0+ ARM64

File details

Details for the file pgoutput_decoder-0.1.0.tar.gz.

File metadata

  • Download URL: pgoutput_decoder-0.1.0.tar.gz
  • Upload date:
  • Size: 79.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for pgoutput_decoder-0.1.0.tar.gz
Algorithm Hash digest
SHA256 81864496fdf1739ca4968eca2c3a358d2fa129722f12a39c5ae36ceaa6e823b1
MD5 d8a8571658c92db20b4152e134d39c4f
BLAKE2b-256 671a3d89211b5a837620efad4b34ace8e4015339f7ad183fcf858b6e489b3842

See more details on using hashes here.

Provenance

The following attestation bundles were made for pgoutput_decoder-0.1.0.tar.gz:

Publisher: release.yml on grove/pgoutput-decoder

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pgoutput_decoder-0.1.0-cp312-abi3-win_amd64.whl.

File metadata

File hashes

Hashes for pgoutput_decoder-0.1.0-cp312-abi3-win_amd64.whl
Algorithm Hash digest
SHA256 f4c2943f9b9360905a5cb1b971afcdcf3aab750f4e4dc081c77f524a40dd643d
MD5 b172d35121cd0f7789100825bfd36be6
BLAKE2b-256 ebd1b8a565e0d4835aa639554f603294916332b4c8cc8e6f4208e9326d0dfe2c

See more details on using hashes here.

Provenance

The following attestation bundles were made for pgoutput_decoder-0.1.0-cp312-abi3-win_amd64.whl:

Publisher: release.yml on grove/pgoutput-decoder

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pgoutput_decoder-0.1.0-cp312-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.

File metadata

File hashes

Hashes for pgoutput_decoder-0.1.0-cp312-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm Hash digest
SHA256 84b27283952e2f9040a5e328a534806d3488a3869f2104f4199afd69fbf1b997
MD5 158957b883d3279a354dbf6b8d92221e
BLAKE2b-256 c70dc5097f777f98b2635c3b6074be1f2c5fac4e00d7b55d51f31f9c1cfb704e

See more details on using hashes here.

Provenance

The following attestation bundles were made for pgoutput_decoder-0.1.0-cp312-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl:

Publisher: release.yml on grove/pgoutput-decoder

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pgoutput_decoder-0.1.0-cp312-abi3-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for pgoutput_decoder-0.1.0-cp312-abi3-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 a84fdf887debf79e291562ae4ab5a300fd173094054464428048a237149a0cca
MD5 aa1a72f4937c9c1e485f3bb35a05dbf2
BLAKE2b-256 8ff3015ef452f6d5b9d7095570d3ca933bde5b92998f60e3c9a4e3e88bfa90f6

See more details on using hashes here.

Provenance

The following attestation bundles were made for pgoutput_decoder-0.1.0-cp312-abi3-macosx_11_0_arm64.whl:

Publisher: release.yml on grove/pgoutput-decoder

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page