Skip to main content

Distributed job queues, event streams, and workflow orchestration for Python

Project description

FlowFn Python SDK

Distributed job queues, event streams, and workflow orchestration for Python.

Features

  • Queues: Background job processing with retries, priorities, and batch processing
  • Streams: Pub/sub event streaming with consumer groups and replay
  • Workflows: Multi-step orchestration with state management and compensation
  • Patterns: Rate limiting, batching, priority queues, circuit breakers
  • Storage: Pluggable storage backends (Memory, Redis, PostgreSQL)
  • Monitoring: Health checks, metrics, and event tracking

Installation

pip install flowfn

With optional adapters:

pip install flowfn[redis]     # Redis adapter
pip install flowfn[postgres]  # PostgreSQL adapter
pip install flowfn[all]       # All adapters

Quick Start

Queue Example

from flowfn import create_flow

flow = create_flow(adapter='memory')

# Create a queue
queue = flow.queue('emails')

# Add jobs
await queue.add('send-welcome', {
    'to': 'user@example.com',
    'template': 'welcome'
})

# Process jobs
@queue.process()
async def process_email(job):
    await send_email(job.data['to'], job.data['template'])
    return {'sent': True}

Stream Example

# Create a stream
stream = flow.stream('events')

# Publish events
await stream.publish({
    'type': 'user.created',
    'user_id': '123'
})

# Subscribe to events
@stream.subscribe()
async def handle_event(message):
    print(f"Received: {message.data}")
    await message.ack()

Workflow Example

# Define a workflow
workflow = (
    flow.workflow('process-order')
    .step('validate', validate_order)
    .step('charge', charge_payment)
    .step('fulfill', fulfill_order)
    .build()
)

# Execute workflow
execution = await workflow.execute({
    'order_id': '123',
    'amount': 99.99
})

Documentation

Development

# Install development dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Format code
black flowfn tests

# Type checking
mypy flowfn

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flowfn-0.0.1.tar.gz (25.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flowfn-0.0.1-py3-none-any.whl (30.7 kB view details)

Uploaded Python 3

File details

Details for the file flowfn-0.0.1.tar.gz.

File metadata

  • Download URL: flowfn-0.0.1.tar.gz
  • Upload date:
  • Size: 25.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for flowfn-0.0.1.tar.gz
Algorithm Hash digest
SHA256 d7020e17eb12a6fef3a325703e6298690fefd5be0f79660625863433389cb546
MD5 eccaeef538cec387e95b3089bfefa07b
BLAKE2b-256 6d4281dffb1883ab282be451204219ea0b92c2a98c91f7bfc6b1b01cf34a8b36

See more details on using hashes here.

File details

Details for the file flowfn-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: flowfn-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 30.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for flowfn-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 5866cbd5d8120ee4b0e4b54d8e79e4fca643cc735adfaa1d6c5d4ff77b6837bf
MD5 98706d7eacb86f73e3c23e85379656f4
BLAKE2b-256 9e39ae07f3f1d2c6c8304b1013ee75c4d186b6d72cd8c4572bb9fc1184b37704

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page