Skip to main content

Create AWS Step Functions asl.json with Python

Project description

pystepfunctions

Create AWS Stepfunction asl.json files (state machine definition files) using python.
Pre-made tasks that are easy to use.
Test dataflow through the stepfunction using the same python code.
Visualise the stepfunction using pyvis for easier dubugging without the use of the AWS console.

Docs

https://mrhopko.github.io/pystepfunction/pystepfunction/

Installation

pip3 install pystepfunction

Complete example

from logging import getLogger
from pystepfunction.tasks import (
    LambdaTask,
    GlueTask,
    PassTask,
    SucceedTask,
    FailTask,
)
from pystepfunction.branch import Branch
from pystepfunction.viz import BranchViz
from pystepfunction.state import StateMachine
from pystepfunction.errors import ERROR_STATE_ALL

logger = getLogger(__name__)

# create a lambda task
lambda_task = (
    LambdaTask("lambda_task", function_arn="aws::my-lambda-arn")
    .with_retry(error_equals=[ERROR_STATE_ALL], interval_seconds=1, max_attempts=3)
    .with_catcher(
        error=[ERROR_STATE_ALL],
        task=FailTask(
            "lambda_task_fail", cause="lambda task failed", error="MyLambdaError"
        ),
    )
    .with_resource_result({"Payload": {"Result": "LambdaResult"}})
    .with_output(
        result_path="$.LambdaTaskResult",
        result_selector={"SelectThis.$": "$.Payload.Result"},
    )
)

# create a glue task
glue_task = (
    GlueTask(
        "glue_task",
        job_name="my-glue-job-name",
        job_args={"job_input_arg.$": "$.LambdaTaskResult.SelectThis"},
    )
    .with_catcher(
        error=[ERROR_STATE_ALL],
        task=FailTask(
            "glue_task_fail", cause="glue task failed", error="MyGlueError"
        ),
    )
    .with_resource_result({"JobResult": "GlueResult"})
    .with_output(result_path="$.GlueTaskResult")
)

# chain them together and create a branch
lambda_task = lambda_task >> glue_task >> SucceedTask("succeeded")
branch = Branch(comment="Lambda and Glue", start_task=lambda_task)

# view the asl
print(branch)

# asl as a dict
asl = branch.to_asl()

# write the asl to a file
branch.write_asl("my_asl_file.asl.json")

# visualise the asl
BranchViz(branch).show()

# create a state machine
sm = StateMachine(state={"Input1": "Input1Value"}, logger=logger)
sm.apply_branch(branch)
sm.show_logs()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pystepfunction-1.1.129.tar.gz (21.3 kB view details)

Uploaded Source

Built Distribution

pystepfunction-1.1.129-py3-none-any.whl (20.7 kB view details)

Uploaded Python 3

File details

Details for the file pystepfunction-1.1.129.tar.gz.

File metadata

  • Download URL: pystepfunction-1.1.129.tar.gz
  • Upload date:
  • Size: 21.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.5

File hashes

Hashes for pystepfunction-1.1.129.tar.gz
Algorithm Hash digest
SHA256 fcb32626d2070c9602b8c5163371d0b2b643b36a9cad6ba5e33e25010e1b359c
MD5 5a2b1d81a2a0adb6888789590af7b11d
BLAKE2b-256 3f3e8f7acb91b7ee8020da05904065f92ba50979bc05f1e65fe87c7df9615e94

See more details on using hashes here.

File details

Details for the file pystepfunction-1.1.129-py3-none-any.whl.

File metadata

File hashes

Hashes for pystepfunction-1.1.129-py3-none-any.whl
Algorithm Hash digest
SHA256 10a85f436d8c1eab0789aa751f94bf6242cc8e49d107b60ed165c0f18f015708
MD5 d5a153704284c899ca57f8f5d28334aa
BLAKE2b-256 56990f915de7bb7c2d554f0fd5241fe5d78385e7cf0c5f1b1561ec304d82e613

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page