Skip to main content

Async Utils

Project description

au - Asynchronous Computation Framework

A Python framework for transforming synchronous functions into asynchronous ones with status tracking, result persistence, and pluggable backends.

Features

  • 🚀 Simple decorator-based API - Transform any function into an async computation
  • 💾 Pluggable storage backends - File system, Redis, databases, etc.
  • 🔄 Multiple execution backends - Processes, threads, distributed queues (RQ, Supabase)
  • 🌐 Queue backends - Standard library, Redis Queue, Supabase PostgreSQL
  • 🛡️ Middleware system - Logging, metrics, authentication, rate limiting
  • 🧹 Automatic cleanup - TTL-based expiration of old results
  • 📦 Flexible serialization - JSON, Pickle, or custom formats
  • 🔍 Status tracking - Monitor computation state and progress
  • Cancellation support - Stop long-running computations
  • 🏭 Distributed processing - Scale across multiple machines

Installation

pip install au

Quick Start

from au import async_compute
# For queue backends:
# from au import StdLibQueueBackend
# from au.backends.rq_backend import RQBackend
# from au.backends.supabase_backend import SupabaseQueueBackend

@async_compute()
def expensive_computation(n: int) -> int:
    """Calculate factorial."""
    result = 1
    for i in range(1, n + 1):
        result *= i
    return result

# Launch computation (returns immediately)
handle = expensive_computation(100)

# Check status
print(handle.get_status())  # ComputationStatus.RUNNING

# Get result (blocks with timeout)
result = handle.get_result(timeout=30)
print(f"100! = {result}")

Use Cases

1. Long-Running Computations

Perfect for computations that take minutes or hours:

  • Machine learning model training
  • Data processing pipelines
  • Scientific simulations
  • Report generation

2. Web Application Background Tasks

Offload heavy work from request handlers:

@app.route('/analyze')
def analyze_data():
    handle = analyze_large_dataset(request.files['data'])
    return {'job_id': handle.key}

@app.route('/status/<job_id>')
def check_status(job_id):
    handle = ComputationHandle(job_id, store)
    return {'status': handle.get_status().value}

3. Distributed Computing

Use queue backends to distribute work across multiple machines:

# Using Redis Queue backend
import redis
from rq import Queue
from au.backends.rq_backend import RQBackend

redis_conn = redis.Redis()
rq_queue = Queue('tasks', connection=redis_conn)
backend = RQBackend(store, rq_queue)

@async_compute(backend=backend, store=store)
def distributed_task(data):
    return complex_analysis(data)

# Task will be processed by RQ workers on any machine
handle = distributed_task(large_dataset)

4. Batch Processing

Process multiple items with shared infrastructure:

store = FileSystemStore("/var/computations", ttl_seconds=3600)
backend = ProcessBackend(store)

@async_compute(backend=backend, store=store)
def process_item(item_id):
    return transform_item(item_id)

# Launch multiple computations
handles = [process_item(i) for i in range(1000)]

Usage Patterns

Basic Usage

from au import async_compute

# Simple async function with default settings
@async_compute()
def my_function(x):
    return x * 2

handle = my_function(21)
result = handle.get_result(timeout=10)  # Returns 42

Custom Configuration

from au import async_compute, FileSystemStore, ProcessBackend
from au import LoggingMiddleware, MetricsMiddleware, SerializationFormat

# Configure store with TTL and serialization
store = FileSystemStore(
    "/var/computations",
    ttl_seconds=3600,  # 1 hour TTL
    serialization=SerializationFormat.PICKLE  # For complex objects
)

# Add middleware
middleware = [
    LoggingMiddleware(level=logging.INFO),
    MetricsMiddleware()
]

# Create backend with middleware
backend = ProcessBackend(store, middleware=middleware)

# Apply to function
@async_compute(backend=backend, store=store)
def complex_computation(data):
    return analyze(data)

Shared Infrastructure

# Create shared components
store = FileSystemStore("/var/shared", ttl_seconds=7200)
backend = ProcessBackend(store)

# Multiple functions share the same infrastructure
@async_compute(backend=backend, store=store)
def step1(x):
    return preprocess(x)

@async_compute(backend=backend, store=store)
def step2(x):
    return transform(x)

# Chain computations
data = load_data()
h1 = step1(data)
preprocessed = h1.get_result(timeout=60)
h2 = step2(preprocessed)
final_result = h2.get_result(timeout=60)

Temporary Computations

from au import temporary_async_compute

# Automatic cleanup when context exits
with temporary_async_compute(ttl_seconds=60) as async_func:
    @async_func
    def quick_job(x):
        return x ** 2
    
    handle = quick_job(10)
    result = handle.get_result(timeout=5)
    # Temporary directory cleaned up automatically

Thread Backend for I/O-Bound Tasks

from au import ThreadBackend

# Use threads for I/O-bound operations
store = FileSystemStore("/tmp/io_tasks")
backend = ThreadBackend(store)

@async_compute(backend=backend, store=store)
def fetch_data(url):
    return requests.get(url).json()

# Launch multiple I/O operations
handles = [fetch_data(url) for url in urls]

Queue Backends

The AU framework supports multiple queue backends for different distributed computing scenarios:

Standard Library Queue Backend

Uses Python's concurrent.futures for in-memory task processing with no external dependencies.

from au import StdLibQueueBackend

store = FileSystemStore("/tmp/computations")

# Use ThreadPoolExecutor for I/O-bound tasks
with StdLibQueueBackend(store, max_workers=4, use_processes=False) as backend:
    @async_compute(backend=backend, store=store)
    def fetch_data(url):
        return requests.get(url).text

# Use ProcessPoolExecutor for CPU-bound tasks  
with StdLibQueueBackend(store, max_workers=4, use_processes=True) as backend:
    @async_compute(backend=backend, store=store)
    def cpu_intensive(n):
        return sum(i * i for i in range(n))

Features:

  • No external dependencies
  • Context manager support for clean shutdown
  • Choice between threads and processes
  • In-memory queuing (not persistent)

Redis Queue (RQ) Backend

Distributed task processing using Redis and RQ workers.

Installation:

pip install redis rq

Usage:

import redis
from rq import Queue
from au.backends.rq_backend import RQBackend

# Setup Redis and RQ
redis_conn = redis.Redis(host='localhost', port=6379, db=0)
rq_queue = Queue('au_tasks', connection=redis_conn)

# Create backend
store = FileSystemStore("/tmp/computations")
backend = RQBackend(store, rq_queue)

@async_compute(backend=backend, store=store)
def heavy_computation(data):
    # This will be processed by RQ workers
    return process_data(data)

# Launch task (enqueued to Redis)
handle = heavy_computation(my_data)

# Start RQ worker in separate process/machine:
# rq worker au_tasks

Features:

  • Distributed processing across multiple machines
  • Persistent task queue (survives restarts)
  • Built-in job monitoring and management
  • Fault tolerance and retry mechanisms

Supabase Queue Backend

PostgreSQL-based task queue using Supabase with internal polling workers.

Installation:

pip install supabase

Database Setup:

CREATE TABLE au_task_queue (
    task_id UUID PRIMARY KEY,
    func_data BYTEA NOT NULL,
    status TEXT NOT NULL DEFAULT 'pending',
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    started_at TIMESTAMP WITH TIME ZONE,
    completed_at TIMESTAMP WITH TIME ZONE,
    worker_id TEXT
);

Usage:

from supabase import create_client
from au.backends.supabase_backend import SupabaseQueueBackend

# Setup Supabase client
supabase = create_client(SUPABASE_URL, SUPABASE_KEY)

# Create backend with internal polling workers
store = FileSystemStore("/tmp/computations")
with SupabaseQueueBackend(
    store, 
    supabase, 
    max_concurrent_tasks=3,
    polling_interval_seconds=2.0
) as backend:
    
    @async_compute(backend=backend, store=store)
    def analyze_data(dataset_id):
        return run_analysis(dataset_id)
    
    handle = analyze_data("dataset_123")
    result = handle.get_result(timeout=60)

Features:

  • PostgreSQL-based persistence
  • Internal polling workers (no separate worker processes needed)
  • SQL-based task management and monitoring
  • Integration with Supabase ecosystem

Backend Comparison

Backend Persistence Distribution Setup Complexity Best For
ProcessBackend No Single machine Low Development, single-machine processing
StdLibQueueBackend No Single machine Low Simple queuing, testing
RQBackend Yes Multi-machine Medium Production distributed systems
SupabaseQueueBackend Yes Multi-machine Medium PostgreSQL-based architectures

Function Serialization Requirements

Queue backends require functions to be pickleable:

Good:

# Module-level function
def my_task(x):
    return x * 2

@async_compute(backend=queue_backend)
def another_task(data):
    return process(data)

Bad:

def test_function():
    # Local function - can't be pickled!
    @async_compute(backend=queue_backend)
    def local_task(x):
        return x * 2

Architecture & Design

Core Components

  1. Storage Abstraction (ComputationStore)

    • Implements Python's MutableMapping interface
    • Handles result persistence and retrieval
    • Supports TTL-based expiration
    • Extensible for any storage backend
  2. Execution Abstraction (ComputationBackend)

    • Defines how computations are launched
    • Supports different execution models
    • Integrates middleware for cross-cutting concerns
  3. Result Handling (ComputationHandle)

    • Clean API for checking status and retrieving results
    • Supports timeouts and cancellation
    • Provides access to metadata
  4. Middleware System

    • Lifecycle hooks: before, after, error
    • Composable and reusable
    • Examples: logging, metrics, auth, rate limiting

Design Principles

  • Separation of Concerns: Storage, execution, and result handling are independent
  • Dependency Injection: All components are injected, avoiding hardcoded dependencies
  • Open/Closed Principle: Extend functionality without modifying core code
  • Standard Interfaces: Uses Python's collections.abc interfaces
  • Functional Approach: Decorator-based API preserves function signatures

Trade-offs & Considerations

Pros

  • ✅ Clean abstraction allows easy swapping of implementations
  • ✅ Type hints and dataclasses provide excellent IDE support
  • ✅ Follows SOLID principles for maintainability
  • ✅ Minimal dependencies (uses only Python stdlib)
  • ✅ Flexible serialization supports complex objects
  • ✅ Middleware enables cross-cutting concerns

Cons

  • ❌ Process-based backend has overhead for small computations
  • ❌ File-based storage might not scale for high throughput
  • ❌ Metrics middleware doesn't share state across processes by default
  • ❌ No built-in distributed coordination
  • ❌ Fork method required for ProcessBackend (platform-specific)

When to Use

  • ✅ Long-running computations (minutes to hours)
  • ✅ Need to persist results across restarts
  • ✅ Want to separate computation from result retrieval
  • ✅ Building async APIs or job queues
  • ✅ Need cancellation or timeout support

When NOT to Use

  • ❌ Sub-second computations (overhead too high)
  • ❌ Need distributed coordination (use Celery/Dask)
  • ❌ Require complex workflow orchestration
  • ❌ Need real-time streaming results

Advanced Features

Custom Middleware

from au import Middleware

class RateLimitMiddleware(Middleware):
    def __init__(self, max_per_minute: int = 60):
        self.max_per_minute = max_per_minute
        self.requests = []
    
    def before_compute(self, func, args, kwargs, key):
        now = time.time()
        self.requests = [t for t in self.requests if now - t < 60]
        
        if len(self.requests) >= self.max_per_minute:
            raise Exception("Rate limit exceeded")
        
        self.requests.append(now)
    
    def after_compute(self, key, result):
        pass
    
    def on_error(self, key, error):
        pass

# Use the middleware
@async_compute(middleware=[RateLimitMiddleware(max_per_minute=10)])
def rate_limited_function(x):
    return expensive_api_call(x)

Custom Storage Backend

from au import ComputationStore, ComputationResult
import redis

class RedisStore(ComputationStore):
    def __init__(self, redis_client, *, ttl_seconds=None):
        super().__init__(ttl_seconds=ttl_seconds)
        self.redis = redis_client
    
    def create_key(self):
        return f"computation:{uuid.uuid4()}"
    
    def __getitem__(self, key):
        data = self.redis.get(key)
        if data is None:
            return ComputationResult(None, ComputationStatus.PENDING)
        return pickle.loads(data)
    
    def __setitem__(self, key, result):
        data = pickle.dumps(result)
        if self.ttl_seconds:
            self.redis.setex(key, self.ttl_seconds, data)
        else:
            self.redis.set(key, data)
    
    def __delitem__(self, key):
        self.redis.delete(key)
    
    def __iter__(self):
        return iter(self.redis.scan_iter("computation:*"))
    
    def __len__(self):
        return len(list(self))
    
    def cleanup_expired(self):
        # Redis handles expiration automatically
        return 0

# Use Redis backend
redis_client = redis.Redis(host='localhost', port=6379)
store = RedisStore(redis_client, ttl_seconds=3600)

@async_compute(store=store)
def distributed_computation(x):
    return process(x)

Monitoring & Metrics

from au import MetricsMiddleware

# Create shared metrics
metrics = MetricsMiddleware()

@async_compute(middleware=[metrics])
def monitored_function(x):
    return compute(x)

# Launch several computations
for i in range(10):
    monitored_function(i)

# Check metrics
stats = metrics.get_stats()
print(f"Total: {stats['total']}")
print(f"Completed: {stats['completed']}")
print(f"Failed: {stats['failed']}")
print(f"Avg Duration: {stats['avg_duration']:.2f}s")

Error Handling

@async_compute()
def may_fail(x):
    if x < 0:
        raise ValueError("x must be positive")
    return x ** 2

handle = may_fail(-5)

try:
    result = handle.get_result(timeout=5)
except Exception as e:
    print(f"Computation failed: {e}")
    print(f"Status: {handle.get_status()}")  # ComputationStatus.FAILED

Cleanup Strategies

# Manual cleanup
@async_compute(ttl_seconds=3600)
def my_func(x):
    return x * 2

# Clean up expired results
removed = my_func.cleanup_expired()
print(f"Removed {removed} expired results")

# Automatic cleanup with probability
store = FileSystemStore(
    "/tmp/computations",
    ttl_seconds=3600,
    auto_cleanup=True,
    cleanup_probability=0.1  # 10% chance on each access
)

API Reference

Main Decorator

@async_compute(
    backend=None,           # Execution backend (default: ProcessBackend)
    store=None,            # Storage backend (default: FileSystemStore)
    base_path="/tmp/computations",  # Path for default file store
    ttl_seconds=3600,      # Time-to-live for results
    serialization=SerializationFormat.JSON,  # JSON or PICKLE
    middleware=None        # List of middleware components
)

ComputationHandle Methods

  • is_ready() -> bool: Check if computation is complete
  • get_status() -> ComputationStatus: Get current status
  • get_result(timeout=None) -> T: Get result, optionally wait
  • cancel() -> bool: Attempt to cancel computation
  • metadata -> Dict[str, Any]: Access computation metadata

ComputationStatus Enum

  • PENDING: Not started yet
  • RUNNING: Currently executing
  • COMPLETED: Successfully finished
  • FAILED: Failed with error

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

MIT License - see LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

au-0.0.16.tar.gz (24.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

au-0.0.16-py3-none-any.whl (21.8 kB view details)

Uploaded Python 3

File details

Details for the file au-0.0.16.tar.gz.

File metadata

  • Download URL: au-0.0.16.tar.gz
  • Upload date:
  • Size: 24.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.19

File hashes

Hashes for au-0.0.16.tar.gz
Algorithm Hash digest
SHA256 b810acb122ab4d604d19d4b9406c2e7f338d395744b6be2a1295bb43c8f0348a
MD5 f6586d9eee53fe7a33ac3a75c0833f0d
BLAKE2b-256 88256581bd4629507200a3a07d8024646e7995ba4ac1e9e7d5639c33fd007808

See more details on using hashes here.

File details

Details for the file au-0.0.16-py3-none-any.whl.

File metadata

  • Download URL: au-0.0.16-py3-none-any.whl
  • Upload date:
  • Size: 21.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.19

File hashes

Hashes for au-0.0.16-py3-none-any.whl
Algorithm Hash digest
SHA256 fa73763d3d88604aaca0afb3991893f25dde6704c1597e180b3691f301151084
MD5 77c0e5d098353f9931991a9e4d6c6fdb
BLAKE2b-256 462bf84a746eebb64fefecbf85db8345b7bbce99ee33cdfd9be2f56ac72b1c56

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page