Skip to main content

Celery support for Flask without breaking PyCharm inspections.

Project description

Flask-Celery-Tools

This is a fork of Flask-Celery-Helper

Even though the Flask documentation says Celery extensions are
unnecessary now, I found that I still need an extension to properly use Celery in large Flask applications. Specifically
I need an init_app() method to initialize Celery after I instantiate it.

This extension also comes with a single_instance method.

  • Python PyPy, 3.11, 3.12 and 3.13 supported.

Tests

Attribution

Single instance decorator inspired by Ryan Roemer.

Quickstart

Install:

pip install Flask-Celery-Tools  

Examples

Basic Example

# example.py  
from flask import Flask  
from flask_celery import Celery  

app = Flask('example')  
app.config['CELERY_BROKER_URL'] = 'redis://localhost'  
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost'  
app.config['CELERY_TASK_LOCK_BACKEND'] = 'redis://localhost'  
celery = Celery(app)  

@celery.task()  
def add_together(a: int, b: int) -> int:  
    return a + b  

if __name__ == '__main__':  
    result = add_together.delay(23, 42)  
    print(result.get())  

Run these two commands in separate terminals:

celery -A example.celery worker
python example.py

Factory Example

# extensions.py
from flask_celery import Celery

celery = Celery()
# application.py
from flask import Flask
from extensions import celery

def create_app() -> Flask:
    app = Flask(__name__)
    app.config['CELERY_IMPORTS'] = ('tasks.add_together', )
    app.config['CELERY_BROKER_URL'] = 'redis://localhost'
    app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost'
    app.config['CELERY_TASK_LOCK_BACKEND'] = 'redis://localhost'
    celery.init_app(app)
    return app
# tasks.py
from extensions import celery

@celery.task()
def add_together(a: int, b: int) -> int:
    return a + b
# manage.py
from application import create_app

app = create_app()
app.run()

Custom Task Base Class

You can provide your own base task class to Celery() or init_app(). It must extend celery.Task and will be used as the base for the internal FlaskTask, so Flask context injection is preserved while your custom behaviour (retry logic, logging, etc.) is inherited by all tasks.

# example.py
from celery import Task
from flask import Flask
from flask_celery import Celery

class MyBaseTask(Task):
    abstract = True

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print(f"Task {task_id} failed: {exc}")

app = Flask('example')
app.config['CELERY_BROKER_URL'] = 'redis://localhost'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost'
celery = Celery(app, task_cls=MyBaseTask)

@celery.task()
def add_together(a: int, b: int) -> int:
    return a + b

The same parameter is available on init_app() for the factory pattern. A value passed to init_app() takes precedence over one passed to Celery().

celery = Celery(task_cls=FallbackTask)
celery.init_app(app, task_cls=MyBaseTask)  # MyBaseTask wins

Single Instance Example

# example.py
import time
from flask import Flask
from flask_celery import Celery, single_instance
from flask_redis import Redis

app = Flask('example')
app.config['REDIS_URL'] = 'redis://localhost'
app.config['CELERY_BROKER_URL'] = 'redis://localhost'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost'
app.config['CELERY_TASK_LOCK_BACKEND'] = 'redis://localhost'
celery = Celery(app)
Redis(app)

@celery.task(bind=True)
@single_instance
def sleep_one_second(a: int, b: int) -> int:
    time.sleep(1)
    return a + b

if __name__ == '__main__':
    task1 = sleep_one_second.delay(23, 42)
    time.sleep(0.1)
    task2 = sleep_one_second.delay(20, 40)
    results1 = task1.get(propagate=False)
    results2 = task2.get(propagate=False)
    print(results1)  # 65
    if isinstance(results2, Exception) and str(results2) == 'Failed to acquire lock.':
        print('Another instance is already running.')
    else:
        print(results2)  # Should not happen.

Locking backends

Flask-Celery-Tools supports multiple locking backends you can use:

Filesystem

Filesystem locking backend is using file locks on filesystem where worker is running, WARNING this backend is not usable for distributed tasks!!!

Redis

Redis backend is using redis for storing task locks, this backend is good for distributed tasks.

Database (MariaDB, PostgreSQL, etc)

Database backend is using database supported by SqlAlchemy to store task locks, this backend is good for distributed tasks. Except sqlite database that have same limitations as filesystem backend.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flask_celery_tools-1.6.2.tar.gz (14.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flask_celery_tools-1.6.2-py3-none-any.whl (14.2 kB view details)

Uploaded Python 3

File details

Details for the file flask_celery_tools-1.6.2.tar.gz.

File metadata

  • Download URL: flask_celery_tools-1.6.2.tar.gz
  • Upload date:
  • Size: 14.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.5

File hashes

Hashes for flask_celery_tools-1.6.2.tar.gz
Algorithm Hash digest
SHA256 a09fe59648653c77257f028766e923241985882e47af14913f8e14a16965bda1
MD5 26af833b50f9520643b75f80190b3923
BLAKE2b-256 d06dcf46fa1f59112c69efbafa4bfa25905b856632c52b5d01282a841cf677b3

See more details on using hashes here.

File details

Details for the file flask_celery_tools-1.6.2-py3-none-any.whl.

File metadata

File hashes

Hashes for flask_celery_tools-1.6.2-py3-none-any.whl
Algorithm Hash digest
SHA256 05bbe95dc071f7de82e9528c94923969c06fcc6055ac21aba7166108483446f4
MD5 dede75286183da8d3ffc006d8e120016
BLAKE2b-256 807624831b019b9e0d8610f70b452189b444709f7bbf2b164a1c7d4b659deb32

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page