Skip to main content

Async-first Celery integration for Pico-IoC. Enables defining Celery tasks as async methods on IoC-managed components, with automatic discovery, dependency injection, and container-scoped execution.

Project description

๐Ÿ“ฆ pico-celery

PyPI Ask DeepWiki License: MIT CI (tox matrix) codecov Quality Gate Status Duplicated Lines (%) Maintainability Rating

Pico-Celery

Pico-Celery integrates Pico-IoC with Celery 5, giving you true inversion of control for background task execution.

It lets you define Celery tasks as async methods inside IoC-managed components, with automatic discovery, dependency injection, and container-scoped execution.

๐Ÿ Requires Python 3.10+ โšก Async-native: tasks run as real async def, with no thread pools ๐Ÿ”ง Works with Celery 5.x ๐Ÿงฉ Full constructor-based DI ๐Ÿš€ Perfect for FastAPI apps, worker daemons, and distributed pipelines

With pico-celery, you get predictable scoping, a clean separation of concerns, and a unified dependency model across HTTP, CLI, and background execution.


๐ŸŽฏ Why pico-celery?

Celery is powerful, but typical usage introduces:

  • Module-level tasks
  • Global Celery apps
  • No dependency injection
  • Shared mutable state
  • Difficult testing setups

pico-celery fixes all of that:

  • Tasks become async methods inside components
  • Dependency injection is constructor-based
  • Task handlers are resolved through Pico-IoC
  • Each execution receives a fresh instance (prototype scope)
  • Workers bootstrap the IoC container exactly once
  • No global state, no magic imports, no tight coupling
Feature Default Celery pico-celery
Task Definition Global functions Component methods
Dependency Injection None Constructor injection
State Isolation Manual Automatic (prototype scope)
Testability Hard Container-managed
Async Tasks Requires custom pools First-class async
Task Clients Manual (app.send_task) Declarative (@send_task)

๐Ÿงฑ Core Features

  • @task decorator for async component methods
  • @celery and @send_task decorators for declarative, injectable clients
  • Automatic task discovery inside Pico-IoC
  • Dependency injection for all task handlers
  • Container-scoped execution (prototype by default)
  • Async-safe task execution wrappers
  • Unified config via CelerySettings
  • Method interception for client-side task sending

๐Ÿ“ฆ Installation

pip install pico-celery

You will also need:

pip install pico-ioc celery

If using Redis (recommended):

pip install celery[redis]

๐Ÿš€ Quick Example

This example shows both a worker and a client that sends the task.

1. Define a Task Component (Worker)

This component defines the task logic and its dependencies.

# my_app/tasks.py
from pico_ioc import component
from pico_celery import task
from my_app.services import UserService # Your business logic

@component(scope="prototype")
class UserTasks:
    def __init__(self, user_service: UserService):
        self.user_service = user_service

    @task(name="tasks.create_user")
    async def create_user(self, username: str, email: str) -> dict:
        # Real async logic with injected dependencies
        user = await self.user_service.create(username, email)
        return user.to_dict()

2. Define a Task Client (Sender)

This is a declarative client that your web API (e.g., FastAPI) can inject and use.

# my_app/clients.py
from pico_celery import celery, send_task, CeleryClient

@celery # Marks it as a pico-celery client component
class UserTaskClient(CeleryClient):

    @send_task(name="tasks.create_user")
    def create_user(self, username: str, email: str):
        # This body is never executed.
        # pico-celery intercepts the call and sends it to Celery.
        pass

3. Create the Worker Entrypoint

This file (worker.py) is what Celery will use to boot up.

# my_app/worker.py
from pico_ioc import init, configuration, DictSource
from celery import Celery

# Your application's configuration (broker, backend, etc.)
cfg = configuration(DictSource({
    "celery": {
        "broker_url": "redis://localhost:6379/0",
        "backend_url": "redis://localhost:6379/1"
    }
}))

# Modules to scan for @component, @task, @celery
modules = [
    "pico_celery",
    "my_app.services",
    "my_app.tasks",
    "my_app.clients"
]

# Initialize the container
container = init(modules=modules, config=cfg)

# Get the IoC-managed Celery app
# The PicoTaskRegistrar has already found and registered
# the 'tasks.create_user' task.
celery_app = container.get(Celery)

4. Run the Worker

You will need an async pool like eventlet or gevent.

# Install the pool: pip install eventlet
celery -A my_app.worker:celery_app worker -P eventlet -l info

5. Use the Client in your API

Your web API (e.g., FastAPI) can now inject the UserTaskClient and use it.

# my_app/main.py
from fastapi import FastAPI
from pico_ioc import init
from my_app.clients import UserTaskClient
from my_app.worker import container # Reuse the worker's container

app = FastAPI()

@app.post("/users/")
async def create_user_endpoint(username: str, email: str):
    # Resolve the client from the container
    client = await container.aget(UserTaskClient)
    
    # Call the client method
    # This sends the task to Celery and returns an AsyncResult
    result = client.create_user(username, email)
    
    return {"message": "Task submitted", "task_id": result.id}

๐Ÿ”„ Task Execution Semantics (Worker)

When Celery receives a task:

Celery Worker
     โ†“
Async Wrapper (generated by PicoTaskRegistrar)
     โ†“
await container.aget(UserTasks)  (Resolves component + dependencies)
     โ†“
component_instance.create_user(...) (Executes your async method)
     โ†“
await self.user_service.create(...)
     โ†“
'prototype' scope is destroyed

Key benefits:

  • True async execution.
  • No global state.
  • Fully injected services.
  • Guaranteed isolation via prototype scope.

๐Ÿงช Testing with Pico-IoC

You can test your task logic just like any other component, with no Celery worker needed.

import pytest
from pico_ioc import init, configuration, DictSource
from my_app.tasks import UserTasks
from unittest.mock import AsyncMock, MagicMock

# Mock the dependencies
@pytest.fixture
def mock_user_service():
    service = AsyncMock()
    service.create.return_value = MagicMock(to_dict=lambda: {"id": 1})
    return service

@pytest.mark.asyncio
async def test_user_task_logic(mock_user_service):
    cfg = configuration(DictSource({}))
    
    # Initialize the container with only the task
    container = init(modules=[UserTasks], config=cfg)
    
    # Register the mocked dependency
    container.register_instance(mock_user_service)

    # Resolve the task component
    task_component = await container.aget(UserTasks)
    
    # Call the async method directly
    result = await task_component.create_user("test", "test@example.com")

    # Assert the logic
    assert result == {"id": 1}
    mock_user_service.create.assert_called_with("test", "test@example.com")
    
    await container.cleanup_all_async()

โš™๏ธ How It Works

  • @task (in decorators.py) flags async methods inside components.
  • PicoTaskRegistrar (in registrar.py) is a component that scans IoC metadata upon configuration.
  • For each @task method found, it generates an async wrapper.
  • This wrapper is what gets registered with Celery (celery_app.task(...)).
  • When Celery executes the task, it invokes the wrapper, which in turn uses await container.aget(Component) to get a fresh instance (thanks to prototype) and then calls your original method, ensuring DI.
  • @send_task (in client.py) flags methods on client classes.
  • @celery (in client.py) applies an interceptor (CeleryClientInterceptor) to all methods flagged with @send_task.
  • When you call a client method (e.g., client.create_user(...)), the interceptor activates, extracts the @send_task metadata (like the task name) and the call arguments, and executes self._celery.send_task(...) on your behalf.

๐Ÿ’ก Architecture Overview

pico-celery manages both sides: the Worker (execution) and the Client (sending).

Worker Flow (Task Execution)

       โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
       โ”‚        Celery Worker        โ”‚
       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                     โ”‚
         Async Wrapper (from pico-celery)
                     โ”‚
       โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
       โ”‚         pico-celery       โ”‚
       โ”‚  (@task, Registrar, Scopes) โ”‚
       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                     โ”‚
          IoC Resolution (await aget)
                     โ”‚
       โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
       โ”‚           Pico-IoC        โ”‚
       โ”‚ (Container, Scopes, DI)   โ”‚
       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                     โ”‚
      Your Business Logic (Services, Repos)

Client Flow (Task Sending)

       โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
       โ”‚      Your App (e.g., FastAPI) โ”‚
       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                     โ”‚
     Call to: client.create_user(...)
                     โ”‚
       โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
       โ”‚         pico-celery       โ”‚
       โ”‚ (@celery, @send_task, Interceptor)
       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                     โ”‚
      Intercepts call and transforms it into:
      celery_app.send_task("tasks.create_user", ...)
                     โ”‚
       โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
       โ”‚       Broker (e.g., Redis)  โ”‚
       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

๐Ÿ“ License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pico_celery-0.1.1.tar.gz (24.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pico_celery-0.1.1-py3-none-any.whl (10.6 kB view details)

Uploaded Python 3

File details

Details for the file pico_celery-0.1.1.tar.gz.

File metadata

  • Download URL: pico_celery-0.1.1.tar.gz
  • Upload date:
  • Size: 24.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for pico_celery-0.1.1.tar.gz
Algorithm Hash digest
SHA256 2d16f274292937efe6096a1053b43d73cd9833d0f8acdc4be346e826b7a12228
MD5 8f9d68aaf4187f5bcd1e405d38a119e5
BLAKE2b-256 c07f797622e5b352803fe8e6a595d5fba16e3ecbfb13d174c1f7cce443964536

See more details on using hashes here.

File details

Details for the file pico_celery-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: pico_celery-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 10.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for pico_celery-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 26ce8190d7a23af80e8cdff736639953c511e34608058a4a267bf9cad592e9c5
MD5 1b38e1e8e8f06c5eaa2b534d2dbb4c0f
BLAKE2b-256 3bca5f0790e2d6719441da132475234d58183d294e313f4f4691ff1fd0200583

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page