Task integrations for AWS StepFunctions
Project description
Tasks for AWS Step Functions
---The APIs of higher level constructs in this module are experimental and under active development. They are subject to non-backward compatible changes or removal in any future version. These are not subject to the Semantic Versioning model and breaking changes will be announced in the release notes. This means that while you may use them, you may need to update your source code when upgrading to a newer version of this package.
AWS Step Functions is a web service that enables you to coordinate the components of distributed applications and microservices using visual workflows. You build applications from individual components that each perform a discrete function, or task, allowing you to scale and change applications quickly.
A Task state represents a single unit of work performed by a state machine. All work in your state machine is performed by tasks.
This module is part of the AWS Cloud Development Kit project.
Table Of Contents
Task
A Task state represents a single unit of work performed by a state machine. In the
CDK, the exact work to be In the CDK, the exact work to be done is determined by
a class that implements IStepFunctionsTask
.
AWS Step Functions integrates with some AWS services so that you can call API actions, and coordinate executions directly from the Amazon States Language in Step Functions. You can directly call and pass parameters to the APIs of those services.
Task parameters from the state JSON
Most tasks take parameters. Parameter values can either be static, supplied directly
in the workflow definition (by specifying their values), or a value available at runtime
in the state machine's execution (either as its input or an output of a prior state).
Parameter values available at runtime can be specified via the Data
class,
using methods such as Data.stringAt()
.
If so, the value is taken from the indicated location in the state JSON,
similar to (for example) inputPath
.
Evaluate Expression
Use the EvaluateExpression
to perform simple operations referencing state paths. The
expression
referenced in the task will be evaluated in a Lambda function
(eval()
). This allows you to not have to write Lambda code for simple operations.
Example: convert a wait time from milliseconds to seconds, concat this in a message and wait:
# Example automatically generated without compilation. See https://github.com/aws/jsii/issues/826
convert_to_seconds = sfn.Task(self, "Convert to seconds",
task=tasks.EvaluateExpression(expression="$.waitMilliseconds / 1000"),
result_path="$.waitSeconds"
)
create_message = sfn.Task(self, "Create message",
# Note: this is a string inside a string.
task=tasks.EvaluateExpression(
expression="`Now waiting ${$.waitSeconds} seconds...`",
runtime=lambda.Runtime.NODEJS_10_X
),
result_path="$.message"
)
publish_message = sfn.Task(self, "Publish message",
task=tasks.PublishToTopic(topic,
message=sfn.TaskInput.from_data_at("$.message")
),
result_path="$.sns"
)
wait = sfn.Wait(self, "Wait",
time=sfn.WaitTime.seconds_path("$.waitSeconds")
)
sfn.StateMachine(self, "StateMachine",
definition=convert_to_seconds.next(create_message).next(publish_message).next(wait)
)
The EvaluateExpression
supports a runtime
prop to specify the Lambda
runtime to use to evaluate the expression. Currently, the only runtime
supported is lambda.Runtime.NODEJS_10_X
.
Batch
Step Functions supports Batch through the service integration pattern.
SubmitJob
The SubmitJob API submits an AWS Batch job from a job definition.
# Example automatically generated without compilation. See https://github.com/aws/jsii/issues/826
import aws_cdk.aws_batch as batch
batch_queue = batch.JobQueue(self, "JobQueue",
compute_environments=[JobQueueComputeEnvironment(
order=1,
compute_environment=batch.ComputeEnvironment(self, "ComputeEnv",
compute_resources=ComputeResources(vpc=vpc)
)
)
]
)
batch_job_definition = batch.JobDefinition(self, "JobDefinition",
container=JobDefinitionContainer(
image=ecs.ContainerImage.from_asset(path.resolve(__dirname, "batchjob-image"))
)
)
task = sfn.Task(self, "Submit Job",
task=tasks.RunBatchJob(
job_definition=batch_job_definition,
job_name="MyJob",
job_queue=batch_queue
)
)
DynamoDB
You can call DynamoDB APIs from a Task
state.
Read more about calling DynamoDB APIs here
GetItem
The GetItem operation returns a set of attributes for the item with the given primary key.
# Example automatically generated without compilation. See https://github.com/aws/jsii/issues/826
sfn.Task(self, "Get Item",
task=tasks.CallDynamoDB.get_item(
partition_key={
"name": "messageId",
"value": tasks.DynamoAttributeValue().with_s("message-007")
},
table_name="my-table"
)
)
PutItem
The PutItem operation creates a new item, or replaces an old item with a new item.
# Example automatically generated without compilation. See https://github.com/aws/jsii/issues/826
sfn.Task(self, "PutItem",
task=tasks.CallDynamoDB.put_item(
item={
"MessageId": tasks.DynamoAttributeValue().with_s("message-007"),
"Text": tasks.DynamoAttributeValue().with_s(sfn.Data.string_at("$.bar")),
"TotalCount": tasks.DynamoAttributeValue().with_n("10")
},
table_name="my-table"
)
)
DeleteItem
The DeleteItem operation deletes a single item in a table by primary key.
# Example automatically generated without compilation. See https://github.com/aws/jsii/issues/826
sfn.Task(self, "DeleteItem",
task=tasks.CallDynamoDB.delete_item(
partition_key={
"name": "MessageId",
"value": tasks.DynamoAttributeValue().with_s("message-007")
},
table_name="my-table"
),
result_path="DISCARD"
)
UpdateItem
The UpdateItem operation edits an existing item's attributes, or adds a new item to the table if it does not already exist.
# Example automatically generated without compilation. See https://github.com/aws/jsii/issues/826
update_item_task = sfn.Task(self, "UpdateItem",
task=tasks.CallDynamoDB.update_item(
partition_key={
"name": "MessageId",
"value": tasks.DynamoAttributeValue().with_s("message-007")
},
table_name="my-table",
expression_attribute_values={
":val": tasks.DynamoAttributeValue().with_n(sfn.Data.string_at("$.Item.TotalCount.N")),
":rand": tasks.DynamoAttributeValue().with_n("20")
},
update_expression="SET TotalCount = :val + :rand"
)
)
ECS
Step Functions supports ECS/Fargate through the service integration pattern.
RunTask
RunTask starts a new task using the specified task definition.
# Example automatically generated without compilation. See https://github.com/aws/jsii/issues/826
import aws_cdk.aws_ecs as ecs
# See examples in ECS library for initialization of 'cluster' and 'taskDefinition'
ecs.RunEcsFargateTask(
cluster=cluster,
task_definition=task_definition,
container_overrides=[{
"container_name": "TheContainer",
"environment": [{
"name": "CONTAINER_INPUT",
"value": Data.string_at("$.valueFromStateData")
}
]
}
]
)
fargate_task.connections.allow_to_default_port(rds_cluster, "Read the database")
sfn.Task(self, "CallFargate",
task=fargate_task
)
EMR
Step Functions supports Amazon EMR through the service integration pattern. The service integration APIs correspond to Amazon EMR APIs but differ in the parameters that are used.
Read more about the differences when using these service integrations.
Create Cluster
Creates and starts running a cluster (job flow).
Corresponds to the runJobFlow
API in EMR.
# Example automatically generated without compilation. See https://github.com/aws/jsii/issues/826
cluster_role = iam.Role(stack, "ClusterRole",
assumed_by=iam.ServicePrincipal("ec2.amazonaws.com")
)
service_role = iam.Role(stack, "ServiceRole",
assumed_by=iam.ServicePrincipal("elasticmapreduce.amazonaws.com")
)
auto_scaling_role = iam.Role(stack, "AutoScalingRole",
assumed_by=iam.ServicePrincipal("elasticmapreduce.amazonaws.com")
)
auto_scaling_role.assume_role_policy.add_statements(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
principals=[
iam.ServicePrincipal("application-autoscaling.amazonaws.com")
],
actions=["sts:AssumeRole"
]
))
sfn.Task(stack, "Create Cluster",
task=tasks.EmrCreateCluster(
instances={},
cluster_role=cluster_role,
name=sfn.TaskInput.from_data_at("$.ClusterName").value,
service_role=service_role,
auto_scaling_role=auto_scaling_role,
integration_pattern=sfn.ServiceIntegrationPattern.FIRE_AND_FORGET
)
)
Termination Protection
Locks a cluster (job flow) so the EC2 instances in the cluster cannot be terminated by user intervention, an API call, or a job-flow error.
Corresponds to the setTerminationProtection
API in EMR.
# Example automatically generated without compilation. See https://github.com/aws/jsii/issues/826
sfn.Task(stack, "Task",
task=tasks.EmrSetClusterTerminationProtection(
cluster_id="ClusterId",
termination_protected=False
)
)
Terminate Cluster
Shuts down a cluster (job flow).
Corresponds to the terminateJobFlows
API in EMR.
# Example automatically generated without compilation. See https://github.com/aws/jsii/issues/826
sfn.Task(stack, "Task",
task=tasks.EmrTerminateCluster(
cluster_id="ClusterId"
)
)
Add Step
Adds a new step to a running cluster.
Corresponds to the addJobFlowSteps
API in EMR.
# Example automatically generated without compilation. See https://github.com/aws/jsii/issues/826
sfn.Task(stack, "Task",
task=tasks.EmrAddStep(
cluster_id="ClusterId",
name="StepName",
jar="Jar",
action_on_failure=tasks.ActionOnFailure.CONTINUE
)
)
Cancel Step
Cancels a pending step in a running cluster.
Corresponds to the cancelSteps
API in EMR.
# Example automatically generated without compilation. See https://github.com/aws/jsii/issues/826
sfn.Task(stack, "Task",
task=tasks.EmrCancelStep(
cluster_id="ClusterId",
step_id="StepId"
)
)
Modify Instance Fleet
Modifies the target On-Demand and target Spot capacities for the instance fleet with the specified InstanceFleetName.
Corresponds to the modifyInstanceFleet
API in EMR.
# Example automatically generated without compilation. See https://github.com/aws/jsii/issues/826
sfn.Task(stack, "Task",
task=tasks.EmrModifyInstanceFleetByName(
cluster_id="ClusterId",
instance_fleet_name="InstanceFleetName",
target_on_demand_capacity=2,
target_spot_capacity=0
)
)
Modify Instance Group
Modifies the number of nodes and configuration settings of an instance group.
Corresponds to the modifyInstanceGroups
API in EMR.
# Example automatically generated without compilation. See https://github.com/aws/jsii/issues/826
sfn.Task(stack, "Task",
task=tasks.EmrModifyInstanceGroupByName(
cluster_id="ClusterId",
instance_group_name=sfn.Data.string_at("$.InstanceGroupName"),
instance_group={
"instance_count": 1
}
)
)
Glue
Step Functions supports AWS Glue through the service integration pattern.
You can call the StartJobRun
API from a Task
state.
# Example automatically generated without compilation. See https://github.com/aws/jsii/issues/826
sfn.Task(stack, "Task",
task=tasks.RunGlueJobTask(job_name,
arguments={
"key": "value"
},
timeout=cdk.Duration.minutes(30),
notify_delay_after=cdk.Duration.minutes(5)
)
)
Lambda
Invoke a Lambda function.
You can specify the input to your Lambda function through the payload
attribute.
By default, Step Functions invokes Lambda function with the state input (JSON path '$')
as the input.
The following snippet invokes a Lambda Function with the state input as the payload
by referencing the $
path.
# Example automatically generated without compilation. See https://github.com/aws/jsii/issues/826
sfn.Task(self, "Invoke with state input")
When a function is invoked, the Lambda service sends these response elements back.
⚠️ The response from the Lambda function is in an attribute called Payload
The following snippet invokes a Lambda Function by referencing the $.Payload
path
to reference the output of a Lambda executed before it.
# Example automatically generated without compilation. See https://github.com/aws/jsii/issues/826
sfn.Task(self, "Invoke with empty object as payload",
task=tasks.RunLambdaTask(my_lambda,
payload=sfn.TaskInput.from_object()
)
)
sfn.Task(self, "Invoke with payload field in the state input",
task=tasks.RunLambdaTask(my_other_lambda,
payload=sfn.TaskInput.from_data_at("$.Payload")
)
)
The following snippet invokes a Lambda and sets the task output to only include the Lambda function response.
# Example automatically generated without compilation. See https://github.com/aws/jsii/issues/826
sfn.Task(self, "Invoke and set function response as task output",
task=tasks.RunLambdaTask(my_lambda,
payload=sfn.TaskInput.from_data_at("$")
),
output_path="$.Payload"
)
You can have Step Functions pause a task, and wait for an external process to return a task token. Read more about the callback pattern
To use the callback pattern, set the token
property on the task. Call the Step
Functions SendTaskSuccess
or SendTaskFailure
APIs with the token to
indicate that the task has completed and the state machine should resume execution.
The following snippet invokes a Lambda with the task token as part of the input to the Lambda.
# Example automatically generated without compilation. See https://github.com/aws/jsii/issues/826
task = sfn.Task(stack, "Invoke with callback",
task=tasks.RunLambdaTask(my_lambda,
integration_pattern=sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN,
payload={
"token": sfn.Context.task_token,
"input": sfn.TaskInput.from_data_at("$.someField")
}
)
)
⚠️ The task will pause until it receives that task token back with a SendTaskSuccess
or SendTaskFailure
call. Learn more about Callback with the Task
Token.
SageMaker
Step Functions supports AWS SageMaker through the service integration pattern.
Create Training Job
You can call the CreateTrainingJob
API from a Task
state.
# Example automatically generated without compilation. See https://github.com/aws/jsii/issues/826
sfn.Task(stack, "TrainSagemaker",
task=tasks.SagemakerTrainTask(
training_job_name=sfn.Data.string_at("$.JobName"),
role=role,
algorithm_specification={
"algorithm_name": "BlazingText",
"training_input_mode": tasks.InputMode.FILE
},
input_data_config=[{
"channel_name": "train",
"data_source": {
"s3_data_source": {
"s3_data_type": tasks.S3DataType.S3_PREFIX,
"s3_location": tasks.S3Location.from_json_expression("$.S3Bucket")
}
}
}
],
output_data_config={
"s3_output_location": tasks.S3Location.from_bucket(s3.Bucket.from_bucket_name(stack, "Bucket", "mybucket"), "myoutputpath")
},
resource_config={
"instance_count": 1,
"instance_type": ec2.InstanceType.of(ec2.InstanceClass.P3, ec2.InstanceSize.XLARGE2),
"volume_size_in_gB": 50
},
stopping_condition={
"max_runtime": cdk.Duration.hours(1)
}
)
)
Create Transform Job
You can call the CreateTransformJob
API from a Task
state.
# Example automatically generated without compilation. See https://github.com/aws/jsii/issues/826
transform_job = tasks.SagemakerTransformTask(transform_job_name, "MyTransformJob", model_name, "MyModelName", role, transform_input, {
"transform_data_source": {
"s3_data_source": {
"s3_uri": "s3://inputbucket/train",
"s3_data_type": S3DataType.S3Prefix
}
}
}, transform_output, {
"s3_output_path": "s3://outputbucket/TransformJobOutputPath"
}, transform_resources,
instance_count=1,
instance_type=ec2.InstanceType.of(ec2.InstanceClass.M4, ec2.InstanceSize.XLarge)
)
task = sfn.Task(self, "Batch Inference",
task=transform_job
)
SNS
Step Functions supports Amazon SNS through the service integration pattern.
You can call the Publish
API from a Task
state to publish to an SNS topic.
# Example automatically generated without compilation. See https://github.com/aws/jsii/issues/826
import aws_cdk.aws_sns as sns
# ...
topic = sns.Topic(self, "Topic")
# Use a field from the execution data as message.
task1 = sfn.Task(self, "Publish1",
task=tasks.PublishToTopic(topic,
integration_pattern=sfn.ServiceIntegrationPattern.FIRE_AND_FORGET,
message=TaskInput.from_data_at("$.state.message")
)
)
# Combine a field from the execution data with
# a literal object.
task2 = sfn.Task(self, "Publish2",
task=tasks.PublishToTopic(topic,
message=TaskInput.from_object(
field1="somedata",
field2=Data.string_at("$.field2")
)
)
)
Step Functions
You can manage AWS Step Functions executions.
AWS Step Functions supports it's own StartExecution
API as a service integration.
# Example automatically generated without compilation. See https://github.com/aws/jsii/issues/826
# Define a state machine with one Pass state
child = sfn.StateMachine(stack, "ChildStateMachine",
definition=sfn.Chain.start(sfn.Pass(stack, "PassState"))
)
# Include the state machine in a Task state with callback pattern
task = sfn.Task(stack, "ChildTask",
task=tasks.ExecuteStateMachine(child,
integration_pattern=sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN,
input={
"token": sfn.Context.task_token,
"foo": "bar"
},
name="MyExecutionName"
)
)
# Define a second state machine with the Task state above
sfn.StateMachine(stack, "ParentStateMachine",
definition=task
)
SQS
Step Functions supports Amazon SQS
You can call the SendMessage
API from a Task
state
to send a message to an SQS queue.
# Example automatically generated without compilation. See https://github.com/aws/jsii/issues/826
import aws_cdk.aws_sqs as sqs
# ...
queue = sns.Queue(self, "Queue")
# Use a field from the execution data as message.
task1 = sfn.Task(self, "Send1",
task=tasks.SendToQueue(queue,
message_body=TaskInput.from_data_at("$.message"),
# Only for FIFO queues
message_group_id="1234"
)
)
# Combine a field from the execution data with
# a literal object.
task2 = sfn.Task(self, "Send2",
task=tasks.SendToQueue(queue,
message_body=TaskInput.from_object(
field1="somedata",
field2=Data.string_at("$.field2")
),
# Only for FIFO queues
message_group_id="1234"
)
)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for aws-cdk.aws-stepfunctions-tasks-1.37.0.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 42144b68897c96d214cc1879a34b16b3afd4ba51b0b0dc479011e740eeb22246 |
|
MD5 | c49a6341b4c2688e3a02f919569af613 |
|
BLAKE2b-256 | 17bf5eeed6e04477c1516514db5f7678e73ecba27df68a3055945e4f0b1e7383 |
Hashes for aws_cdk.aws_stepfunctions_tasks-1.37.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0845481564789b35910219819077ef834646a6e8fc8f8ae7ceb237e30b3b1097 |
|
MD5 | a4c1864569a72a612697bb964366a5c2 |
|
BLAKE2b-256 | da8d80f872984079d488b92bc24395fefee78a62630f6b2b5ce7e1712293cde7 |