Skip to main content

Consume an AWS Kinesis Data Stream to look over the records from a terminal

Project description

PyPI version Python versions Docker version Build Status

aws-kinesis-consumer

Consume an AWS Kinesis Data Stream to look over the records from a terminal.

Demo

$ aws-kinesis-consumer --stream-name MyStream
> preparing shards 2/2
> shard_id=shardId-000000000000, records=1
Record-001
> shard_id=shardId-000000000001, records=2
Record-002
Record-003

Usage

Pre-requirement

Connect to AWS and set the default AWS environment variables .

export AWS_DEFAULT_REGION=eu-central-1
export AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE
export AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY

Alternatively, aws configure can help to set the environment variables.

Use with Python

# install
pip install aws-kinesis-consumer

# consume a stream
aws-kinesis-consumer --stream-name MyStream

Use with Docker

docker run \
  -e AWS_DEFAULT_REGION \
  -e AWS_ACCESS_KEY_ID \
  -e AWS_SECRET_ACCESS_KEY \
  -e AWS_SESSION_TOKEN \
  thinow/aws-kinesis-consumer --stream-name MyStream

Arguments

Default Description
-s --stream-name REQUIRED Name of the AWS Kinesis Stream.
-e --endpoint Custom AWS endpoint url to communicate with the AWS API. Could be used in order to specify a region (e.g. https://kinesis.us-east-1.amazonaws.com/).
-i --iterator-type latest Defines how to start consuming records from the stream. Use latest to consume the new records only. Or use trim-horizon to consume all the records already existing in the stream.
-m --max-records-per-request 10 Defines the maximum number of records per request.
-r --region AWS region of the stream (e.g. us-east-1). By default, the AWS region available in your terminal (e.g. environment variable AWS_DEFAULT_REGION).
-h --help Prints the help message.
-v --version Prints the version of the binary.

FAQ

What is the motivation ? What is the issue with AWS CLI ?

The AWS CLI is able to fetch records from Kinesis, but the users need to list the shards, to generate iterator tokens, use subsequent tokens, delay operations, and so on.

aws-kinesis-consumer in contrary is able to get records by using the stream name, and only the stream name. Therefore there is no need for an extra script.

How to filter the records ?

aws-kinesis-consumer can be piped with other command such as grep, or even jq to filter json records.

# all the records
$ aws-kinesis-consumer --stream-name MyStream
{"name":"foo", "status":"ok"}
{"name":"bar", "status":"pending"}
{"name":"baz", "status":"error"}

# records containing the text "ba" (e.g. "bar" and "baz", but not "foo")
$ aws-kinesis-consumer --stream-name MyStream | grep "ba"
{"name":"bar", "status":"pending"}
{"name":"baz", "status":"error"}

# records where the json property "status" has the value "error"
$ aws-kinesis-consumer --stream-name MyStream | jq 'select(.status == "error")'
{"name":"baz", "status":"error"}

What are the required AWS permissions ?

aws-kinesis-consumer requires the following AWS permissions :

The following policy is an example which can be applied to an AWS user or an AWS role :

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:ListShards",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords"
            ],
            "Resource": [
                "arn:aws:kinesis:REGION:ACCOUNT-ID:stream/STREAM-NAME"
            ]
        }
    ]
}

Changelog

See all the changes per release.

Special thanks

  • Thanks to the contributors of the kinesalite project which make test and development of this project extremely easy and reliable!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aws-kinesis-consumer-1.3.0.tar.gz (16.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aws_kinesis_consumer-1.3.0-py3-none-any.whl (25.1 kB view details)

Uploaded Python 3

File details

Details for the file aws-kinesis-consumer-1.3.0.tar.gz.

File metadata

  • Download URL: aws-kinesis-consumer-1.3.0.tar.gz
  • Upload date:
  • Size: 16.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.7.0 requests/2.25.1 setuptools/53.0.0 requests-toolbelt/0.9.1 tqdm/4.57.0 CPython/3.7.1

File hashes

Hashes for aws-kinesis-consumer-1.3.0.tar.gz
Algorithm Hash digest
SHA256 077c5a0387ebc9969a2fbb25da9bbff2e31bf1220555844e780d07c675dce3da
MD5 723056b9cee26140c3212068cc196aa1
BLAKE2b-256 08b1e2e758e2d1584658a66e828360170bcbfda94b67723e398382e5cc5a0f09

See more details on using hashes here.

File details

Details for the file aws_kinesis_consumer-1.3.0-py3-none-any.whl.

File metadata

  • Download URL: aws_kinesis_consumer-1.3.0-py3-none-any.whl
  • Upload date:
  • Size: 25.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.7.0 requests/2.25.1 setuptools/53.0.0 requests-toolbelt/0.9.1 tqdm/4.57.0 CPython/3.7.1

File hashes

Hashes for aws_kinesis_consumer-1.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 75a8890b2c4f46eb455cd3706863382da64e45e637bb97e8feff70ec7ff46516
MD5 06b8f42ef3be79d931c6096b82ca6e45
BLAKE2b-256 dc8b46ef2bd2f38ef4a1b59044c978aa1be326ba44d9c97cd0bd8d3e872fa774

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page