Skip to main content

A framework for processing streaming data through CPU-intensive tasks while maintaining order and tracking latency

Project description

Async CPU-Intensive Task Pipeline

PyPI PyPI

A framework for processing streaming data through CPU-intensive tasks while maintaining order and tracking latency.

Overview

Combines async I/O with threaded CPU processing:

  • Async streams: Non-blocking input/output
  • Pipeline parallelism: Each stage runs in its own thread
  • Order preservation: Output maintains input sequence
  • Latency tracking: Monitor end-to-end and per-stage performance

Workflow

sequenceDiagram
    participant Input as Async Input Stream
    participant Main as Main Thread<br/>(Asyncio Event Loop)
    participant Q1 as Input Queue
    participant T1 as Thread 1<br/>(Stage 1: Validate)
    participant Q2 as Queue 1
    participant T2 as Thread 2<br/>(Stage 2: Transform)
    participant Q3 as Queue 2
    participant T3 as Thread 3<br/>(Stage 3: Serialize)
    participant Q4 as Output Queue
    participant Output as Async Output Stream

    Note over Main: Pipeline Parallelism - Multiple items processed simultaneously

    Input->>Main: yield Item A
    Main->>Q1: put Item A
    Q1->>T1: get Item A

    Input->>Main: yield Item B
    Main->>Q1: put Item B
    Q1->>T1: get Item B

    par Item A flows through pipeline
        T1->>Q2: put processed Item A
        Q2->>T2: get Item A
        T2->>Q3: put processed Item A
        Q3->>T3: get Item A
        T3->>Q4: put processed Item A
    and Item B follows behind
        T1->>Q2: put processed Item B
        Q2->>T2: get Item B
        T2->>Q3: put processed Item B
    and Item C enters pipeline
        Input->>Main: yield Item C
        Main->>Q1: put Item C
        Q1->>T1: get Item C
        T1->>Q2: put processed Item C
    end

    Q4->>Main: get Item A (ordered)
    Main->>Output: yield Item A

    Q4->>Main: get Item B (ordered)
    Main->>Output: yield Item B

    Note over Main,Output: Output buffer ensures<br/>items maintain input order

The asyncio event loop handles I/O operations while each pipeline stage runs in its own thread for true CPU parallelism.

Quick Start

import asyncio
from async_task_pipeline import AsyncTaskPipeline

# Create pipeline
pipeline = AsyncTaskPipeline(max_queue_size=100)

# Add processing stages
pipeline.add_stage("validate", validate_function)
pipeline.add_stage("transform", transform_function)
pipeline.add_stage("serialize", serialize_function)

# Start and run
await pipeline.start()

# Process streams concurrently
await asyncio.gather(
    pipeline.process_input_stream(your_input_stream()),
    consume_output(pipeline.generate_output_stream())
)

await pipeline.stop()

Usage Patterns

Basic Processing Function

def cpu_intensive_task(data):
    # Your CPU-heavy computation here
    result = complex_computation(data)
    return result

Input Stream

async def input_stream():
    for item in data_source:
        yield item
        await asyncio.sleep(0)  # Yield control

Output Consumer

async def consume_output(output_stream):
    async for result in output_stream:
        # Handle processed result
        print(f"Processed: {result}")

Pipeline Management

# Clear pipeline state
pipeline.clear()

# Stop gracefully
await pipeline.stop()

# Get performance metrics
summary = pipeline.get_latency_summary()

Running the Example

python example.py --enable-timing

The example demonstrates a 4-stage pipeline processing 50 items with simulated CPU-intensive tasks.

Development

This project uses modern Python development tools managed through a Makefile and uv.

Quick Setup

# Install development dependencies and set up pre-commit hooks
make dev-setup

# Run all quality checks
make check

Available Commands

# Development setup
make install          # Install the package
make install-dev      # Install with development dependencies
make dev-setup        # Complete development environment setup

# Code quality
make format           # Format code with ruff
make lint             # Lint code with ruff
make type-check       # Run type checking with mypy
make test             # Run tests with pytest
make test-cov         # Run tests with coverage
make check            # Run all quality checks

# Pre-commit
make pre-commit-install  # Install pre-commit hooks
make pre-commit         # Run pre-commit on all files

# Building and publishing
make build            # Build the package
make publish-test     # Publish to TestPyPI
make publish          # Publish to PyPI

# Version management
make version-patch    # Bump patch version
make version-minor    # Bump minor version
make version-major    # Bump major version

# Utilities
make clean            # Clean up cache and build files
make watch-test       # Run tests in watch mode
make help             # Show all available commands

Code Quality Standards

This project enforces high code quality standards:

  • Formatting: ruff format for consistent code style
  • Linting: ruff check for code quality and best practices
  • Type Checking: mypy for static type analysis
  • Testing: pytest with coverage reporting
  • Pre-commit hooks: Automated checks before each commit
  • Security: bandit for security vulnerability scanning

Publishing Workflow

  1. Make your changes and ensure all tests pass:

    make check
    
  2. Bump the version:

    make version-patch  # or version-minor/version-major
    
  3. Build and publish:

    make publish  # or publish-test for TestPyPI
    

When to Use

  • Streaming data with CPU-heavy processing
  • Need to maintain input order in output
  • Want pipeline parallelism (different stages processing different items)
  • CPU processing is with libraries that release Python's GIL (NumPy, PyTorch, etc.)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_task_pipeline-0.1.9.tar.gz (38.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

async_task_pipeline-0.1.9-py3-none-any.whl (12.8 kB view details)

Uploaded Python 3

File details

Details for the file async_task_pipeline-0.1.9.tar.gz.

File metadata

  • Download URL: async_task_pipeline-0.1.9.tar.gz
  • Upload date:
  • Size: 38.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.6.11

File hashes

Hashes for async_task_pipeline-0.1.9.tar.gz
Algorithm Hash digest
SHA256 0a24f4fa467844a4399bc6078e3cf33b0261fef66a7f1c6fec6ab5379f321304
MD5 af3c175eb60ee6a04310d058b1d6f04a
BLAKE2b-256 e608e6d25a2c315359d07716575cc4875c9f0548fd2d451a08bdbafee6d2ed14

See more details on using hashes here.

File details

Details for the file async_task_pipeline-0.1.9-py3-none-any.whl.

File metadata

File hashes

Hashes for async_task_pipeline-0.1.9-py3-none-any.whl
Algorithm Hash digest
SHA256 37224dec87256fa7a3465d7b07f00baea88fe5ff77242fe29b172a96beaeeb05
MD5 8793f3b3cfd5727c1b3a753300c0978e
BLAKE2b-256 2186bde51350422c074e2a0ca70810181e2ee9a96f25ac54e5e2f1a40ee74673

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page