Skip to main content

Reader for AWS VPC Flow Logs

Project description

Introduction

Build Status PyPI Version

Amazon's VPC Flow Logs are analogous to NetFlow and IPFIX logs, and can be used for security and performance analysis. Observable Networks uses VPC Flow logs as an input to endpoint modeling for security monitoring.

This project contains:

  • A utility for working with VPC Flow Logs on the command line
  • A Python library for retrieving and working with VPC Flow logs

The tools support reading Flow Logs from both CloudWatch Logs and S3. For S3 destinations, version 3 custom log formats are supported.

The library builds on boto3 and should work on the supported versions of Python 3.

For information on VPC Flow Logs and how to enable them see this post at the AWS blog. You may use this library with the kinesis-logs-reader library when retrieving VPC flow logs from Amazon Kinesis.

Installation

You can get flowlogs_reader by using pip:

pip install flowlogs_reader

Or if you want to install from source and/or contribute you can clone from GitHub:

git clone https://github.com/obsrvbl-oss/flowlogs-reader.git
cd flowlogs-reader
python setup.py develop

CLI Usage

flowlogs-reader provides a command line interface called flowlogs_reader that allows you to print VPC Flow Log records to your screen. It assumes your AWS credentials are available through environment variables, a boto configuration file, or through IAM metadata. Some example uses are below.

Location types

flowlogs_reader has one required argument, location. By default that is interpreted as a CloudWatch Logs group.

To use an S3 location, specify --location-type='s3':

  • flowlogs_reader --location-type="s3" "bucket-name/optional-prefix"

Printing flows

The default action is to print flows. You may also specify the ipset, findip, and aggregate actions:

  • flowlogs_reader location - print all flows in the past hour
  • flowlogs_reader location print 10 - print the first 10 flows from the past hour
  • flowlogs_reader location ipset - print the unique IPs seen in the past hour
  • flowlogs_reader location findip 198.51.100.2 - print all flows involving 198.51.100.2
  • flowlogs_reader location aggregate - aggregate the flows by 5-tuple, then print them as a tab-separated stream (with a header). This requires that each of the fields in the 5-tuple are present in the data format.

You may combine the output of flowlogs_reader with other command line utilities:

  • flowlogs_reader location | grep REJECT - print all REJECTed Flow Log records
  • flowlogs_reader location | awk '$6 = 443' - print all traffic from port 443

Time windows

The default time window is the last hour. You may also specify a --start-time and/or an --end-time. The -s and -e switches may be used also:

  • flowlogs_reader --start-time='2015-08-13 00:00:00' location
  • flowlogs_reader --end-time='2015-08-14 00:00:00' location
  • flowlogs_reader --start-time='2015-08-13 01:00:00' --end-time='2015-08-14 02:00:00' location

Use the --time-format switch to control how start and end times are interpreted. The default is '%Y-%m-%d %H:%M:%S'. See the Python documentation for strptime for information on format strings.

Concurrent reads

Give --thread-count to read from multiple log groups or S3 keys at once:

  • flowlogs_reader --thread_count=4 location

AWS options

Other command line switches:

  • flowlogs_reader --region='us-west-2' location - connect to the given AWS region
  • flowlogs_reader --profile='dev_profile' location - use the profile from your local AWS configuration file to specify credentials and regions
  • flowlogs_reader --role-arn='arn:aws:iam::12345678901:role/myrole' --external-id='0a1b2c3d' location - use the given role and external ID to connect to a 3rd party's account using sts assume-role

For CloudWatch Logs locations:

  • flowlogs_reader --fields='${version} ${account-id} ${interface-id} ${srcaddr} ${dstaddr} ${srcport} ${dstport} ${protocol} ${packets} ${bytes} ${start} ${end} ${action} ${log-status}' - use the given fields to prevent the module from querying EC2 for the log line format
  • flowlogs_reader --filter-pattern='REJECT' location - use the given filter pattern to have the server limit the output

For S3 locations:

  • flowlogs_reader --location-type='s3' --include-accounts='12345678901,12345678902' bucket-name/optional-prefix - return logs only for the given accounts
  • flowlogs_reader --location-type='s3' --include-regions='us-east-1,us-east-2' bucket-name/optional-prefix - return logs only for the given regions

Module Usage

FlowRecord takes an event dictionary retrieved from a log stream. It parses the message in the event, which takes a record like this:

2 123456789010 eni-102010ab 198.51.100.1 192.0.2.1 443 49152 6 10 840 1439387263 1439387264 ACCEPT OK

And turns it into a Python object like this:

>>> flow_record.srcaddr
'198.51.100.1'
>>> flow_record.dstaddr
'192.0.2.1'
>>> flow_record.srcport
443
>>> flow_record.to_dict()
{'account_id': '123456789010',
 'action': 'ACCEPT',
 'bytes': 840,
 'dstaddr': '192.0.2.1',
 'dstport': 49152,
 'end': datetime.datetime(2015, 8, 12, 13, 47, 44),
 'interface_id': 'eni-102010ab',
 'log_status': 'OK',
 'packets': 10,
 'protocol': 6,
 'srcaddr': '198.51.100.1',
 'srcport': 443,
 'start': datetime.datetime(2015, 8, 12, 13, 47, 43),
 'version': 2}

FlowLogsReader reads from CloudWatch Logs. It takes the name of a log group and can then yield all the Flow Log records from that group.

>>> from flowlogs_reader import FlowLogsReader
... flow_log_reader = FlowLogsReader('flowlog_group')
... records = list(flow_log_reader)
... print(len(records))
176

S3FlowLogsReader reads from S3. It takes a bucket name or a bucket/prefix identifier.

By default these classes will yield records from the last hour.

You can control what's retrieved with these parameters:

  • start_time and end_time are Python datetime.datetime objects
  • region_name is a string like 'us-east-1'.
  • boto_client is a boto3 client object.

When using FlowLogsReader with CloudWatch Logs:

  • The fields keyword is a tuple like ('version', 'account-id'). If not supplied then the EC2 API will be queried to find out the log format.
  • The filter_pattern keyword is a string like REJECT or 443 used to filter the logs. See the examples below.

When using S3FlowLogsReader with S3:

  • The include_accounts keyword is an iterable of account identifiers (as strings) used to filter the logs.
  • The include_regions keyword is an iterable of region names used to filter the logs.

Examples

Start by importing FlowLogsReader:

from flowlogs_reader import FlowLogsReader

Find all of the IP addresses communicating inside the VPC:

ip_set = set()
for record in FlowLogsReader('flowlog_group'):
    ip_set.add(record.srcaddr)
    ip_set.add(record.dstaddr)

See all of the traffic for one IP address:

target_ip = '192.0.2.1'
records = []
for record in FlowLogsReader('flowlog_group'):
    if (record.srcaddr == target_ip) or (record.dstaddr == target_ip):
        records.append(record)

Loop through a few preconfigured profiles and collect all of the IP addresses:

ip_set = set()
profile_names = ['profile1', 'profile2']
for profile_name in profile_names:
    for record in FlowLogsReader('flowlog_group', profile_name=profile_name):
        ip_set.add(record.srcaddr)
        ip_set.add(record.dstaddr)

Apply a filter for UDP traffic that was logged normally (CloudWatch Logs only):

FILTER_PATTERN = (
    '[version="2", account_id, interface_id, srcaddr, dstaddr, '
    'srcport, dstport, protocol="17", packets, bytes, '
    'start, end, action, log_status="OK"]'
)

flow_log_reader = FlowLogsReader('flowlog_group', filter_pattern=FILTER_PATTERN)
records = list(flow_log_reader)
print(len(records))

Retrieve logs from a list of regions:

from flowlogs_reader import S3FlowLogsReader

reader = S3FlowLogsReader('example-bucket/optional-prefix', include_regions=['us-east-1', 'us-east-2'])
records = list(reader)
print(len(records))

You may aggregate records with the aggregate_records function. Pass in a FlowLogsReader or S3FlowLogsReader object and optionally a key_fields tuple. Python dict objects will be yielded representing the aggregated flow records. By default the typical ('srcaddr', 'dstaddr', 'srcport', 'dstport', 'protocol') will be used. The start, end, packets, and bytes items will be aggregated.

flow_log_reader = FlowLogsReader('flowlog_group')
key_fields = ('srcaddr', 'dstaddr')
records = list(aggregated_records(flow_log_reader, key_fields=key_fields))

The number of bytes processed after iterating is available in the bytes_processed attribute. For S3FlowLogsReader instances there is also a compressed_bytes_processed attribute.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flowlogs_reader-5.0.1.tar.gz (27.5 kB view details)

Uploaded Source

Built Distribution

flowlogs_reader-5.0.1-py3-none-any.whl (18.9 kB view details)

Uploaded Python 3

File details

Details for the file flowlogs_reader-5.0.1.tar.gz.

File metadata

  • Download URL: flowlogs_reader-5.0.1.tar.gz
  • Upload date:
  • Size: 27.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/4.0.2 CPython/3.11.7

File hashes

Hashes for flowlogs_reader-5.0.1.tar.gz
Algorithm Hash digest
SHA256 6e47c529c7ba69668d9d96cc7bd39a439c60c4a87d42cd2766a65761a2e763c6
MD5 d585fced87207fc142d0126f91fc8e30
BLAKE2b-256 39c76a24eb5a46942b8677bf4b42e14679ea426e90e7d1b3380b69da7163f86f

See more details on using hashes here.

File details

Details for the file flowlogs_reader-5.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for flowlogs_reader-5.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 1b6ef8a7e645fe25bd5a86176c2fb11d208a21a253b6902c312c0c13176bb6d1
MD5 62d35c4bd1c710a1f7eae01da32ed65c
BLAKE2b-256 bfdd46a7fb749059b75c45af077102e0ec94713cdbc91bfe18ef97b745312a19

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page