No project description provided
Project description
Celery service task
Classe permettant d'implémenter une tâche pour un worker Celery spécifique
Usage
# import
from celery_service_task.task import TaskBase
# implémentation d'une tâche simple
class Task(TaskBase):
def task(self, payload: Dict[str, Any]) -> bool:
print(self.conf) # la configuration est donnée par le worker Celery
print(payload) # le payload est déjà sous forme d'un dictionnaire
print(payload['transaction_id']) # identifiant de la transaction issue du payload
# Initialisation de la classe avec une configuration
t = Task(conf={'token': 'tk'})
# simulation d'un payload JSON parsé
payload={'transaction_id': '123', 'hello': 'world'}
# lance la tâche si l'id de transaction n'est pas déjà enregistré
# Ici la tâche se lance
t.run_task(
payload=payload,
transaction_id=payload['transaction_id']
)
# Ici la tâche est considérée comme un replica
t.run_task(
payload=payload,
transaction_id=payload['transaction_id']
)
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for celery_service_task-0.2.0.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | a2f9084d05f4ab674341f919878fca77ad730fcd2a3d515323ca3a8f2fb0cfbb |
|
MD5 | d23ba38bb2119eb3730d89956a76dbb5 |
|
BLAKE2b-256 | bf6aad9bcf5358196e7169b95cf8f9d62db196abf62c6e423a2baf4967a35d56 |
Close
Hashes for celery_service_task-0.2.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 991ab31d3deea58e67f79e6a9f3f8372131704e3d75fe23cb52e524694815995 |
|
MD5 | c13bd6ce99121c4ef338a9dfc92889c6 |
|
BLAKE2b-256 | d49bf5ffd4056bcfd716642d211503c4df2b9dec00e77e21d2da04bf0143272d |