Framework for building data pipelines
Project description
Batchout
Framework for building data pipelines that:
- Extract batch of payloads with hierarchical structure like JSON or XML;
- Transform batch of payloads to batch of rows for a table defined by columns;
- Load batch of table rows to persistent storage.
Install
pip install batchout
Usage
It is better explained by example.
Let's say we encounter a stream of JSON messages coming from Kafka topic auth.sessions:
{
"user": {
"id": "someuserid",
"last_seen": "2019-11-01T00:00:00"
},
"sessions": [
{
"id": "somesessionid",
"created_at": "2019-10-01T00:00:00",
"device": {
"useragent": "chrome"
}
},
{
"id": "othersessionid",
"created_at": "2019-11-01T00:00:00",
"device": {
"useragent": "firefox"
}
}
]
}
Stream data has to be put into database table user_session like this:
user_id user_last_seen session_id session_created_at session_useragent
-------------------------------------------------------------------------------------------
someuserid 2019-11-01T00:00:00 somesessionid 2019-10-01T00:00:00 chrome
someuserid 2019-11-01T00:00:00 othersessionid 2019-11-01T00:00:00 firefox
With Batchout, you don't need to write boilerplate code.
Just use batchout.core.Batch for configuring and running your pipeline.
from batchout.core import Batch
batch = Batch.from_config(
inputs=dict(
kafka=dict(
type='kafka',
bootstrap_servers=['kafka:9092'],
consumer_group='batchout',
topic='auth.sessions',
),
),
indexes=dict(
session_idx=dict(
type='for_list',
path='sessions',
)
),
columns=dict(
user_id=dict(
type='extracted',
cast='string',
path='user.id',
),
user_last_seen=dict(
type='extracted',
cast='string',
path='user.last_seen',
),
session_id=dict(
type='extracted',
cast='string',
path='sessions[{session_idx}].id', # notice usage of session_idx defined as index above
),
session_created_at=dict(
type='extracted',
cast='timestamp',
path='sessions[{session_idx}].created_at',
),
session_useragent=dict(
type='extracted',
cast='timestamp',
path='sessions[{session_idx}].device.useragent',
),
),
outputs=dict(
local=dict(
type='postgres',
mode='upsert',
keys=['user_id', 'session_id'],
host='localhost',
port=5432,
dbname='user_session',
from_env=dict(
user='PG_USER', # PG_USER and
password='PG_PASSWORD', # PG_PASSWORD are read from environment
),
),
),
)
batch.run_once()
Batch.run_once() processes exactly one batch of payloads from each input.
Batch.run_forever() does the same in infinite loop.
Integrations
Extractors
jsonpath_rw
JSON parser built on top of jsonpath_rw.
Inputs
kafka
Simple consumer built on top of kafka-python.
Outputs
postgres
Postgres table writer built on top of psycopg2.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distributions
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file batchout-0.0.1.tar.gz.
File metadata
- Download URL: batchout-0.0.1.tar.gz
- Upload date:
- Size: 10.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/2.0.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.0.1 requests-toolbelt/0.9.1 tqdm/4.37.0 CPython/3.7.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9d4f1a2b29e73722c0c03d1c2f15aa96d80950efa3d5b39625004253f631e544
|
|
| MD5 |
27139b3c73e51fad728baa9526426cec
|
|
| BLAKE2b-256 |
df2c8d80ab0e0a084d35399795778534d8b2abc425e4457fac14f9581ca46952
|
File details
Details for the file batchout-0.0.1-py3.6.egg.
File metadata
- Download URL: batchout-0.0.1-py3.6.egg
- Upload date:
- Size: 40.0 kB
- Tags: Egg
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/2.0.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.0.1 requests-toolbelt/0.9.1 tqdm/4.37.0 CPython/3.7.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
162aa632918763a7776daea8a0d3880ed71855647685040e2d60dc59b4fd2598
|
|
| MD5 |
225a95d6372f0d71e6459c76a4d441dd
|
|
| BLAKE2b-256 |
605f27cc80c06aa40d08a7efd2ae8612a8112fcdcd5d7e2903ee0750e9807a28
|
File details
Details for the file batchout-0.0.1-py3-none-any.whl.
File metadata
- Download URL: batchout-0.0.1-py3-none-any.whl
- Upload date:
- Size: 17.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/2.0.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.0.1 requests-toolbelt/0.9.1 tqdm/4.37.0 CPython/3.7.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
34ed7f08907eac57011ae1252a7840eb154fa5e948b856c85ddb7d1bffd4e7e9
|
|
| MD5 |
896b807ac22d0702261aacc7e66f0a7e
|
|
| BLAKE2b-256 |
c0e4f2acbe4a3416ae0ce9194b78fe3ee7c716afbf1d568831541bc0ac61666a
|