Skip to main content

Result and friends: various subclassable classes for deferred delivery of values.

Project description

Result and friends: various subclassable classes for deferred delivery of values.

Latest release 20220311:

  • Result: class local Seq instance.
  • Result.call: thread safe runtime check of self.state==pending.
  • New Task and @task decorator, prototype for rerunnable blocking chaining tasks scheme - very alpha.

A Result is the base class for several callable subclasses which will receive values at a later point in time, and can also be used standalone without subclassing.

A call to a Result will block until the value is received or the Result is cancelled, which will raise an exception in the caller. A Result may be called by multiple users, before or after the value has been delivered; if the value has been delivered the caller returns with it immediately. A Result's state may be inspected (pending, running, ready, cancelled). Callbacks can be registered via a Result's .notify method.

An incomplete Result can be told to call a function to compute its value; the function return will be stored as the value unless the function raises an exception, in which case the exception information is recorded instead. If an exception occurred, it will be reraised for any caller of the Result.

Trite example:

R = Result(name="my demo")

Thread 1:

# this blocks until the Result is ready
value = R()
print(value)
# prints 3 once Thread 2 (below) assigns to it

Thread 2:

R.result = 3

Thread 3:

value = R()
# returns immediately with 3

You can also collect multiple Results in completion order using the report() function:

Rs = [ ... list of Results of whatever type ... ]
...
for R in report(Rs):
    x = R()     # collect result, will return immediately because
                # the Result is complete
    print(x)    # print result

Function after(Rs, R, func, *a, **kw)

After the completion of Rs call func(*a,**kw) and return its result via R; return the Result object.

Parameters:

  • Rs: an iterable of Results.
  • R: a Result to collect to result of calling func. If None, one will be created.
  • func, a, kw: a callable and its arguments.

Class AsynchState(enum.Enum)

State tokens for Results.

Function bg(func, *a, **kw)

Dispatch a Thread to run func, return a Result to collect its value.

Parameters:

  • _name: optional name for the Result, passed to the initialiser
  • _extra: optional extra data for the Result, passed to the initialiser

Other parameters are passed to func.

Class BlockedError(builtins.Exception, builtins.BaseException)

Raised by a blocked Task if attempted.

Class CancellationError(builtins.Exception, builtins.BaseException)

Raised when accessing result or exc_info after cancellation.

Class OnDemandFunction(Result)

Wrap a callable, run it when required.

Class OnDemandResult(Result)

Wrap a callable, run it when required.

Function report(LFs)

Generator which yields completed Results.

This is a generator that yields Results as they complete, useful for waiting for a sequence of Results that may complete in an arbitrary order.

Class Result

Basic class for asynchronous collection of a result. This is also used to make OnDemandFunctions, LateFunctions and other objects with asynchronous termination.

Method Result.__init__(self, name=None, lock=None, result=None, extra=None): Base initialiser for Result objects and subclasses.

Parameter:

  • name: optional parameter naming this object.
  • lock: optional locking object, defaults to a new threading.Lock.
  • result: if not None, prefill the .result property.
  • extra: a mapping of extra information to associate with the Result, useful to provide context when collecting the result; the Result has a public attribute .extra which is an AttrableMapping to hold this information.

Method Result.__call__(self, *a, **kw): Call the Result: wait for it to be ready and then return or raise.

You can optionally supply a callable and arguments, in which case callable(*args,**kwargs) will be called via Result.call and the results applied to this Result.

Method Result.bg(self, func, *a, **kw): Submit a function to compute the result in a separate Thread, returning the Thread.

This dispatches a Thread to run self.call(func,*a,**kw) and as such the Result must be in "pending" state, and transitions to "running".

Method Result.call(self, func, *a, **kw): Have the Result call func(*a,**kw) and store its return value as self.result. If func raises an exception, store it as self.exc_info.

Method Result.cancel(self): Cancel this function. If self.state is pending or cancelled, return True. Otherwise return False (too late to cancel).

Property Result.cancelled: Test whether this Result has been cancelled.

Method Result.empty(self): Analogue to Queue.empty().

Property Result.exc_info: The exception information from a completed Result. This is not available before completion.

Method Result.get(self, default=None): Wait for readiness; return the result if self.exc_info is None, otherwise default.

Method Result.join(self, *a, **kw): Calling the .join() method waits for the function to run to completion and returns a tuple as for the WorkerThreadPool's .dispatch() return queue, a tuple of (result,exc_info).

On completion the sequence (result,None) is returned. If an exception occurred computing the result the sequence (None,exc_info) is returned where exc_info is a tuple of (exc_type,exc_value,exc_traceback). If the function was cancelled the sequence (None,None) is returned.

Method Result.notify(self, notifier): After the function completes, call notifier(self).

If the function has already completed this will happen immediately. example: if you'd rather self got put on some Queue Q, supply Q.put.

Property Result.pending: Whether the Result is pending.

Method Result.put(self, value): Store the value. Queue-like idiom.

Method Result.raise_(self, exc=None): Convenience wrapper for self.exc_info to store an exception result exc. If exc is omitted or None, use sys.exc_info().

Property Result.ready: Whether the Result state is ready or cancelled.

Property Result.result: The result. This property is not available before completion.

Method Result.with_result(self, submitter, prefix=None): On completion without an exception, call submitter(self.result) or report exception.

Class ResultSet(builtins.set)

A set subclass containing Results, on which one may iterate as Results complete.

Method ResultSet.__iter__(self): Iterating on a ResultSet yields Results as they complete.

Method ResultSet.wait(self): Convenience function to wait for all the Results.

Class ResultState(enum.Enum)

State tokens for Results.

Class Task(Result)

A task which may require the completion of other tasks. This is a subclass of Result.

Keyword parameters:

  • cancel_on_exception: if true, cancel this Task if .call raises an exception; the default is False, allowing repair and retry
  • cancel_on_result: optional callable to test the Task.result after .call; if it returns True the Task is marked as cancelled
  • func: the function to call to complete the Task; it will be called as func(*func_args,**func_kwargs)
  • func_args: optional positional arguments, default ()
  • func_kwargs: optional keyword arguments, default {}
  • lock: optional lock, default an RLock Other arguments are passed to the Result initialiser.

Example:

t1 = Task(name="task1")
t1.bg(time.sleep, 10)
t2 = Task("name="task2")
# prevent t2 from running until t1 completes
t2.require(t1)
# try to run sleep(5) for t2 immediately after t1 completes
t1.notify(t2.call, sleep, 5)

The model here may not be quite as expected; it is aimed at tasks which can be repaired and rerun. As such, if self.call(func,...) raises an exception from func then this Task will still block dependent Tasks. Dually, a Task which completes without an exception is considered complete and does not block dependent Tasks. To cancel dependent Tasks the function should raise a CancellationError.

Users wanting more immediate semantics can supply cancel_on_exception and/or cancel_on_result to control these behaviours.

Example:

t1 = Task(name="task1")
t1.bg(time.sleep, 2)
t2 = Task("name="task2")
# prevent t2 from running until t1 completes
t2.require(t1)
# try to run sleep(5) for t2 immediately after t1 completes
t1.notify(t2.call, sleep, 5)

>>>

Method Task.abort(self): Calling abort() calls self.runstate.cancel() to indicate to the running function that it should cease operation.

Method Task.bg(self): Submit a function to complete the Task in a separate Thread, returning the Thread.

This dispatches a Thread to run self.call() and as such the Task must be in "pending" state, and transitions to "running".

Method Task.block(self, otask): Block another task until we are complete.

Method Task.blockers(self): A generator yielding tasks from self.required() which should block this task. Cancelled tasks are not blockers but if we encounter one we do cancel the current task.

Method Task.call(self): Attempt to perform the Task by calling func(*func_args,**func_kwargs).

If we are cancelled, raise CancellationError. If there are blocking required tasks, raise BlockedError. Otherwise run r=func(self,*self.func_args,**self.func_kwargsw) with the following effects:

  • if func() raises a CancellationError, cancel the Task
  • otherwise, if an exception is raised and self.cancel_on_exception is true, cancel the Task; store the exception information from sys.exc_info() as self.exc_info regardless
  • otherwise, if self.cancel_on_result is not None and self.cancel_on_result(r) is true, cancel the Task; store r as self.result regardless If we were cancelled, raise CancellationError.

During the duration of the call the property Task.current_task is set to self allowing access to the Task. A typical use is to access the current Task's .runstate attribute which can be polled by long running tasks to honour calls to Task.abort().

Method Task.callif(self): Trigger a call to func(self,*self.func_args,**self.func_kwargsw) if we're pending and not blocked or cancelled.

Method Task.current_task(): The current Task, valid during Task.call(). This allows the function called by the Task to access the task, typically to poll its .runstate attribute.

Method Task.require(self, otask): Add a requirement that otask be complete before we proceed.

Method Task.required(self): Return a set containing any required tasks.

Method Task.then(self, func, *a, **kw): Queue a call to func(*a,**kw) to run after the completion of this task.

This supports a chain of actions:

>>> t = Task(func=lambda: 1)
>>> final_t = t.then(print,1).then(print,2)
>>> final_t.ready   # the final task has not yet run
False
>>> # finalise t, wait for final_t (which runs immediately)
>>> t.call(); print(final_t.join())
1
2
(None, None)
>>> final_t.ready
True

Function task(*da, **dkw)

Decorator for a function which runs it as a Task. The function may still be called directly. The function should accept a Task as its first argument.

The following function attributes are provided:

  • dispatch(after=(),deferred=False,delay=0.0): run this function after the completion of the tasks specified by after and after at least delay seconds; return the Task for the queued function

Examples:

>>> import time
>>> @task
... def f(x):
...     return x * 2
...
>>> print(f(3))  # call the function normally
6
>>> # dispatch f(5) after 0.5s, get Task
>>> t0 = time.time()
>>> ft = f.dispatch((5,), delay=0.5)
>>> # calling a Task, as with a Result, is like calling the function
>>> print(ft())
10
>>> # check that we were blocked for 0.5s
>>> now = time.time()
>>> now - t0 >= 0.5
True

Release Log

Release 20220311:

  • Result: class local Seq instance.
  • Result.call: thread safe runtime check of self.state==pending.
  • New Task and @task decorator, prototype for rerunnable blocking chaining tasks scheme - very alpha.

Release 20210420: Update dependencies, add docstring.

Release 20210407: New ResultSet(set) class, with context manager and wait methods, and whose iter iterates completed Results.

Release 20210123: bg: accept optional _extra parameter for use by the Result.

Release 20201102: Result: now .extra attribute for associated data and a new optional "extra" parameter in the initialiser.

Release 20200521:

  • OnDemandResult: bugfixes and improvements.
  • Result.bg: accept optional _name parameter to specify the Result.name.

Release 20191007:

  • Simplify ResultState definition.
  • Result.bg: use cs.threads.bg to dispatch the Thread.

Release 20190522:

  • Result.call now accepts an optional callable and args.
  • Result.call: set the Result state to "running" before dispatching the function.
  • Rename OnDemandFunction to OnDemandResult, keep old name around for compatibility.
  • Result._complete: also permitted if state==cancelled.

Release 20190309: Small bugfix.

Release 20181231:

  • Result.call: report baser exceptions than BaseException.
  • Drop _PendingFunction abstract class.

Release 20181109.1: DISTINFO update.

Release 20181109:

  • Derive CancellationError from Exception instead of RuntimeError, fix initialiser.
  • Rename AsynchState to ResultState and make it an Enum.
  • Make Results hashable and comparable for equality for use as mapping keys: equality is identity.
  • New Result.collected attribute, set true if .result or .exc_info are accessed, logs an error if Result.del is called when false, may be set true externally if a Result is not required.
  • Drop final parameter; never used and supplanted by Result.notify.
  • Result.join: return the .result and .exc_info properties in order to mark the Result as collected.
  • Result: set .collected to True when a notifier has been called successfully.
  • Bugfix Result.cancel: apply the new cancelled state.

Release 20171231:

  • Bugfix Result.call to catch BaseException instead of Exception.
  • New convenience function bg(func) to dispatch func in a separate Thread and return a Result to collect its value.

Release 20171030.1: Fix module requirements specification.

Release 20171030: New Result.bg(func, *a, **kw) method to dispatch function in separate Thread to compute the Result value.

Release 20170903: rename cs.asynchron to cs.result

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cs.result-20220311.tar.gz (16.9 kB view details)

Uploaded Source

File details

Details for the file cs.result-20220311.tar.gz.

File metadata

  • Download URL: cs.result-20220311.tar.gz
  • Upload date:
  • Size: 16.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.7.1 importlib_metadata/4.8.2 pkginfo/1.8.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.10

File hashes

Hashes for cs.result-20220311.tar.gz
Algorithm Hash digest
SHA256 33f5da5471db0d1ab8f3cef018436661510e88073590e5a28735e80cc61bc24a
MD5 05a85d91de98ee3ea2c8c8ecbcbe64f9
BLAKE2b-256 dc2740833f3212607cf5278119cf0f2dd9f14fa50705840702766cdbae9d6c12

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page