Create AWS Step Functions asl.json with Python
Project description
pystepfunctions
Create AWS Stepfunction asl.json files (state machine definition files) using python.
Pre-made tasks that are easy to use.
Test dataflow through the stepfunction using the same python code.
Visualise the stepfunction using pyvis for easier dubugging without the use of the AWS console.
Docs
https://mrhopko.github.io/pystepfunction/pystepfunction/
Installation
pip3 install pystepfunctions
Complete example
from logging import getLogger
from pystepfunction.tasks import (
LambdaTask,
GlueTask,
PassTask,
SucceedTask,
FailTask,
)
from pystepfunction.branch import Branch
from pystepfunction.viz import BranchViz
from pystepfunction.state import StateMachine
from pystepfunction.errors import ERROR_STATE_ALL
logger = getLogger(__name__)
# create a lambda task
lambda_task = (
LambdaTask("lambda_task", function_arn="aws::my-lambda-arn")
.with_retry(error_equals=[ERROR_STATE_ALL], interval_seconds=1, max_attempts=3)
.with_catcher(
error=[ERROR_STATE_ALL],
task=FailTask(
"lambda_task_fail", cause="lambda task failed", error="MyLambdaError"
),
)
.with_resource_result({"Payload": {"Result": "LambdaResult"}})
.with_output(
result_path="$.LambdaTaskResult",
result_selector={"SelectThis.$": "$.Payload.Result"},
)
)
# create a glue task
glue_task = (
GlueTask(
"glue_task",
job_name="my-glue-job-name",
job_args={"job_input_arg.$": "$.LambdaTaskResult.SelectThis"},
)
.with_catcher(
error=[ERROR_STATE_ALL],
task=FailTask(
"glue_task_fail", cause="glue task failed", error="MyGlueError"
),
)
.with_resource_result({"JobResult": "GlueResult"})
.with_output(result_path="$.GlueTaskResult")
)
# chain them together and create a branch
lambda_task = lambda_task >> glue_task >> SucceedTask("succeeded")
branch = Branch(comment="Lambda and Glue", start_task=lambda_task)
# view the asl
print(branch)
# asl as a dict
asl = branch.to_asl()
# write the asl to a file
branch.write_asl("my_asl_file.asl.json")
# visualise the asl
BranchViz(branch).show()
# create a state machine
sm = StateMachine(state={"Input1": "Input1Value"}, logger=logger)
sm.apply_branch(branch)
sm.show_logs()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
pystepfunction-0.1.102.tar.gz
(15.7 kB
view details)
Built Distribution
File details
Details for the file pystepfunction-0.1.102.tar.gz
.
File metadata
- Download URL: pystepfunction-0.1.102.tar.gz
- Upload date:
- Size: 15.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 7a3b526311f9ac8f4e0b1dfee1f2f015c313a4b3acb01995219d673e191a233d |
|
MD5 | 1aa4987e658c60ef616329770f8fd9f8 |
|
BLAKE2b-256 | 0aed593165354850a6b0196511e2be572b53032da7e91d446a3f4ecc70ffe799 |
File details
Details for the file pystepfunction-0.1.102-py3-none-any.whl
.
File metadata
- Download URL: pystepfunction-0.1.102-py3-none-any.whl
- Upload date:
- Size: 15.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | ba0e599f609b2896f089f30ba2f1d2d2a40022c07d53799b5b58d95dbb60cc3f |
|
MD5 | d6e01f256efc18699e676288325441c5 |
|
BLAKE2b-256 | fb329a6448e425d7f8210e5251966f4e9b526e8af15661e935f79848792851d9 |