Skip to main content

Result and friends: various subclassable classes for deferred delivery of values.

Project description

Result and friends: various subclassable classes for deferred delivery of values.

Latest release 20240630:

  • Move CancellationError from cs.result to cs.fsm.
  • report: avoid iterating over the live set.
  • Result: drop the PREPARE state, allow 'cancel' no-op event in CANCELLED and DONE states.
  • Result.result,exc_info getters: raise CancellationError if the Result is cancelled.
  • Result._complete: fire the 'complete' event after the ._result and ._exc_info attributes are set but still before the self._get_lock.release().
  • Result: preserve message passed to .cancel(), _complete raises RuntimeError if already complete.

A Result is the base class for several callable subclasses which will receive values at a later point in time, and can also be used standalone without subclassing.

A call to a Result will block until the value is received or the Result is cancelled, which will raise an exception in the caller. A Result may be called by multiple users, before or after the value has been delivered; if the value has been delivered the caller returns with it immediately. A Result's state may be inspected (pending, running, ready, cancelled). Callbacks can be registered via a Result's .notify method.

An incomplete Result can be told to call a function to compute its value; the function return will be stored as the value unless the function raises an exception, in which case the exception information is recorded instead. If an exception occurred, it will be reraised for any caller of the Result.

Trite example:

R = Result(name="my demo")

Thread 1:

# this blocks until the Result is ready
value = R()
print(value)
# prints 3 once Thread 2 (below) assigns to it

Thread 2:

R.result = 3

Thread 3:

value = R()
# returns immediately with 3

You can also collect multiple Results in completion order using the report() function:

Rs = [ ... list of Results of whatever type ... ]
...
for R in report(Rs):
    x = R()     # collect result, will return immediately because
                # the Result is complete
    print(x)    # print result

Function after(Rs, R, func, *a, **kw)

After the completion of Rs call func(*a,**kw) and return its result via R; return the Result object.

Parameters:

  • Rs: an iterable of Results.
  • R: a Result to collect to result of calling func. If None, one will be created.
  • func, a, kw: a callable and its arguments.

Function bg(func, *a, **kw)

Dispatch a Thread to run func, return a Result to collect its value.

Parameters:

  • _name: optional name for the Result, passed to the initialiser
  • _extra: optional extra data for the Result, passed to the initialiser

Other parameters are passed to func.

Function call_in_thread(func, *a, **kw)

Run func(*a,**kw) in a separate Thread via the @in_thread decorator. Return or exception is as for the original function.

Function in_thread(func)

Decorator to evaluate func in a separate Thread. Return or exception is as for the original function.

This exists to step out of the current Thread's thread local context, such as a database transaction associated with Django's implicit per-Thread database context.

Class OnDemandFunction(Result)

Wrap a callable, run it when required.

Class OnDemandResult(Result)

Wrap a callable, run it when required.

Function report(LFs)

Generator which yields completed Results.

This is a generator that yields Results as they complete, useful for waiting for a sequence of Results that may complete in an arbitrary order.

Class Result(cs.fsm.FSM)

Base class for asynchronous collection of a result. This is used to make Result, OnDemandFunctions, LateFunctions and other objects with asynchronous termination.

In addition to the methods below, for each state value such as self.PENDING there is a corresponding attribute is_pending testing whether the Result is in that state.

Method Result.__init__(self, name=None, *, lock=None, result=None, state=None, extra=None): Base initialiser for Result objects and subclasses.

Parameter:

  • name: optional parameter naming this object.
  • lock: optional locking object, defaults to a new threading.Lock.
  • result: if not None, prefill the .result property.
  • extra: an optional mapping of extra information to associate with the Result, useful to provide context when collecting the result; the Result has a public attribute .extra which is an AttrableMapping to hold this information.

Method Result.__call__(self, *a, **kw): Call the Result: wait for it to be ready and then return or raise.

You can optionally supply a callable and arguments, in which case callable(*args,**kwargs) will be called via Result.call and the results applied to this Result.

Basic example:

R = Result()
... hand R to something which will fulfil it later ...
x = R() # wait for fulfilment - value lands in x

Direct call:

R = Result()
... pass R to something which wants the result ...
# call func(1,2,z=3), save result in R
# ready for collection by whatever received R
R(func,1,2,z=3)

Method Result.bg(self, func, *a, **kw): Submit a function to compute the result in a separate Thread, returning the Thread.

Keyword arguments for cs.threads.bg may be supplied by prefixing their names with an underscore, for example:

T = R.bg(mainloop, _pre_enter_objects=(S, fs))

This dispatches a Thread to run self.run_func(func,*a,**kw) and as such the Result must be in "pending" state, and transitions to "running".

Method Result.cancel(self, msg: Optional[str] = None): Cancel this Result.

Property Result.cancelled: Test whether this Result has been cancelled. Obsolete: use .is_cancelled.

Method Result.empty(self): Analogue to Queue.empty().

Property Result.exc_info: The exception information from a completed Result. This is not available before completion. Accessing this on a cancelled Result raises CancellationError.

Method Result.get(self, default=None): Wait for readiness; return the result if self.exc_info is None, otherwise default.

Method Result.join(self): Calling the .join() method waits for the function to run to completion and returns a tuple of (result,exc_info).

On completion the sequence (result,None) is returned. If an exception occurred computing the result the sequence (None,exc_info) is returned where exc_info is a tuple of (exc_type,exc_value,exc_traceback). If the function was cancelled the sequence (None,None) is returned.

Method Result.notify(self, notifier: Callable[[ForwardRef('Result')], NoneType]): After the Result completes, call notifier(self).

If the Result has already completed this will happen immediately. If you'd rather self got put on some queue Q, supply Q.put.

Property Result.pending: Whether the Result is pending. Obsolete: use .is_pending.

Method Result.post_notify(self, post_func) -> 'Result': Return a secondary Result which processes the result of self.

After the self completes, call post_func(retval) where retval is the result of self, and use that to complete the secondary Result.

Important note: because the completion lock object is released after the internal FSM.fsm_event call, the callback used to implement .post_notify is fired before the lock object is released. As such, it would deadlock as it waits for completion of self by using that lock. Therefore the callback dispatches a separate Thread to wait for self and then run post_func.

Example:

# submit packet to data stream
R = submit_packet()
# arrange that when the response is received, decode the response
R2 = R.post_notify(lambda response: decode(response))
# collect decoded response
decoded = R2()

If the Result has already completed this will happen immediately.

Method Result.put(self, value): Store the value. Queue-like idiom.

Method Result.raise_(self, exc=None): Convenience wrapper for self.exc_info to store an exception result exc. If exc is omitted or None, uses sys.exc_info().

Examples:

# complete the result using the current exception state
R.raise_()

# complete the result with an exception type
R.raise_(RuntimeError)

# complete the result with an exception
R.raise_(ValueError("bad value!"))

Property Result.ready: True if the Result state is DONE or CANCELLED..

Property Result.result: The result. This property is not available before completion. Accessing this on a cancelled Result raises CancellationError.

Method Result.run_func(self, func, *a, **kw): Fulfil the Result by running func(*a,**kw).

Method Result.run_func_in_thread(self, func, *a, **kw): Fulfil the Result by running func(*a,**kw) in a separate Thread.

This exists to step out of the current Thread's thread local context, such as a database transaction associated with Django's implicit per-Thread database context.

Property Result.state: The FSM state (obsolete). Obsolete: use .fsm_state.

Class ResultSet(builtins.set)

A set subclass containing Results, on which one may iterate as Results complete.

Method ResultSet.__iter__(self): Iterating on a ResultSet yields Results as they complete.

Method ResultSet.wait(self): Convenience function to wait for all the Results.

Release Log

Release 20240630:

  • Move CancellationError from cs.result to cs.fsm.
  • report: avoid iterating over the live set.
  • Result: drop the PREPARE state, allow 'cancel' no-op event in CANCELLED and DONE states.
  • Result.result,exc_info getters: raise CancellationError if the Result is cancelled.
  • Result._complete: fire the 'complete' event after the ._result and ._exc_info attributes are set but still before the self._get_lock.release().
  • Result: preserve message passed to .cancel(), _complete raises RuntimeError if already complete.

Release 20240412: Result.bg: plumb _foo arguments to cs.threads.bg as foo.

Release 20240316: Fixed release upload artifacts.

Release 20240305: Result.str: handle early use where dict lacks various entries.

Release 20231221: Doc update.

Release 20231129: Result.del: issue a warning about no collection instead of raising an exception.

Release 20230331: Result.join: access self._result instead of the property.

Release 20230212:

  • Result._complete: release self._get_lock before firing the event, as the event is what fires the notifiers.
  • Result.notify: when we make a direct notifier call, call the notifier outside the lock and remember to set self.collected=True.
  • Result: new post_notify() method to queue a function of the Result.result, returning a Result for the completion of the post function.

Release 20221207: CancellationError: accept keyword arguments, apply as attributes.

Release 20221118:

  • CancellationError: rename msg to message.
  • Result.run_func_in_thread: new method to run an arbitrary function in a separate Thread and return it via the Result.
  • New @in_thread decorator to cause a function to run in a separate Thread using Result.run_in_thread.
  • New call_in_thread to run an arbitrary function in a distinct Thread.
  • @in_thread: expose the original function as the decorated function's .direct attribute.

Release 20220918: OnDemandResult: modern "pending" check.

Release 20220805: Result now subclasses cs.fsm.FSM.

Release 20220311:

  • Result: class local Seq instance.
  • Result.call: thread safe runtime check of self.state==pending.
  • New Task and @task decorator, prototype for rerunnable blocking chaining tasks scheme - very alpha.

Release 20210420: Update dependencies, add docstring.

Release 20210407: New ResultSet(set) class, with context manager and wait methods, and whose iter iterates completed Results.

Release 20210123: bg: accept optional _extra parameter for use by the Result.

Release 20201102: Result: now .extra attribute for associated data and a new optional "extra" parameter in the initialiser.

Release 20200521:

  • OnDemandResult: bugfixes and improvements.
  • Result.bg: accept optional _name parameter to specify the Result.name.

Release 20191007:

  • Simplify ResultState definition.
  • Result.bg: use cs.threads.bg to dispatch the Thread.

Release 20190522:

  • Result.call now accepts an optional callable and args.
  • Result.call: set the Result state to "running" before dispatching the function.
  • Rename OnDemandFunction to OnDemandResult, keep old name around for compatibility.
  • Result._complete: also permitted if state==cancelled.

Release 20190309: Small bugfix.

Release 20181231:

  • Result.call: report baser exceptions than BaseException.
  • Drop _PendingFunction abstract class.

Release 20181109.1: DISTINFO update.

Release 20181109:

  • Derive CancellationError from Exception instead of RuntimeError, fix initialiser.
  • Rename AsynchState to ResultState and make it an Enum.
  • Make Results hashable and comparable for equality for use as mapping keys: equality is identity.
  • New Result.collected attribute, set true if .result or .exc_info are accessed, logs an error if Result.del is called when false, may be set true externally if a Result is not required.
  • Drop final parameter; never used and supplanted by Result.notify.
  • Result.join: return the .result and .exc_info properties in order to mark the Result as collected.
  • Result: set .collected to True when a notifier has been called successfully.
  • Bugfix Result.cancel: apply the new cancelled state.

Release 20171231:

  • Bugfix Result.call to catch BaseException instead of Exception.
  • New convenience function bg(func) to dispatch func in a separate Thread and return a Result to collect its value.

Release 20171030.1: Fix module requirements specification.

Release 20171030: New Result.bg(func, *a, **kw) method to dispatch function in separate Thread to compute the Result value.

Release 20170903: rename cs.asynchron to cs.result

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cs.result-20240630.tar.gz (13.7 kB view details)

Uploaded Source

Built Distribution

cs.result-20240630-py3-none-any.whl (12.5 kB view details)

Uploaded Python 3

File details

Details for the file cs.result-20240630.tar.gz.

File metadata

  • Download URL: cs.result-20240630.tar.gz
  • Upload date:
  • Size: 13.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.6

File hashes

Hashes for cs.result-20240630.tar.gz
Algorithm Hash digest
SHA256 27d7c490b66a905968c17d46cda96b3cdf6620f353cb44ccfbe3151cd92003a5
MD5 3fb809914020629204af4e6252acfffb
BLAKE2b-256 6efd996c67b392889e8d4c6ba96258118d48426fe3d66070e13169a77cb4f293

See more details on using hashes here.

File details

Details for the file cs.result-20240630-py3-none-any.whl.

File metadata

  • Download URL: cs.result-20240630-py3-none-any.whl
  • Upload date:
  • Size: 12.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.6

File hashes

Hashes for cs.result-20240630-py3-none-any.whl
Algorithm Hash digest
SHA256 518a8e5da1ff562587c3c0f9e66c179c71f719388975021be76fca6ce7eadb36
MD5 fc045962a5d1b5bae6888b5e5b31cd8f
BLAKE2b-256 e58de546b30076698c91a611ad48dcd21958abafd98135c0118c83ecd350c57f

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page