UNKNOWN
Project description
A small set of utilities to help with writing synchronous code flows in a collaborative multitasking context. It has been designed around the feature set of gevent (http://www.gevent.org)
deferred calls
async.DeferredCallHandler is a wrapper for asynchronously handled function calls. This allows to control in which context the execution of those functions are done, which is essential in collaborative multitasking.
- There are 2 available types of calls:
sync (synchronous): this type of call awaits for the deferred call handle to process the call to return. for a user’s perspective, it behaves like a regular function call.
oneway (one way): this type of call returns instantly. Due to its nature, there is no way to know whether, once it has been processed, it has succeeded or failed.
Example
For instance, imagining we have a Manager entity that must handle some resources in an atomic manner:
from async import DeferredCallHandler
class Manager(DeferredCallHandler):
def manage(self):
# do things with resources
# with the assurance that the resources won't
# be modified during process
self.process() # process pending calls
# do more things
def access_resources(self):
#returns the resources the manager has properly managed.
def update_resource(self, data):
#updates a resource info
def run(self):
while True:
self.manage()
We can startup the manager and call functions on it from multiple greenlets:
manager = Manager()
gevent.spawn(manager.run)
# At that point, the manager entity is will be doing resource management
resources = ... # we have an array of resources
def monitor(target):
for event in target.events():
# we could apply some transformation to the event, and then
# forward it to the manager.
manager.oneway.update_resource(event)
for resource in resources:
gevent.spawn(monitor, resource)
def consumer():
while True:
resources = manager.access_resources()
# at that point, we have the guarantee that the resources
# are properly managed and will not become stale or corrupted during process.
consumer()
DeferredCallHandler API documentation
def process(forever=False, whitelist=None):
Processes all the the pending deferred calls.
If forever is set to True, process will remain waiting for new calls until a call to stop_processing() is performed.
If whitelist is set as a list of string, only functions which names match the elements in the white list will be executed.
def stop_processing():
Interrupts the iteration through incoming calls of a DeferredCallHandler’s call to process(forever=True).
Exceptions
sync calls will forward exceptions just like regular functions:
from async import DeferredCallHandler
class Lemming(DeferredCallHandler):
def kaboom(self):
raise Exception("#high pitched# oh no!")
lemming = Lemming()
spawn(lemming.process, forever=True)
try:
lemming.sync.kaboom()
except Exception:
pass # We should hit that
# This should trigger the exception but produce an exception log entry.
lemming.oneway.kaboom()
Regular function calls
DeferredCallHandler objects don’t prevent direct function calls. Use at your own risk:
from async import DeferredCallHandler
class Manager(DeferredCallHandler):
def manage(self):
# do things with resources
# with the assurance that the resources won't
# be modified during process
self.process() # process pending calls
# do more things
def access_resources(self):
#returns the resources the manager has properly managed.
def update_resource(self, data):
#updates a resource info
def run(self):
while True:
self.manage()
manager = Manager()
gevent.spawn(manager.run)
resources = manager.access_resources()
# !!! The resources may be in the middle of a management process and their state
# may be incoherent
resources = manager.sync.access_resources()
# In that case, we're guaranteed the management process is not running.
Timeouts
sync calls can be specified with an optional timeout, to ensure actions are performed within a given time frame:
from async import DeferredCallHandler
class ABitSlow(DeferredCallHandler):
def taking_my_time(self):
gevent.sleep(10)
slow = ABitSlow()
spawn(slow.process, forever=True)
try:
slow.sync(timeout=1).taking_my_time()
except gevent.Timeout:
pass # We should hit that
multitask state handling
Partially inspired by the mechanism of tail recursion, we provide a way to contain and handle code to manage the behaviour of state machines within greenlets.
The @state decorator transforms a function method into a state greenlet. When another state function is invoked, it create a new state greenlet that replaces the current state greenlet, effectively replicating the behaviour of tail recursion.
For instance:
@state(transitions_to="growing")
def sprouting()
# germination process here
growing() # the sprouting greenlet terminates and leaves way to the growing one
@state(transitions_to="flowering")
def growing()
# transform CO2 and sunlight to biomass
flowering() # the growing greenlet terminates and leaves way to the flowering one
@state(transitions_to=["dead", "withering"])
def flowering()
# Grow flowers
if is_eaten:
# parameters can be given to state changes.
dead(is_eaten=True) # the flowering greenlet terminates and leaves way to the dead one
else:
withering() # the flowering greenlet terminates and leaves way to the withering one
@state(transitions_to="dead")
def withering()
# Dry up
dead() # the withering greenlet terminates and leaves way to the dead one
@state # terminal state, no transitions
def dead(is_eaten=False)
if not is_eaten:
# clean up phase
sprouting() # spawns the initial state
The @state decorator can also be used for methods:
class Flower(object):
@state(transitions_to="growing")
def sprouting(self)
# germination process here
growing() # the sprouting greenlet terminates and leaves way to the growing one
# ...
Correct transitions must be specified by the transitions_to parameter or any incorrect transition will raise the ValidationError exception.
Callbacks
Callbacks can be defined on transition. By setting the on_start parameter to a state, a given callback will be activated whenever a state is started.
The expected callback signature is def on_start(state, *args, **kwargs), where state is the (at that point, still not started) async.state.State state greenlet which will handle the execution of the state and *args and **kwargs are the parameters given to the state call.
For instance:
def on_transition(new_state, target, *args, **kwargs):
if "store" in kwargs and kwargs["store"]:
target.state = new_state
class Object(object):
def __init__(self):
self.state = None
@state(on_start=on_transition)
def a_state(self, store=False):
pass
obj = Object()
obj.a_state(store=True)
sleep()
obj.state # => is now storing the current state object.
Execution started event
When a state object has started its execution (ie. when gevent has scheduled it for execution) an execution_started gevent.event.Event is set:
def on_transition(new_state, target):
target.state = new_state
class Object(object):
def __init__(self):
self.state = None
@state(on_start=on_transition)
def a_state(self):
pass
obj = Object()
obj.a_state()
# at that point, obj.state is set
# now we wait until it's scheduled
obj.state.execution_started.wait()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file gevent_async-0.8.3.dev2-g9c0eaf9.tar.gz
.
File metadata
- Download URL: gevent_async-0.8.3.dev2-g9c0eaf9.tar.gz
- Upload date:
- Size: 7.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | c2e1ca9aad4f24de60e629c3d36ede85ecd8a957b8b7c58fb88dd852222f2684 |
|
MD5 | fb59fcec12a9ed08ba95f0563e7d84d2 |
|
BLAKE2b-256 | 79ea0c71597736ebf746b00313a811178e5b6790d97f098ec56cf1cfdbf5b9fe |