Skip to main content

Celery support for Flask without breaking PyCharm inspections.

Project description

Even though the Flask documentation says Celery extensions are unnecessary now, I found that I still need an extension to properly use Celery in large Flask applications. Specifically I need an init_app() method to initialize Celery after I instantiate it.

This extension also comes with a single_instance method using Redis locks.

Currently only works with Redis backends.

Attribution

Single instance decorator inspired by Ryan Roemer.

Supported Platforms

Probably works on other versions too.

Quickstart

Install:

pip install Flask-Celery-Helper

Example:

# example.py
from flask import Flask
from flask.ext.celery import Celery

app = Flask('example')
app.config['REDIS_URL'] = 'redis://localhost'
celery = Celery(app)

@celery.task()
def add_together(a, b):
    return a + b

if __name__ == '__main__':
    result = add_together.delay(23, 42)
    print(result.get())

Run these two commands in separate terminals:

celery -A example.celery worker
python example.py

Factory Example

# extensions.py
from flask.ext.celery import Celery

celery = Celery()
# application.py
from flask import Flask
from extensions import celery

def create_app():
    app = Flask(__name__)
    app.config['CELERY_IMPORTS'] = ('tasks.add_together', )
    app.config['REDIS_URL'] = 'redis://localhost'
    celery.init_app(app)
    return app
# tasks.py
from extensions import celery

@celery.task()
def add_together(a, b):
    return a + b
# manage.py
from application import create_app

app = create_app()
app.run()

Single Instance Example

# example.py
import time
from flask import Flask
from flask.ext.celery import Celery, single_instance
from flask.ext.redis import Redis

app = Flask('example')
app.config['REDIS_URL'] = 'redis://localhost'
celery = Celery(app)
Redis(app)

@celery.task(bind=True)
@single_instance
def sleep_one_second(a, b):
    time.sleep(1)
    return a + b

if __name__ == '__main__':
    task1 = sleep_one_second.delay(23, 42)
    time.sleep(0.1)
    task2 = sleep_one_second.delay(20, 40)
    results1 = task1.get(propagate=False)
    results2 = task2.get(propagate=False)
    print(results1)  # 65
    if isinstance(results2, Exception) and str(results2) == 'Failed to acquire lock.':
        print('Another instance is already running.')
    else:
        print(results2)  # Should not happen.

Changelog

0.2.0

  • Added include_args argument to single_instance.

0.1.0

  • Initial release.

Project details


Release history Release notifications

History Node

1.1.0

History Node

1.0.0

History Node

0.2.2

History Node

0.2.1

This version
History Node

0.2.0

History Node

0.1.0

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Filename, size & hash SHA256 hash help File type Python version Upload date
Flask-Celery-Helper-0.2.0.tar.gz (5.6 kB) Copy SHA256 hash SHA256 Source None Jun 18, 2014

Supported by

Elastic Elastic Search Pingdom Pingdom Monitoring Google Google BigQuery Sentry Sentry Error logging CloudAMQP CloudAMQP RabbitMQ AWS AWS Cloud computing Fastly Fastly CDN DigiCert DigiCert EV certificate StatusPage StatusPage Status page