Skip to main content

The CDK Construct Library for AWS::MSK

Project description

Amazon Managed Streaming for Apache Kafka Construct Library

---

cdk-constructs: Experimental

The APIs of higher level constructs in this module are experimental and under active development. They are subject to non-backward compatible changes or removal in any future version. These are not subject to the Semantic Versioning model and breaking changes will be announced in the release notes. This means that while you may use them, you may need to update your source code when upgrading to a newer version of this package.


Amazon MSK is a fully managed service that makes it easy for you to build and run applications that use Apache Kafka to process streaming data.

The following example creates an MSK Cluster.

# vpc: ec2.Vpc

cluster = msk.Cluster(self, "Cluster",
    cluster_name="myCluster",
    kafka_version=msk.KafkaVersion.V4_1_X_KRAFT,
    vpc=vpc
)

Allowing Connections

To control who can access the Cluster, use the .connections attribute. For a list of ports used by MSK, refer to the MSK documentation.

# vpc: ec2.Vpc

cluster = msk.Cluster(self, "Cluster",
    cluster_name="myCluster",
    kafka_version=msk.KafkaVersion.V4_1_X_KRAFT,
    vpc=vpc
)

cluster.connections.allow_from(
    ec2.Peer.ipv4("1.2.3.4/8"),
    ec2.Port.tcp(2181))
cluster.connections.allow_from(
    ec2.Peer.ipv4("1.2.3.4/8"),
    ec2.Port.tcp(9094))

Cluster Endpoints

You can use the following attributes to get a list of the Kafka broker or ZooKeeper node endpoints

# cluster: msk.Cluster

CfnOutput(self, "BootstrapBrokers", value=cluster.bootstrap_brokers)
CfnOutput(self, "BootstrapBrokersTls", value=cluster.bootstrap_brokers_tls)
CfnOutput(self, "BootstrapBrokersSaslScram", value=cluster.bootstrap_brokers_sasl_scram)
CfnOutput(self, "BootstrapBrokerStringSaslIam", value=cluster.bootstrap_brokers_sasl_iam)
CfnOutput(self, "ZookeeperConnection", value=cluster.zookeeper_connection_string)
CfnOutput(self, "ZookeeperConnectionTls", value=cluster.zookeeper_connection_string_tls)

Importing an existing Cluster

To import an existing MSK cluster into your CDK app use the .fromClusterArn() method.

cluster = msk.Cluster.from_cluster_arn(self, "Cluster", "arn:aws:kafka:us-west-2:1234567890:cluster/a-cluster/11111111-1111-1111-1111-111111111111-1")

Client Authentication

MSK supports the following authentication mechanisms.

TLS

To enable client authentication with TLS set the certificateAuthorityArns property to reference your ACM Private CA. More info on Private CAs.

import aws_cdk.aws_acmpca as acmpca

# vpc: ec2.Vpc

cluster = msk.Cluster(self, "Cluster",
    cluster_name="myCluster",
    kafka_version=msk.KafkaVersion.V4_1_X_KRAFT,
    vpc=vpc,
    encryption_in_transit=msk.EncryptionInTransitConfig(
        client_broker=msk.ClientBrokerEncryption.TLS
    ),
    client_authentication=msk.ClientAuthentication.tls(
        certificate_authorities=[
            acmpca.CertificateAuthority.from_certificate_authority_arn(self, "CertificateAuthority", "arn:aws:acm-pca:us-west-2:1234567890:certificate-authority/11111111-1111-1111-1111-111111111111")
        ]
    )
)

SASL/SCRAM

Enable client authentication with SASL/SCRAM:

# vpc: ec2.Vpc

cluster = msk.Cluster(self, "cluster",
    cluster_name="myCluster",
    kafka_version=msk.KafkaVersion.V4_1_X_KRAFT,
    vpc=vpc,
    encryption_in_transit=msk.EncryptionInTransitConfig(
        client_broker=msk.ClientBrokerEncryption.TLS
    ),
    client_authentication=msk.ClientAuthentication.sasl(
        scram=True
    )
)

IAM

Enable client authentication with IAM:

# vpc: ec2.Vpc

cluster = msk.Cluster(self, "cluster",
    cluster_name="myCluster",
    kafka_version=msk.KafkaVersion.V4_1_X_KRAFT,
    vpc=vpc,
    encryption_in_transit=msk.EncryptionInTransitConfig(
        client_broker=msk.ClientBrokerEncryption.TLS
    ),
    client_authentication=msk.ClientAuthentication.sasl(
        iam=True
    )
)

SASL/IAM + TLS

Enable client authentication with IAM as well as enable client authentication with TLS by setting the certificateAuthorityArns property to reference your ACM Private CA. More info on Private CAs.

import aws_cdk.aws_acmpca as acmpca

# vpc: ec2.Vpc

cluster = msk.Cluster(self, "Cluster",
    cluster_name="myCluster",
    kafka_version=msk.KafkaVersion.V4_1_X_KRAFT,
    vpc=vpc,
    encryption_in_transit=msk.EncryptionInTransitConfig(
        client_broker=msk.ClientBrokerEncryption.TLS
    ),
    client_authentication=msk.ClientAuthentication.sasl_tls(
        iam=True,
        certificate_authorities=[
            acmpca.CertificateAuthority.from_certificate_authority_arn(self, "CertificateAuthority", "arn:aws:acm-pca:us-west-2:1234567890:certificate-authority/11111111-1111-1111-1111-111111111111")
        ]
    )
)

Logging

You can deliver Apache Kafka broker logs to one or more of the following destination types: Amazon CloudWatch Logs, Amazon S3, Amazon Data Firehose.

To configure logs to be sent to an S3 bucket, provide a bucket in the logging config.

# vpc: ec2.Vpc
# bucket: s3.IBucket

cluster = msk.Cluster(self, "cluster",
    cluster_name="myCluster",
    kafka_version=msk.KafkaVersion.V4_1_X_KRAFT,
    vpc=vpc,
    logging=msk.BrokerLogging(
        s3=msk.S3LoggingConfiguration(
            bucket=bucket
        )
    )
)

When the S3 destination is configured, AWS will automatically create an S3 bucket policy that allows the service to write logs to the bucket. This makes it impossible to later update that bucket policy. To have CDK create the bucket policy so that future updates can be made, the @aws-cdk/aws-s3:createDefaultLoggingPolicy feature flag can be used. This can be set in the cdk.json file.

{
  "context": {
    "@aws-cdk/aws-s3:createDefaultLoggingPolicy": true
  }
}

Storage Mode

You can configure an MSK cluster storage mode using the storageMode property.

Tiered storage is a low-cost storage tier for Amazon MSK that scales to virtually unlimited storage, making it cost-effective to build streaming data applications.

Visit Tiered storage to see the list of compatible Kafka versions and for more details.

# vpc: ec2.Vpc
# bucket: s3.IBucket


cluster = msk.Cluster(self, "cluster",
    cluster_name="myCluster",
    kafka_version=msk.KafkaVersion.V4_1_X_KRAFT,
    vpc=vpc,
    storage_mode=msk.StorageMode.TIERED
)

MSK Express Brokers

You can create an MSK cluster with Express Brokers by setting the brokerType property to BrokerType.EXPRESS. Express Brokers are a low-cost option for development, testing, and workloads that don't require the high availability guarantees of standard MSK cluster. For more information, see Amazon MSK Express Brokers.

Note: When using Express Brokers, the following constraints apply:

  • Apache Kafka version must be 3.6.x, 3.8.x, or 3.9.x
  • You must specify the instanceType
  • The VPC must have at least 3 subnets (across 3 AZs)
  • ebsStorageInfo is not supported
  • storageMode is not supported
  • logging is not supported
  • Supported broker sizes: m7g.xlarge, m7g.2xlarge, m7g.4xlarge, m7g.8xlarge, m7g.12xlarge, m7g.16xlarge
# vpc: ec2.Vpc


express_cluster = msk.Cluster(self, "ExpressCluster",
    cluster_name="MyExpressCluster",
    kafka_version=msk.KafkaVersion.V3_8_X,
    vpc=vpc,
    broker_type=msk.BrokerType.EXPRESS,
    instance_type=ec2.InstanceType.of(ec2.InstanceClass.M7G, ec2.InstanceSize.XLARGE)
)

MSK Serverless

You can also use MSK Serverless by using ServerlessCluster class.

MSK Serverless is a cluster type for Amazon MSK that makes it possible for you to run Apache Kafka without having to manage and scale cluster capacity.

MSK Serverless requires IAM access control for all clusters.

For more infomation, see Use MSK Serverless clusters.

# vpc: ec2.Vpc


serverless_cluster = msk.ServerlessCluster(self, "ServerlessCluster",
    cluster_name="MyServerlessCluster",
    vpc_configs=[msk.VpcConfig(vpc=vpc)
    ]
)

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aws_cdk_aws_msk_alpha-2.247.0a0.tar.gz (116.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aws_cdk_aws_msk_alpha-2.247.0a0-py3-none-any.whl (115.3 kB view details)

Uploaded Python 3

File details

Details for the file aws_cdk_aws_msk_alpha-2.247.0a0.tar.gz.

File metadata

File hashes

Hashes for aws_cdk_aws_msk_alpha-2.247.0a0.tar.gz
Algorithm Hash digest
SHA256 b1be87db0220f1bc0f1dd46891f7e65d982109c9222019ab342b8ca16e0c1250
MD5 3f02fe1af6f04a19d6319568fa2b38a6
BLAKE2b-256 b5132795d1010f70779d2e4ec9000e01a01fbd6126698365b2f4f8b05a9a33a9

See more details on using hashes here.

File details

Details for the file aws_cdk_aws_msk_alpha-2.247.0a0-py3-none-any.whl.

File metadata

File hashes

Hashes for aws_cdk_aws_msk_alpha-2.247.0a0-py3-none-any.whl
Algorithm Hash digest
SHA256 044e783b0b3eb4a44a5392f870af24dff3f6fb81875b2ca9459e1f93c7ea76d6
MD5 a48607a369afea07a6196a38a4ec7b1d
BLAKE2b-256 508bcb90791fc835df0a391fbe0c0eebe9413d18bf269b639922f92c4c37bea8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page