Skip to main content

Triton - Kinesis Data Pipeline

Project description

# Triton Project

Python Utility code for building a Data Pipeline with AWS Kinesis.

See [Triton](https://github.com/postmates/go-triton)

Kinesis (http://aws.amazon.com/kinesis/) lets you define streams of records.
You put records in one end, and the other end can consumer them. The stream
maintains the records for 24 hours. These streams come in multiple shards
(defined by the adminstrator).

The tooling provided here builds on top of the boto library to make real-world
work with these streams and record easier. This is preferential to using the
Amazon provided KCL (Kinesis Client Library) which is Java-based, or the python
bindings built on top of KCL, because it isn't very pythonic.

This tooling also provides built in support for checkpointing, which allows a client to pick up processing records wherever it stopped last. The raw kinesis libraries require the client to take care of the checkpointing process itself.

### Configuration

Normal AWS credential environment variables or IAM roles for boto apply.

Clients will need to define a yaml file containing definitions for the streams
they will want to use. That yaml file will look like:

my_stream:
name: my_stream_v2
partition_key: value
region: us-west-1

Clients generating records will reference the `my_stream` stream which will
automatically know to use the real underlying stream of `my_stream_v2` in the
`us-west-1` region. Records put into this stream are assumed to have a key
named `value` which is use for partitioning.


### Demo

Triton comes with a command line script `triton` which can be used to demo some simple functionality.

$ echo 'hi' | triton put -s test_stream

And then to consume:

$ triton get -s test_stream
<Record shardId-000000000001 49551479315998225220804774498660962603757541393499684882>
{'msg': 'hi\n', 'ts': 1433969276.172019}

(Note the order is actually important here, this consumer is set to 'latest',
so if your producer produces first, you might miss it.)
You can set the config by using the environment variable TRITON_CONFIG, the default is /etc/triton.yaml

### Producers

Adding records to the stream is easy:

import triton

c = triton.load_config("/etc/triton.yaml")

s = triton.get_stream('my_stream', c)
s.put(value='hi mom', ts=time.time())


For more advanced uses, you can record the shard and sequence number returned
by the put operation.

shard, seq_num = s.put(...)

You could in theory communicate these values to some other process if you want
to ensure they have received this record.

__CAVEAT UTILITOR__: Triton currently only supports data types directly converatible
into [msgpack formated data](https://github.com/msgpack/msgpack/blob/master/spec.md).
Unsupported types will raise a `TypeError`.

### Non-Blocking Producers and `tritond`

Using the producer syntax above, `s.put(value='hi mom', ts=time.time())`, will block until
the operation to put the data into Kinesis is complete, which can take on the order of 100 ms.
This guarantees that the write has succeeded before continuing the flow of control.

To allow for writes that do not block, triton comes with `tritond`;
a daemon that will spool Kinesis messages to local memory and write those messages to Kinesis asynchronously.
Writes via this pathway block for approximately 0.1 ms.
The `tritond` spools messages to memory and writes all recieved messages to Kinesis
every 100 ms.
It is important to note that using this non-blocking pathway eliminates the guarantee
that data will be written to Kinesis.

An instance of `tritond` needs be running to collect Kinesis writes.
It is recomended to run an instance on each host that will be producing Kinesis writes.
By default, `tritond` will listen on `127.0.0.1:3515` or it will
respect the environment variables `TRITON_ZMQ_HOST` and `TRITON_ZMQ_PORT`.
The `tritond` uses the same `triton.yaml` files to configure triton streams;
and will _log errors and skip_ any data if the stream is not configured
or the config file is not found.

`tritond` can be run by simply calling it from the command line. For testing
and/or debugging, it can be run in verbose mode and with its output directed to stdout or a file e.g.

tritond -v --skip-kinesis # writes verbose logs and writes events to stdout

tritond -cc --skip-kinesis --output_file test_output.txt

Once `tritond` is running, usage follows the basic write pattern:

import triton

c = triton.load_config("/etc/triton.yaml")

s = triton.get_nonblocking_stream('my_stream', c)
s.put(value='hi mom', ts=time.time())

Since the actual Kinesis write happens asynchronously, the shard and sequence number
are not returned from this operation.
Also, as mentioned above, Triton currently only supports data types directly converatible
into [msgpack formated data](https://github.com/msgpack/msgpack/blob/master/spec.md).
For data put into a `NonblockingStream` object, unsupported types will log an error and continue.

### Consumers

Writing consumers is more complicated as you must deal with sharding. Even in
the lightest of workloads you'll likely want to have multiple shards. Triton makes this simple:

import triton

c = triton.load_config("/etc/triton.yaml")

s = triton.get_stream('my_stream', c)
i = s.build_iterator_from_latest()

for rec in i:
print rec.data

This will consume only new records from the stream. Since the stream in theory
never ends, you can in your own process management tell it when to stop:

for rec in i:

do_stuff(rec)

if has_reason():
i.stop()

This will cause the iterator to stop fetching new data, but will flush out data
that's already been fetched.

Kinesis supports other types of iterators. For example, if you want to see all the data in the stream:

i = s.build_iterator_for_all()

or if you know a specific shard and offset:

i = s.build_iterator_from_seqnum(shard_num, seq_num)

For building distributed consumers, you'll want to divide up the work by shards.
So if you have 4 shards, the first worker would:

i = s.build_iterator_from_latest([0, 1])

and the second worker would do:

i = s.build_iterator_from_latest([2, 3])

Note that these are 'share numbers', not shard ids. These are indexes into the
actual shard list.

### Checkpointing

Triton supports checkpointing to a DB so that processing can start where
previous processing left off. It requires a postgresDB available.
To specify the DB location, set the ENV variable `TRITON_DB` to the DSN
of the postgres DB, e.g.

export TRITON_DB="dbname=db_name port=5432 host=www.dbhosting.com user=user_name password=password"

Attempting to checkpoint without this DB being configured will raise a
`TritonCheckpointError` exception.

The DB also needs to have a specific table created; calling the following will initialized the table (this call is safe to repeat; it is a no-op if the table already exists):

triton.checkpoint.init_db()

Triton checkpointing also requires a unique client name, since the basic
assumption is that the checkpoint DB will be shared. The client name is specified
by the ENV variable `TRITON_CLIENT_NAME`.
Attempting to checkpoint without this ENV variable will also raise a
`TritonCheckpointError` exception.


Once configured, checkpointing can be used simply by calling the `checkpoint`
method on a stream iterator.

For example:

s = triton.get_stream('my_stream', c)
i = s.build_iterator_from_checkpoint()

for ctr in range(1):
rec = i.next()
print rec.data

i.checkpoint()

The next time this code is run, it will pick up from where the last run left off.


### Consuming Archives

Triton data is typically archived to S3. Using the triton command, you can view that data:

$ triton cat --bucket=triton-data --stream=my_stream --start-date=20150715 --end-date=20150715

Or using the API, something like:

import triton

c = triton.load_config("/etc/triton.yaml")
b = triton.open_bucket("triton-data", "us-west-1")
s = triton.stream_from_s3_store(b, c['my_stream'], start_dt, end_dt)

for rec in s:
... do something ...


## Development

You should be able to configure your development environment using make:

~/python-triton $ make dev

You will likely need to install system libraries as well:

~/python-triton $ sudo apt-get install libsnappy-dev libzmq-dev

The tests should all work:

~/python-triton $ make test
.
PASSED. 1 test / 1 case: 1 passed, 0 failed. (Total test time 0.00s)

If you need to debug your application with ipython:

~/python-triton $ make shell
Python 2.7.3 (default, Apr 27 2012, 21:31:10)
Type "copyright", "credits" or "license" for more information.

IPython 0.12.1 -- An enhanced Interactive Python.
? -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help -> Python's own help system.
object? -> Details about 'object', use 'object??' for extra details.

In [1]: from project.models import Project

In [2]:

## TODO

* It would probably be helpful to have some common code for building a worker
process that just handles records.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

py-triton-0.0.18.tar.gz (19.1 kB view details)

Uploaded Source

File details

Details for the file py-triton-0.0.18.tar.gz.

File metadata

  • Download URL: py-triton-0.0.18.tar.gz
  • Upload date:
  • Size: 19.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No

File hashes

Hashes for py-triton-0.0.18.tar.gz
Algorithm Hash digest
SHA256 df7162d87ff9726d4e8a95c22a31eb2cf6b22f1b9c660412ae7ed31900c61da9
MD5 232957ecd4d16b89ef23ba31f41e4fd2
BLAKE2b-256 01f9e500e87848f8c3bcb78e560ede2d2c646e8088341497cb5325498b43efe2

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page