Skip to main content

Type-safe Pydantic serialization for Celery tasks

Project description

celery-typed

PyPI version Python Support License: MIT

Type-safe Pydantic serialization for Celery tasks

A low-friction library that enables seamless Pydantic model serialization in Celery tasks using Kombu's preserializer functionality. Pass Pydantic models directly to your Celery tasks without any special decorators or manual serialization.

Features

  • Zero-friction integration - No special task decorators or parameters needed
  • Type-safe serialization - Full Pydantic model support with proper type hints
  • Automatic registration - One-line setup in your Celery configuration
  • Production ready - Based on Kombu's proven serialization system
  • Extensible - Clean protocol-based design for custom preserializers

Installation

pip install celery-typed

Or with uv:

uv add celery-typed

Quick Start

1. Register the preserializer in your Celery app

# core/celery.py
from celery import Celery
from celery_typed import register_pydantic_serializer

app = Celery("myapp")
register_pydantic_serializer()  # One line setup!

# ... rest of your Celery configuration

2. Use Pydantic models directly in tasks

from pydantic import BaseModel
from myapp.celery import app

class UserModel(BaseModel):
    id: int
    name: str
    email: str

class ProcessingResult(BaseModel):
    status: str
    message: str
    user_id: int

@app.task
def process_user(user: UserModel) -> ProcessingResult:
    # Your task logic here
    return ProcessingResult(
        status="success",
        message=f"Processed user {user.name}",
        user_id=user.id
    )

3. Call tasks with Pydantic model instances

# Pass model instances directly
user = UserModel(id=1, name="Alice", email="alice@example.com")
result = process_user.delay(user)

# Get back a proper Pydantic model instance
processing_result: ProcessingResult = result.get()
print(processing_result.status)  # "success"

Comparison with Official Pydantic Support

Before (Official Celery + Pydantic)

@app.task(pydantic=True)  # Must remember this decorator
def process_user(user_data: dict) -> dict:  # Work with dicts
    user = UserModel(**user_data)  # Manual conversion
    # ... process user
    return result.model_dump()  # Manual serialization

# Usage
result = process_user.delay({'id': 1, 'name': 'Alice'})  # Pass dict
result_dict = result.get()  # Get back dict
result_obj = ProcessingResult(**result_dict)  # Manual conversion

After (celery-typed)

@app.task  # No special decorator needed
def process_user(user: UserModel) -> ProcessingResult:  # Real type hints
    # ... process user
    return result  # Return model directly

# Usage  
user = UserModel(id=1, name='Alice', email='alice@example.com')
result = process_user.delay(user)  # Pass model instance
result_obj = result.get()  # Get back model instance

How It Works

celery-typed uses Kombu's register_type functionality to automatically serialize and deserialize Pydantic models. When you pass a Pydantic model to a task:

  1. Serialization: The model is packed into a JSON-serializable format containing:

    • Module path and class name for reconstruction
    • The model's data via model_dump()
  2. Transmission: Standard Celery message passing (no changes needed)

  3. Deserialization: The worker reconstructs the original model instance using the stored class information and data

This approach is based on the excellent implementation described in Dosu's blog post.

API Reference

register_pydantic_serializer()

Convenience function to register Pydantic model serialization for all BaseModel subclasses.

from celery_typed import register_pydantic_serializer

# Call once in your Celery app configuration
register_pydantic_serializer()

register_preserializer(preserializer)

Decorator factory for registering custom preserializers.

from celery_typed import register_preserializer, Preserializer

class MyCustomPreserializer:
    @classmethod
    def compatible_with(cls, type_: type) -> bool:
        return isinstance(type_, MyCustomType)
    
    @classmethod 
    def pack(cls, obj):
        return {"data": obj.serialize()}
        
    @classmethod
    def unpack(cls, data):
        return MyCustomType.deserialize(data["data"])

# Register for your custom type
register_preserializer(MyCustomPreserializer)(MyCustomType)

Preserializer Protocol

Protocol defining the interface for custom preserializers:

class Preserializer(Protocol):
    @classmethod
    def compatible_with(cls, type_: type) -> Literal[True]:
        """Check if type is compatible with this preserializer"""
        
    @classmethod
    def pack(cls, obj: Any) -> Any:
        """Pack object into JSON-serializable format"""
        
    @classmethod
    def unpack(cls, data: Any) -> object:
        """Unpack data back into original object"""

Requirements

  • Python 3.10+
  • Celery 5.2+
  • Kombu 5.2+
  • Pydantic 2.0+

Development

This project uses uv for dependency management. To set up a development environment:

# Clone the repository
git clone https://github.com/nwcell/celery-typed.git
cd celery-typed

# Install dependencies
uv sync --dev

# Run tests
uv run pytest

# Run linting
uv run ruff check
uv run ruff format

# Type checking
uv run mypy src/

Contributing

Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.

Make sure to:

  • Add tests for any new functionality
  • Update documentation as needed
  • Follow the existing code style (enforced by Ruff)
  • Ensure all tests and type checks pass

License

This project is licensed under the MIT License - see the LICENSE file for details.

Credits

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

celery_typed-0.2.0.tar.gz (48.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

celery_typed-0.2.0-py3-none-any.whl (8.2 kB view details)

Uploaded Python 3

File details

Details for the file celery_typed-0.2.0.tar.gz.

File metadata

  • Download URL: celery_typed-0.2.0.tar.gz
  • Upload date:
  • Size: 48.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.9.4

File hashes

Hashes for celery_typed-0.2.0.tar.gz
Algorithm Hash digest
SHA256 b1e59bd6df5cb92d5645bd21194322182a4bfb7da8cdd93d161a7bb8c1c8a16a
MD5 2c6eb87a2855a626863aabdf60178f71
BLAKE2b-256 30949ec81bb24957bf1a87b1f7b6f72e5c53f004d3f8f97072a98b3381fb818e

See more details on using hashes here.

File details

Details for the file celery_typed-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for celery_typed-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 5930864be79e0643a0f0a6d855b3ff8e2e4e319779e5b38747a6fe92a0ceb335
MD5 6871ed9e90057bc3b7b43b764d4b842a
BLAKE2b-256 85bd58d82660045193331b7e86cbebf1769d69cd5d0dba901f6344de6bf894ea

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page