Async Utils
Project description
au - Asynchronous Computation Framework
A Python framework for transforming synchronous functions into asynchronous ones with status tracking, result persistence, and pluggable backends.
Features
- 🚀 Simple decorator-based API - Transform any function into an async computation
- 💾 Pluggable storage backends - File system, Redis, databases, etc.
- 🔄 Multiple execution backends - Processes, threads, remote APIs
- 🛡️ Middleware system - Logging, metrics, authentication, rate limiting
- 🧹 Automatic cleanup - TTL-based expiration of old results
- 📦 Flexible serialization - JSON, Pickle, or custom formats
- 🔍 Status tracking - Monitor computation state and progress
- ❌ Cancellation support - Stop long-running computations
Installation
pip install au
Quick Start
from au import async_compute
@async_compute()
def expensive_computation(n: int) -> int:
"""Calculate factorial."""
result = 1
for i in range(1, n + 1):
result *= i
return result
# Launch computation (returns immediately)
handle = expensive_computation(100)
# Check status
print(handle.get_status()) # ComputationStatus.RUNNING
# Get result (blocks with timeout)
result = handle.get_result(timeout=30)
print(f"100! = {result}")
Use Cases
1. Long-Running Computations
Perfect for computations that take minutes or hours:
- Machine learning model training
- Data processing pipelines
- Scientific simulations
- Report generation
2. Web Application Background Tasks
Offload heavy work from request handlers:
@app.route('/analyze')
def analyze_data():
handle = analyze_large_dataset(request.files['data'])
return {'job_id': handle.key}
@app.route('/status/<job_id>')
def check_status(job_id):
handle = ComputationHandle(job_id, store)
return {'status': handle.get_status().value}
3. Distributed Computing
Use remote backends to distribute work:
@async_compute(backend=RemoteAPIBackend(api_url="https://compute.example.com"))
def distributed_task(data):
return complex_analysis(data)
4. Batch Processing
Process multiple items with shared infrastructure:
store = FileSystemStore("/var/computations", ttl_seconds=3600)
backend = ProcessBackend(store)
@async_compute(backend=backend, store=store)
def process_item(item_id):
return transform_item(item_id)
# Launch multiple computations
handles = [process_item(i) for i in range(1000)]
Usage Patterns
Basic Usage
from au import async_compute
# Simple async function with default settings
@async_compute()
def my_function(x):
return x * 2
handle = my_function(21)
result = handle.get_result(timeout=10) # Returns 42
Custom Configuration
from au import async_compute, FileSystemStore, ProcessBackend
from au import LoggingMiddleware, MetricsMiddleware, SerializationFormat
# Configure store with TTL and serialization
store = FileSystemStore(
"/var/computations",
ttl_seconds=3600, # 1 hour TTL
serialization=SerializationFormat.PICKLE # For complex objects
)
# Add middleware
middleware = [
LoggingMiddleware(level=logging.INFO),
MetricsMiddleware()
]
# Create backend with middleware
backend = ProcessBackend(store, middleware=middleware)
# Apply to function
@async_compute(backend=backend, store=store)
def complex_computation(data):
return analyze(data)
Shared Infrastructure
# Create shared components
store = FileSystemStore("/var/shared", ttl_seconds=7200)
backend = ProcessBackend(store)
# Multiple functions share the same infrastructure
@async_compute(backend=backend, store=store)
def step1(x):
return preprocess(x)
@async_compute(backend=backend, store=store)
def step2(x):
return transform(x)
# Chain computations
data = load_data()
h1 = step1(data)
preprocessed = h1.get_result(timeout=60)
h2 = step2(preprocessed)
final_result = h2.get_result(timeout=60)
Temporary Computations
from au import temporary_async_compute
# Automatic cleanup when context exits
with temporary_async_compute(ttl_seconds=60) as async_func:
@async_func
def quick_job(x):
return x ** 2
handle = quick_job(10)
result = handle.get_result(timeout=5)
# Temporary directory cleaned up automatically
Thread Backend for I/O-Bound Tasks
from au import ThreadBackend
# Use threads for I/O-bound operations
store = FileSystemStore("/tmp/io_tasks")
backend = ThreadBackend(store)
@async_compute(backend=backend, store=store)
def fetch_data(url):
return requests.get(url).json()
# Launch multiple I/O operations
handles = [fetch_data(url) for url in urls]
Architecture & Design
Core Components
-
Storage Abstraction (
ComputationStore
)- Implements Python's
MutableMapping
interface - Handles result persistence and retrieval
- Supports TTL-based expiration
- Extensible for any storage backend
- Implements Python's
-
Execution Abstraction (
ComputationBackend
)- Defines how computations are launched
- Supports different execution models
- Integrates middleware for cross-cutting concerns
-
Result Handling (
ComputationHandle
)- Clean API for checking status and retrieving results
- Supports timeouts and cancellation
- Provides access to metadata
-
Middleware System
- Lifecycle hooks: before, after, error
- Composable and reusable
- Examples: logging, metrics, auth, rate limiting
Design Principles
- Separation of Concerns: Storage, execution, and result handling are independent
- Dependency Injection: All components are injected, avoiding hardcoded dependencies
- Open/Closed Principle: Extend functionality without modifying core code
- Standard Interfaces: Uses Python's
collections.abc
interfaces - Functional Approach: Decorator-based API preserves function signatures
Trade-offs & Considerations
Pros
- ✅ Clean abstraction allows easy swapping of implementations
- ✅ Type hints and dataclasses provide excellent IDE support
- ✅ Follows SOLID principles for maintainability
- ✅ Minimal dependencies (uses only Python stdlib)
- ✅ Flexible serialization supports complex objects
- ✅ Middleware enables cross-cutting concerns
Cons
- ❌ Process-based backend has overhead for small computations
- ❌ File-based storage might not scale for high throughput
- ❌ Metrics middleware doesn't share state across processes by default
- ❌ No built-in distributed coordination
- ❌ Fork method required for ProcessBackend (platform-specific)
When to Use
- ✅ Long-running computations (minutes to hours)
- ✅ Need to persist results across restarts
- ✅ Want to separate computation from result retrieval
- ✅ Building async APIs or job queues
- ✅ Need cancellation or timeout support
When NOT to Use
- ❌ Sub-second computations (overhead too high)
- ❌ Need distributed coordination (use Celery/Dask)
- ❌ Require complex workflow orchestration
- ❌ Need real-time streaming results
Advanced Features
Custom Middleware
from au import Middleware
class RateLimitMiddleware(Middleware):
def __init__(self, max_per_minute: int = 60):
self.max_per_minute = max_per_minute
self.requests = []
def before_compute(self, func, args, kwargs, key):
now = time.time()
self.requests = [t for t in self.requests if now - t < 60]
if len(self.requests) >= self.max_per_minute:
raise Exception("Rate limit exceeded")
self.requests.append(now)
def after_compute(self, key, result):
pass
def on_error(self, key, error):
pass
# Use the middleware
@async_compute(middleware=[RateLimitMiddleware(max_per_minute=10)])
def rate_limited_function(x):
return expensive_api_call(x)
Custom Storage Backend
from au import ComputationStore, ComputationResult
import redis
class RedisStore(ComputationStore):
def __init__(self, redis_client, *, ttl_seconds=None):
super().__init__(ttl_seconds=ttl_seconds)
self.redis = redis_client
def create_key(self):
return f"computation:{uuid.uuid4()}"
def __getitem__(self, key):
data = self.redis.get(key)
if data is None:
return ComputationResult(None, ComputationStatus.PENDING)
return pickle.loads(data)
def __setitem__(self, key, result):
data = pickle.dumps(result)
if self.ttl_seconds:
self.redis.setex(key, self.ttl_seconds, data)
else:
self.redis.set(key, data)
def __delitem__(self, key):
self.redis.delete(key)
def __iter__(self):
return iter(self.redis.scan_iter("computation:*"))
def __len__(self):
return len(list(self))
def cleanup_expired(self):
# Redis handles expiration automatically
return 0
# Use Redis backend
redis_client = redis.Redis(host='localhost', port=6379)
store = RedisStore(redis_client, ttl_seconds=3600)
@async_compute(store=store)
def distributed_computation(x):
return process(x)
Monitoring & Metrics
from au import MetricsMiddleware
# Create shared metrics
metrics = MetricsMiddleware()
@async_compute(middleware=[metrics])
def monitored_function(x):
return compute(x)
# Launch several computations
for i in range(10):
monitored_function(i)
# Check metrics
stats = metrics.get_stats()
print(f"Total: {stats['total']}")
print(f"Completed: {stats['completed']}")
print(f"Failed: {stats['failed']}")
print(f"Avg Duration: {stats['avg_duration']:.2f}s")
Error Handling
@async_compute()
def may_fail(x):
if x < 0:
raise ValueError("x must be positive")
return x ** 2
handle = may_fail(-5)
try:
result = handle.get_result(timeout=5)
except Exception as e:
print(f"Computation failed: {e}")
print(f"Status: {handle.get_status()}") # ComputationStatus.FAILED
Cleanup Strategies
# Manual cleanup
@async_compute(ttl_seconds=3600)
def my_func(x):
return x * 2
# Clean up expired results
removed = my_func.cleanup_expired()
print(f"Removed {removed} expired results")
# Automatic cleanup with probability
store = FileSystemStore(
"/tmp/computations",
ttl_seconds=3600,
auto_cleanup=True,
cleanup_probability=0.1 # 10% chance on each access
)
API Reference
Main Decorator
@async_compute(
backend=None, # Execution backend (default: ProcessBackend)
store=None, # Storage backend (default: FileSystemStore)
base_path="/tmp/computations", # Path for default file store
ttl_seconds=3600, # Time-to-live for results
serialization=SerializationFormat.JSON, # JSON or PICKLE
middleware=None # List of middleware components
)
ComputationHandle Methods
is_ready() -> bool
: Check if computation is completeget_status() -> ComputationStatus
: Get current statusget_result(timeout=None) -> T
: Get result, optionally waitcancel() -> bool
: Attempt to cancel computationmetadata -> Dict[str, Any]
: Access computation metadata
ComputationStatus Enum
PENDING
: Not started yetRUNNING
: Currently executingCOMPLETED
: Successfully finishedFAILED
: Failed with error
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
License
MIT License - see LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file au-0.0.11.tar.gz
.
File metadata
- Download URL: au-0.0.11.tar.gz
- Upload date:
- Size: 16.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.10.17
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 |
198ff40bde4e8ef72f369dcd0ddc43cdd73356045e158d52db33e616d8ea7a2d
|
|
MD5 |
5c1de1c2a1c02f8ff8af2c59f91f4803
|
|
BLAKE2b-256 |
9268483c2caf814c17acfee0e9913237e3014e037a1a581ab50199497654a2a3
|
File details
Details for the file au-0.0.11-py3-none-any.whl
.
File metadata
- Download URL: au-0.0.11-py3-none-any.whl
- Upload date:
- Size: 13.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.10.17
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 |
e1b54b8f3396b45faadf848c90a9ad3f8a29cdf3dc8f71c4daf9dcfd603ef3d7
|
|
MD5 |
cdb5298280b8926fdbbb9c56d7ea1620
|
|
BLAKE2b-256 |
46e720a64c7e6d5064be0790474ea515ed592a9703ff6e57ea51f10858a484b8
|