Skip to main content

Celery support for Flask without breaking PyCharm inspections.

Project description

Even though the Flask documentation says Celery extensions are unnecessary now, I found that I still need an extension to properly use Celery in large Flask applications. Specifically I need an init_app() method to initialize Celery after I instantiate it.

This extension also comes with a single_instance method using Redis locks.

Currently only works with Redis backends.

Attribution

Single instance decorator inspired by Ryan Roemer.

Supported Platforms

Probably works on other versions too.

Quickstart

Install:

pip install Flask-Celery-Helper

Example:

# example.py
from flask import Flask
from flask.ext.celery import Celery

app = Flask('example')
app.config['REDIS_URL'] = 'redis://localhost'
celery = Celery(app)

@celery.task()
def add_together(a, b):
    return a + b

if __name__ == '__main__':
    result = add_together.delay(23, 42)
    print(result.get())

Run these two commands in separate terminals:

celery -A example.celery worker
python example.py

Factory Example

# extensions.py
from flask.ext.celery import Celery

celery = Celery()
# application.py
from flask import Flask
from extensions import celery

def create_app():
    app = Flask(__name__)
    app.config['CELERY_IMPORTS'] = ('tasks.add_together', )
    app.config['REDIS_URL'] = 'redis://localhost'
    celery.init_app(app)
    return app
# tasks.py
from extensions import celery

@celery.task()
def add_together(a, b):
    return a + b
# manage.py
from application import create_app

app = create_app()
app.run()

Single Instance Example

# example.py
import time
from flask import Flask
from flask.ext.celery import Celery, single_instance
from flask.ext.redis import Redis

app = Flask('example')
app.config['REDIS_URL'] = 'redis://localhost'
celery = Celery(app)
Redis(app)

@celery.task(bind=True)
@single_instance
def sleep_one_second(a, b):
    time.sleep(1)
    return a + b

if __name__ == '__main__':
    task1 = sleep_one_second.delay(23, 42)
    time.sleep(0.1)
    task2 = sleep_one_second.delay(20, 40)
    results1 = task1.get(propagate=False)
    results2 = task2.get(propagate=False)
    print(results1)  # 65
    if isinstance(results2, Exception) and str(results2) == 'Failed to acquire lock.':
        print('Another instance is already running.')
    else:
        print(results2)  # Should not happen.

Changelog

0.2.1

  • Fixed single_instance arguments with functools.

0.2.0

  • Added include_args argument to single_instance.

0.1.0

  • Initial release.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

Flask-Celery-Helper-0.2.1.tar.gz (5.7 kB view details)

Uploaded Source

File details

Details for the file Flask-Celery-Helper-0.2.1.tar.gz.

File metadata

File hashes

Hashes for Flask-Celery-Helper-0.2.1.tar.gz
Algorithm Hash digest
SHA256 139c796254e41cbb964f9dcb1fd1dea61406a644e110e8c0c1d5622ac58af7e3
MD5 0acce955ccc1c08c9d9cc1a3b24c5e8c
BLAKE2b-256 b0bec1a5f427452a4455c195067bc1ffca035d63928db1a938782b9016b5e38c

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page