Celery Application and Task Loader Examples
Project description
Examples for Celery Applications and Task Loading
This is an example for running Celery applications that demonstrates how to load multiple task modules from different files using environment variables. It also shows how to utilize a get celery config approach that allows for decoupling inclusion of tasks from the initialization of the Celery application. It also includes a derived Celery task called CustomTask for showing how to handle on_success and on_failure task control events. I find this easier to manage my Celery workers and the applications that publish tasks to them.
This is not an official Celery project, it is just examples I use to test my workers and tasks.
I hope you find it valuable.
Install
pip install celery-loaders
Start Redis
This is only needed if redis is not running already on port 6739. It also requires having docker and docker-compose available to your user.
./run-redis.sh
Start Celery Worker
Please run this from the base repository directory or the module paths to the Celery tasks will fail.
./run-celery-loaders-worker.sh
Starting Worker=celery_worker
celery worker -A celery_worker -c 1 -l INFO -n default@%h
2018-02-24 13:01:49,843 - worker - INFO - start - worker
2018-02-24 13:01:49,843 - worker - INFO - broker=redis://localhost:6379/9 backend=redis://localhost:6379/10 include_tasks=['celery_loaders.work_tasks.tasks', 'celery_loaders.work_tasks.always_fails_tasks']
2018-02-24 13:01:49,844 - get_celery_app - INFO - creating celery app=worker tasks=['celery_loaders.work_tasks.tasks', 'celery_loaders.work_tasks.always_fails_tasks']
2018-02-24 13:01:49,845 - worker - INFO - starting celery
-------------- default@dev v4.1.0 (latentcall)
---- **** -----
--- * *** * -- Linux-4.13.0-16-generic-x86_64-with-Ubuntu-17.10-artful 2018-02-24 13:01:49
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: worker:0x7f384346e550
- ** ---------- .> transport: redis://localhost:6379/9
- ** ---------- .> results: redis://localhost:6379/10
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. celery_loaders.work_tasks.always_fails_tasks.always_fails
. celery_loaders.work_tasks.tasks.do_some_work
2018-02-24 13:01:49,956 - celery.worker.consumer.connection - INFO - Connected to redis://localhost:6379/9
2018-02-24 13:01:49,963 - celery.worker.consumer.mingle - INFO - mingle: searching for neighbors
2018-02-24 13:01:50,981 - celery.worker.consumer.mingle - INFO - mingle: all alone
2018-02-24 13:01:51,013 - celery.apps.worker - INFO - default@dev ready.
Run Success and Failure Tests
Run this in another terminal with the celery_loaders pip available to the python 3 runtime.
./run-celery-loaders-tests.py 2018-02-24 13:01:56,768 - worker - INFO - start - worker 2018-02-24 13:01:56,768 - worker - INFO - broker=redis://localhost:6379/9 backend=redis://localhost:6379/10 include_tasks=['celery_loaders.work_tasks.tasks', 'celery_loaders.work_tasks.always_fails_tasks'] 2018-02-24 13:01:56,768 - worker - INFO - broker=redis://localhost:6379/9 backend=redis://localhost:6379/10 2018-02-24 13:01:56,769 - get_celery_app - INFO - creating celery app=worker tasks=['celery_loaders.work_tasks.tasks', 'celery_loaders.work_tasks.always_fails_tasks'] 2018-02-24 13:01:56,781 - worker - INFO - calling task - success testing 2018-02-24 13:01:56,822 - worker - INFO - calling task=celery_loaders.work_tasks.tasks.do_some_work - success job_id=3dcd5066-46fe-43cc-b0c6-8cff5499a7b1 2018-02-24 13:01:56,822 - worker - INFO - calling task - failure testing 2018-02-24 13:01:56,823 - worker - INFO - calling task=celery_loaders.work_tasks.always_fails_tasks.always_fails - failure job_id=c6f91e65-f541-40ad-9226-eaf97b223723 2018-02-24 13:01:56,823 - worker - INFO - end - worker
Confirm the Celery Worker Processed the Tasks
2018-02-24 13:01:56,822 - celery.worker.strategy - INFO - Received task: celery_loaders.work_tasks.tasks.do_some_work[3dcd5066-46fe-43cc-b0c6-8cff5499a7b1]
2018-02-24 13:01:56,824 - tasks - INFO - task - do_some_work - start work_dict={'user_id': 1}
2018-02-24 13:01:56,824 - tasks - INFO - task - do_some_work - done
2018-02-24 13:01:56,828 - custom_task - INFO - custom_task SUCCESS - retval=True task_id=3dcd5066-46fe-43cc-b0c6-8cff5499a7b1 args=[{'user_id': 1}] kwargs={}
2018-02-24 13:01:56,828 - celery.app.trace - INFO - Task celery_loaders.work_tasks.tasks.do_some_work[3dcd5066-46fe-43cc-b0c6-8cff5499a7b1] succeeded in 0.004043873999762582s: True
2018-02-24 13:01:57,007 - celery.worker.strategy - INFO - Received task: celery_loaders.work_tasks.always_fails_tasks.always_fails[c6f91e65-f541-40ad-9226-eaf97b223723]
2018-02-24 13:01:57,009 - always_fails_tasks - INFO - task - always_fails - start work_dict={'test_failure': 'Should fail now 2018-02-24T13:01:56.781481'}
2018-02-24 13:01:57,010 - custom_task - ERROR - custom_task FAIL - exc=Should fail now 2018-02-24T13:01:56.781481 args=[{'test_failure': 'Should fail now 2018-02-24T13:01:56.781481'}] kwargs={}
2018-02-24 13:01:57,010 - celery.app.trace - ERROR - Task celery_loaders.work_tasks.always_fails_tasks.always_fails[c6f91e65-f541-40ad-9226-eaf97b223723] raised unexpected: Exception('Should fail now 2018-02-24T13:01:56.781481',)
Traceback (most recent call last):
File "/home/jay/.venvs/celeryloaders/lib/python3.6/site-packages/celery/app/trace.py", line 374, in trace_task
R = retval = fun(*args, **kwargs)
File "/home/jay/.venvs/celeryloaders/lib/python3.6/site-packages/celery/app/trace.py", line 629, in __protected_call__
return self.run(*args, **kwargs)
File "/opt/redten/stack/jwt/webapp/celery_examples/build/lib/celery_loaders/work_tasks/always_fails_tasks.py", line 32, in always_fails
"simulating a failure"))
Exception: Should fail now 2018-02-24T13:01:56.781481
Debug a Celery Task from the Command Line
Start a named Celery Task and pass in data from a file containing a testing JSON payload
./run-celery-task.py -f tests/data/user_lookup_test.json -t celery_loaders.work_tasks.tasks.do_some_work -w 5.0 2018-02-26 00:11:50,192 - run-celery-task - INFO - start - run-celery-task 2018-02-26 00:11:50,192 - run-celery-task - INFO - connecting Celery=run-celery-task broker=redis://localhost:6379/9 backend=redis://localhost:6379/10 tasks=['celery_loaders.work_tasks.tasks'] 2018-02-26 00:11:50,192 - get_celery_app - INFO - creating celery app=run-celery-task tasks=['celery_loaders.work_tasks.tasks'] 2018-02-26 00:11:50,207 - run-celery-task - INFO - app.broker_url=redis://localhost:6379/9 calling task=celery_loaders.work_tasks.tasks.do_some_work data={'user_id': 1} 2018-02-26 00:11:50,264 - run-celery-task - INFO - calling task=celery_loaders.work_tasks.tasks.do_some_work - started job_id=561c80bc-0bab-4387-a842-4372d11c8291 2018-02-26 00:11:50,264 - run-celery-task - INFO - task=561c80bc-0bab-4387-a842-4372d11c8291 - waiting seconds=5.0 for results 2018-02-26 00:11:50,268 - run-celery-task - INFO - task=celery_loaders.work_tasks.tasks.do_some_work - success job_id=561c80bc-0bab-4387-a842-4372d11c8291 task_result={'job_results': 'some response key=c60adfdc-e27c-41f4-8440-666be599ab4a'} 2018-02-26 00:11:50,268 - run-celery-task - INFO - end - run-celery-taskVerify the Worker Processed the Task
2018-02-26 00:11:50,265 - celery.worker.strategy - INFO - Received task: celery_loaders.work_tasks.tasks.do_some_work[561c80bc-0bab-4387-a842-4372d11c8291] 2018-02-26 00:11:50,266 - tasks - INFO - task - do_some_work - start work_dict={'user_id': 1} 2018-02-26 00:11:50,266 - tasks - INFO - task - {'job_results': 'some response key=c60adfdc-e27c-41f4-8440-666be599ab4a'} - result=do_some_work done 2018-02-26 00:11:50,267 - custom_task - INFO - custom_task SUCCESS - retval={'job_results': 'some response key=c60adfdc-e27c-41f4-8440-666be599ab4a'} task_id=561c80bc-0bab-4387-a842-4372d11c8291 args=[{'user_id': 1}] kwargs={} 2018-02-26 00:11:50,268 - celery.app.trace - INFO - Task celery_loaders.work_tasks.tasks.do_some_work[561c80bc-0bab-4387-a842-4372d11c8291] succeeded in 0.001375271000142675s: {'job_results': 'some response key=c60adfdc-e27c-41f4-8440-666be599ab4a'}
Development
virtualenv -p python3 ~/.venvs/celeryloaders && source ~/.venvs/celeryloaders/bin/activate && pip install -e .
Run tests
python setup.py test
Linting
flake8 . pycodestyle .
License
Apache 2.0
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file celery-loaders-1.0.8.tar.gz.
File metadata
- Download URL: celery-loaders-1.0.8.tar.gz
- Upload date:
- Size: 8.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.12.1 pkginfo/1.4.2 requests/2.19.1 setuptools/40.4.3 requests-toolbelt/0.8.0 tqdm/4.26.0 CPython/3.6.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
11519b71f7abf6d4503b0638db59710666042d3d26635a3158d9af3c4403704a
|
|
| MD5 |
4fc7b898628c6bf2d53882f0c57cd041
|
|
| BLAKE2b-256 |
6d9bf72f7628e5518ddb00b10ddd7b324fc75f93c396d13a431c3ab5f2e5ce11
|
File details
Details for the file celery_loaders-1.0.8-py2.py3-none-any.whl.
File metadata
- Download URL: celery_loaders-1.0.8-py2.py3-none-any.whl
- Upload date:
- Size: 15.2 kB
- Tags: Python 2, Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.12.1 pkginfo/1.4.2 requests/2.19.1 setuptools/40.4.3 requests-toolbelt/0.8.0 tqdm/4.26.0 CPython/3.6.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d631fbec13fcc3f53315c8273383a278c7e8196e323604fec090cba8c75e65ba
|
|
| MD5 |
ef38d3b5aa2b0fbd14217f46b4d929d5
|
|
| BLAKE2b-256 |
e7ccf79e48dfd78a3680d1f0ca65276078c440851632930f0649d7b0e8b7bc92
|