Skip to main content

Framework for building data pipelines

Project description

Batchout

Framework for building data pipelines that:

  1. Extract batch of payloads with hierarchical structure like JSON or XML;
  2. Transform batch of payloads to batch of rows for a table defined by columns;
  3. Load batch of table rows to persistent storage.

Install

pip install batchout

Usage

It is better explained by example.

Let's say we encounter a stream of JSON messages coming from Kafka topic auth.sessions:

{
  "user": {
    "id": "someuserid",
    "last_seen": "2019-11-01T00:00:00"
  },
  "sessions": [
     {
        "id": "somesessionid",
        "created_at": "2019-10-01T00:00:00",
        "device": {
           "useragent": "chrome"
        }
     },
     {
        "id": "othersessionid",
        "created_at": "2019-11-01T00:00:00",
        "device": {
           "useragent": "firefox"
        }
     }
  ]
}

Stream data has to be put into database table user_session like this:

user_id       user_last_seen        session_id      session_created_at    session_useragent
-------------------------------------------------------------------------------------------
someuserid    2019-11-01T00:00:00   somesessionid   2019-10-01T00:00:00   chrome
someuserid    2019-11-01T00:00:00   othersessionid  2019-11-01T00:00:00   firefox

With Batchout, you don't need to write boilerplate code.

Just use batchout.core.Batch for configuring and running your pipeline.

from batchout.core import Batch


batch = Batch.from_config(
    inputs=dict(
        kafka=dict(
            type='kafka',
            bootstrap_servers=['kafka:9092'],
            consumer_group='batchout',
            topic='auth.sessions',
        ),
    ),
    indexes=dict(
        session_idx=dict(
            type='for_list',
            path='sessions',
        )
    ),
    columns=dict(
        user_id=dict(
            type='extracted',
            cast='string',
            path='user.id',
        ),
        user_last_seen=dict(
            type='extracted',
            cast='string',
            path='user.last_seen',
        ),
        session_id=dict(
            type='extracted',
            cast='string',
            path='sessions[{session_idx}].id',  # notice usage of session_idx defined as index above
        ),
        session_created_at=dict(
            type='extracted',
            cast='timestamp',
            path='sessions[{session_idx}].created_at',
        ),
        session_useragent=dict(
            type='extracted',
            cast='timestamp',
            path='sessions[{session_idx}].device.useragent',
        ),
    ),
    outputs=dict(
        local=dict(
            type='postgres',
            mode='upsert',
            keys=['user_id', 'session_id'],
            host='localhost',
            port=5432,
            dbname='user_session',
            from_env=dict(
                user='PG_USER',          # PG_USER and
                password='PG_PASSWORD',  # PG_PASSWORD are read from environment
            ),
        ),
    ),
)
batch.run_once()

Batch.run_once() processes exactly one batch of payloads from each input.

Batch.run_forever() does the same in infinite loop.

Integrations

Extractors

jsonpath_rw

JSON parser built on top of jsonpath_rw.

Inputs

kafka

Simple consumer built on top of kafka-python.

Outputs

postgres

Postgres table writer built on top of psycopg2.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

batchout-0.0.1.tar.gz (10.7 kB view details)

Uploaded Source

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

batchout-0.0.1-py3.6.egg (40.0 kB view details)

Uploaded Egg

batchout-0.0.1-py3-none-any.whl (17.0 kB view details)

Uploaded Python 3

File details

Details for the file batchout-0.0.1.tar.gz.

File metadata

  • Download URL: batchout-0.0.1.tar.gz
  • Upload date:
  • Size: 10.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/2.0.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.0.1 requests-toolbelt/0.9.1 tqdm/4.37.0 CPython/3.7.4

File hashes

Hashes for batchout-0.0.1.tar.gz
Algorithm Hash digest
SHA256 9d4f1a2b29e73722c0c03d1c2f15aa96d80950efa3d5b39625004253f631e544
MD5 27139b3c73e51fad728baa9526426cec
BLAKE2b-256 df2c8d80ab0e0a084d35399795778534d8b2abc425e4457fac14f9581ca46952

See more details on using hashes here.

File details

Details for the file batchout-0.0.1-py3.6.egg.

File metadata

  • Download URL: batchout-0.0.1-py3.6.egg
  • Upload date:
  • Size: 40.0 kB
  • Tags: Egg
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/2.0.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.0.1 requests-toolbelt/0.9.1 tqdm/4.37.0 CPython/3.7.4

File hashes

Hashes for batchout-0.0.1-py3.6.egg
Algorithm Hash digest
SHA256 162aa632918763a7776daea8a0d3880ed71855647685040e2d60dc59b4fd2598
MD5 225a95d6372f0d71e6459c76a4d441dd
BLAKE2b-256 605f27cc80c06aa40d08a7efd2ae8612a8112fcdcd5d7e2903ee0750e9807a28

See more details on using hashes here.

File details

Details for the file batchout-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: batchout-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 17.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/2.0.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.0.1 requests-toolbelt/0.9.1 tqdm/4.37.0 CPython/3.7.4

File hashes

Hashes for batchout-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 34ed7f08907eac57011ae1252a7840eb154fa5e948b856c85ddb7d1bffd4e7e9
MD5 896b807ac22d0702261aacc7e66f0a7e
BLAKE2b-256 c0e4f2acbe4a3416ae0ce9194b78fe3ee7c716afbf1d568831541bc0ac61666a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page