Skip to main content

Celery Application and Task Loader Examples

Project description

Examples for Celery Applications and Task Loading

This is an example for running Celery applications that demonstrates how to load multiple task modules from different files using environment variables. It also shows how to utilize a get celery config approach that allows for decoupling inclusion of tasks from the initialization of the Celery application. It also includes a derived Celery task called CustomTask for showing how to handle on_success and on_failure task control events. I find this easier to manage my Celery workers and the applications that publish tasks to them.

This is not an official Celery project, it is just examples I use to test my workers and tasks.

I hope you find it valuable.

Install

pip install celery-loaders

Start Redis

This is only needed if redis is not running already on port 6739. It also requires having docker and docker-compose available to your user.

./run-redis.sh

Start Celery Worker

Please run this from the base repository directory or the module paths to the Celery tasks will fail.

./run-celery-loaders-worker.sh
Starting Worker=celery_worker
celery worker -A celery_worker -c 1 -l INFO -n default@%h
2018-02-24 13:01:49,843 - worker - INFO - start - worker
2018-02-24 13:01:49,843 - worker - INFO - broker=redis://localhost:6379/9 backend=redis://localhost:6379/10 include_tasks=['celery_loaders.work_tasks.tasks', 'celery_loaders.work_tasks.always_fails_tasks']
2018-02-24 13:01:49,844 - get_celery_app - INFO - creating celery app=worker tasks=['celery_loaders.work_tasks.tasks', 'celery_loaders.work_tasks.always_fails_tasks']
2018-02-24 13:01:49,845 - worker - INFO - starting celery

-------------- default@dev v4.1.0 (latentcall)
---- **** -----
--- * ***  * -- Linux-4.13.0-16-generic-x86_64-with-Ubuntu-17.10-artful 2018-02-24 13:01:49
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         worker:0x7f384346e550
- ** ---------- .> transport:   redis://localhost:6379/9
- ** ---------- .> results:     redis://localhost:6379/10
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
. celery_loaders.work_tasks.always_fails_tasks.always_fails
. celery_loaders.work_tasks.tasks.do_some_work

2018-02-24 13:01:49,956 - celery.worker.consumer.connection - INFO - Connected to redis://localhost:6379/9
2018-02-24 13:01:49,963 - celery.worker.consumer.mingle - INFO - mingle: searching for neighbors
2018-02-24 13:01:50,981 - celery.worker.consumer.mingle - INFO - mingle: all alone
2018-02-24 13:01:51,013 - celery.apps.worker - INFO - default@dev ready.

Run Success and Failure Tests

Run this in another terminal with the celery_loaders pip available to the python 3 runtime.

./run-celery-loaders-tests.py
2018-02-24 13:01:56,768 - worker - INFO - start - worker
2018-02-24 13:01:56,768 - worker - INFO - broker=redis://localhost:6379/9 backend=redis://localhost:6379/10 include_tasks=['celery_loaders.work_tasks.tasks', 'celery_loaders.work_tasks.always_fails_tasks']
2018-02-24 13:01:56,768 - worker - INFO - broker=redis://localhost:6379/9 backend=redis://localhost:6379/10
2018-02-24 13:01:56,769 - get_celery_app - INFO - creating celery app=worker tasks=['celery_loaders.work_tasks.tasks', 'celery_loaders.work_tasks.always_fails_tasks']
2018-02-24 13:01:56,781 - worker - INFO - calling task - success testing
2018-02-24 13:01:56,822 - worker - INFO - calling task=celery_loaders.work_tasks.tasks.do_some_work - success job_id=3dcd5066-46fe-43cc-b0c6-8cff5499a7b1
2018-02-24 13:01:56,822 - worker - INFO - calling task - failure testing
2018-02-24 13:01:56,823 - worker - INFO - calling task=celery_loaders.work_tasks.always_fails_tasks.always_fails - failure job_id=c6f91e65-f541-40ad-9226-eaf97b223723
2018-02-24 13:01:56,823 - worker - INFO - end - worker

Confirm the Celery Worker Processed the Tasks

2018-02-24 13:01:56,822 - celery.worker.strategy - INFO - Received task: celery_loaders.work_tasks.tasks.do_some_work[3dcd5066-46fe-43cc-b0c6-8cff5499a7b1]
2018-02-24 13:01:56,824 - tasks - INFO - task - do_some_work - start work_dict={'user_id': 1}
2018-02-24 13:01:56,824 - tasks - INFO - task - do_some_work - done
2018-02-24 13:01:56,828 - custom_task - INFO - custom_task SUCCESS - retval=True task_id=3dcd5066-46fe-43cc-b0c6-8cff5499a7b1 args=[{'user_id': 1}] kwargs={}
2018-02-24 13:01:56,828 - celery.app.trace - INFO - Task celery_loaders.work_tasks.tasks.do_some_work[3dcd5066-46fe-43cc-b0c6-8cff5499a7b1] succeeded in 0.004043873999762582s: True
2018-02-24 13:01:57,007 - celery.worker.strategy - INFO - Received task: celery_loaders.work_tasks.always_fails_tasks.always_fails[c6f91e65-f541-40ad-9226-eaf97b223723]
2018-02-24 13:01:57,009 - always_fails_tasks - INFO - task - always_fails - start work_dict={'test_failure': 'Should fail now 2018-02-24T13:01:56.781481'}
2018-02-24 13:01:57,010 - custom_task - ERROR - custom_task FAIL - exc=Should fail now 2018-02-24T13:01:56.781481 args=[{'test_failure': 'Should fail now 2018-02-24T13:01:56.781481'}] kwargs={}
2018-02-24 13:01:57,010 - celery.app.trace - ERROR - Task celery_loaders.work_tasks.always_fails_tasks.always_fails[c6f91e65-f541-40ad-9226-eaf97b223723] raised unexpected: Exception('Should fail now 2018-02-24T13:01:56.781481',)
Traceback (most recent call last):
File "/home/jay/.venvs/celeryloaders/lib/python3.6/site-packages/celery/app/trace.py", line 374, in trace_task
    R = retval = fun(*args, **kwargs)
File "/home/jay/.venvs/celeryloaders/lib/python3.6/site-packages/celery/app/trace.py", line 629, in __protected_call__
    return self.run(*args, **kwargs)
File "/opt/redten/stack/jwt/webapp/celery_examples/build/lib/celery_loaders/work_tasks/always_fails_tasks.py", line 32, in always_fails
    "simulating a failure"))
Exception: Should fail now 2018-02-24T13:01:56.781481

Debug a Celery Task from the Command Line

  1. Start a named Celery Task and pass in data from a file containing a testing JSON payload

    ./run-celery-task.py -f tests/data/user_lookup_test.json -t celery_loaders.work_tasks.tasks.do_some_work -w 5.0
    2018-02-26 00:11:50,192 - run-celery-task - INFO - start - run-celery-task
    2018-02-26 00:11:50,192 - run-celery-task - INFO - connecting Celery=run-celery-task broker=redis://localhost:6379/9 backend=redis://localhost:6379/10 tasks=['celery_loaders.work_tasks.tasks']
    2018-02-26 00:11:50,192 - get_celery_app - INFO - creating celery app=run-celery-task tasks=['celery_loaders.work_tasks.tasks']
    2018-02-26 00:11:50,207 - run-celery-task - INFO - app.broker_url=redis://localhost:6379/9 calling task=celery_loaders.work_tasks.tasks.do_some_work data={'user_id': 1}
    2018-02-26 00:11:50,264 - run-celery-task - INFO - calling task=celery_loaders.work_tasks.tasks.do_some_work - started job_id=561c80bc-0bab-4387-a842-4372d11c8291
    2018-02-26 00:11:50,264 - run-celery-task - INFO - task=561c80bc-0bab-4387-a842-4372d11c8291 - waiting seconds=5.0 for results
    2018-02-26 00:11:50,268 - run-celery-task - INFO - task=celery_loaders.work_tasks.tasks.do_some_work - success job_id=561c80bc-0bab-4387-a842-4372d11c8291 task_result={'job_results': 'some response key=c60adfdc-e27c-41f4-8440-666be599ab4a'}
    2018-02-26 00:11:50,268 - run-celery-task - INFO - end - run-celery-task
  2. Verify the Worker Processed the Task

    2018-02-26 00:11:50,265 - celery.worker.strategy - INFO - Received task: celery_loaders.work_tasks.tasks.do_some_work[561c80bc-0bab-4387-a842-4372d11c8291]
    2018-02-26 00:11:50,266 - tasks - INFO - task - do_some_work - start work_dict={'user_id': 1}
    2018-02-26 00:11:50,266 - tasks - INFO - task - {'job_results': 'some response key=c60adfdc-e27c-41f4-8440-666be599ab4a'} - result=do_some_work done
    2018-02-26 00:11:50,267 - custom_task - INFO - custom_task SUCCESS - retval={'job_results': 'some response key=c60adfdc-e27c-41f4-8440-666be599ab4a'} task_id=561c80bc-0bab-4387-a842-4372d11c8291 args=[{'user_id': 1}] kwargs={}
    2018-02-26 00:11:50,268 - celery.app.trace - INFO - Task celery_loaders.work_tasks.tasks.do_some_work[561c80bc-0bab-4387-a842-4372d11c8291] succeeded in 0.001375271000142675s: {'job_results': 'some response key=c60adfdc-e27c-41f4-8440-666be599ab4a'}

Development

virtualenv -p python3 ~/.venvs/celeryloaders && source ~/.venvs/celeryloaders/bin/activate && pip install -e .

Run tests

python setup.py test

Linting

flake8 .
pycodestyle .

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

celery-loaders-1.0.8.tar.gz (8.8 kB view details)

Uploaded Source

Built Distribution

celery_loaders-1.0.8-py2.py3-none-any.whl (15.2 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file celery-loaders-1.0.8.tar.gz.

File metadata

  • Download URL: celery-loaders-1.0.8.tar.gz
  • Upload date:
  • Size: 8.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.12.1 pkginfo/1.4.2 requests/2.19.1 setuptools/40.4.3 requests-toolbelt/0.8.0 tqdm/4.26.0 CPython/3.6.5

File hashes

Hashes for celery-loaders-1.0.8.tar.gz
Algorithm Hash digest
SHA256 11519b71f7abf6d4503b0638db59710666042d3d26635a3158d9af3c4403704a
MD5 4fc7b898628c6bf2d53882f0c57cd041
BLAKE2b-256 6d9bf72f7628e5518ddb00b10ddd7b324fc75f93c396d13a431c3ab5f2e5ce11

See more details on using hashes here.

File details

Details for the file celery_loaders-1.0.8-py2.py3-none-any.whl.

File metadata

  • Download URL: celery_loaders-1.0.8-py2.py3-none-any.whl
  • Upload date:
  • Size: 15.2 kB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.12.1 pkginfo/1.4.2 requests/2.19.1 setuptools/40.4.3 requests-toolbelt/0.8.0 tqdm/4.26.0 CPython/3.6.5

File hashes

Hashes for celery_loaders-1.0.8-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 d631fbec13fcc3f53315c8273383a278c7e8196e323604fec090cba8c75e65ba
MD5 ef38d3b5aa2b0fbd14217f46b4d929d5
BLAKE2b-256 e7ccf79e48dfd78a3680d1f0ca65276078c440851632930f0649d7b0e8b7bc92

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page