Async task executor built on asyncio - a lightweight Celery alternative
Project description
Flowrra
Lightweight async task queue built on pure Python asyncio. A Celery-inspired background job executor with zero dependencies, featuring retries, priority queues, and pluggable backends.
📚 Read the Documentation - Complete guides, API reference, and tutorials
Features
- Zero Dependencies - Built on pure Python 3.11+ asyncio (only aiosqlite required for scheduler)
- Simple API - Decorator-based task registration, just like Celery
- Async First - Native async/await support for I/O-bound tasks
- CPU-Bound Support - ProcessPoolExecutor integration for compute-heavy tasks
- Task Scheduling - Celery Beat-like persistent scheduling with cron, intervals, and one-time tasks
- Web UI - Built-in monitoring dashboard for FastAPI, Flask/Quart, and Django
- Automatic Retries - Configurable retry logic with exponential backoff
- Priority Queues - Control task execution order
- Pluggable Backends - Extensible result storage (in-memory included)
- Multiple Databases - SQLite, PostgreSQL, and MySQL support for scheduler
- Type Safe - Full type hints for better IDE support
Installation
pip install flowrra
With Redis backend support (for distributed execution):
pip install flowrra[redis]
With PostgreSQL scheduler support:
pip install flowrra[postgresql]
With MySQL scheduler support:
pip install flowrra[mysql]
With Web UI support:
# FastAPI
pip install flowrra[ui-fastapi]
# Flask/Quart
pip install flowrra[ui-flask]
# Django
pip install flowrra[ui-django]
# All UI adapters
pip install flowrra[ui]
With all optional dependencies:
pip install flowrra[all]
For development:
pip install flowrra[dev]
Quick Start
Basic Usage
import asyncio
from flowrra import Flowrra
# Create Flowrra application
app = Flowrra.from_urls()
# Register I/O-bound tasks (async functions)
@app.task()
async def send_email(to: str, subject: str):
"""Send an email asynchronously."""
await asyncio.sleep(0.1) # Simulate email sending
print(f"Email sent to {to}: {subject}")
return {"status": "sent", "to": to}
@app.task(max_retries=5, retry_delay=2.0)
async def fetch_data(url: str):
"""Fetch data with automatic retries."""
# Your HTTP request logic here
return {"data": "..."}
# Execute tasks
async def main():
async with app:
# Submit tasks
task_id = await app.submit(send_email, "user@example.com", "Hello")
# Wait for result
result = await app.wait_for_result(task_id, timeout=10.0)
print(f"Result: {result.result}")
asyncio.run(main())
CPU-Bound Tasks
For CPU-intensive computations, use cpu_bound=True with sync functions:
import asyncio
from flowrra import Flowrra
# CPU tasks require Redis backend for cross-process result sharing
app = Flowrra.from_urls(backend="redis://localhost:6379/0")
# Register CPU-bound task (sync function, not async!)
@app.task(cpu_bound=True)
def compute_heavy(n: int):
"""CPU-intensive computation runs in separate process."""
return sum(i ** 2 for i in range(n))
async def main():
async with app:
task_id = await app.submit(compute_heavy, 1000000)
result = await app.wait_for_result(task_id, timeout=10.0)
print(f"Result: {result.result}")
asyncio.run(main())
Note: CPU-bound tasks require a Redis backend. Install with pip install flowrra[redis] and ensure Redis is running.
Distributed Task Queue
Use Redis broker for distributed task queueing across multiple workers:
from flowrra import Flowrra
# Redis broker queues tasks, Redis backend stores results
app = Flowrra.from_urls(
broker="redis://localhost:6379/0", # Task queue
backend="redis://localhost:6379/1" # Result storage
)
@app.task()
async def process_job(job_id: int):
# Workers pull tasks from Redis queue
return f"Processed job {job_id}"
Task Scheduling
Schedule tasks to run periodically using cron expressions, intervals, or one-time execution:
from flowrra import Flowrra
import asyncio
# Create app and define tasks
app = Flowrra.from_urls()
@app.task()
async def send_daily_report():
print("Generating daily report...")
return "Report sent"
@app.task()
async def cleanup_old_data():
print("Cleaning up old data...")
return "Cleanup complete"
# Create integrated scheduler (automatically connected to app's executors)
scheduler = app.create_scheduler()
async def main():
# Schedule tasks
await scheduler.schedule_cron(
task_name="send_daily_report",
cron="0 9 * * *", # Every day at 9 AM
description="Daily report generation"
)
await scheduler.schedule_interval(
task_name="cleanup_old_data",
interval=3600, # Every hour
description="Hourly cleanup"
)
# Start app and scheduler
await app.start()
await scheduler.start()
# Keep running
await asyncio.Event().wait()
asyncio.run(main())
Scheduling options:
- Cron:
schedule_cron(task_name, cron="0 9 * * *")- Standard cron syntax - Interval:
schedule_interval(task_name, interval=300)- Every N seconds - One-time:
schedule_once(task_name, run_at=datetime)- Run once at specific time
Database backends:
- SQLite (default):
get_scheduler_backend() - PostgreSQL:
get_scheduler_backend("postgresql://user:pass@host/db") - MySQL:
get_scheduler_backend("mysql://user:pass@host/db")
See SCHEDULER.md for complete documentation.
Core Concepts
Task Registration
Register tasks using the @app.task() decorator:
I/O-Bound Tasks - Async functions for network, file I/O, API calls:
from flowrra import Flowrra
app = Flowrra.from_urls()
@app.task(name="custom_name", max_retries=3, retry_delay=1.0)
async def my_io_task(arg1: str, arg2: int):
# Must be async for I/O-bound tasks
await asyncio.sleep(1)
return result
CPU-Bound Tasks - Sync functions for heavy computations:
from flowrra import Flowrra
# Requires Redis backend for cross-process result sharing
app = Flowrra.from_urls(backend="redis://localhost:6379/0")
@app.task(cpu_bound=True, max_retries=3, retry_delay=1.0)
def my_cpu_task(arg1: str, arg2: int):
# Must be sync (not async) for CPU-bound tasks
return compute_result(arg1, arg2)
Parameters:
name- Custom task name (defaults to function name)max_retries- Number of retry attempts on failure (default: 3)retry_delay- Seconds between retries (default: 1.0)
Task Submission
Submit tasks for execution and get a unique task ID:
# Submit with positional arguments
task_id = await app.submit(my_task, "hello", 42)
# Submit with keyword arguments
task_id = await app.submit(my_task, arg1="hello", arg2=42)
# Submit with priority (lower number = higher priority)
task_id = await app.submit(my_task, "hello", 42, priority=1)
Result Retrieval
Get task results using the task ID:
# Wait for result with timeout
result = await app.wait_for_result(task_id, timeout=30.0)
# Check result status
if result.status == TaskStatus.SUCCESS:
print(f"Success: {result.result}")
elif result.status == TaskStatus.FAILED:
print(f"Failed: {result.error}")
# Get result without waiting (returns None if not ready)
result = await app.get_result(task_id)
Redis Backend for Distributed Execution
For production deployments and distributed task execution, configure Redis when creating your Flowrra app:
from flowrra import Flowrra
# Flowrra with Redis backend and broker
app = Flowrra.from_urls(
broker="redis://localhost:6379/0", # Task queue
backend="redis://localhost:6379/1" # Result storage
)
# Task registration works the same
@app.task()
async def my_io_task(data: str):
return await process(data)
@app.task(cpu_bound=True)
def my_cpu_task(n: int):
return compute(n)
Supported Redis connection strings:
- Basic:
redis://localhost:6379/0 - With password:
redis://:password@localhost:6379/0 - With username:
redis://username:password@localhost:6379/0 - SSL/TLS:
rediss://localhost:6379/0 - Unix socket:
unix:///tmp/redis.sock
Important:
- CPU-bound tasks must be regular functions (not async)
- Tasks must be defined at module level for ProcessPoolExecutor pickling
- CPU-bound tasks require Redis backend for cross-process result sharing
- Install Redis support:
pip install flowrra[redis]
Framework Integration
FastAPI Integration
Perfect for background tasks in FastAPI applications:
from fastapi import FastAPI
from flowrra import Flowrra
from contextlib import asynccontextmanager
# Initialize Flowrra app
flowrra_app = Flowrra.from_urls()
@flowrra_app.task(max_retries=3)
async def send_welcome_email(email: str, username: str):
"""Send welcome email to new users."""
# Your email logic here
await asyncio.sleep(1)
return {"email": email, "sent": True}
@flowrra_app.task()
async def process_upload(file_path: str, user_id: int):
"""Process uploaded file in background."""
# Your processing logic
return {"processed": True, "file": file_path}
# Lifespan context manager for startup/shutdown
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: Start Flowrra
await flowrra_app.start()
yield
# Shutdown: Stop Flowrra
await flowrra_app.stop()
# Create FastAPI app with lifespan
app = FastAPI(lifespan=lifespan)
@app.post("/register")
async def register_user(email: str, username: str):
"""Register user and send welcome email."""
# Save user to database...
# Submit background task
task_id = await flowrra_app.submit(send_welcome_email, email, username)
return {
"user_id": 123,
"email_task_id": task_id,
"message": "Registration successful"
}
@app.post("/upload")
async def upload_file(file_path: str, user_id: int):
"""Upload file and process in background."""
# Save file...
# Submit processing task
task_id = await flowrra_app.submit(process_upload, file_path, user_id)
return {
"task_id": task_id,
"status": "processing"
}
@app.get("/task/{task_id}")
async def get_task_status(task_id: str):
"""Check task status."""
result = await flowrra_app.get_result(task_id)
if result is None:
return {"status": "not_found"}
return {
"task_id": task_id,
"status": result.status.value,
"result": result.result,
"error": result.error
}
Django Integration
Use Flowrra in Django applications with async views:
# tasks.py
import asyncio
from flowrra import Flowrra
# Initialize Flowrra app (consider using django.conf for settings)
app = Flowrra.from_urls()
@app.task(max_retries=3, retry_delay=2.0)
async def send_notification(user_id: int, message: str):
"""Send notification to user."""
from myapp.models import User
from asgiref.sync import sync_to_async
# Use sync_to_async for Django ORM queries
user = await sync_to_async(User.objects.get)(id=user_id)
# Send notification logic...
return {"user": user.email, "sent": True}
@app.task()
async def generate_report(report_id: int):
"""Generate report asynchronously."""
from myapp.models import Report
from asgiref.sync import sync_to_async
report = await sync_to_async(Report.objects.get)(id=report_id)
# Generate report...
report.status = "completed"
await sync_to_async(report.save)()
return {"report_id": report_id, "status": "completed"}
# views.py
from django.http import JsonResponse
from asgiref.sync import async_to_sync
from .tasks import app, send_notification, generate_report
# Option 1: Async view (Django 4.1+)
async def create_notification(request):
"""Async view to create notification."""
user_id = request.POST.get('user_id')
message = request.POST.get('message')
task_id = await app.submit(send_notification, int(user_id), message)
return JsonResponse({
"task_id": task_id,
"status": "submitted"
})
# Option 2: Sync view with async_to_sync
def create_report(request):
"""Sync view using async task."""
report_id = request.POST.get('report_id')
# Wrap async call with async_to_sync
task_id = async_to_sync(app.submit)(generate_report, int(report_id))
return JsonResponse({
"task_id": task_id,
"status": "processing"
})
async def check_task(request, task_id):
"""Check task status."""
result = await app.get_result(task_id)
if result is None:
return JsonResponse({"error": "Task not found"}, status=404)
return JsonResponse({
"task_id": task_id,
"status": result.status.value,
"result": result.result
})
# asgi.py
import os
from django.core.asgi import get_asgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
# Start Flowrra on application startup
from .tasks import app
import asyncio
application = get_asgi_application()
# Start Flowrra when Django starts
async def startup():
await app.start()
async def shutdown():
await app.stop()
# Run startup
asyncio.create_task(startup())
Flask Integration
Use Flowrra with Flask and asyncio:
from flask import Flask, jsonify, request
from flowrra import Flowrra
import asyncio
from functools import wraps
flask_app = Flask(__name__)
# Initialize Flowrra app
flowrra_app = Flowrra.from_urls()
# Helper to run async functions in Flask
def async_route(f):
@wraps(f)
def wrapped(*args, **kwargs):
return asyncio.run(f(*args, **kwargs))
return wrapped
# Register tasks
@flowrra_app.task(max_retries=3)
async def send_email_task(to: str, subject: str, body: str):
"""Send email asynchronously."""
await asyncio.sleep(1) # Simulate email sending
return {"to": to, "subject": subject, "sent": True}
@flowrra_app.task()
async def process_data_task(data: dict):
"""Process data in background."""
# Your processing logic
await asyncio.sleep(2)
return {"processed": True, "items": len(data)}
# Flask routes
@flask_app.route('/send-email', methods=['POST'])
@async_route
async def send_email():
"""Submit email task."""
data = request.get_json()
task_id = await flowrra_app.submit(
send_email_task,
data['to'],
data['subject'],
data['body']
)
return jsonify({
"task_id": task_id,
"status": "submitted"
})
@flask_app.route('/process', methods=['POST'])
@async_route
async def process_data():
"""Submit processing task."""
data = request.get_json()
task_id = await flowrra_app.submit(process_data_task, data)
return jsonify({
"task_id": task_id,
"status": "processing"
})
@flask_app.route('/task/<task_id>', methods=['GET'])
@async_route
async def get_task_status(task_id):
"""Get task status and result."""
result = await flowrra_app.get_result(task_id)
if result is None:
return jsonify({"error": "Task not found"}), 404
return jsonify({
"task_id": task_id,
"status": result.status.value,
"result": result.result,
"error": result.error,
"retries": result.retries
})
# Application lifecycle
@flask_app.before_serving
@async_route
async def before_serving():
"""Start Flowrra before serving requests."""
await flowrra_app.start()
print("Flowrra started")
@flask_app.after_serving
@async_route
async def after_serving():
"""Stop Flowrra after serving."""
await flowrra_app.stop()
print("Flowrra stopped")
if __name__ == '__main__':
# For development (use Gunicorn/Uvicorn for production)
flask_app.run(debug=True)
Note: For production Flask apps with Flowrra, consider using an ASGI server like Uvicorn or Hypercorn instead of the default WSGI server.
Web UI
Flowrra includes built-in web interfaces for monitoring and managing your tasks. Mount the UI into your existing web application to get instant visibility into task execution, schedules, and system health.
Features
- 📊 Dashboard - System overview with real-time statistics
- 📋 Task Management - View registered tasks and execution history
- 📅 Schedule Management - Create and manage cron schedules
- 🔧 JSON API - RESTful endpoints for programmatic access
- ⚡ Multiple Frameworks - FastAPI, Flask/Quart, and Django support
- 🔌 WebSocket Support - Optional real-time task updates (no page refresh needed)
Quick Start
FastAPI:
from fastapi import FastAPI
from flowrra import Flowrra
from flowrra.ui.fastapi import create_router
app = FastAPI()
flowrra = Flowrra.from_urls()
# Mount Flowrra UI
app.include_router(create_router(flowrra), prefix="/flowrra")
# Visit: http://localhost:8000/flowrra/
Flask/Quart:
from quart import Quart
from flowrra import Flowrra
from flowrra.ui.flask import create_blueprint
app = Quart(__name__)
flowrra = Flowrra.from_urls()
# Mount Flowrra UI
app.register_blueprint(create_blueprint(flowrra), url_prefix="/flowrra")
# Visit: http://localhost:8000/flowrra/
Django:
# urls.py
from django.urls import path, include
from flowrra.ui.django import get_urls
urlpatterns = [
path('flowrra/', include(get_urls(flowrra))),
]
# Visit: http://localhost:8000/flowrra/
Installation
# FastAPI
pip install flowrra[ui-fastapi]
# Flask/Quart
pip install flowrra[ui-flask]
# Django
pip install flowrra[ui-django]
# All UI adapters
pip install flowrra[ui]
API Endpoints
All UI adapters provide these JSON API endpoints:
GET /api/stats- System statisticsGET /api/health- Health checkGET /api/tasks- List registered tasksGET /api/tasks/{name}- Task detailsGET /api/schedules- List schedulesPOST /api/schedules/cron- Create schedulePUT /api/schedules/{id}/enable- Enable schedulePUT /api/schedules/{id}/disable- Disable scheduleDELETE /api/schedules/{id}- Delete schedule
Real-Time Updates (WebSocket)
The UI supports optional WebSocket connections for real-time task updates without page refreshes:
- FastAPI: WebSocket support is built-in and enabled automatically
- Django: Requires Django-Channels setup (see WEBSOCKET_SETUP.md)
- Flask/Quart: Requires migration to Quart (see WEBSOCKET_SETUP.md)
Without WebSocket, the frontend automatically falls back to polling every 15 seconds. Both modes work seamlessly!
Full Documentation
See UI.md for complete documentation including:
- Detailed integration guides for each framework
- API endpoint reference
- Customization and styling
- Authentication setup
- Production deployment
- Troubleshooting
See docs/WEBSOCKET_SETUP.md for WebSocket configuration:
- Django-Channels setup guide
- Flask to Quart migration
- Testing WebSocket connections
- Production deployment considerations
Advanced Usage
Priority Queues
Control task execution order with priorities:
# Lower priority number = higher priority
await app.submit(critical_task, priority=1) # Executes first
await app.submit(normal_task, priority=5) # Executes second
await app.submit(low_priority_task, priority=10) # Executes last
Custom Retry Logic
Configure retry behavior per task:
@app.task(max_retries=5, retry_delay=2.0)
async def flaky_api_call(url: str):
"""API call with aggressive retries."""
# Will retry up to 5 times with 2 second delay
async with httpx.AsyncClient() as client:
response = await client.get(url)
response.raise_for_status()
return response.json()
Error Handling
Handle task failures gracefully:
@app.task(max_retries=3)
async def risky_operation():
"""Operation that might fail."""
if random.random() < 0.5:
raise ValueError("Random failure")
return "success"
# Check result
result = await app.wait_for_result(task_id, timeout=10.0)
if result.status == TaskStatus.FAILED:
print(f"Task failed after {result.retries} retries")
print(f"Error: {result.error}")
elif result.status == TaskStatus.SUCCESS:
print(f"Task succeeded: {result.result}")
Custom Backends
Flowrra includes a Redis backend, or you can implement your own. For most use cases, we recommend using Redis:
from flowrra import Flowrra
# Recommended: Use Redis backend with connection string
app = Flowrra.from_urls(
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1"
)
# Or with Config
from flowrra import Config, BrokerConfig, BackendConfig
config = Config(
broker=BrokerConfig(url="redis://localhost:6379/0"),
backend=BackendConfig(url="redis://localhost:6379/1", ttl=3600)
)
app = Flowrra(config=config)
Built-in backends:
InMemoryBackend- Default, single-process (automatically used when no backend specified)RedisBackend- Production-ready, distributed (via connection string)
Configuration
Flowrra Configuration
Using URLs (recommended):
from flowrra import Flowrra
# Simple in-memory configuration
app = Flowrra.from_urls()
# With Redis backend and broker
app = Flowrra.from_urls(
broker="redis://localhost:6379/0", # Task queue
backend="redis://localhost:6379/1" # Result storage
)
Using Config objects (advanced):
from flowrra import Flowrra, Config, BrokerConfig, BackendConfig, ExecutorConfig
config = Config(
broker=BrokerConfig(
url="redis://localhost:6379/0",
max_connections=50,
socket_timeout=5.0
),
backend=BackendConfig(
url="redis://localhost:6379/1",
ttl=3600, # Result expiration in seconds
max_connections=50
),
executor=ExecutorConfig(
num_workers=10, # Async workers for I/O tasks
cpu_workers=4, # Process workers for CPU tasks
max_queue_size=1000,
max_retries=3,
retry_delay=1.0
)
)
app = Flowrra(config=config)
Task Options
@app.task(
name="custom_name", # Custom task name (defaults to function name)
max_retries=3, # Number of retry attempts (default: 3)
retry_delay=1.0, # Seconds between retries (default: 1.0)
cpu_bound=False, # True for CPU-bound, False for I/O-bound (default)
)
async def my_task(): # async for I/O-bound, sync for cpu_bound=True
pass
Best Practices
- Use async for I/O-bound tasks - Network requests, database queries, file I/O (default)
- Use cpu_bound=True for CPU-heavy tasks - Data processing, computations, image processing
- Use Redis for production - Install
flowrra[redis]and configure broker/backend URLs - Set appropriate retry values - Balance reliability with resource usage
- Use priority queues - Critical tasks get priority=1, normal tasks priority=5
- Handle failures gracefully - Check result status before using result
- Use context managers - Ensures proper cleanup with
async with app: - Module-level CPU tasks - Define CPU-bound tasks at module level for pickling
Performance Tips
- Tune worker count - Start with CPU core count for I/O workers
- Batch similar tasks - Group related operations for efficiency
- Use timeouts - Prevent tasks from hanging indefinitely
- Monitor queue size - Track pending tasks with
executor.pending_count() - Profile bottlenecks - Identify slow tasks and optimize
Use Flowrra when:
- Building async-first applications
- Want zero dependencies
- Don't need distributed workers
- Prefer simplicity over scalability
API Reference
Flowrra
Unified application for I/O-bound and CPU-bound task execution:
class Flowrra:
def __init__(self, config: Config | None = None)
@classmethod
def from_urls(
cls,
broker: str | None = None,
backend: str | None = None,
) -> "Flowrra"
def task(
self,
name: str | None = None,
cpu_bound: bool = False, # False = I/O-bound (async), True = CPU-bound (sync)
max_retries: int = 3,
retry_delay: float = 1.0,
**kwargs
) -> Callable
async def submit(
self,
task_func: Callable,
*args,
priority: int = 0,
**kwargs
) -> str
async def wait_for_result(
self,
task_id: str,
timeout: float | None = None
) -> TaskResult
async def get_result(self, task_id: str) -> TaskResult | None
async def start(self) -> None
async def stop(self, wait: bool = True, timeout: float = 30.0) -> None
@property
def is_running(self) -> bool
Config
Configuration for Flowrra application:
@dataclass
class Config:
broker: BrokerConfig | None = None
backend: BackendConfig | None = None
executor: ExecutorConfig = field(default_factory=ExecutorConfig)
@classmethod
def from_env(cls, prefix: str = "FLOWRRA_") -> "Config"
@dataclass
class BrokerConfig:
url: str
max_connections: int = 50
socket_timeout: float = 5.0
retry_on_timeout: bool = True
@dataclass
class BackendConfig:
url: str
ttl: int | None = None
max_connections: int = 50
socket_timeout: float = 5.0
retry_on_timeout: bool = True
@dataclass
class ExecutorConfig:
num_workers: int = 4 # Async workers for I/O tasks
cpu_workers: int | None = None # Process workers for CPU tasks
max_queue_size: int = 1000
max_retries: int = 3
retry_delay: float = 1.0
TaskResult
@dataclass
class TaskResult:
task_id: str
status: TaskStatus
result: Any = None
error: str | None = None
started_at: datetime | None = None
finished_at: datetime | None = None
retries: int = 0
@property
def is_complete(self) -> bool
@property
def is_success(self) -> bool
TaskStatus
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
RETRYING = "retrying"
Development
Running Tests
# Install dev dependencies
pip install -e ".[dev]"
# Run all tests
pytest tests/
# Run with coverage
pytest tests/ --cov=flowrra --cov-report=html
# Run specific test
pytest tests/test_executor.py::TestTaskExecution::test_simple_task_execution
Code Quality
# Lint with ruff
ruff check src/ tests/
# Format code
ruff format src/ tests/
# Type check with mypy
mypy src/flowrra/
Requirements
- Python 3.11+
- Zero runtime dependencies (core functionality)
- Optional:
redis[hiredis]>=5.0.0for Redis backend support
License
Apache License 2.0 - see LICENSE file for details.
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
Links
- PyPI: https://pypi.org/project/flowrra/
- Documentation: https://flowrra.readthedocs.io/en/latest/
- GitHub: https://github.com/flowrra/flowrra
- Issues: https://github.com/flowrra/flowrra/issues
Acknowledgments
Inspired by Celery's elegant task queue design, built with modern Python async/await patterns.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file flowrra-0.1.2.tar.gz.
File metadata
- Download URL: flowrra-0.1.2.tar.gz
- Upload date:
- Size: 151.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5f8e6aba44cd3c754d64c8a87645a2041c77627965b70484edd90e37f2fede6b
|
|
| MD5 |
84da26ca7245628e41a2295845129c6e
|
|
| BLAKE2b-256 |
bbf2ca6a00d01a13c5718a79015a33ce2582c0d1d4e89d9e1d7d74bfbdb20a7e
|
File details
Details for the file flowrra-0.1.2-py3-none-any.whl.
File metadata
- Download URL: flowrra-0.1.2-py3-none-any.whl
- Upload date:
- Size: 99.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a6d747368254d78d371ce010fab35be1b3c5dcc82810f51327f188ca87044910
|
|
| MD5 |
e2526688a1de46269d928789b11c8b7c
|
|
| BLAKE2b-256 |
d3928d5d0261f7ffc176b37cbb749c7da1e091ab9ee8874ef76db33588060748
|