Async-first Celery integration for Pico-IoC. Enables defining Celery tasks as async methods on IoC-managed components, with automatic discovery, dependency injection, and container-scoped execution.
Project description
๐ฆ pico-celery
Pico-Celery
Pico-Celery integrates Pico-IoC with Celery 5, giving you true inversion of control for background task execution.
It lets you define Celery tasks as async methods inside IoC-managed components, with automatic discovery, dependency injection, and container-scoped execution.
๐ Requires Python 3.10+ โก Async-native: tasks run as real
async def, with no thread pools ๐ง Works with Celery 5.x ๐งฉ Full constructor-based DI ๐ Perfect for FastAPI apps, worker daemons, and distributed pipelines
With pico-celery, you get predictable scoping, a clean separation of concerns, and a unified dependency model across HTTP, CLI, and background execution.
๐ฏ Why pico-celery?
Celery is powerful, but typical usage introduces:
- Module-level tasks
- Global Celery apps
- No dependency injection
- Shared mutable state
- Difficult testing setups
pico-celery fixes all of that:
- Tasks become async methods inside components
- Dependency injection is constructor-based
- Task handlers are resolved through Pico-IoC
- Each execution receives a fresh instance (
prototypescope) - Workers bootstrap the IoC container exactly once
- No global state, no magic imports, no tight coupling
| Feature | Default Celery | pico-celery |
|---|---|---|
| Task Definition | Global functions | Component methods |
| Dependency Injection | None | Constructor injection |
| State Isolation | Manual | Automatic (prototype scope) |
| Testability | Hard | Container-managed |
| Async Tasks | Requires custom pools | First-class async |
| Task Clients | Manual (app.send_task) |
Declarative (@send_task) |
๐งฑ Core Features
@taskdecorator for async component methods@celeryand@send_taskdecorators for declarative, injectable clients- Auto-Discovery: Automatically loaded when using
pico-stack(v0.2.0+). - Automatic task discovery inside Pico-IoC
- Dependency injection for all task handlers
- Container-scoped execution (
prototypeby default) - Async-safe task execution wrappers
- Unified config via
CelerySettings - Method interception for client-side task sending
๐ฆ Installation
pip install pico-celery
You will also need:
pip install pico-ioc celery
If using Redis (recommended):
pip install celery[redis]
๐ Zero-Config with Pico-Stack
If you are using pico-stack, you do not need to manually register "pico_celery" in your modules list. It is automatically discovered via entry points.
from pico_stack import init
# pico_celery is automatically loaded!
container = init(modules=[__name__, "my_app"])
If you are using standard pico-ioc, follow the manual registration in the example below.
๐ Quick Example
This example shows both a worker and a client that sends the task.
1. Define a Task Component (Worker)
This component defines the task logic and its dependencies.
# my_app/tasks.py
from pico_ioc import component
from pico_celery import task
from my_app.services import UserService # Your business logic
@component(scope="prototype")
class UserTasks:
def __init__(self, user_service: UserService):
self.user_service = user_service
@task(name="tasks.create_user")
async def create_user(self, username: str, email: str) -> dict:
# Real async logic with injected dependencies
user = await self.user_service.create(username, email)
return user.to_dict()
2. Define a Task Client (Sender)
This is a declarative client that your web API (e.g., FastAPI) can inject and use.
# my_app/clients.py
from pico_celery import celery, send_task, CeleryClient
@celery # Marks it as a pico-celery client component
class UserTaskClient(CeleryClient):
@send_task(name="tasks.create_user")
def create_user(self, username: str, email: str):
# This body is never executed.
# pico-celery intercepts the call and sends it to Celery.
pass
3. Create the Worker Entrypoint
This file (worker.py) is what Celery will use to boot up.
# my_app/worker.py
from pico_ioc import init, configuration, DictSource
from celery import Celery
# Your application's configuration (broker, backend, etc.)
cfg = configuration(DictSource({
"celery": {
"broker_url": "redis://localhost:6379/0",
"backend_url": "redis://localhost:6379/1"
}
}))
# Modules to scan for @component, @task, @celery
# NOTE: If using pico-stack, 'pico_celery' can be omitted here.
modules = [
"pico_celery",
"my_app.services",
"my_app.tasks",
"my_app.clients"
]
# Initialize the container
container = init(modules=modules, config=cfg)
# Get the IoC-managed Celery app
# The PicoTaskRegistrar has already found and registered
# the 'tasks.create_user' task.
celery_app = container.get(Celery)
4. Run the Worker
You will need an async pool like eventlet or gevent.
# Install the pool: pip install eventlet
celery -A my_app.worker:celery_app worker -P eventlet -l info
5. Use the Client in your API
Your web API (e.g., FastAPI) can now inject the UserTaskClient and use it.
# my_app/main.py
from fastapi import FastAPI
from pico_ioc import init
from my_app.clients import UserTaskClient
from my_app.worker import container # Reuse the worker's container
app = FastAPI()
@app.post("/users/")
async def create_user_endpoint(username: str, email: str):
# Resolve the client from the container
client = await container.aget(UserTaskClient)
# Call the client method
# This sends the task to Celery and returns an AsyncResult
result = client.create_user(username, email)
return {"message": "Task submitted", "task_id": result.id}
๐ Task Execution Semantics (Worker)
When Celery receives a task:
Celery Worker
โ
Async Wrapper (generated by PicoTaskRegistrar)
โ
await container.aget(UserTasks) (Resolves component + dependencies)
โ
component_instance.create_user(...) (Executes your async method)
โ
await self.user_service.create(...)
โ
'prototype' scope is destroyed
Key benefits:
- True async execution.
- No global state.
- Fully injected services.
- Guaranteed isolation via
prototypescope.
๐งช Testing with Pico-IoC
You can test your task logic just like any other component, with no Celery worker needed.
import pytest
from pico_ioc import init, configuration, DictSource
from my_app.tasks import UserTasks
from unittest.mock import AsyncMock, MagicMock
# Mock the dependencies
@pytest.fixture
def mock_user_service():
service = AsyncMock()
service.create.return_value = MagicMock(to_dict=lambda: {"id": 1})
return service
@pytest.mark.asyncio
async def test_user_task_logic(mock_user_service):
cfg = configuration(DictSource({}))
# Initialize the container with only the task
container = init(modules=[UserTasks], config=cfg)
# Register the mocked dependency
container.register_instance(mock_user_service)
# Resolve the task component
task_component = await container.aget(UserTasks)
# Call the async method directly
result = await task_component.create_user("test", "test@example.com")
# Assert the logic
assert result == {"id": 1}
mock_user_service.create.assert_called_with("test", "test@example.com")
await container.cleanup_all_async()
โ๏ธ How It Works
@task(indecorators.py) flagsasyncmethods inside components.PicoTaskRegistrar(inregistrar.py) is a component that scans IoC metadata upon configuration.- For each
@taskmethod found, it generates an async wrapper. - This wrapper is what gets registered with Celery (
celery_app.task(...)). - When Celery executes the task, it invokes the wrapper, which in turn uses
await container.aget(Component)to get a fresh instance (thanks toprototype) and then calls your original method, ensuring DI. @send_task(inclient.py) flags methods on client classes.@celery(inclient.py) applies an interceptor (CeleryClientInterceptor) to all methods flagged with@send_task.- When you call a client method (e.g.,
client.create_user(...)), the interceptor activates, extracts the@send_taskmetadata (like the task name) and the call arguments, and executesself._celery.send_task(...)on your behalf.
๐ก Architecture Overview
pico-celery manages both sides: the Worker (execution) and the Client (sending).
Worker Flow (Task Execution)
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Celery Worker โ
โโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโ
โ
Async Wrapper (from pico-celery)
โ
โโโโโโโโโโโโโโโผโโโโโโโโโโโโโโ
โ pico-celery โ
โ (@task, Registrar, Scopes) โ
โโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโ
โ
IoC Resolution (await aget)
โ
โโโโโโโโโโโโโโโผโโโโโโโโโโโโโโ
โ Pico-IoC โ
โ (Container, Scopes, DI) โ
โโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโ
โ
Your Business Logic (Services, Repos)
Client Flow (Task Sending)
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Your App (e.g., FastAPI) โ
โโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโ
โ
Call to: client.create_user(...)
โ
โโโโโโโโโโโโโโโผโโโโโโโโโโโโโโ
โ pico-celery โ
โ (@celery, @send_task, Interceptor)
โโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโ
โ
Intercepts call and transforms it into:
celery_app.send_task("tasks.create_user", ...)
โ
โโโโโโโโโโโโโโโผโโโโโโโโโโโโโโ
โ Broker (e.g., Redis) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
๐ License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pico_celery-0.2.0.tar.gz.
File metadata
- Download URL: pico_celery-0.2.0.tar.gz
- Upload date:
- Size: 25.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d9267c0313bac689df62f208aa1fa92bfb8ca6833a4d7b06b18ea291dae238ba
|
|
| MD5 |
11449f948a9ca20c453fdddefd8ca8d5
|
|
| BLAKE2b-256 |
f58cdabfb6f5bb9cb45fbf3b47cc5a3686b769fe2fb6fba01708a0bcb799eb0a
|
File details
Details for the file pico_celery-0.2.0-py3-none-any.whl.
File metadata
- Download URL: pico_celery-0.2.0-py3-none-any.whl
- Upload date:
- Size: 11.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0d30a880d5c22d4ce43c201368d685da3131a2458e0d4e70bdb84d6cf3807f9e
|
|
| MD5 |
224cc9eec0d632bcdf2f7673ed4bcad3
|
|
| BLAKE2b-256 |
66bb3115cd74bf3161df4491ba80b4b65c5cc97a762a98dd4912277116ea5114
|