Skip to main content

PostgreSQL integration for taskiq

Project description

TaskIQ PostgreSQL

TaskIQ PostgreSQL is a comprehensive plugin for TaskIQ that provides PostgreSQL-based broker, result backend, and scheduler source with support for multiple PostgreSQL drivers.

Features

  • 🚀 PostgreSQL Broker: High-performance message broker using PostgreSQL LISTEN/NOTIFY
  • 📦 Result Backend: Persistent task result storage with configurable retention
  • ⏰ Scheduler Source: Cron-like task scheduling with PostgreSQL persistence
  • 🔌 Multiple Drivers: Support for asyncpg, psycopg3, and psqlpy
  • ⚡ Async/Await: Built for high-performance async operations
  • 🛠️ Flexible Configuration: Customizable table names, field types, and connection options
  • 🔄 Multiple Serializers: Support for different serialization methods (Pickle, JSON, etc.)
  • 🔐 Connection Pooling: Built-in connection pool management for all drivers

Installation

Basic Installation

pip install taskiq-postgresql

With Driver Dependencies

Choose your preferred PostgreSQL driver:

AsyncPG (Recommended)

pip install taskiq-postgresql[asyncpg]

Psycopg3

pip install taskiq-postgresql[psycopg]

PSQLPy

pip install taskiq-postgresql[psqlpy]

Using Package Managers

Poetry:

poetry add taskiq-postgresql[asyncpg]

UV:

uv add taskiq-postgresql[asyncpg]

Rye:

rye add taskiq-postgresql[asyncpg]

Note: Driver extras are required as PostgreSQL drivers are optional dependencies. Without them, the PostgreSQL drivers won't be available.

Quick Start

Basic Task Processing

import asyncio
from taskiq_postgresql import PostgresqlBroker, PostgresqlResultBackend

# Configure the result backend
result_backend = PostgresqlResultBackend(
    dsn="postgresql://postgres:postgres@localhost:5432/taskiq_db",
)

# Configure the broker with result backend
broker = PostgresqlBroker(
    dsn="postgresql://postgres:postgres@localhost:5432/taskiq_db",
).with_result_backend(result_backend)


@broker.task
async def calculate_sum(a: int, b: int) -> int:
    """Calculate the sum of two numbers."""
    await asyncio.sleep(1)  # Simulate some work
    return a + b


async def main():
    # Startup the broker
    await broker.startup()
    
    # Send a task
    task = await calculate_sum.kiq(10, 20)
    
    # Wait for result
    result = await task.wait_result()
    print(f"Result: {result}")  # Result: 30
    
    # Shutdown the broker
    await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

Task Scheduling

from taskiq_postgresql import PostgresqlBroker, PostgresqlSchedulerSource
from taskiq import TaskiqScheduler

# Initialize broker
broker = PostgresqlBroker(
    dsn="postgresql://postgres:postgres@localhost:5432/taskiq_db"
)

# Initialize scheduler source
scheduler_source = PostgresqlSchedulerSource(
    dsn="postgresql://postgres:postgres@localhost:5432/taskiq_db",
    table_name="taskiq_schedules",
    driver="asyncpg"
)

# Create scheduler
scheduler = TaskiqScheduler(
    broker=broker,
    sources=[scheduler_source],
)

@broker.task
async def scheduled_task():
    print("This task runs on schedule!")

# Schedule task to run every minute
async def setup_schedule():
    await scheduler_source.add_schedule(
        schedule_id="task-every-minute",
        task_name="scheduled_task",
        cron="* * * * *",  # Every minute
        args=[],
        kwargs={}
    )

Configuration

PostgresqlBroker

Parameter Type Default Description
dsn str Required PostgreSQL connection string
queue_name str "taskiq_queue" Name of the queue table
field_for_task_id Literal["VarChar", "Text", "Uuid"] "Uuid" Field type for task IDs
driver Literal["asyncpg", "psycopg", "psqlpy"] "asyncpg" Database driver
**connect_kwargs Any - Additional driver-specific connection parameters

PostgresqlResultBackend

Parameter Type Default Description
dsn str Required PostgreSQL connection string
keep_results bool True Whether to keep results after reading
table_name str "taskiq_results" Name of the results table
field_for_task_id Literal["VarChar", "Text", "Uuid"] "Uuid" Field type for task IDs
serializer BaseSerializer PickleSerializer() Serializer instance
driver Literal["asyncpg", "psycopg", "psqlpy"] "asyncpg" Database driver
**connect_kwargs Any - Additional driver-specific connection parameters

PostgresqlSchedulerSource

Parameter Type Default Description
dsn str Required PostgreSQL connection string
table_name str "taskiq_schedules" Name of the schedules table
driver Literal["asyncpg", "psycopg", "psqlpy"] "asyncpg" Database driver
startup_schedule dict None Schedule definitions to create on startup
**connect_kwargs Any - Additional driver-specific connection parameters

Database Drivers

AsyncPG (Recommended)

  • Performance: Fastest PostgreSQL driver for Python
  • Features: Full asyncio support, prepared statements, connection pooling
  • Use case: High-performance applications

Psycopg3

  • Performance: Good performance with extensive features
  • Features: Full PostgreSQL feature support, mature ecosystem
  • Use case: Feature-rich applications needing advanced PostgreSQL features

PSQLPy

  • Performance: Rust-based driver with excellent performance
  • Features: Modern async implementation
  • Use case: Applications prioritizing performance and modern architecture

Advanced Configuration

Custom Serializer

from taskiq.serializers import JSONSerializer

result_backend = PostgresqlResultBackend(
    dsn="postgresql://postgres:postgres@localhost:5432/taskiq_db",
    serializer=JSONSerializer(),
)

Custom Table Names and Field Types

broker = PostgresqlBroker(
    dsn="postgresql://postgres:postgres@localhost:5432/taskiq_db",
    queue_name="my_custom_queue",
    field_for_task_id="Text",  # Use TEXT instead of UUID
)

result_backend = PostgresqlResultBackend(
    dsn="postgresql://postgres:postgres@localhost:5432/taskiq_db",
    table_name="my_custom_results",
    field_for_task_id="VarChar",  # Use VARCHAR instead of UUID
)

Connection Pool Configuration

# AsyncPG
broker = PostgresqlBroker(
    dsn="postgresql://postgres:postgres@localhost:5432/taskiq_db",
    driver="asyncpg",
    min_size=5,
    max_size=20,
    max_inactive_connection_lifetime=300,
)

# Psycopg3
broker = PostgresqlBroker(
    dsn="postgresql://postgres:postgres@localhost:5432/taskiq_db",
    driver="psycopg",
    min_size=5,
    max_size=20,
    max_lifetime=3600,
)

Using Environment Variables

import os

# From environment
dsn = os.getenv("DATABASE_URL", "postgresql://localhost/taskiq")

# With SSL
dsn = "postgresql://user:pass@localhost:5432/db?sslmode=require"

broker = PostgresqlBroker(dsn=dsn)

Database Schema

The library automatically creates the necessary tables:

Queue Table (default: taskiq_queue)

CREATE TABLE taskiq_queue (
    id SERIAL PRIMARY KEY,
    task_id UUID NOT NULL,
    task_name VARCHAR NOT NULL,
    message BYTEA NOT NULL,
    labels JSONB,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

Results Table (default: taskiq_results)

CREATE TABLE taskiq_results (
    task_id UUID PRIMARY KEY,
    result BYTEA,
    is_err BOOLEAN DEFAULT FALSE,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

Schedules Table (default: taskiq_schedules)

CREATE TABLE taskiq_schedules (
    id UUID PRIMARY KEY,
    task_name VARCHAR(100) NOT NULL,
    schedule JSONB NOT NULL,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

Performance Tips

  1. Choose the Right Driver: AsyncPG typically offers the best performance
  2. Connection Pooling: Configure appropriate pool sizes for your workload
  3. Field Types: Use UUID for high-performance task IDs, TEXT for debugging
  4. Indexes: Consider adding indexes on frequently queried columns
  5. Connection Reuse: Keep broker connections alive during application lifetime

Troubleshooting

Common Issues

Connection Errors

# Ensure your connection string is correct
dsn = "postgresql://username:password@host:port/database"

# Check PostgreSQL is running and accessible
import asyncpg
conn = await asyncpg.connect(dsn)
await conn.close()

Table Creation Issues

# Ensure user has CREATE TABLE permissions
# Or manually create tables using provided schemas

Driver Import Errors

# Install the appropriate driver extra
pip install taskiq-postgresql[asyncpg]

Requirements

  • Python: 3.9+
  • TaskIQ: 0.11.7+
  • PostgreSQL: 10+

Driver Dependencies

Driver Version Extra
AsyncPG 0.30.0+ [asyncpg]
Psycopg3 3.2.9+ [psycopg]
PSQLPy 0.11.3+ [psqlpy]

Development

This project uses modern Python development tools:

  • UV: Fast Python package installer and resolver
  • Ruff: Extremely fast Python linter and formatter
  • Pytest: Testing framework

Setup Development Environment

# Clone the repository
git clone https://github.com/z22092/taskiq-postgresql.git
cd taskiq-postgresql

# Install dependencies with UV
uv sync

# Install with all driver extras
uv sync --extra asyncpg --extra psycopg --extra psqlpy

# Run tests
uv run pytest

# Format and lint
uv run ruff format
uv run ruff check

Contributing

Contributions are welcome! Please:

  1. Fork the repository
  2. Create a feature branch
  3. Add tests for new functionality
  4. Ensure all tests pass
  5. Submit a pull request

License

This project is licensed under the MIT License. See the LICENSE file for details.

Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taskiq_postgresql-0.4.0.tar.gz (125.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

taskiq_postgresql-0.4.0-py3-none-any.whl (25.8 kB view details)

Uploaded Python 3

File details

Details for the file taskiq_postgresql-0.4.0.tar.gz.

File metadata

  • Download URL: taskiq_postgresql-0.4.0.tar.gz
  • Upload date:
  • Size: 125.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.22

File hashes

Hashes for taskiq_postgresql-0.4.0.tar.gz
Algorithm Hash digest
SHA256 3e8cda663ec2893adfcf2d1f30447b2c03cc46067622e1f9e1c9a45f4e22731f
MD5 53092f2e014c255bce8e1c1aa94e3156
BLAKE2b-256 e11adc903813bc238fcba16624e22f959e681f96bd210583a98e77c1d1c7607c

See more details on using hashes here.

File details

Details for the file taskiq_postgresql-0.4.0-py3-none-any.whl.

File metadata

File hashes

Hashes for taskiq_postgresql-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 0e64adc6faf5ffa6b67a59722e56181fc6e79a1bac6d5584fbaca6916bb1ec7b
MD5 ac38cd95cddd023cffbf04d6df28073b
BLAKE2b-256 bac79a5bec60989cd1d7d97c624fd272a36efe087a0e54ae1f17117c2b56655f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page