EDWH pgqueuer pipeline (pg-skEWer)
Project description
pgskewer
A minimalistic pipeline functionality built on top of pgqueuer, providing enhanced task orchestration capabilities for PostgreSQL-based job queues.
The name "pgskewer" is chosen due to its metaphorical representation of how it "skewers" tasks together in a pipeline; rhymes with "pgqueuer" and contains "EW" (Education Warehouse) in its name.
Features
- Sequential and Parallel Task Pipelines: Define complex workflows with a mix of sequential and parallel task execution
- Result Storage: Automatically store and pass around task results in sequence
- Cancelable Tasks: Gracefully cancel tasks when needed
- Improved Error Handling: Better error propagation and handling in pipelines
- Real-time Log Streaming: Stream logs from blocking functions in real-time
- Type Annotations: Comprehensive typing support for better IDE integration
Installation
uv pip install pgskewer
Requirements
- Python ≥ 3.13
- PostgreSQL database
- Dependencies:
- pgqueuer
- asyncpg
- anyio
- uvloop
Quick Start
import asyncio
import asyncpg
from pgqueuer import Job
from pgqueuer.db import AsyncpgDriver
from pgqueuer.queries import Queries
from pgskewer import ImprovedQueuer, parse_payload, TaskResult
async def main():
# Initialize the queuer with your database connection
connection = await asyncpg.connect("postgresql://user:password@localhost/dbname")
driver = AsyncpgDriver(connection)
pgq = ImprovedQueuer(driver)
# Define some tasks as entrypoints
@pgq.entrypoint("fetch_data")
async def fetch_data(job):
# Fetch some data
return {"data": "example data"}
@pgq.entrypoint("process_data")
async def process_data(job: Job):
# Process the data from the previous step
payload = parse_payload(job.payload)
data = payload["tasks"]["fetch_data"]["result"]["data"]
return {"processed": data.upper()}
@pgq.entrypoint("store_results")
async def store_results(job: Job):
# Store the processed data
payload = parse_payload(job.payload)
processed = payload["tasks"]["process_data"]["result"]["processed"]
# Store the processed data somewhere
return {"status": "completed", "stored": processed}
# Create a pipeline that runs these tasks in sequence
pgq.entrypoint_pipeline(
"my_pipeline",
# start steps as a mix of entrypoint names and function references:
fetch_data,
"process_data",
store_results
)
# Execute the pipeline (empty initial data)
job_id = await pgq.qm.queries.enqueue("my_pipeline", b'')
# when the pipeline completes, pgqueuer_result should have an entry for this job_id:
result: TaskResult = await pgq.result(job_id, timeout=None)
if __name__ == "__main__":
asyncio.run(main())
Advanced Usage
Creating Pipelines with Parallel Tasks
You can define pipelines with a mix of sequential and parallel tasks:
# Define a pipeline with parallel tasks
pipeline = pgq.pipeline([
"task_1", # Run task_1 first
["task_2a", "task_2b"], # Then run task_2a and task_2b in parallel
"task_3" # Finally run task_3 after both task_2a and task_2b complete
])
Register a Pipeline as an Entrypoint
You can register a pipeline as an entrypoint for reuse:
# Register the pipeline as an entrypoint
pgq.entrypoint_pipeline(
"data_processing_pipeline",
"fetch_data",
["validate_data", "normalize_data"],
"store_data"
)
# Now you can enqueue this pipeline like any other task
job_id = await pgq.enqueue("data_processing_pipeline", {"source": "api"})
Running Blocking Functions Asynchronously
pgskewer provides utilities to run blocking functions asynchronously with real-time log streaming:
from pgskewer import unblock
def cpu_intensive_task(data):
# This is a blocking function
print("Processing data...")
result = process_data(data)
print("Processing complete!")
return result
# Run the blocking function asynchronously with log streaming
result = await unblock(cpu_intensive_task, data)
Pipeline Result Structure
The pipeline returns a structured result with information about each task:
{
"initial": {
# The initial payload provided to the pipeline
},
"tasks": {
"task_1": {
"status": "successful",
"ok": true,
"result": {
# Task 1's result data
}
},
"task_2a": {
"status": "successful",
"ok": true,
"result": {
# Task 2a's result data
}
},
# ... other tasks
}
}
Error Handling
If any task in a pipeline fails:
- In sequential execution, the pipeline stops and no further tasks are executed
- In parallel execution, sibling tasks are terminated
License
pgskewer is distributed under the terms of the MIT license.
Credits
Developed by Education Warehouse.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pgskewer-0.2.0.tar.gz.
File metadata
- Download URL: pgskewer-0.2.0.tar.gz
- Upload date:
- Size: 22.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.4 {"installer":{"name":"uv","version":"0.10.4","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Linux Mint","version":"22.3","id":"zena","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9c3ca61fedc5c4b8850a968382b43035c1c0ca2dac31790dcd30be8a0c72212f
|
|
| MD5 |
18fff832a95e07e125329d97daa8e50a
|
|
| BLAKE2b-256 |
725f341012d8651c4f2dca408a2f69fb33afeafac3d84d515faddc200e882a61
|
File details
Details for the file pgskewer-0.2.0-py3-none-any.whl.
File metadata
- Download URL: pgskewer-0.2.0-py3-none-any.whl
- Upload date:
- Size: 19.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.4 {"installer":{"name":"uv","version":"0.10.4","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Linux Mint","version":"22.3","id":"zena","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8dce3d5c47f033901f2fa5363dcbb54d2cd97019016fde1b9124a5e5716f1d89
|
|
| MD5 |
dcc85ffec97be108afcc6b4b92353d77
|
|
| BLAKE2b-256 |
cb3e5e7d7a9e12746862b8066aa2c705cb63e120389fc3bcd2dd6d7bc9309a87
|