AWS Step Function Activity server framework
Project description
stefuna
Stefuna is a simple AWS Step Function Activity server framework. It makes it incredibly quick and easy to write workers to process activity tasks in Python.
Implementation
Stefuna uses a multiprocessing Pool of pre-forked worker processes to run the activity tasks. The is a single instance of a Worker class in each worker process. To implement your task, simply create a Worker subclass, override the run_task(self, task_token, input_data) method and start the server.
The run_task method can do whatever work it requires and then return a result as a string or dict (which is automatically JSON stringified). It can be a long-running task but the worker process won’t be released until the method returns.
If run_task raises an exception, the task is failed with a Task.Failure error which can be handled in the Step Function state machine. Alternatively, a worker can call `self.send_task_failure(error, cause)` with a custom error string and return value from `run_task` will be ignored.
Configurable heartbeats are supported for longer-running tasks.
Getting Started
See the examples folder for the files described below.
Create a worker class, which is a subclass of the stefuna.Worker in the file hello_worker.py:
import logging
from stefuna import Worker
logger = logging.getLogger('stefuna.example')
class HelloWorker(Worker):
def init(self):
"""Initialize the single instance in a worker"""
pass
def run_task(self, task_token, input_data):
self.logger.debug('Worker in run_task')
# Do some work!
# Return value can be a string or a dict/array that
# will be JSON stringified.
return {"message": "Hello World"}
Create a config file hello_config.py, setting the worker class, server name, and activity ARN:
#
# Stefuna server worker config file
#
# The module path of the worker class
worker = 'examples.hello_worker.HelloWorker'
# The base name of the server that will be combined with host and pid
# and used when requesting tasks from AWS.
name = 'HelloExample'
# Set the ARN for the activity that this server will work on.
activity_arn = 'arn:aws:states:us-west-2:00000000000000:activity:hello'
# The number of worker processes.
# If None, it will be set to the number of cores.
processes = None
# Maximum number of seconds between heartbeats.
# None or 0 means there is no heartbeat.
heartbeat = 120
# Maximum number of tasks for a worker to run before the worker
# process is automatically killed and a new one created.
# If None, workers will not be killed.
maxtasksperchild = None
# The worker_config is an arbitrary dictionary that is available
# in the worker instance as self.config
# Use it for worker-specific configuration.
worker_config = {
'foo': 'bar'
}
Run the server:
$ stefuna --config=hello_config
History (Change Log)
See HISTORY.rst
License
MIT License
See LICENSE
History
0.8.3 [2017-09-18] * Add processes arg to stefuna * Add timestamp to log format
0.8.2 [2017-08-16] * Set the region based on region in activity ARN.
0.8.1 [2017-08-15] * First public release
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.