Skip to main content

EDWH pgqueuer pipeline (pg-skEWer)

Project description

pgskewer

A minimalistic pipeline functionality built on top of pgqueuer, providing enhanced task orchestration capabilities for PostgreSQL-based job queues.

The name "pgskewer" is chosen due to its metaphorical representation of how it "skewers" tasks together in a pipeline; rhymes with "pgqueuer" and contains "EW" (Education Warehouse) in its name.

Features

  • Sequential and Parallel Task Pipelines: Define complex workflows with a mix of sequential and parallel task execution
  • Result Storage: Automatically store and pass around task results in sequence
  • Cancelable Tasks: Gracefully cancel tasks when needed
  • Improved Error Handling: Better error propagation and handling in pipelines
  • Real-time Log Streaming: Stream logs from blocking functions in real-time
  • Type Annotations: Comprehensive typing support for better IDE integration

Installation

uv pip install pgskewer

Requirements

  • Python ≥ 3.13
  • PostgreSQL database
  • Dependencies:
    • pgqueuer
    • asyncpg
    • anyio
    • uvloop

Quick Start

import asyncio
import asyncpg
from pgqueuer import Job
from pgqueuer.db import AsyncpgDriver
from pgqueuer.queries import Queries
from pgskewer import ImprovedQueuer, parse_payload, TaskResult

async def main():
    # Initialize the queuer with your database connection
    connection = await asyncpg.connect("postgresql://user:password@localhost/dbname")
    driver = AsyncpgDriver(connection)
    pgq = ImprovedQueuer(driver)
    
    # Define some tasks as entrypoints
    @pgq.entrypoint("fetch_data")
    async def fetch_data(job):
        # Fetch some data
        return {"data": "example data"}
    
    @pgq.entrypoint("process_data")
    async def process_data(job: Job):
        # Process the data from the previous step
        payload = parse_payload(job.payload)
        data = payload["tasks"]["fetch_data"]["result"]["data"]
        return {"processed": data.upper()}
    
    @pgq.entrypoint("store_results")
    async def store_results(job: Job):
        # Store the processed data
        payload = parse_payload(job.payload)
        processed = payload["tasks"]["process_data"]["result"]["processed"]
        # Store the processed data somewhere
        return {"status": "completed", "stored": processed}
    
    # Create a pipeline that runs these tasks in sequence
    pgq.entrypoint_pipeline(
        "my_pipeline",
        # start steps as a mix of entrypoint names and function references:
        fetch_data,
        "process_data",
        store_results
    )
    
    # Execute the pipeline (empty initial data)
    job_id = await pgq.qm.queries.enqueue("my_pipeline", b'')
    # when the pipeline completes, pgqueuer_result should have an entry for this job_id:
    result: TaskResult = await pgq.result(job_id, timeout=None)
    
    
if __name__ == "__main__":
    asyncio.run(main())

Advanced Usage

Creating Pipelines with Parallel Tasks

You can define pipelines with a mix of sequential and parallel tasks:

# Define a pipeline with parallel tasks
pipeline = pgq.pipeline([
    "task_1",                    # Run task_1 first
    ["task_2a", "task_2b"],      # Then run task_2a and task_2b in parallel
    "task_3"                     # Finally run task_3 after both task_2a and task_2b complete
])

Register a Pipeline as an Entrypoint

You can register a pipeline as an entrypoint for reuse:

# Register the pipeline as an entrypoint
pgq.entrypoint_pipeline(
    "data_processing_pipeline",
    "fetch_data",
    ["validate_data", "normalize_data"],
    "store_data"
)

# Now you can enqueue this pipeline like any other task
job_id = await pgq.enqueue("data_processing_pipeline", {"source": "api"})

Running Blocking Functions Asynchronously

pgskewer provides utilities to run blocking functions asynchronously with real-time log streaming:

from pgskewer import unblock

def cpu_intensive_task(data):
    # This is a blocking function
    print("Processing data...")
    result = process_data(data)
    print("Processing complete!")
    return result

# Run the blocking function asynchronously with log streaming
result = await unblock(cpu_intensive_task, data)

Pipeline Result Structure

The pipeline returns a structured result with information about each task:

{
    "initial": {
        # The initial payload provided to the pipeline
    },
    "tasks": {
        "task_1": {
            "status": "successful",
            "ok": true,
            "result": {
                # Task 1's result data
            }
        },
        "task_2a": {
            "status": "successful",
            "ok": true,
            "result": {
                # Task 2a's result data
            }
        },
        # ... other tasks
    }
}

Error Handling

If any task in a pipeline fails:

  • In sequential execution, the pipeline stops and no further tasks are executed
  • In parallel execution, sibling tasks are terminated

License

pgskewer is distributed under the terms of the MIT license.

Credits

Developed by Education Warehouse.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgskewer-0.2.1.tar.gz (23.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pgskewer-0.2.1-py3-none-any.whl (19.6 kB view details)

Uploaded Python 3

File details

Details for the file pgskewer-0.2.1.tar.gz.

File metadata

  • Download URL: pgskewer-0.2.1.tar.gz
  • Upload date:
  • Size: 23.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.4 {"installer":{"name":"uv","version":"0.10.4","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Linux Mint","version":"22.3","id":"zena","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for pgskewer-0.2.1.tar.gz
Algorithm Hash digest
SHA256 8c0377587169ae198b5a6034b3595690091c599e3efefef9f81986e6be164a1f
MD5 4050d1018779b27476c425b1c569984d
BLAKE2b-256 a76d1c121b1c495bce35788db2ab5c9f7a39afb73908d6070d394dc6cb18b307

See more details on using hashes here.

File details

Details for the file pgskewer-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: pgskewer-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 19.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.4 {"installer":{"name":"uv","version":"0.10.4","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Linux Mint","version":"22.3","id":"zena","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for pgskewer-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 7adca3325d1e98c1e247950ebb1c35656bf5dd6d11733063b3b0ab7bc545f907
MD5 986fdadcc621bba1b4816d3be5c8b085
BLAKE2b-256 b0ef3cab78249da7c929148eb7299eba51c991b2e8916882dd34f6efa2e9c784

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page