Skip to main content

Prefect integrations for interacting with Amazon Web Services.

Project description

prefect-aws

PyPI

Welcome

Build production-ready data workflows that seamlessly integrate with AWS services. prefect-aws provides battle-tested blocks, tasks, and infrastructure integrations for AWS, featuring support for ECS, S3, Secrets Manager, Lambda, Batch, and Glue.

Why use prefect-aws?

While you could integrate with AWS services directly using boto3, prefect-aws offers significant advantages:

  • Production-ready integrations: Pre-built, tested components that handle common AWS patterns and edge cases
  • Unified credential management: Secure, centralized authentication that works consistently across all AWS services
  • Built-in observability: Automatic logging, monitoring, and state tracking for all AWS operations in your workflows
  • Infrastructure as code: Deploy and scale your workflows on AWS ECS with minimal configuration
  • Error handling & retries: Robust error handling with intelligent retry logic for transient AWS service issues
  • Type safety: Full type hints and validation for all AWS service interactions

Getting started

Prerequisites

  • An AWS account and the necessary permissions to access desired services.

Installation

Install prefect-aws as a dependency of Prefect. If you don't already have prefect installed, it will install the newest version of prefect as well.

pip install "prefect[aws]"

Blocks setup

Credentials

Most AWS services require an authenticated session. Prefect makes it simple to provide credentials via AWS Credentials blocks.

Steps:

  1. Refer to the AWS Configuration documentation to retrieve your access key ID and secret access key.
  2. Copy the access key ID and secret access key.
  3. Create an AwsCredentials block in the Prefect UI or use a Python script like the one below.
from prefect_aws import AwsCredentials


AwsCredentials(
    aws_access_key_id="PLACEHOLDER",
    aws_secret_access_key="PLACEHOLDER",
    aws_session_token=None,  # replace this with token if necessary
    region_name="us-east-2"
).save("BLOCK-NAME-PLACEHOLDER")

Prefect uses the Boto3 library under the hood. To find credentials for authentication, any data not provided to the block are sourced at runtime in the order shown in the Boto3 docs. Prefect creates the session object using the values in the block and then, any missing values follow the sequence in the Boto3 docs.

S3

Create a block for reading and writing files to S3.

from prefect_aws import AwsCredentials
from prefect_aws.s3 import S3Bucket

S3Bucket(
    bucket_name="BUCKET-NAME-PLACEHOLDER",
    credentials=aws_credentials
).save("S3-BLOCK-NAME-PLACEHOLDER")

Lambda

Invoke AWS Lambdas, synchronously or asynchronously.

from prefect_aws.lambda_function import LambdaFunction
from prefect_aws.credentials import AwsCredentials

LambdaFunction(
    function_name="test_lambda_function",
    aws_credentials=credentials,
).save("LAMBDA-BLOCK-NAME-PLACEHOLDER")

Secret Manager

Create a block to read, write, and delete AWS Secret Manager secrets.

from prefect_aws import AwsCredentials
from prefect_aws.secrets_manager import AwsSecret

AwsSecret(
    secret_name="test_secret_name",
    aws_credentials=credentials,
).save("AWS-SECRET-BLOCK-NAME-PLACEHOLDER")

RDS IAM Authentication (Experimental)

prefect-aws includes a plugin to automatically handle authentication for AWS RDS PostgreSQL databases using IAM tokens.

Prerequisites

Before using RDS IAM authentication, you need to configure AWS:

  1. Enable IAM authentication on your RDS instance: In the AWS Console, modify your RDS instance and enable "IAM database authentication".

  2. Create an IAM policy with the rds-db:connect permission:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": "rds-db:connect",
          "Resource": "arn:aws:rds-db:<region>:<account-id>:dbuser:<db-resource-id>/<db-username>"
        }
      ]
    }
    
  3. Create a database user that uses IAM authentication:

    CREATE USER iam_user WITH LOGIN;
    GRANT rds_iam TO iam_user;
    

Enable the Plugin

  1. Enable the experimental plugin system:

    export PREFECT_EXPERIMENTS_PLUGINS_ENABLED=true
    
  2. Enable RDS IAM authentication:

    export PREFECT_INTEGRATIONS_AWS_RDS_IAM_ENABLED=true
    
  3. (Optional) Set the AWS region:

    export PREFECT_INTEGRATIONS_AWS_RDS_IAM_REGION_NAME=us-west-2
    
  4. Configure your database connection URL:

    export PREFECT_API_DATABASE_CONNECTION_URL="postgresql+asyncpg://iam_user@your-rds-host:5432/prefect"
    

The plugin will automatically generate and inject an IAM authentication token as the password when connecting to the database.

Scale workflows with AWS infrastructure

ECS (Elastic Container Service)

Deploy and scale your Prefect workflows on AWS ECS for production workloads. prefect-aws provides:

  • ECS workers: Long-running workers for hybrid deployments with full control over execution environment
  • Auto-scaling: Dynamic resource allocation based on workflow demands
  • Cost optimization: Pay only for compute resources when workflows are running

See the ECS worker deployment guide for a step-by-step walkthrough of deploying production-ready workers to your ECS cluster.

Docker Images

Pre-built Docker images with prefect-aws are available for simplified deployment:

docker pull prefecthq/prefect-aws:latest

Available Tags

Image tags have the following format:

  • prefecthq/prefect-aws:latest - Latest stable release with Python 3.12
  • prefecthq/prefect-aws:latest-python3.11 - Latest stable with Python 3.11
  • prefecthq/prefect-aws:0.5.9-python3.12 - Specific prefect-aws version with Python 3.12
  • prefecthq/prefect-aws:0.5.9-python3.12-prefect3.4.9 - Full version specification

Usage Examples

Running an ECS worker:

docker run -d \
  --name prefect-ecs-worker \
  -e PREFECT_API_URL=https://api.prefect.cloud/api/accounts/your-account/workspaces/your-workspace \
  -e PREFECT_API_KEY=your-api-key \
  prefecthq/prefect-aws:latest \
  prefect worker start --pool ecs-pool

Local development:

docker run -it --rm \
  -v $(pwd):/opt/prefect \
  prefecthq/prefect-aws:latest \
  python your_flow.py

Supported AWS services

prefect-aws provides comprehensive integrations for key AWS services:

Service Integration Type Use Cases
S3 S3Bucket block File storage, data lake operations, deployment storage
Secrets Manager AwsSecret block Secure credential storage, API key management
Lambda LambdaFunction block Serverless function execution, event-driven processing
Glue GlueJobBlock block ETL operations, data transformation pipelines
ECS ECSWorker infrastructure Container orchestration, scalable compute workloads
Batch batch_submit task High-throughput computing, batch job processing

Integration types explained:

  • Blocks: Reusable configuration objects that can be saved and shared across flows
  • Tasks: Functions decorated with @task for direct use in flows
  • Workers: Infrastructure components for running flows on AWS compute services

Each integration includes full support for AWS IAM roles, cross-region operations, and Prefect's built-in retry and error handling mechanisms.

Examples

Read and write files to AWS S3

Upload a file to an AWS S3 bucket and download the same file under a different filename. The following code assumes that the bucket already exists:

from pathlib import Path
from prefect import flow
from prefect_aws import AwsCredentials, S3Bucket


@flow
def s3_flow():
    # create a dummy file to upload
    file_path = Path("test-example.txt")
    file_path.write_text("Hello, Prefect!")

    aws_credentials = AwsCredentials.load("BLOCK-NAME-PLACEHOLDER")
    s3_bucket = S3Bucket(
        bucket_name="BUCKET-NAME-PLACEHOLDER",
        credentials=aws_credentials
    )

    s3_bucket_path = s3_bucket.upload_from_path(file_path)
    downloaded_file_path = s3_bucket.download_object_to_path(
        s3_bucket_path, "downloaded-test-example.txt"
    )
    return downloaded_file_path.read_text()


if __name__ == "__main__":
    s3_flow()

Access secrets with AWS Secrets Manager

Write a secret to AWS Secrets Manager, read the secret data, delete the secret, and return the secret data.

from prefect import flow
from prefect_aws import AwsCredentials, AwsSecret


@flow
def secrets_manager_flow():
    aws_credentials = AwsCredentials.load("BLOCK-NAME-PLACEHOLDER")
    aws_secret = AwsSecret(secret_name="test-example", aws_credentials=aws_credentials)
    aws_secret.write_secret(secret_data=b"Hello, Prefect!")
    secret_data = aws_secret.read_secret()
    aws_secret.delete_secret()
    return secret_data


if __name__ == "__main__":
    secrets_manager_flow()

Invoke lambdas

from prefect_aws.lambda_function import LambdaFunction
from prefect_aws.credentials import AwsCredentials

credentials = AwsCredentials()
lambda_function = LambdaFunction(
    function_name="test_lambda_function",
    aws_credentials=credentials,
)
response = lambda_function.invoke(
    payload={"foo": "bar"},
    invocation_type="RequestResponse",
)
response["Payload"].read()

Submit AWS Glue jobs

from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.glue_job import GlueJobBlock


@flow
def example_run_glue_job():
    aws_credentials = AwsCredentials(
        aws_access_key_id="your_access_key_id",
        aws_secret_access_key="your_secret_access_key"
    )
    glue_job_run = GlueJobBlock(
        job_name="your_glue_job_name",
        arguments={"--YOUR_EXTRA_ARGUMENT": "YOUR_EXTRA_ARGUMENT_VALUE"},
    ).trigger()

    return glue_job_run.wait_for_completion()


if __name__ == "__main__":
    example_run_glue_job()

Submit AWS Batch jobs

from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.batch import batch_submit


@flow
def example_batch_submit_flow():
    aws_credentials = AwsCredentials(
        aws_access_key_id="access_key_id",
        aws_secret_access_key="secret_access_key"
    )
    job_id = batch_submit(
        "job_name",
        "job_queue",
        "job_definition",
        aws_credentials
    )
    return job_id


if __name__ == "__main__":
    example_batch_submit_flow()

Resources

Documentation

AWS Resources

Community & Support

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

prefect_aws-0.7.5.tar.gz (147.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

prefect_aws-0.7.5-py3-none-any.whl (88.3 kB view details)

Uploaded Python 3

File details

Details for the file prefect_aws-0.7.5.tar.gz.

File metadata

  • Download URL: prefect_aws-0.7.5.tar.gz
  • Upload date:
  • Size: 147.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for prefect_aws-0.7.5.tar.gz
Algorithm Hash digest
SHA256 6e290e01236b57eea0a3aad9fe2387faf7ad39917b952a651bec27b9abb12d53
MD5 33fc17ba17cf2d6ed3b9dd758a6f0095
BLAKE2b-256 6532a22615cc04409de4c59373cc0a4e879f7429139185b33e3047855bc2c87a

See more details on using hashes here.

Provenance

The following attestation bundles were made for prefect_aws-0.7.5.tar.gz:

Publisher: integration-package-release.yaml on PrefectHQ/prefect

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file prefect_aws-0.7.5-py3-none-any.whl.

File metadata

  • Download URL: prefect_aws-0.7.5-py3-none-any.whl
  • Upload date:
  • Size: 88.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for prefect_aws-0.7.5-py3-none-any.whl
Algorithm Hash digest
SHA256 3eed2f1c5f26df9d27227580fed56a5e2f2cc68aa4ca29a1af72da2d4152f159
MD5 5f60ac3a933ebf0e3a9a99aad304e4e4
BLAKE2b-256 396330ecd8bfe5e642e3199b3f0b5790e1af28b5e10eddc774d666c4eaf933f7

See more details on using hashes here.

Provenance

The following attestation bundles were made for prefect_aws-0.7.5-py3-none-any.whl:

Publisher: integration-package-release.yaml on PrefectHQ/prefect

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page