Celery Application and Task Loader Examples
Project description
Examples for Celery Applications and Task Loading
This is an example for running Celery applications that demonstrates how to load multiple task modules from different files using environment variables. It also shows how to utilize a get celery config approach that allows for decoupling inclusion of tasks from the initialization of the Celery application. It also includes a derived Celery task called CustomTask for showing how to handle on_success and on_failure task control events. I find this easier to manage my Celery workers and the applications that publish tasks to them.
This is not an official Celery project, it is just examples I use to test my workers and tasks.
I hope you find it valuable.
Install
pip install celery-loaders
Start Redis
This is only needed if redis is not running already on port 6739. It also requires having docker and docker-compose available to your user.
./run-redis.sh
Start Celery Worker
Please run this from the base repository directory or the module paths to the Celery tasks will fail.
./run-celery-loaders-worker.sh Starting Worker=celery_worker celery worker -A celery_worker -c 1 -l INFO -n default@%h 2018-02-24 13:01:49,843 - worker - INFO - start - worker 2018-02-24 13:01:49,843 - worker - INFO - broker=redis://localhost:6379/9 backend=redis://localhost:6379/10 include_tasks=['celery_loaders.work_tasks.tasks', 'celery_loaders.work_tasks.always_fails_tasks'] 2018-02-24 13:01:49,844 - get_celery_app - INFO - creating celery app=worker tasks=['celery_loaders.work_tasks.tasks', 'celery_loaders.work_tasks.always_fails_tasks'] 2018-02-24 13:01:49,845 - worker - INFO - starting celery -------------- default@dev v4.1.0 (latentcall) ---- **** ----- --- * *** * -- Linux-4.13.0-16-generic-x86_64-with-Ubuntu-17.10-artful 2018-02-24 13:01:49 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: worker:0x7f384346e550 - ** ---------- .> transport: redis://localhost:6379/9 - ** ---------- .> results: redis://localhost:6379/10 - *** --- * --- .> concurrency: 1 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . celery_loaders.work_tasks.always_fails_tasks.always_fails . celery_loaders.work_tasks.tasks.do_some_work 2018-02-24 13:01:49,956 - celery.worker.consumer.connection - INFO - Connected to redis://localhost:6379/9 2018-02-24 13:01:49,963 - celery.worker.consumer.mingle - INFO - mingle: searching for neighbors 2018-02-24 13:01:50,981 - celery.worker.consumer.mingle - INFO - mingle: all alone 2018-02-24 13:01:51,013 - celery.apps.worker - INFO - default@dev ready.
Run Success and Failure Tests
Run this in another terminal with the celery_loaders pip available to the python 3 runtime.
./run-celery-loaders-tests.py 2018-02-24 13:01:56,768 - worker - INFO - start - worker 2018-02-24 13:01:56,768 - worker - INFO - broker=redis://localhost:6379/9 backend=redis://localhost:6379/10 include_tasks=['celery_loaders.work_tasks.tasks', 'celery_loaders.work_tasks.always_fails_tasks'] 2018-02-24 13:01:56,768 - worker - INFO - broker=redis://localhost:6379/9 backend=redis://localhost:6379/10 2018-02-24 13:01:56,769 - get_celery_app - INFO - creating celery app=worker tasks=['celery_loaders.work_tasks.tasks', 'celery_loaders.work_tasks.always_fails_tasks'] 2018-02-24 13:01:56,781 - worker - INFO - calling task - success testing 2018-02-24 13:01:56,822 - worker - INFO - calling task=celery_loaders.work_tasks.tasks.do_some_work - success job_id=3dcd5066-46fe-43cc-b0c6-8cff5499a7b1 2018-02-24 13:01:56,822 - worker - INFO - calling task - failure testing 2018-02-24 13:01:56,823 - worker - INFO - calling task=celery_loaders.work_tasks.always_fails_tasks.always_fails - failure job_id=c6f91e65-f541-40ad-9226-eaf97b223723 2018-02-24 13:01:56,823 - worker - INFO - end - worker
Confirm the Celery Worker Processed the Tasks
2018-02-24 13:01:56,822 - celery.worker.strategy - INFO - Received task: celery_loaders.work_tasks.tasks.do_some_work[3dcd5066-46fe-43cc-b0c6-8cff5499a7b1] 2018-02-24 13:01:56,824 - tasks - INFO - task - do_some_work - start work_dict={'user_id': 1} 2018-02-24 13:01:56,824 - tasks - INFO - task - do_some_work - done 2018-02-24 13:01:56,828 - custom_task - INFO - custom_task SUCCESS - retval=True task_id=3dcd5066-46fe-43cc-b0c6-8cff5499a7b1 args=[{'user_id': 1}] kwargs={} 2018-02-24 13:01:56,828 - celery.app.trace - INFO - Task celery_loaders.work_tasks.tasks.do_some_work[3dcd5066-46fe-43cc-b0c6-8cff5499a7b1] succeeded in 0.004043873999762582s: True 2018-02-24 13:01:57,007 - celery.worker.strategy - INFO - Received task: celery_loaders.work_tasks.always_fails_tasks.always_fails[c6f91e65-f541-40ad-9226-eaf97b223723] 2018-02-24 13:01:57,009 - always_fails_tasks - INFO - task - always_fails - start work_dict={'test_failure': 'Should fail now 2018-02-24T13:01:56.781481'} 2018-02-24 13:01:57,010 - custom_task - ERROR - custom_task FAIL - exc=Should fail now 2018-02-24T13:01:56.781481 args=[{'test_failure': 'Should fail now 2018-02-24T13:01:56.781481'}] kwargs={} 2018-02-24 13:01:57,010 - celery.app.trace - ERROR - Task celery_loaders.work_tasks.always_fails_tasks.always_fails[c6f91e65-f541-40ad-9226-eaf97b223723] raised unexpected: Exception('Should fail now 2018-02-24T13:01:56.781481',) Traceback (most recent call last): File "/home/jay/.venvs/celeryloaders/lib/python3.6/site-packages/celery/app/trace.py", line 374, in trace_task R = retval = fun(*args, **kwargs) File "/home/jay/.venvs/celeryloaders/lib/python3.6/site-packages/celery/app/trace.py", line 629, in __protected_call__ return self.run(*args, **kwargs) File "/opt/redten/stack/jwt/webapp/celery_examples/build/lib/celery_loaders/work_tasks/always_fails_tasks.py", line 32, in always_fails "simulating a failure")) Exception: Should fail now 2018-02-24T13:01:56.781481
Debug a Celery Task from the Command Line
Start a named Celery Task and pass in data from a file containing a testing JSON payload
./run-celery-task.py -f tests/data/user_lookup_test.json -t celery_loaders.work_tasks.tasks.do_some_work -w 5.0 2018-02-26 00:11:50,192 - run-celery-task - INFO - start - run-celery-task 2018-02-26 00:11:50,192 - run-celery-task - INFO - connecting Celery=run-celery-task broker=redis://localhost:6379/9 backend=redis://localhost:6379/10 tasks=['celery_loaders.work_tasks.tasks'] 2018-02-26 00:11:50,192 - get_celery_app - INFO - creating celery app=run-celery-task tasks=['celery_loaders.work_tasks.tasks'] 2018-02-26 00:11:50,207 - run-celery-task - INFO - app.broker_url=redis://localhost:6379/9 calling task=celery_loaders.work_tasks.tasks.do_some_work data={'user_id': 1} 2018-02-26 00:11:50,264 - run-celery-task - INFO - calling task=celery_loaders.work_tasks.tasks.do_some_work - started job_id=561c80bc-0bab-4387-a842-4372d11c8291 2018-02-26 00:11:50,264 - run-celery-task - INFO - task=561c80bc-0bab-4387-a842-4372d11c8291 - waiting seconds=5.0 for results 2018-02-26 00:11:50,268 - run-celery-task - INFO - task=celery_loaders.work_tasks.tasks.do_some_work - success job_id=561c80bc-0bab-4387-a842-4372d11c8291 task_result={'job_results': 'some response key=c60adfdc-e27c-41f4-8440-666be599ab4a'} 2018-02-26 00:11:50,268 - run-celery-task - INFO - end - run-celery-task
Verify the Worker Processed the Task
2018-02-26 00:11:50,265 - celery.worker.strategy - INFO - Received task: celery_loaders.work_tasks.tasks.do_some_work[561c80bc-0bab-4387-a842-4372d11c8291] 2018-02-26 00:11:50,266 - tasks - INFO - task - do_some_work - start work_dict={'user_id': 1} 2018-02-26 00:11:50,266 - tasks - INFO - task - {'job_results': 'some response key=c60adfdc-e27c-41f4-8440-666be599ab4a'} - result=do_some_work done 2018-02-26 00:11:50,267 - custom_task - INFO - custom_task SUCCESS - retval={'job_results': 'some response key=c60adfdc-e27c-41f4-8440-666be599ab4a'} task_id=561c80bc-0bab-4387-a842-4372d11c8291 args=[{'user_id': 1}] kwargs={} 2018-02-26 00:11:50,268 - celery.app.trace - INFO - Task celery_loaders.work_tasks.tasks.do_some_work[561c80bc-0bab-4387-a842-4372d11c8291] succeeded in 0.001375271000142675s: {'job_results': 'some response key=c60adfdc-e27c-41f4-8440-666be599ab4a'}
Development
virtualenv -p python3 ~/.venvs/celeryloaders && source ~/.venvs/celeryloaders/bin/activate && pip install -e .
Run tests
python setup.py test
Linting
flake8 . pycodestyle .
License
Apache 2.0
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file celery-loaders-1.0.8.tar.gz
.
File metadata
- Download URL: celery-loaders-1.0.8.tar.gz
- Upload date:
- Size: 8.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.12.1 pkginfo/1.4.2 requests/2.19.1 setuptools/40.4.3 requests-toolbelt/0.8.0 tqdm/4.26.0 CPython/3.6.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 11519b71f7abf6d4503b0638db59710666042d3d26635a3158d9af3c4403704a |
|
MD5 | 4fc7b898628c6bf2d53882f0c57cd041 |
|
BLAKE2b-256 | 6d9bf72f7628e5518ddb00b10ddd7b324fc75f93c396d13a431c3ab5f2e5ce11 |
File details
Details for the file celery_loaders-1.0.8-py2.py3-none-any.whl
.
File metadata
- Download URL: celery_loaders-1.0.8-py2.py3-none-any.whl
- Upload date:
- Size: 15.2 kB
- Tags: Python 2, Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.12.1 pkginfo/1.4.2 requests/2.19.1 setuptools/40.4.3 requests-toolbelt/0.8.0 tqdm/4.26.0 CPython/3.6.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | d631fbec13fcc3f53315c8273383a278c7e8196e323604fec090cba8c75e65ba |
|
MD5 | ef38d3b5aa2b0fbd14217f46b4d929d5 |
|
BLAKE2b-256 | e7ccf79e48dfd78a3680d1f0ca65276078c440851632930f0649d7b0e8b7bc92 |