Framework for building data pipelines
Project description
Batchout
Framework for building data pipelines that:
- Extract batch of payloads with hierarchical structure like JSON or XML;
- Transform batch of payloads to batch of rows for a table defined by columns;
- Load batch of table rows to persistent storage.
Install
pip install batchout
Usage
It is better explained by example.
Let's say we periodically fetch a JSON message from some REST API:
{
"user": {
"id": "someuserid",
"last_seen": "2019-11-01T00:00:00"
},
"sessions": [
{
"id": "somesessionid",
"created_at": "2019-10-01T00:00:00",
"device": {
"useragent": "chrome"
}
},
{
"id": "othersessionid",
"created_at": "2019-11-01T00:00:00",
"device": {
"useragent": "firefox"
}
}
]
}
Fetched data has to be put into database table user_session like this:
user_id user_last_seen session_id session_created_at session_useragent
-------------------------------------------------------------------------------------------
someuserid 2019-11-01T00:00:00 somesessionid 2019-10-01T00:00:00 chrome
someuserid 2019-11-01T00:00:00 othersessionid 2019-11-01T00:00:00 firefox
With Batchout, you don't need to write boilerplate code.
Just use batchout.Batch for configuring and running your pipeline.
from batchout import Batch
batch = Batch.from_config(
dict(
inputs=dict(
some_api=dict(
type='http',
method='get',
uri='https://some.api/my/user/sessions',
),
),
extractors=dict(
first_match_in_json=dict(
type='jsonpath',
),
),
indexes=dict(
session_idx=dict(
type='for_list',
path='sessions',
)
),
columns=dict(
user_id=dict(
type='string',
path='user.id',
),
user_last_seen=dict(
type='string',
path='user.last_seen',
),
session_id=dict(
type='string',
path='sessions[{session_idx}].id', # notice usage of session_idx defined as index above
),
session_created_at=dict(
type='timestamp',
path='sessions[{session_idx}].created_at',
),
session_useragent=dict(
type='timestamp',
path='sessions[{session_idx}].device.useragent',
),
),
selectors=dict(
all_sessions=dict(
type='sql',
query='select * from some_api',
columns=[
'user_id',
'user_last_seen',
'session_id',
'session_created_at',
'session_useragent',
]
)
),
outputs=dict(
local_db=dict(
type='postgres',
mode='upsert',
keys=['user_id', 'session_id'],
host='localhost',
port=5432,
dbname='somedb',
table='user_session',
from_env=dict(
user='DB_USER', # DB_USER and
password='DB_PASSWORD', # DB_PASSWORD are read from environment
),
),
),
tasks=dict(
read_sessions=dict(
type='reader',
inputs=['some_api'],
),
walk_sessions=dict(
type='walker',
inputs=['some_api'],
indexes=['session_idx'],
columns=[
'user_id',
'user_last_seen',
'session_id',
'session_created_at',
'session_useragent',
],
),
write_sessions_to_local_db=dict(
type='writer',
selector='all_sessions',
outputs=['local_db']
)
)
),
defaults=dict(
columns=dict(
extractor='first_match_in_json',
),
indexes=dict(
extractor='first_match_in_json',
),
),
)
batch.run_once()
Batch.run_once() processes exactly one batch of payloads from each input.
Batch.run_forever() does the same in infinite loop.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file batchout-0.2.0.tar.gz.
File metadata
- Download URL: batchout-0.2.0.tar.gz
- Upload date:
- Size: 15.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/2.0.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/42.0.1 requests-toolbelt/0.9.1 tqdm/4.37.0 CPython/3.6.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
13be095c97be0d54e52531ff606e2fc70c877409979576201787c038253919a2
|
|
| MD5 |
b84dc0bed9f995184cc40e2fd442c811
|
|
| BLAKE2b-256 |
98028754ba4af5d9b1ffb2b04184d30e7a735c0ffe198f511377b0fb4c0d2c7f
|
File details
Details for the file batchout-0.2.0-py3-none-any.whl.
File metadata
- Download URL: batchout-0.2.0-py3-none-any.whl
- Upload date:
- Size: 30.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/2.0.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/42.0.1 requests-toolbelt/0.9.1 tqdm/4.37.0 CPython/3.6.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fb35a0a4b5a188edbd6e78e74e82fab3f174e3a2a1511bd8a6897b2a89d2a8ae
|
|
| MD5 |
2735889bcd986ef41501fcb2bd98c160
|
|
| BLAKE2b-256 |
5fc20cf92e3d3ea0c5f7f4df77d4f2990add9725b55057fcf05aed466e1c147d
|