Skip to main content

A general purpose Task and TaskQueue for running tasks with dependencies and failure/retry, potentially in parallel.

Project description

A general purpose Task and TaskQueue for running tasks with dependencies and failure/retry, potentially in parallel.

Latest release 20230331:

  • Task: subclass BaseTask instead of (FSM, RunStateMixin).
  • BaseTask.init: use @uses_runstate to ensure we've got a RunState.

Class BaseTask(cs.fsm.FSM, cs.gvutils.DOTNodeMixin, cs.resources.RunStateMixin)

A base class subclassing cs.fsm.FSM with a RunStateMixin.

Note that this class and the FSM base class does not provide a FSM_DEFAULT_STATE attribute; a default state value of None will leave .fsm_state unset.

This behaviour is is chosen mostly to support subclasses with unusual behaviour, particularly Django's Model class whose refresh_from_db method seems to not refresh fields which already exist, and setting .fsm_state from a FSM_DEFAULT_STATE class attribute thus breaks this method. Subclasses of this class and Model should not provide a FSM_DEFAULT_STATE attribute, instead relying on the field definition to provide this default in the usual way.

Class BlockedError(TaskError, cs.fsm.FSMError, builtins.Exception, builtins.BaseException)

Raised by a blocked Task if attempted.

Function main(argv)

Dummy main programme to exercise something.

Function make(*tasks, fail_fast=False, queue=None)

Generator which completes all the supplied tasks by dispatching them once they are no longer blocked. Yield each task from tasks as it completes (or becomes cancelled).

Parameters:

  • tasks: Tasks as positional parameters
  • fail_fast: default False; if true, cease evaluation as soon as a task completes in a state with is not DONE
  • queue: optional callable to submit a task for execution later via some queue such as Later or celery

The following rules are applied by this function:

  • if a task is being prepared, raise an FSMError
  • if a task is already running or queued, wait for its completion
  • if a task is pending:
    • if any prerequisite has failed, fail this task
    • if any prerequisite is cancelled, cancel this task
    • if any prerequisite is pending, make it first
    • if any prerequisite is not done, fail this task
    • otherwise dispatch this task and then yield it
  • if fail_fast and the task is not done, return

Examples:

>>> t1 = Task('t1', lambda: print('doing t1'), track=True)
>>> t2 = t1.then('t2', lambda: print('doing t2'), track=True)
>>> list(make(t2))    # doctest: +ELLIPSIS
t1 PENDING->dispatch->RUNNING
doing t1
t1 RUNNING->done->DONE
t2 PENDING->dispatch->RUNNING
doing t2
t2 RUNNING->done->DONE
[Task('t2',<function <lambda> at ...>,state='DONE')]

Function make_later(L, *tasks, fail_fast=False)

Dispatch the tasks via L:Later for asynchronous execution if it is not already completed. The caller can wait on t.result for completion.

This calls make_now() in a thread and uses L.defer to queue the task and its prerequisites for execution.

Function make_now(*tasks, fail_fast=False, queue=None)

Run the generator make(*tasks) to completion and return the list of completed tasks.

Class Task(BaseTask, cs.fsm.FSM, cs.gvutils.DOTNodeMixin, cs.resources.RunStateMixin, cs.threads.HasThreadState, cs.context.ContextManagerMixin)

A task which may require the completion of other tasks.

The model here may not be quite as expected; it is aimed at tasks which can be repaired and rerun. As such, if self.run(func,...) raises an exception from func then this Task will still block dependent Tasks. Dually, a Task which completes without an exception is considered complete and does not block dependent Tasks.

Keyword parameters:

  • cancel_on_exception: if true, cancel this Task if .run raises an exception; the default is False, allowing repair and retry
  • cancel_on_result: optional callable to test the Task.result after .run; if the callable returns True the Task is marked as cancelled, allowing repair and retry
  • func: the function to call to complete the Task; it will be called as func(*func_args,**func_kwargs)
  • func_args: optional positional arguments, default ()
  • func_kwargs: optional keyword arguments, default {}
  • lock: optional lock, default an RLock
  • state: initial state, default from self._state.initial_state, which is initally 'PENDING'
  • track: default False; if True then apply a callback for all states to print task transitions; otherwise it should be a callback function suitable for FSM.fsm_callback Other arguments are passed to the Result initialiser.

Example:

t1 = Task(name="task1")
t1.bg(time.sleep, 10)
t2 = Task(name="task2")
# prevent t2 from running until t1 completes
t2.require(t1)
# try to run sleep(5) for t2 immediately after t1 completes
t1.notify(t2.call, sleep, 5)

Users wanting more immediate semantics can supply cancel_on_exception and/or cancel_on_result to control these behaviours.

Example:

t1 = Task(name="task1")
t1.bg(time.sleep, 2)
t2 = Task(name="task2")
# prevent t2 from running until t1 completes
t2.require(t1)
# try to run sleep(5) for t2 immediately after t1 completes
t1.notify(t2.call, sleep, 5)

Class TaskError(cs.fsm.FSMError, builtins.Exception, builtins.BaseException)

Raised by Task related errors.

Class TaskQueue

A task queue for managing and running a set of related tasks.

Unlike make and Task.make, this is aimed at a "dispatch" worker which dispatches individual tasks as required.

Example 1, put 2 dependent tasks in a queue and run:

 >>> t1 = Task("t1", lambda: print("t1"))
 >>> t2 = t1.then("t2", lambda: print("t2"))
 >>> q = TaskQueue(t1, t2)
 >>> for _ in q.run(): pass
 ...
 t1
 t2

Example 2, put 1 task in a queue and run. The queue only runs the specified tasks:

 >>> t1 = Task("t1", lambda: print("t1"))
 >>> t2 = t1.then("t2", lambda: print("t2"))
 >>> q = TaskQueue(t1)
 >>> for _ in q.run(): pass
 ...
 t1

Example 2, put 1 task in a queue with run_dependent_tasks=True and run. The queue pulls in the dependencies of completed tasks and also runs those:

 >>> t1 = Task("t1", lambda: print("t1"))
 >>> t2 = t1.then("t2", lambda: print("t2"))
 >>> q = TaskQueue(t1, run_dependent_tasks=True)
 >>> for _ in q.run(): pass
 ...
 t1
 t2

Method TaskQueue.__init__(self, *tasks, run_dependent_tasks=False): Initialise the queue with the supplied tasks.

Release Log

Release 20230331:

  • Task: subclass BaseTask instead of (FSM, RunStateMixin).
  • BaseTask.init: use @uses_runstate to ensure we've got a RunState.

Release 20230217: Task: subclass HasThreadState, drop .current_task() class method.

Release 20221207:

  • Pull out core stuff from Task into BaseTask, aids subclassing.
  • BaseTask: explainatory docustring about unusual FSM_DEFAULT_STATE design choice.
  • BaseTask.tasks_as_dot: express the edges using the node ids instead of their labels.
  • BaseTask: new tasks_as_svg() method like tasks_as_dot() but returning SVG.

Release 20220805: Initial PyPI release.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cs.taskqueue-20230331.tar.gz (15.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cs.taskqueue-20230331-py3-none-any.whl (13.1 kB view details)

Uploaded Python 3

File details

Details for the file cs.taskqueue-20230331.tar.gz.

File metadata

  • Download URL: cs.taskqueue-20230331.tar.gz
  • Upload date:
  • Size: 15.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.6

File hashes

Hashes for cs.taskqueue-20230331.tar.gz
Algorithm Hash digest
SHA256 7fe25d7596943a7770d1fa68594c9e5eca9cb89aa2ae1c4ae164626cbc10f5b3
MD5 303a99c30b61ba08fbe3a4f3de563065
BLAKE2b-256 a19688686be0e29f9e89f4814c72acd2fdf3f5fd9cc32ec4cb049c4963166991

See more details on using hashes here.

File details

Details for the file cs.taskqueue-20230331-py3-none-any.whl.

File metadata

File hashes

Hashes for cs.taskqueue-20230331-py3-none-any.whl
Algorithm Hash digest
SHA256 3e872824d5472e8446b18b10b67e4d0eaca6ea811bcaba984bfcba66f8b75a3c
MD5 64177f7497a52df1585c9a66bcce9300
BLAKE2b-256 14e88ebf82d4067afd12fe039fa0bc30a2ee759b216c128ead53e82fe89f5dcd

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page