Skip to main content

EDWH pgqueuer pipeline (pg-skEWer)

Project description

pgskewer

A minimalistic pipeline functionality built on top of pgqueuer, providing enhanced task orchestration capabilities for PostgreSQL-based job queues.

The name "pgskewer" is chosen due to its metaphorical representation of how it "skewers" tasks together in a pipeline; rhymes with "pgqueuer" and contains "EW" (Education Warehouse) in its name.

Features

  • Sequential and Parallel Task Pipelines: Define complex workflows with a mix of sequential and parallel task execution
  • Result Storage: Automatically store and pass around task results in sequence
  • Cancelable Tasks: Gracefully cancel tasks when needed
  • Improved Error Handling: Better error propagation and handling in pipelines
  • Real-time Log Streaming: Stream logs from blocking functions in real-time
  • Type Annotations: Comprehensive typing support for better IDE integration

Installation

uv pip install pgskewer

Requirements

  • Python ≥ 3.13
  • PostgreSQL database
  • Dependencies:
    • pgqueuer
    • asyncpg
    • anyio
    • uvloop

Quick Start

import asyncio
import asyncpg
from pgqueuer import Job
from pgqueuer.db import AsyncpgDriver
from pgqueuer.queries import Queries
from pgskewer import ImprovedQueuer, parse_payload, TaskResult

async def main():
    # Initialize the queuer with your database connection
    connection = await asyncpg.connect("postgresql://user:password@localhost/dbname")
    driver = AsyncpgDriver(connection)
    pgq = ImprovedQueuer(driver)
    
    # Define some tasks as entrypoints
    @pgq.entrypoint("fetch_data")
    async def fetch_data(job):
        # Fetch some data
        return {"data": "example data"}
    
    @pgq.entrypoint("process_data")
    async def process_data(job: Job):
        # Process the data from the previous step
        payload = parse_payload(job.payload)
        data = payload["tasks"]["fetch_data"]["result"]["data"]
        return {"processed": data.upper()}
    
    @pgq.entrypoint("store_results")
    async def store_results(job: Job):
        # Store the processed data
        payload = parse_payload(job.payload)
        processed = payload["tasks"]["process_data"]["result"]["processed"]
        # Store the processed data somewhere
        return {"status": "completed", "stored": processed}
    
    # Create a pipeline that runs these tasks in sequence
    pgq.entrypoint_pipeline(
        "my_pipeline",
        # start steps as a mix of entrypoint names and function references:
        fetch_data,
        "process_data",
        store_results
    )
    
    # Execute the pipeline (empty initial data)
    job_id = await pgq.qm.queries.enqueue("my_pipeline", b'')
    # when the pipeline completes, pgqueuer_result should have an entry for this job_id:
    result: TaskResult = await pgq.result(job_id, timeout=None)
    
    
if __name__ == "__main__":
    asyncio.run(main())

Advanced Usage

Creating Pipelines with Parallel Tasks

You can define pipelines with a mix of sequential and parallel tasks:

# Define a pipeline with parallel tasks
pipeline = pgq.pipeline([
    "task_1",                    # Run task_1 first
    ["task_2a", "task_2b"],      # Then run task_2a and task_2b in parallel
    "task_3"                     # Finally run task_3 after both task_2a and task_2b complete
])

Register a Pipeline as an Entrypoint

You can register a pipeline as an entrypoint for reuse:

# Register the pipeline as an entrypoint
pgq.entrypoint_pipeline(
    "data_processing_pipeline",
    "fetch_data",
    ["validate_data", "normalize_data"],
    "store_data"
)

# Now you can enqueue this pipeline like any other task
job_id = await pgq.enqueue("data_processing_pipeline", {"source": "api"})

Running Blocking Functions Asynchronously

pgskewer provides utilities to run blocking functions asynchronously with real-time log streaming:

from pgskewer import unblock

def cpu_intensive_task(data):
    # This is a blocking function
    print("Processing data...")
    result = process_data(data)
    print("Processing complete!")
    return result

# Run the blocking function asynchronously with log streaming
result = await unblock(cpu_intensive_task, data)

Pipeline Result Structure

The pipeline returns a structured result with information about each task:

{
    "initial": {
        # The initial payload provided to the pipeline
    },
    "tasks": {
        "task_1": {
            "status": "successful",
            "ok": true,
            "result": {
                # Task 1's result data
            }
        },
        "task_2a": {
            "status": "successful",
            "ok": true,
            "result": {
                # Task 2a's result data
            }
        },
        # ... other tasks
    }
}

Error Handling

If any task in a pipeline fails:

  • In sequential execution, the pipeline stops and no further tasks are executed
  • In parallel execution, sibling tasks are terminated

License

pgskewer is distributed under the terms of the MIT license.

Credits

Developed by Education Warehouse.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgskewer-0.3.0.tar.gz (23.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pgskewer-0.3.0-py3-none-any.whl (19.7 kB view details)

Uploaded Python 3

File details

Details for the file pgskewer-0.3.0.tar.gz.

File metadata

  • Download URL: pgskewer-0.3.0.tar.gz
  • Upload date:
  • Size: 23.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.4 {"installer":{"name":"uv","version":"0.10.4","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Linux Mint","version":"22.3","id":"zena","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for pgskewer-0.3.0.tar.gz
Algorithm Hash digest
SHA256 fad5513a2e5c6c08f3cf9de0084b1376831f96cc91af7d6a0853c08c5f9f3e59
MD5 dc8f073d2e9ce4f4df1a88814f605663
BLAKE2b-256 0af37a699b2e59bed9353d9f34c08d2fa0d636312bae65756fa4c8a73d167e7a

See more details on using hashes here.

File details

Details for the file pgskewer-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: pgskewer-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 19.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.4 {"installer":{"name":"uv","version":"0.10.4","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Linux Mint","version":"22.3","id":"zena","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for pgskewer-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a0692ea683bce8f8cb81275776b57fc7726e7e2afb3aa9c0d6b71750846f583a
MD5 294673d77bf018211bdf598298d6dd13
BLAKE2b-256 81144073a4a541847299a19ed3e0aa5854e7341b8b2f6ad4d856e20b2ae72c70

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page