Distributed job queues, event streams, and workflow orchestration for Python
Project description
FlowFn Python SDK
Distributed job queues, event streams, and workflow orchestration for Python.
Features
- Queues: Background job processing with retries, priorities, and batch processing
- Streams: Pub/sub event streaming with consumer groups and replay
- Workflows: Multi-step orchestration with state management and compensation
- Patterns: Rate limiting, batching, priority queues, circuit breakers
- Storage: Pluggable storage backends (Memory, Redis, PostgreSQL)
- Monitoring: Health checks, metrics, and event tracking
Installation
pip install flowfn
With optional adapters:
pip install flowfn[redis] # Redis adapter
pip install flowfn[postgres] # PostgreSQL adapter
pip install flowfn[all] # All adapters
Quick Start
Queue Example
from flowfn import create_flow
flow = create_flow(adapter='memory')
# Create a queue
queue = flow.queue('emails')
# Add jobs
await queue.add('send-welcome', {
'to': 'user@example.com',
'template': 'welcome'
})
# Process jobs
@queue.process()
async def process_email(job):
await send_email(job.data['to'], job.data['template'])
return {'sent': True}
Stream Example
# Create a stream
stream = flow.stream('events')
# Publish events
await stream.publish({
'type': 'user.created',
'user_id': '123'
})
# Subscribe to events
@stream.subscribe()
async def handle_event(message):
print(f"Received: {message.data}")
await message.ack()
Workflow Example
# Define a workflow
workflow = (
flow.workflow('process-order')
.step('validate', validate_order)
.step('charge', charge_payment)
.step('fulfill', fulfill_order)
.build()
)
# Execute workflow
execution = await workflow.execute({
'order_id': '123',
'amount': 99.99
})
Documentation
Development
# Install development dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Format code
black flowfn tests
# Type checking
mypy flowfn
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
flowfn-0.0.1.tar.gz
(25.9 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
flowfn-0.0.1-py3-none-any.whl
(30.7 kB
view details)
File details
Details for the file flowfn-0.0.1.tar.gz.
File metadata
- Download URL: flowfn-0.0.1.tar.gz
- Upload date:
- Size: 25.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d7020e17eb12a6fef3a325703e6298690fefd5be0f79660625863433389cb546
|
|
| MD5 |
eccaeef538cec387e95b3089bfefa07b
|
|
| BLAKE2b-256 |
6d4281dffb1883ab282be451204219ea0b92c2a98c91f7bfc6b1b01cf34a8b36
|
File details
Details for the file flowfn-0.0.1-py3-none-any.whl.
File metadata
- Download URL: flowfn-0.0.1-py3-none-any.whl
- Upload date:
- Size: 30.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5866cbd5d8120ee4b0e4b54d8e79e4fca643cc735adfaa1d6c5d4ff77b6837bf
|
|
| MD5 |
98706d7eacb86f73e3c23e85379656f4
|
|
| BLAKE2b-256 |
9e39ae07f3f1d2c6c8304b1013ee75c4d186b6d72cd8c4572bb9fc1184b37704
|