Skip to main content

Create AWS Step Functions asl.json with Python

Project description

pystepfunctions

Create AWS Stepfunction asl.json files (state machine definition files) using python.
Pre-made tasks that are easy to use.
Test dataflow through the stepfunction using the same python code.
Visualise the stepfunction using pyvis for easier dubugging without the use of the AWS console.

Docs

https://mrhopko.github.io/pystepfunction/pystepfunction/

Installation

pip3 install pystepfunctions

Complete example

from logging import getLogger
from pystepfunction.tasks import (
    LambdaTask,
    GlueTask,
    PassTask,
    SucceedTask,
    FailTask,
)
from pystepfunction.branch import Branch
from pystepfunction.viz import BranchViz
from pystepfunction.state import StateMachine
from pystepfunction.errors import ERROR_STATE_ALL

logger = getLogger(__name__)

# create a lambda task
lambda_task = (
    LambdaTask("lambda_task", function_arn="aws::my-lambda-arn")
    .with_retry(error_equals=[ERROR_STATE_ALL], interval_seconds=1, max_attempts=3)
    .with_catcher(
        error=[ERROR_STATE_ALL],
        task=FailTask(
            "lambda_task_fail", cause="lambda task failed", error="MyLambdaError"
        ),
    )
    .with_resource_result({"Payload": {"Result": "LambdaResult"}})
    .with_output(
        result_path="$.LambdaTaskResult",
        result_selector={"SelectThis.$": "$.Payload.Result"},
    )
)

# create a glue task
glue_task = (
    GlueTask(
        "glue_task",
        job_name="my-glue-job-name",
        job_args={"job_input_arg.$": "$.LambdaTaskResult.SelectThis"},
    )
    .with_catcher(
        error=[ERROR_STATE_ALL],
        task=FailTask(
            "glue_task_fail", cause="glue task failed", error="MyGlueError"
        ),
    )
    .with_resource_result({"JobResult": "GlueResult"})
    .with_output(result_path="$.GlueTaskResult")
)

# chain them together and create a branch
lambda_task = lambda_task >> glue_task >> SucceedTask("succeeded")
branch = Branch(comment="Lambda and Glue", start_task=lambda_task)

# view the asl
print(branch)

# asl as a dict
asl = branch.to_asl()

# write the asl to a file
branch.write_asl("my_asl_file.asl.json")

# visualise the asl
BranchViz(branch).show()

# create a state machine
sm = StateMachine(state={"Input1": "Input1Value"}, logger=logger)
sm.apply_branch(branch)
sm.show_logs()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pystepfunction-0.1.109.tar.gz (18.9 kB view details)

Uploaded Source

Built Distribution

pystepfunction-0.1.109-py3-none-any.whl (17.7 kB view details)

Uploaded Python 3

File details

Details for the file pystepfunction-0.1.109.tar.gz.

File metadata

  • Download URL: pystepfunction-0.1.109.tar.gz
  • Upload date:
  • Size: 18.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.5

File hashes

Hashes for pystepfunction-0.1.109.tar.gz
Algorithm Hash digest
SHA256 e343c20339c997aa67dbda586d1b9acdf5f56720a60a20f17fd0c67544a4eaed
MD5 11da9096f29f6a725a653a83d628d2c7
BLAKE2b-256 1b8489f3d1d064687ab4937ce2085ea277769c188fb9f2a955bfe3d7c9104f52

See more details on using hashes here.

File details

Details for the file pystepfunction-0.1.109-py3-none-any.whl.

File metadata

File hashes

Hashes for pystepfunction-0.1.109-py3-none-any.whl
Algorithm Hash digest
SHA256 423e3b42cf6c38e93e707665fe55ccecd0ff5a300347396e51b1a3aa03a2c805
MD5 651c1f63bc1a3377e58f861436972b87
BLAKE2b-256 46f74393683f5a4347d495d0f1a7d867e48dad695e9f93e18ea288085c32fd4d

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page