Skip to main content

EDWH pgqueuer pipeline (pg-skEWer)

Project description

pgskewer

A minimalistic pipeline functionality built on top of pgqueuer, providing enhanced task orchestration capabilities for PostgreSQL-based job queues.

The name "pgskewer" is chosen due to its metaphorical representation of how it "skewers" tasks together in a pipeline; rhymes with "pgqueuer" and contains "EW" (Education Warehouse) in its name.

Features

  • Sequential and Parallel Task Pipelines: Define complex workflows with a mix of sequential and parallel task execution
  • Result Storage: Automatically store and pass around task results in sequence
  • Cancelable Tasks: Gracefully cancel tasks when needed
  • Improved Error Handling: Better error propagation and handling in pipelines
  • Real-time Log Streaming: Stream logs from blocking functions in real-time
  • Type Annotations: Comprehensive typing support for better IDE integration

Installation

uv pip install pgskewer

Requirements

  • Python ≥ 3.13
  • PostgreSQL database
  • Dependencies:
    • pgqueuer
    • asyncpg
    • anyio
    • uvloop

Quick Start

import asyncio
import asyncpg
from pgqueuer import Job
from pgqueuer.db import AsyncpgDriver
from pgqueuer.queries import Queries
from pgskewer import ImprovedQueuer, parse_payload, TaskResult

async def main():
    # Initialize the queuer with your database connection
    connection = await asyncpg.connect("postgresql://user:password@localhost/dbname")
    driver = AsyncpgDriver(connection)
    pgq = ImprovedQueuer(driver)
    
    # Define some tasks as entrypoints
    @pgq.entrypoint("fetch_data")
    async def fetch_data(job):
        # Fetch some data
        return {"data": "example data"}
    
    @pgq.entrypoint("process_data")
    async def process_data(job: Job):
        # Process the data from the previous step
        payload = parse_payload(job.payload)
        data = payload["tasks"]["fetch_data"]["result"]["data"]
        return {"processed": data.upper()}
    
    @pgq.entrypoint("store_results")
    async def store_results(job: Job):
        # Store the processed data
        payload = parse_payload(job.payload)
        processed = payload["tasks"]["process_data"]["result"]["processed"]
        # Store the processed data somewhere
        return {"status": "completed", "stored": processed}
    
    # Create a pipeline that runs these tasks in sequence
    pgq.entrypoint_pipeline(
        "my_pipeline",
        # start steps as a mix of entrypoint names and function references:
        fetch_data,
        "process_data",
        store_results
    )
    
    # Execute the pipeline (empty initial data)
    job_id = await pgq.qm.queries.enqueue("my_pipeline", b'')
    # when the pipeline completes, pgqueuer_result should have an entry for this job_id:
    result: TaskResult = await pgq.result(job_id, timeout=None)
    
    
if __name__ == "__main__":
    asyncio.run(main())

Advanced Usage

Creating Pipelines with Parallel Tasks

You can define pipelines with a mix of sequential and parallel tasks:

# Define a pipeline with parallel tasks
pipeline = pgq.pipeline([
    "task_1",                    # Run task_1 first
    ["task_2a", "task_2b"],      # Then run task_2a and task_2b in parallel
    "task_3"                     # Finally run task_3 after both task_2a and task_2b complete
])

Register a Pipeline as an Entrypoint

You can register a pipeline as an entrypoint for reuse:

# Register the pipeline as an entrypoint
pgq.entrypoint_pipeline(
    "data_processing_pipeline",
    "fetch_data",
    ["validate_data", "normalize_data"],
    "store_data"
)

# Now you can enqueue this pipeline like any other task
job_id = await pgq.enqueue("data_processing_pipeline", {"source": "api"})

Running Blocking Functions Asynchronously

pgskewer provides utilities to run blocking functions asynchronously with real-time log streaming:

from pgskewer import unblock

def cpu_intensive_task(data):
    # This is a blocking function
    print("Processing data...")
    result = process_data(data)
    print("Processing complete!")
    return result

# Run the blocking function asynchronously with log streaming
result = await unblock(cpu_intensive_task, data)

Pipeline Result Structure

The pipeline returns a structured result with information about each task:

{
    "initial": {
        # The initial payload provided to the pipeline
    },
    "tasks": {
        "task_1": {
            "status": "successful",
            "ok": true,
            "result": {
                # Task 1's result data
            }
        },
        "task_2a": {
            "status": "successful",
            "ok": true,
            "result": {
                # Task 2a's result data
            }
        },
        # ... other tasks
    }
}

Error Handling

If any task in a pipeline fails:

  • In sequential execution, the pipeline stops and no further tasks are executed
  • In parallel execution, sibling tasks are terminated

License

pgskewer is distributed under the terms of the MIT license.

Credits

Developed by Education Warehouse.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgskewer-0.1.1.tar.gz (21.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pgskewer-0.1.1-py3-none-any.whl (18.0 kB view details)

Uploaded Python 3

File details

Details for the file pgskewer-0.1.1.tar.gz.

File metadata

  • Download URL: pgskewer-0.1.1.tar.gz
  • Upload date:
  • Size: 21.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.4 {"installer":{"name":"uv","version":"0.10.4","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Linux Mint","version":"22.3","id":"zena","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for pgskewer-0.1.1.tar.gz
Algorithm Hash digest
SHA256 18e7c5a00b99a8e8dc355af9e22d4eada361dd127a3e8e3471c221b2a2530d98
MD5 a29569d99eb7518edd0eadd4ad606656
BLAKE2b-256 ab59d725873bae93782645109a8ad46b30694eaa676aa6f25d891350d2b5ead6

See more details on using hashes here.

File details

Details for the file pgskewer-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: pgskewer-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 18.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.4 {"installer":{"name":"uv","version":"0.10.4","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Linux Mint","version":"22.3","id":"zena","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for pgskewer-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 7332951e33ae8dfcfeb96376efc7368e93e7b9e289ef963896c4d0a800e30f3c
MD5 48a4b99e6e26fdc88938068496ffa57b
BLAKE2b-256 8e471b9e1465176ff9bc9ded9cab71d22b40a0d445a1451d26d303253164061f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page