Result and friends: various subclassable classes for deferred delivery of values.
Project description
Result and friends: various subclassable classes for deferred delivery of values.
Latest release 20220311:
- Result: class local Seq instance.
- Result.call: thread safe runtime check of self.state==pending.
- New Task and @task decorator, prototype for rerunnable blocking chaining tasks scheme - very alpha.
A Result
is the base class for several callable subclasses
which will receive values at a later point in time,
and can also be used standalone without subclassing.
A call to a Result
will block until the value is received or the Result
is cancelled,
which will raise an exception in the caller.
A Result
may be called by multiple users, before or after the value has been delivered;
if the value has been delivered the caller returns with it immediately.
A Result
's state may be inspected (pending, running, ready, cancelled).
Callbacks can be registered via a Result
's .notify method.
An incomplete Result
can be told to call a function to compute its value;
the function return will be stored as the value unless the function raises an exception,
in which case the exception information is recorded instead.
If an exception occurred, it will be reraised for any caller of the Result
.
Trite example:
R = Result(name="my demo")
Thread 1:
# this blocks until the Result is ready
value = R()
print(value)
# prints 3 once Thread 2 (below) assigns to it
Thread 2:
R.result = 3
Thread 3:
value = R()
# returns immediately with 3
You can also collect multiple Result
s in completion order using the report()
function:
Rs = [ ... list of Results of whatever type ... ]
...
for R in report(Rs):
x = R() # collect result, will return immediately because
# the Result is complete
print(x) # print result
Function after(Rs, R, func, *a, **kw)
After the completion of Rs
call func(*a,**kw)
and return
its result via R
; return the Result
object.
Parameters:
Rs
: an iterable of Results.R
: aResult
to collect to result of callingfunc
. IfNone
, one will be created.func
,a
,kw
: a callable and its arguments.
Class AsynchState(enum.Enum)
State tokens for Result
s.
Function bg(func, *a, **kw)
Dispatch a Thread
to run func
, return a Result
to collect its value.
Parameters:
_name
: optional name for theResult
, passed to the initialiser_extra
: optional extra data for theResult
, passed to the initialiser
Other parameters are passed to func
.
Class BlockedError(builtins.Exception, builtins.BaseException)
Raised by a blocked Task
if attempted.
Class CancellationError(builtins.Exception, builtins.BaseException)
Raised when accessing result
or exc_info
after cancellation.
Class OnDemandFunction(Result)
Wrap a callable, run it when required.
Class OnDemandResult(Result)
Wrap a callable, run it when required.
Function report(LFs)
Generator which yields completed Result
s.
This is a generator that yields Result
s as they complete,
useful for waiting for a sequence of Result
s
that may complete in an arbitrary order.
Class Result
Basic class for asynchronous collection of a result.
This is also used to make OnDemandFunction
s, LateFunction
s and other
objects with asynchronous termination.
Method Result.__init__(self, name=None, lock=None, result=None, extra=None)
:
Base initialiser for Result
objects and subclasses.
Parameter:
name
: optional parameter naming this object.lock
: optional locking object, defaults to a newthreading.Lock
.result
: if notNone
, prefill the.result
property.extra
: a mapping of extra information to associate with theResult
, useful to provide context when collecting the result; theResult
has a public attribute.extra
which is anAttrableMapping
to hold this information.
Method Result.__call__(self, *a, **kw)
:
Call the Result
: wait for it to be ready and then return or raise.
You can optionally supply a callable and arguments,
in which case callable(*args,**kwargs)
will be called
via Result.call
and the results applied to this Result
.
Method Result.bg(self, func, *a, **kw)
:
Submit a function to compute the result in a separate Thread
,
returning the Thread
.
This dispatches a Thread
to run self.call(func,*a,**kw)
and as such the Result
must be in "pending" state,
and transitions to "running".
Method Result.call(self, func, *a, **kw)
:
Have the Result
call func(*a,**kw)
and store its return value as
self.result
.
If func
raises an exception, store it as self.exc_info
.
Method Result.cancel(self)
:
Cancel this function.
If self.state
is pending or cancelled, return True
.
Otherwise return False
(too late to cancel).
Property Result.cancelled
:
Test whether this Result
has been cancelled.
Method Result.empty(self)
:
Analogue to Queue.empty()
.
Property Result.exc_info
:
The exception information from a completed Result
.
This is not available before completion.
Method Result.get(self, default=None)
:
Wait for readiness; return the result if self.exc_info
is None
,
otherwise default
.
Method Result.join(self, *a, **kw)
:
Calling the .join()
method waits for the function to run to
completion and returns a tuple as for the WorkerThreadPool
's
.dispatch()
return queue, a tuple of (result,exc_info)
.
On completion the sequence (result,None)
is returned.
If an exception occurred computing the result the sequence
(None,exc_info)
is returned
where exc_info
is a tuple of (exc_type,exc_value,exc_traceback)
.
If the function was cancelled the sequence (None,None)
is returned.
Method Result.notify(self, notifier)
:
After the function completes, call notifier(self)
.
If the function has already completed this will happen immediately.
example: if you'd rather self
got put on some Queue Q
, supply Q.put
.
Property Result.pending
:
Whether the Result
is pending.
Method Result.put(self, value)
:
Store the value. Queue
-like idiom.
Method Result.raise_(self, exc=None)
:
Convenience wrapper for self.exc_info
to store an exception result exc
.
If exc
is omitted or None
, use sys.exc_info()
.
Property Result.ready
:
Whether the Result
state is ready or cancelled.
Property Result.result
:
The result.
This property is not available before completion.
Method Result.with_result(self, submitter, prefix=None)
:
On completion without an exception, call submitter(self.result)
or report exception.
Class ResultSet(builtins.set)
A set
subclass containing Result
s,
on which one may iterate as Result
s complete.
Method ResultSet.__iter__(self)
:
Iterating on a ResultSet
yields Result
s as they complete.
Method ResultSet.wait(self)
:
Convenience function to wait for all the Result
s.
Class ResultState(enum.Enum)
State tokens for Result
s.
Class Task(Result)
A task which may require the completion of other tasks.
This is a subclass of Result
.
Keyword parameters:
cancel_on_exception
: if true, cancel thisTask
if.call
raises an exception; the default isFalse
, allowing repair and retrycancel_on_result
: optional callable to test theTask.result
after.call
; if it returnsTrue
theTask
is marked as cancelledfunc
: the function to call to complete theTask
; it will be called asfunc(*func_args,**func_kwargs)
func_args
: optional positional arguments, default()
func_kwargs
: optional keyword arguments, default{}
lock
: optional lock, default anRLock
Other arguments are passed to theResult
initialiser.
Example:
t1 = Task(name="task1")
t1.bg(time.sleep, 10)
t2 = Task("name="task2")
# prevent t2 from running until t1 completes
t2.require(t1)
# try to run sleep(5) for t2 immediately after t1 completes
t1.notify(t2.call, sleep, 5)
The model here may not be quite as expected; it is aimed at
tasks which can be repaired and rerun.
As such, if self.call(func,...)
raises an exception from
func
then this Task
will still block dependent Task
s.
Dually, a Task
which completes without an exception is
considered complete and does not block dependent Task
s.
To cancel dependent Tasks
the function should raise a
CancellationError
.
Users wanting more immediate semantics can supply cancel_on_exception
and/or cancel_on_result
to control these behaviours.
Example:
t1 = Task(name="task1")
t1.bg(time.sleep, 2)
t2 = Task("name="task2")
# prevent t2 from running until t1 completes
t2.require(t1)
# try to run sleep(5) for t2 immediately after t1 completes
t1.notify(t2.call, sleep, 5)
>>>
Method Task.abort(self)
:
Calling abort()
calls self.runstate.cancel()
to indicate
to the running function that it should cease operation.
Method Task.bg(self)
:
Submit a function to complete the Task
in a separate Thread
,
returning the Thread
.
This dispatches a Thread
to run self.call()
and as such the Task
must be in "pending" state,
and transitions to "running".
Method Task.block(self, otask)
:
Block another task until we are complete.
Method Task.blockers(self)
:
A generator yielding tasks from self.required()
which should block this task.
Cancelled tasks are not blockers
but if we encounter one we do cancel the current task.
Method Task.call(self)
:
Attempt to perform the Task
by calling func(*func_args,**func_kwargs)
.
If we are cancelled, raise CancellationError
.
If there are blocking required tasks, raise BlockedError
.
Otherwise run r=func(self,*self.func_args,**self.func_kwargsw)
with the following effects:
- if
func()
raises aCancellationError
, cancel theTask
- otherwise, if an exception is raised and
self.cancel_on_exception
is true, cancel theTask
; store the exception information fromsys.exc_info()
asself.exc_info
regardless - otherwise, if
self.cancel_on_result
is notNone
andself.cancel_on_result(r)
is true, cancel theTask
; storer
asself.result
regardless If we were cancelled, raiseCancellationError
.
During the duration of the call the property Task.current_task
is set to self
allowing access to the Task
.
A typical use is to access the current Task
's .runstate
attribute which can be polled by long running tasks to
honour calls to Task.abort()
.
Method Task.callif(self)
:
Trigger a call to func(self,*self.func_args,**self.func_kwargsw)
if we're pending and not blocked or cancelled.
Method Task.current_task()
:
The current Task
, valid during Task.call()
.
This allows the function called by the Task
to access the
task, typically to poll its .runstate
attribute.
Method Task.require(self, otask)
:
Add a requirement that otask
be complete before we proceed.
Method Task.required(self)
:
Return a set
containing any required tasks.
Method Task.then(self, func, *a, **kw)
:
Queue a call to func(*a,**kw)
to run after the completion of
this task.
This supports a chain of actions:
>>> t = Task(func=lambda: 1)
>>> final_t = t.then(print,1).then(print,2)
>>> final_t.ready # the final task has not yet run
False
>>> # finalise t, wait for final_t (which runs immediately)
>>> t.call(); print(final_t.join())
1
2
(None, None)
>>> final_t.ready
True
Function task(*da, **dkw)
Decorator for a function which runs it as a Task
.
The function may still be called directly.
The function should accept a Task
as its first argument.
The following function attributes are provided:
dispatch(after=(),deferred=False,delay=0.0)
: run this function after the completion of the tasks specified byafter
and after at leastdelay
seconds; return theTask
for the queued function
Examples:
>>> import time
>>> @task
... def f(x):
... return x * 2
...
>>> print(f(3)) # call the function normally
6
>>> # dispatch f(5) after 0.5s, get Task
>>> t0 = time.time()
>>> ft = f.dispatch((5,), delay=0.5)
>>> # calling a Task, as with a Result, is like calling the function
>>> print(ft())
10
>>> # check that we were blocked for 0.5s
>>> now = time.time()
>>> now - t0 >= 0.5
True
Release Log
Release 20220311:
- Result: class local Seq instance.
- Result.call: thread safe runtime check of self.state==pending.
- New Task and @task decorator, prototype for rerunnable blocking chaining tasks scheme - very alpha.
Release 20210420: Update dependencies, add docstring.
Release 20210407: New ResultSet(set) class, with context manager and wait methods, and whose iter iterates completed Results.
Release 20210123: bg: accept optional _extra parameter for use by the Result.
Release 20201102: Result: now .extra attribute for associated data and a new optional "extra" parameter in the initialiser.
Release 20200521:
- OnDemandResult: bugfixes and improvements.
- Result.bg: accept optional _name parameter to specify the Result.name.
Release 20191007:
- Simplify ResultState definition.
- Result.bg: use cs.threads.bg to dispatch the Thread.
Release 20190522:
- Result.call now accepts an optional callable and args.
- Result.call: set the Result state to "running" before dispatching the function.
- Rename OnDemandFunction to OnDemandResult, keep old name around for compatibility.
- Result._complete: also permitted if state==cancelled.
Release 20190309: Small bugfix.
Release 20181231:
- Result.call: report baser exceptions than BaseException.
- Drop _PendingFunction abstract class.
Release 20181109.1: DISTINFO update.
Release 20181109:
- Derive CancellationError from Exception instead of RuntimeError, fix initialiser.
- Rename AsynchState to ResultState and make it an Enum.
- Make Results hashable and comparable for equality for use as mapping keys: equality is identity.
- New Result.collected attribute, set true if .result or .exc_info are accessed, logs an error if Result.del is called when false, may be set true externally if a Result is not required.
- Drop
final
parameter; never used and supplanted by Result.notify. - Result.join: return the .result and .exc_info properties in order to mark the Result as collected.
- Result: set .collected to True when a notifier has been called successfully.
- Bugfix Result.cancel: apply the new cancelled state.
Release 20171231:
- Bugfix Result.call to catch BaseException instead of Exception.
- New convenience function bg(func) to dispatch
func
in a separate Thread and return a Result to collect its value.
Release 20171030.1: Fix module requirements specification.
Release 20171030: New Result.bg(func, *a, **kw) method to dispatch function in separate Thread to compute the Result value.
Release 20170903: rename cs.asynchron to cs.result
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file cs.result-20220311.tar.gz
.
File metadata
- Download URL: cs.result-20220311.tar.gz
- Upload date:
- Size: 16.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.7.1 importlib_metadata/4.8.2 pkginfo/1.8.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 33f5da5471db0d1ab8f3cef018436661510e88073590e5a28735e80cc61bc24a |
|
MD5 | 05a85d91de98ee3ea2c8c8ecbcbe64f9 |
|
BLAKE2b-256 | dc2740833f3212607cf5278119cf0f2dd9f14fa50705840702766cdbae9d6c12 |