High-performance PostgreSQL logical replication (CDC) library with Rust backend
Project description
pgoutput-decoder
Rust-powered PostgreSQL CDC (Change Data Capture) library with Debezium-compatible output for Python 3.12+
Transform PostgreSQL changes into Debezium-format events with blazing-fast Rust performance and Python's async simplicity.
๐ Table of Contents
- Features
- Why pgoutput-decoder?
- Installation
- Quick Start
- Use Cases
- PostgreSQL Setup
- Message Format
- Examples
- Advanced Usage
- Supported Types
- Performance
- FAQ
- Testing
- Security
- Troubleshooting
- Development
- Version Compatibility
- Architecture
- Contributing
- License
Features
โก Performance
- Rust-Powered Core: Critical path implemented in Rust using
tokio-postgres - Zero-Copy Decoding: Minimal allocations for high-throughput scenarios
- Async/Await Native: Built on
tokioandpyo3-asynciofor true async Python integration
๐ Compatibility
๐ Compatibility
- Debezium Format: Drop-in compatible with Debezium CDC event format
- pgoutput Plugin: Uses PostgreSQL's native logical replication protocol
- Python 3.12+: Modern Python with full type hints
๐ก๏ธ Reliability
- Auto-Reconnect: Exponential backoff for connection failures
- Manual LSN Control: Optional manual acknowledgment for exactly-once processing
- Type-Safe: Comprehensive PostgreSQL type support with proper conversions
๐จโ๐ป Developer Experience
- Simple API: Pythonic async iteration over CDC events
- Helper Functions: Ready-to-use utilities for common tasks
- Testcontainers: Easy testing with ephemeral PostgreSQL instances
- Comprehensive Examples: Real-world usage patterns included
Why pgoutput-decoder?
What is CDC (Change Data Capture)?
CDC captures changes (inserts, updates, deletes) from your database and streams them as events. This enables:
- Real-time data synchronization
- Event-driven architectures
- Audit logging
- Cache invalidation
- Microservice data replication
Comparison with Alternatives
| Feature | pgoutput-decoder | psycopg2 | py-postgresql | Pure Python |
|---|---|---|---|---|
| Performance | ๐ข Native Rust | ๐ก C Extension | ๐ก C Extension | ๐ด Pure Python |
| Async Support | ๐ข Native async | ๐ด Sync only | ๐ก Limited | ๐ข asyncio |
| Debezium Format | ๐ข Built-in | ๐ด Manual | ๐ด Manual | ๐ด Manual |
| Type Safety | ๐ข Full | ๐ก Partial | ๐ก Partial | ๐ก Partial |
| Auto-reconnect | ๐ข Yes | ๐ด No | ๐ด No | ๐ด No |
| **Python 3.12+ |
| Python 3.12+ | ๐ข Optimized | ๐ก Supported | ๐ก Supported | ๐ก Supported |
When to Use pgoutput-decoder
โ Good fit when you need:
- Real-time change streaming from PostgreSQL
- Debezium-compatible event format
- High-performance async Python CDC
- Simple, batteries-included solution
- Python 3.12+ modern features
โ Consider alternatives if:
- You need Python < 3.12 support
- You're already using Debezium/Kafka Connect
- You only need occasional polling (triggers might be simpler)
- Your use case doesn't require sub-second latency
Installation
From PyPI (Recommended)
# Using uv (recommended)
uv pip install pgoutput-decoder
# Or using pip
pip install pgoutput-decoder
From Source
Requires Rust 1.70+ and Python 3.12+:
git clone https://github.com/yourusername/pgoutput-decoder
cd pgoutput-decoder
# Using uv (recommended)
uv sync
uv run maturin develop
# Or using pip
pip install maturin
maturin develop
Quick Start
Prerequisites
Before running this example, ensure:
- PostgreSQL 12+ with
wal_level = logical(see PostgreSQL Setup) - A publication and replication slot created
- User has
REPLICATIONprivilege
Basic Example
import asyncio
import pgoutput_decoder
async def main():
# Create replication reader
cdc_reader = pgoutput_decoder.LogicalReplicationReader(
publication_name="test_pub",
slot_name="test_slot",
host="localhost",
database="mydb",
port=5432,
user="postgres",
password="password",
)
# Consume replication messages (Debezium-compatible format)
async for message in cdc_reader:
if message.op == "c": # INSERT
print(f"New row: {message.after}")
elif message.op == "u": # UPDATE
print(f"Updated from {message.before} to {message.after}")
elif message.op == "d": # DELETE
print(f"Deleted row: {message.before}")
# Access source metadata
print(f"Table: {message.source['schema']}.{message.source['table']}")
print(f"LSN: {message.source['lsn']}")
# Access source metadata
print(f"Table: {message.source['schema']}.{message.source['table']}")
print(f"LSN: {message.source['lsn']}")
# Stop when done
await cdc_reader.stop()
if __name__ == "__main__":
asyncio.run(main())
Expected Output
# When you INSERT a row:
New row: {'id': 1, 'name': 'Alice', 'email': 'alice@example.com'}
Table: public.users
LSN: 0/1234ABC
# When you UPDATE a row:
Updated from {'id': 1, 'name': 'Alice'} to {'id': 1, 'name': 'Alice Smith'}
Table: public.users
LSN: 0/1234ABD
# When you DELETE a row:
Deleted row: {'id': 1, 'name': 'Alice Smith'}
Table: public.users
LSN: 0/1234ABE
More Examples
- Basic CDC Usage - Simple monitoring with helper functions
- Debezium Format Demo - Working with Debezium-compatible messages
- Manual Acknowledgment - Exactly-once processing patterns
Use Cases
1. Real-Time Data Synchronization
Keep secondary databases, search indexes, or caches in sync:
async for message in cdc_reader:
if message.op == "c" or message.op == "u":
# Update Elasticsearch index
await es_client.index(
index=message.source['table'],
id=message.after['id'],
document=message.after
)
elif message.op == "d":
# Remove from index
await es_client.delete(
index=message.source['table'],
id=message.before['id']
)
2. Event-Driven Microservices
Publish database changes to message queues:
from pgoutput_decoder import message_to_debezium_json
async for message in cdc_reader:
# Publish to Kafka, RabbitMQ, etc.
await kafka_producer.send(
topic=f"db.{message.source['table']}",
value=message_to_debezium_json(message)
)
3. Audit Logging
Track all data changes with full history:
async for message in cdc_reader:
audit_entry = {
"timestamp": message.ts_ms,
"operation": message.op,
"table": f"{message.source['schema']}.{message.source['table']}",
"before": message.before,
"after": message.after,
"lsn": message.source['lsn']
}
await audit_log.write(audit_entry)
4. Cache Invalidation
Invalidate caches when data changes:
async for message in cdc_reader:
cache_key = f"{message.source['table']}:{message.after.get('id')}"
await redis.delete(cache_key)
logger.info(f"Invalidated cache: {cache_key}")
PostgreSQL Setup
Step-by-Step Configuration
1. Enable Logical Replication
Edit postgresql.conf and restart PostgreSQL:
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
# On Linux
sudo systemctl restart postgresql
# On macOS (Homebrew)
brew services restart postgresql
# Verify settings
psql -c "SHOW wal_level;" # Should output: logical
2. Set Replica Identity (Critical!)
For UPDATE/DELETE operations to include old values:
-- For specific tables (recommended)
```sql
-- For specific tables (recommended)
ALTER TABLE users REPLICA IDENTITY FULL;
ALTER TABLE orders REPLICA IDENTITY FULL;
-- Or for all tables in schema (use cautiously)
DO $$
DECLARE
r RECORD;
BEGIN
FOR r IN SELECT tablename FROM pg_tables WHERE schemaname = 'public'
LOOP
EXECUTE 'ALTER TABLE ' || quote_ident(r.tablename) || ' REPLICA IDENTITY FULL';
END LOOP;
END$$;
โ ๏ธ Warning:
REPLICA IDENTITY FULLincreases WAL size. Only apply to tables where you need old values in UPDATE/DELETE events.
3. Create Publication
-- Create a publication for specific tables
CREATE PUBLICATION my_pub FOR TABLE users, orders, products;
-- Or for all tables
CREATE PUBLICATION my_pub FOR ALL TABLES;
-- Verify
SELECT * FROM pg_publication;
4. Create Replication Slot
-- Create a logical replication slot using pgoutput
SELECT pg_create_logical_replication_slot('my_slot', 'pgoutput');
-- Verify
SELECT * FROM pg_replication_slots;
5. Grant Permissions
-- Grant replication permission to your user
ALTER USER myuser WITH REPLICATION;
-- Grant SELECT on published tables
GRANT SELECT ON ALL TABLES IN SCHEMA public TO myuser;
GRANT SELECT ON ALL SEQUENCES IN SCHEMA public TO myuser;
Quick Start with Docker Compose
For local development and testing:
# docker-compose.yml
version: '3.8'
services:
postgres:
image: postgres:16-alpine
environment:
POSTGRES_PASSWORD: password
POSTGRES_DB: testdb
ports:
- "5432:5432"
command:
- "postgres"
- "-c"
- "wal_level=logical"
- "-c"
- "max_replication_slots=10"
- "-c"
- "max_wal_senders=10"
volumes:
- ./examples/setup_postgres.sql:/docker-entrypoint-initdb.d/init.sql
# Start PostgreSQL
docker-compose up -d
# Run setup script
docker-compose exec postgres psql -U postgres -d testdb -f /docker-entrypoint-initdb.d/init.sql
# Test connection
python example_debezium.py
Verify Configuration
-- Check WAL level
SHOW wal_level; -- Must be 'logical'
-- Check replication slots
SELECT slot_name, slot_type, active FROM pg_replication_slots;
-- Check publications
SELECT pubname, puballtables FROM pg_publication;
-- Monitor replication lag
SELECT slot_name,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag
FROM pg_replication_slots;
Message Format
What is Debezium?
Debezium is a popular open-source CDC platform. This library produces events in Debezium's format, making it compatible with existing Debezium-based pipelines and tools.
Message Structure
Messages follow the Debezium-compatible format with before/after states:
{
"op": "c", # Operation: "c" (create/INSERT), "u" (update), "d" (delete)
"before": None, # Previous row state (UPDATE/DELETE only)
"after": { # New row state (INSERT/UPDATE only)
"id": 1,
"name": "Alice",
"email": "alice@example.com",
"created_at": "2024-01-15 10:30:00"
},
"source": { # Source metadata
"version": "0.1.0",
"connector": "pgoutput-decoder",
"name": "pgoutput-decoder",
"ts_ms": 1705315800000,
"snapshot": "false",
"db": "mydb",
"schema": "public",
"table": "users",
"lsn": 123456789
},
"ts_ms": 1705315800000, # Timestamp in milliseconds
"ts_us": 1705315800000000, # Timestamp in microseconds (optional)
"ts_ns": 1705315800000000000 # Timestamp in nanoseconds (optional)
}
Visual Operation Flow
DATABASE CHANGE โ CDC EVENT โ YOUR APPLICATION
โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโ
INSERT INTO users... โ op: "c" โ Create in cache
after: {new data} โ Index in search
before: None Send notification
UPDATE users SET... โ op: "u" โ Update cache
after: {new data} โ Reindex search
before: {old data} Audit the change
DELETE FROM users... โ op: "d" โ Remove from cache
after: None โ Delete from search
before: {old data} Log deletion
Operation Types
"c"(Create): INSERT operations -aftercontains new row,beforeisNone"u"(Update): UPDATE operations -aftercontains new values,beforecontains old values (requiresREPLICA IDENTITY FULL)"d"(Delete): DELETE operations -beforecontains deleted row,afterisNone
Message Examples
INSERT:
message.op == "c"
message.after == {"id": 1, "name": "Alice"}
message.before == None
UPDATE:
message.op == "u"
message.after == {"id": 1, "name": "Alice Updated"}
message.before == {"id": 1, "name": "Alice"}
DELETE:
message.op == "d"
message.before == {"id": 1, "name": "Alice"}
message.after == None
Examples
All examples are in the examples/ directory:
| Example | Description | Complexity |
|---|---|---|
| setup_postgres.sql | PostgreSQL setup script with e-commerce schema | โญ |
| basic_cdc.py | Simple CDC monitoring with helper functions | โญโญ |
| example_debezium.py | Debezium format demo with auto/manual acknowledgment | โญโญ |
Running Examples
# 1. Setup PostgreSQL (see PostgreSQL Setup section)
# 2. Run basic CDC example
python examples/basic_cdc.py
# 3. Try Debezium format demo
python example_debezium.py
Advanced Usage
Manual LSN Acknowledgment
By default, LSNs are acknowledged automatically after each message is processed (auto_acknowledge=True). For more control over acknowledgment (e.g., batch processing, transactional guarantees), you can disable auto-acknowledgment:
# Disable auto-acknowledgment for manual control
cdc_reader = pgoutput_decoder.LogicalReplicationReader(
publication_name="test_pub",
slot_name="test_slot",
host="localhost",
database="mydb",
port=5432,
user="postgres",
password="password",
auto_acknowledge=False, # Manual LSN control
)
async for message in cdc_reader:
try:
# Process message...
await process_message(message)
# Manually acknowledge after successful processing
await cdc_reader.acknowledge()
except Exception as e:
print(f"Failed to process message: {e}")
# Don't acknowledge - will retry from this LSN on restart
break
When to use manual acknowledgment:
- Batch processing: Acknowledge after processing N messages
- Transactional guarantees: Acknowledge only after committing to database
- Error handling: Skip acknowledgment on failure to replay messages
- Exactly-once processing: Coordinate acknowledgment with external systems
Helper Functions
The library provides several helper functions for working with CDC messages:
message_to_debezium_json(message, indent=2)
Convert a message to JSON string in Debezium format. This function is implemented in Rust for high performance.
from pgoutput_decoder import message_to_debezium_json
# Pretty-printed JSON with 2-space indentation (default)
json_str = message_to_debezium_json(message, indent=2)
print(json_str)
# Custom indentation (4 spaces)
json_str = message_to_debezium_json(message, indent=4)
# Compact JSON (no indentation)
json_str = message_to_debezium_json(message, indent=None)
message_to_dict(message)
Convert a message to a Python dictionary:
from pgoutput_decoder import message_to_dict
msg_dict = message_to_dict(message)
# Returns: {"op": "c", "before": None, "after": {...}, "source": {...}, ...}
format_operation(op)
Convert operation codes to human-readable format:
from pgoutput_decoder import format_operation
op_name = format_operation("c") # Returns: "INSERT"
op_name = format_operation("u") # Returns: "UPDATE"
op_name = format_operation("d") # Returns: "DELETE"
get_table_name(message)
Extract fully-qualified table name from a message:
from pgoutput_decoder import get_table_name
table = get_table_name(message) # Returns: "public.customers"
Filtering by Table
async for message in cdc_reader:
if message.source["table"] == "users":
# Process only user table changes
process_user_change(message)
Error Handling
try:
async for message in cdc_reader:
process_message(message)
except Exception as e:
print(f"Replication error: {e}")
await cdc_reader.stop()
Manual Slot Management
The library requires you to manually create replication slots for safety. This prevents accidental slot creation that could lead to disk space issues if not properly monitored.
# Create slot using psycopg2 or asyncpg before starting replication
import asyncpg
conn = await asyncpg.connect("postgresql://localhost/mydb")
await conn.execute(
"SELECT pg_create_logical_replication_slot('my_slot', 'pgoutput')"
)
Supported PostgreSQL Types
| PostgreSQL Type | Python Type |
|---|---|
bool |
bool |
int2, int4, int8 |
int |
float4, float8 |
float |
numeric, decimal |
float or str |
text, varchar, char |
str |
bytea |
bytes |
json, jsonb |
dict or list |
uuid |
str |
date, time, timestamp, timestamptz |
str (ISO 8601) |
| Arrays | list |
| Composite types | dict |
Performance
Benchmarks
(Benchmarks coming soon)
Performance Characteristics
- Throughput: Designed for high-volume streams (1000s of messages/sec)
- Latency: Sub-millisecond message processing overhead
- Memory: ~2-5 MB base overhead + message buffer
- CPU: Minimal Python GIL impact due to Rust core
Optimization Tips
# Batch acknowledgments for higher throughput
messages_batch = []
async for message in cdc_reader:
messages_batch.append(message)
if len(messages_batch) >= 100:
await process_batch(messages_batch)
await cdc_reader.acknowledge() # Acknowledge batch
messages_batch.clear()
FAQ
What is CDC and why do I need it?
Change Data Capture (CDC) is a design pattern that captures and streams database changes in real-time. Unlike polling, CDC:
- โ Has minimal database impact (uses WAL, not queries)
- โ Captures all changes in order
- โ Provides sub-second latency
- โ Doesn't miss changes between polls
How is this different from database triggers?
| Feature | CDC (pgoutput-decoder) | Triggers |
|---|---|---|
| Performance | No query overhead | Runs on every DML |
| Decoupling | External consumer | Tightly coupled |
| Reliability | Durable WAL | Transaction-dependent |
| Replay | Can replay from LSN | No replay capability |
| Schema changes | Handles gracefully | Requires trigger updates |
Can I use this in production?
Yes, but consider:
- โ Monitor replication slots to prevent WAL bloat
- โ Set up alerting for replication lag
- โ Test failover/recovery scenarios
- โ Use manual acknowledgment for critical workloads
- โ ๏ธ This library is in active development (v0.1.x)
How do I handle schema changes?
Schema changes are captured in the WAL but may require application updates:
async for message in cdc_reader:
try:
# Your processing logic
process_message(message)
except KeyError as e:
# Handle missing columns in old messages
logger.warning(f"Schema mismatch: {e}")
except Exception as e:
# Handle unexpected data types
logger.error(f"Processing error: {e}")
What happens if my consumer crashes?
The replication slot preserves your position (LSN):
- โ WAL data is retained from your last acknowledged LSN
- โ On restart, you resume from where you left off
- โ ๏ธ Un-acknowledged messages will be replayed
- โ ๏ธ Monitor slot lag to prevent WAL disk space issues
How do I monitor replication lag?
-- Check replication lag
SELECT
slot_name,
active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag_size,
pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) as lag_bytes
FROM pg_replication_slots
WHERE slot_name = 'your_slot';
Set up monitoring alerts when lag_size exceeds acceptable thresholds (e.g., >1GB).
Testing
Running Tests Locally
Tests use Testcontainers to spin up ephemeral PostgreSQL instances:
# Ensure Docker is running
docker ps
# Run all tests
uv run pytest tests/ -v
# Run specific test file
uv run pytest tests/test_ecommerce_comprehensive.py -v
# Run with coverage
uv run pytest tests/ --cov=pgoutput_decoder --cov-report=html
Test Structure
tests/
โโโ test_ecommerce_comprehensive.py # E2E tests with realistic schema
โโโ test_acknowledgement.py # LSN acknowledgment tests
โโโ test_json_serialization.py # Debezium format validation
โโโ test_types.py # PostgreSQL type conversion
All tests use PostgreSQL 18.1 via Testcontainers and follow the patterns in AGENTS.md.
Security
Principle of Least Privilege
Grant only necessary permissions:
-- Create dedicated replication user
CREATE USER cdc_user WITH REPLICATION PASSWORD 'secure_password';
-- Grant only SELECT on published tables
GRANT SELECT ON TABLE users, orders, products TO cdc_user;
-- Do NOT grant: INSERT, UPDATE, DELETE, or superuser
Connection Security
# Use environment variables, never hardcode credentials
import os
cdc_reader = pgoutput_decoder.LogicalReplicationReader(
publication_name="my_pub",
slot_name="my_slot",
host=os.getenv("PG_HOST", "localhost"),
database=os.getenv("PG_DATABASE"),
port=int(os.getenv("PG_PORT", "5432")),
user=os.getenv("PG_USER"),
password=os.getenv("PG_PASSWORD"),
)
SSL/TLS
# Enable SSL (implementation depends on tokio-postgres configuration)
# Currently supported via connection parameters
# See: https://www.postgresql.org/docs/current/libpq-ssl.html
Audit Logging
Monitor replication activity:
-- Enable connection logging in postgresql.conf
log_connections = on
log_disconnections = on
-- Check active replication connections
SELECT * FROM pg_stat_replication;
Troubleshooting
"replication slot does not exist"
Create the replication slot manually:
SELECT pg_create_logical_replication_slot('your_slot', 'pgoutput');
"must be superuser or replication role"
Grant replication permission:
ALTER USER your_user WITH REPLICATION;
Slot bloating disk space
Monitor and drop unused slots:
-- Check slot lag
SELECT slot_name, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag
FROM pg_replication_slots;
-- Drop unused slot
SELECT pg_drop_replication_slot('unused_slot');
"permission denied for table"
Grant SELECT permission:
GRANT SELECT ON ALL TABLES IN SCHEMA public TO your_user;
Connection keeps dropping
Check your firewall/network settings and enable auto-reconnect (enabled by default).
Missing "before" values in UPDATE/DELETE
Set REPLICA IDENTITY FULL:
ALTER TABLE your_table REPLICA IDENTITY FULL;
Debugging Tips
import logging
# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
# Check PostgreSQL logs
# tail -f /var/log/postgresql/postgresql-16-main.log
Development
Prerequisites
- Rust: 1.70+ (from Cargo.toml)
- Python: 3.12+ only
- PostgreSQL: 12+ (with logical replication support)
- Docker: For running tests
- uv: Python package manager (recommended)
Setup Development Environment
# Clone repository
git clone https://github.com/yourusername/pgoutput-decoder
cd pgoutput-decoder
# Install uv (if not already installed)
curl -LsSf https://astral.sh/uv/install.sh | sh
# Sync dependencies
uv sync
# Build Rust extension in development mode
uv run maturin develop
# Run tests
uv run pytest tests/ -v
Development Workflow
Per AGENTS.md:
# Lint Python code
uv run ruff check .
uv run ruff format .
# Lint Rust code
cargo fmt --all -- --check
cargo clippy --all-targets --all-features
# Run tests
uv run pytest tests/ -v
# Build release
uv run maturin build --release
Code Coverage
The project supports both Python and Rust code coverage:
# Python coverage only (skips Docker tests)
just coverage
# Rust coverage only (skips Docker tests, requires cargo-llvm-cov)
just install-llvm-cov # One-time installation
just coverage-rust
# Combined Python + Rust coverage (skips Docker tests)
just coverage-all
# Include Docker tests (requires Docker running)
just coverage-docker # Python only with Docker
just coverage-rust-docker # Rust with Docker
just coverage-all-docker # Both with Docker
Local Development: By default, coverage commands skip Docker-dependent tests for faster iteration. Use the -docker variants when you need complete coverage including integration tests.
GitHub Actions: CI automatically generates and uploads both Python and Rust coverage to Codecov:
- Python coverage: Measures
python/pgoutput_decoder/code - Rust coverage: Measures
src/code exercised by Python tests - Flags: Separate
pythonandrustflags for tracking
View coverage reports at: https://codecov.io/gh/yourusername/pgoutput-decoder
Project Structure
pgoutput-decoder/
โโโ src/ # Rust source code
โ โโโ lib.rs # PyO3 module definitions
โ โโโ pgoutput/ # pgoutput decoder implementation
โ โโโ replication.rs # Replication connection logic
โโโ python/ # Python source code
โ โโโ pgoutput_decoder/
โ โโโ __init__.py # Python API
โ โโโ core.py # Helper functions
โโโ tests/ # Test suite (uses testcontainers)
โโโ examples/ # Example scripts
โโโ Cargo.toml # Rust dependencies
โโโ pyproject.toml # Python metadata & build config
โโโ README.md # This file
Contributing
Contributions welcome! Please:
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Ensure
cargo fmt,cargo clippy, andruffpass - Submit a pull request
See CONTRIBUTING.md for detailed guidelines (if available).
Version Compatibility
Supported Versions
| Component | Version | Status |
|---|---|---|
| Python | 3.12+ | โ Required |
| PostgreSQL | 12+ | โ Tested |
| PostgreSQL | 13-16 | โ Tested |
| Rust | 1.70+ | โ Required |
| PyO3 | 0.20 | โ Current |
Python Version Support
This library requires Python 3.12 or later and uses:
- Modern type hints
async/awaitpatterns- PyO3 0.20 with
abi3-py312
Why Python 3.12+?
- Better performance
- Improved async capabilities
- Modern standard library features
- Rust binding compatibility
PostgreSQL Version Testing
Tested with:
- PostgreSQL 12 (minimum)
- PostgreSQL 13, 14, 15, 16 (CI tested)
- PostgreSQL 18.1-alpine (testcontainer default)
Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Python Application โ
โ โ
โ async for message in cdc_reader: โ
โ process(message) โ
โโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Python asyncio
โ
โโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ PyO3 Bridge (Rust โ Python) โ
โ โ
โ โข pyo3-asyncio (event loop integration) โ
โ โข Type conversion (Rust โ Python) โ
โโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Rust Core (tokio-postgres) โ
โ โ
โ โข Replication connection โ
โ โข pgoutput binary decoder โ
โ โข Auto-reconnect with backoff โ
โ โข Type conversion (PG โ Rust) โ
โโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ PostgreSQL Protocol
โ
โโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ PostgreSQL Server โ
โ โ
โ โข WAL stream via replication protocol โ
โ โข pgoutput plugin โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
License
MIT License - see LICENSE file for details.
Credits
Built with:
- PyO3 - Rust โ Python bindings
- tokio-postgres - PostgreSQL async client
- maturin - Build tool for Rust Python extensions
- Debezium - Inspiration for message format
Inspired by and compatible with the Debezium CDC ecosystem.
๐ Resources
- Documentation: Full API Docs (coming soon)
- Examples: examples/
- Issues: GitHub Issues
- Discussions: GitHub Discussions
๐ Support
If you find this project useful, pleaseโญ star the repository on GitHub!
Built with โค๏ธ using Rust and Python
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distributions
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pgoutput_decoder-0.1.0.tar.gz.
File metadata
- Download URL: pgoutput_decoder-0.1.0.tar.gz
- Upload date:
- Size: 79.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
81864496fdf1739ca4968eca2c3a358d2fa129722f12a39c5ae36ceaa6e823b1
|
|
| MD5 |
d8a8571658c92db20b4152e134d39c4f
|
|
| BLAKE2b-256 |
671a3d89211b5a837620efad4b34ace8e4015339f7ad183fcf858b6e489b3842
|
Provenance
The following attestation bundles were made for pgoutput_decoder-0.1.0.tar.gz:
Publisher:
release.yml on grove/pgoutput-decoder
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pgoutput_decoder-0.1.0.tar.gz -
Subject digest:
81864496fdf1739ca4968eca2c3a358d2fa129722f12a39c5ae36ceaa6e823b1 - Sigstore transparency entry: 954414865
- Sigstore integration time:
-
Permalink:
grove/pgoutput-decoder@19ba1a6489270ecbeff54d528b685b64d9333e2a -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/grove
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@19ba1a6489270ecbeff54d528b685b64d9333e2a -
Trigger Event:
push
-
Statement type:
File details
Details for the file pgoutput_decoder-0.1.0-cp312-abi3-win_amd64.whl.
File metadata
- Download URL: pgoutput_decoder-0.1.0-cp312-abi3-win_amd64.whl
- Upload date:
- Size: 1.3 MB
- Tags: CPython 3.12+, Windows x86-64
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f4c2943f9b9360905a5cb1b971afcdcf3aab750f4e4dc081c77f524a40dd643d
|
|
| MD5 |
b172d35121cd0f7789100825bfd36be6
|
|
| BLAKE2b-256 |
ebd1b8a565e0d4835aa639554f603294916332b4c8cc8e6f4208e9326d0dfe2c
|
Provenance
The following attestation bundles were made for pgoutput_decoder-0.1.0-cp312-abi3-win_amd64.whl:
Publisher:
release.yml on grove/pgoutput-decoder
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pgoutput_decoder-0.1.0-cp312-abi3-win_amd64.whl -
Subject digest:
f4c2943f9b9360905a5cb1b971afcdcf3aab750f4e4dc081c77f524a40dd643d - Sigstore transparency entry: 954414868
- Sigstore integration time:
-
Permalink:
grove/pgoutput-decoder@19ba1a6489270ecbeff54d528b685b64d9333e2a -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/grove
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@19ba1a6489270ecbeff54d528b685b64d9333e2a -
Trigger Event:
push
-
Statement type:
File details
Details for the file pgoutput_decoder-0.1.0-cp312-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.
File metadata
- Download URL: pgoutput_decoder-0.1.0-cp312-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
- Upload date:
- Size: 1.4 MB
- Tags: CPython 3.12+, manylinux: glibc 2.17+ x86-64
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
84b27283952e2f9040a5e328a534806d3488a3869f2104f4199afd69fbf1b997
|
|
| MD5 |
158957b883d3279a354dbf6b8d92221e
|
|
| BLAKE2b-256 |
c70dc5097f777f98b2635c3b6074be1f2c5fac4e00d7b55d51f31f9c1cfb704e
|
Provenance
The following attestation bundles were made for pgoutput_decoder-0.1.0-cp312-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl:
Publisher:
release.yml on grove/pgoutput-decoder
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pgoutput_decoder-0.1.0-cp312-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl -
Subject digest:
84b27283952e2f9040a5e328a534806d3488a3869f2104f4199afd69fbf1b997 - Sigstore transparency entry: 954414870
- Sigstore integration time:
-
Permalink:
grove/pgoutput-decoder@19ba1a6489270ecbeff54d528b685b64d9333e2a -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/grove
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@19ba1a6489270ecbeff54d528b685b64d9333e2a -
Trigger Event:
push
-
Statement type:
File details
Details for the file pgoutput_decoder-0.1.0-cp312-abi3-macosx_11_0_arm64.whl.
File metadata
- Download URL: pgoutput_decoder-0.1.0-cp312-abi3-macosx_11_0_arm64.whl
- Upload date:
- Size: 1.3 MB
- Tags: CPython 3.12+, macOS 11.0+ ARM64
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a84fdf887debf79e291562ae4ab5a300fd173094054464428048a237149a0cca
|
|
| MD5 |
aa1a72f4937c9c1e485f3bb35a05dbf2
|
|
| BLAKE2b-256 |
8ff3015ef452f6d5b9d7095570d3ca933bde5b92998f60e3c9a4e3e88bfa90f6
|
Provenance
The following attestation bundles were made for pgoutput_decoder-0.1.0-cp312-abi3-macosx_11_0_arm64.whl:
Publisher:
release.yml on grove/pgoutput-decoder
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pgoutput_decoder-0.1.0-cp312-abi3-macosx_11_0_arm64.whl -
Subject digest:
a84fdf887debf79e291562ae4ab5a300fd173094054464428048a237149a0cca - Sigstore transparency entry: 954414874
- Sigstore integration time:
-
Permalink:
grove/pgoutput-decoder@19ba1a6489270ecbeff54d528b685b64d9333e2a -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/grove
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@19ba1a6489270ecbeff54d528b685b64d9333e2a -
Trigger Event:
push
-
Statement type: