A general purpose Task and TaskQueue for running tasks with dependencies and failure/retry, potentially in parallel.
Project description
A general purpose Task and TaskQueue for running tasks with dependencies and failure/retry, potentially in parallel.
Latest release 20220805: Initial PyPI release.
Class BlockedError(TaskError, cs.fsm.FSMError, builtins.Exception, builtins.BaseException)
Raised by a blocked Task
if attempted.
Function main(argv)
Dummy main programme to exercise something.
Function make(*tasks, fail_fast=False, queue=None)
Generator which completes all the supplied tasks
by dispatching them
once they are no longer blocked.
Yield each task from tasks
as it completes (or becomes cancelled).
Parameters:
tasks
:Task
s as positional parametersfail_fast
: defaultFalse
; if true, cease evaluation as soon as a task completes in a state with is notDONE
queue
: optional callable to submit a task for execution later via some queue such asLater
or celery
The following rules are applied by this function:
- if a task is being prepared, raise an
FSMError
- if a task is already running or queued, wait for its completion
- if a task is pending:
- if any prerequisite has failed, fail this task
- if any prerequisite is cancelled, cancel this task
- if any prerequisite is pending, make it first
- if any prerequisite is not done, fail this task
- otherwise dispatch this task and then yield it
- if
fail_fast
and the task is not done, return
Examples:
>>> t1 = Task('t1', lambda: print('doing t1'), track=True)
>>> t2 = t1.then('t2', lambda: print('doing t2'), track=True)
>>> list(make(t2)) # doctest: +ELLIPSIS
t1 PENDING->dispatch->RUNNING
doing t1
t1 RUNNING->done->DONE
t2 PENDING->dispatch->RUNNING
doing t2
t2 RUNNING->done->DONE
[Task('t2',<function <lambda> at ...>,state='DONE')]
Function make_later(L, *tasks, fail_fast=False)
Dispatch the tasks
via L:Later
for asynchronous execution
if it is not already completed.
The caller can wait on t.result
for completion.
This calls make_now()
in a thread and uses L.defer
to
queue the task and its prerequisites for execution.
Function make_now(*tasks, fail_fast=False, queue=None)
Run the generator make(*tasks)
to completion and return the
list of completed tasks.
Class Task(cs.fsm.FSM, cs.gvutils.DOTNodeMixin, cs.resources.RunStateMixin)
A task which may require the completion of other tasks.
The model here may not be quite as expected; it is aimed at
tasks which can be repaired and rerun.
As such, if self.run(func,...)
raises an exception from
func
then this Task
will still block dependent Task
s.
Dually, a Task
which completes without an exception is
considered complete and does not block dependent Task
s.
Keyword parameters:
cancel_on_exception
: if true, cancel thisTask
if.run
raises an exception; the default isFalse
, allowing repair and retrycancel_on_result
: optional callable to test theTask.result
after.run
; if the callable returnsTrue
theTask
is marked as cancelled, allowing repair and retryfunc
: the function to call to complete theTask
; it will be called asfunc(*func_args,**func_kwargs)
func_args
: optional positional arguments, default()
func_kwargs
: optional keyword arguments, default{}
lock
: optional lock, default anRLock
state
: initial state, default fromself._state.initial_state
, which is initally 'PENDING
'track
: defaultFalse
; ifTrue
then apply a callback for all states to print task transitions; otherwise it should be a callback function suitable forFSM.fsm_callback
Other arguments are passed to theResult
initialiser.
Example:
t1 = Task(name="task1")
t1.bg(time.sleep, 10)
t2 = Task(name="task2")
# prevent t2 from running until t1 completes
t2.require(t1)
# try to run sleep(5) for t2 immediately after t1 completes
t1.notify(t2.call, sleep, 5)
Users wanting more immediate semantics can supply
cancel_on_exception
and/or cancel_on_result
to control
these behaviours.
Example:
t1 = Task(name="task1")
t1.bg(time.sleep, 2)
t2 = Task(name="task2")
# prevent t2 from running until t1 completes
t2.require(t1)
# try to run sleep(5) for t2 immediately after t1 completes
t1.notify(t2.call, sleep, 5)
Class TaskError(cs.fsm.FSMError, builtins.Exception, builtins.BaseException)
Raised by Task
related errors.
Class TaskQueue
A task queue for managing and running a set of related tasks.
Unlike make
and Task.make
, this is aimed at a "dispatch" worker
which dispatches individual tasks as required.
Example 1, put 2 dependent tasks in a queue and run:
>>> t1 = Task("t1", lambda: print("t1"))
>>> t2 = t1.then("t2", lambda: print("t2"))
>>> q = TaskQueue(t1, t2)
>>> for _ in q.run(): pass
...
t1
t2
Example 2, put 1 task in a queue and run. The queue only runs the specified tasks:
>>> t1 = Task("t1", lambda: print("t1"))
>>> t2 = t1.then("t2", lambda: print("t2"))
>>> q = TaskQueue(t1)
>>> for _ in q.run(): pass
...
t1
Example 2, put 1 task in a queue with run_dependent_tasks=True
and run.
The queue pulls in the dependencies of completed tasks and also runs those:
>>> t1 = Task("t1", lambda: print("t1"))
>>> t2 = t1.then("t2", lambda: print("t2"))
>>> q = TaskQueue(t1, run_dependent_tasks=True)
>>> for _ in q.run(): pass
...
t1
t2
Method TaskQueue.__init__(self, *tasks, run_dependent_tasks=False)
:
Initialise the queue with the supplied tasks
.
TaskSubType = ~TaskSubType
Type variable.
Usage::
T = TypeVar('T') # Can be anything A = TypeVar('A', str, bytes) # Must be str or bytes
Type variables exist primarily for the benefit of static type checkers. They serve as the parameters for generic types as well as for generic function definitions. See class Generic for more information on generic types. Generic functions work as follows:
def repeat(x: T, n: int) -> List[T]: '''Return a list containing n references to x.''' return [x]*n
def longest(x: A, y: A) -> A: '''Return the longest of two strings.''' return x if len(x) >= len(y) else y
The latter example's signature is essentially the overloading of (str, str) -> str and (bytes, bytes) -> bytes. Also note that if the arguments are instances of some subclass of str, the return type is still plain str.
At runtime, isinstance(x, T) and issubclass(C, T) will raise TypeError.
Type variables defined with covariant=True or contravariant=True can be used to declare covariant or contravariant generic types. See PEP 484 for more details. By default generic types are invariant in all type variables.
Type variables can be introspected. e.g.:
T.name == 'T' T.constraints == () T.covariant == False T.contravariant = False A.constraints == (str, bytes)
Note that only type variables defined in global scope can be pickled.
Release Log
Release 20220805: Initial PyPI release.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for cs.taskqueue-20220805-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 22d025108771cfdaf37f84fac6a2e5902baa9c7474d3869f30d79e852eb011a1 |
|
MD5 | 4f7d1ef2bab6e11ab2b1e4008b0f04b5 |
|
BLAKE2b-256 | 0ba20bcd143f91aed191a765b53f02b2fea5a767aeb4688ce11c5d449cf12d91 |