No project description provided
Project description
Celery service task
Classe permettant d'implémenter une tâche pour un worker Celery spécifique
Usage
# import
from celery_service_task.task import TaskBase
# implémentation d'une tâche simple
class Task(TaskBase):
def task(self, payload: Dict[str, Any]) -> bool:
print(self.conf) # la configuration est donnée par le worker Celery
print(payload) # le payload est déjà sous forme d'un dictionnaire
print(payload['transaction_id']) # identifiant de la transaction issue du payload
# Initialisation de la classe avec une configuration
t = Task(conf={'token': 'tk'})
# simulation d'un payload JSON parsé
payload={'transaction_id': '123', 'hello': 'world'}
# lance la tâche si l'id de transaction n'est pas déjà enregistré
# Ici la tâche se lance
t.run_task(
payload=payload,
transaction_id=payload['transaction_id']
)
# Ici la tâche est considérée comme un replica
t.run_task(
payload=payload,
transaction_id=payload['transaction_id']
)
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for celery_service_task-0.3.0.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3ebc3e9b9be5f30b383924354a99982a64c53cc96beb709aaca9864f7b727b4c |
|
MD5 | 94cf19bdc1cfcb7c0cf9a9a3ce5b37d4 |
|
BLAKE2b-256 | 622ec7a4ba5dc34775bfe56e869f579ea8d9e5b9f8ae94581eb871698e9e0472 |
Close
Hashes for celery_service_task-0.3.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 79a8ce667f8f996e096892e787d31d583fbd5c2aff174b0057f12bfded24c5fa |
|
MD5 | 4a97ffeca27afdbc555698d7f3fcdb7d |
|
BLAKE2b-256 | 05412672deb74fc75bc18842fb9840a7145beb6449b9547d76c195da5b69415b |