Framework for building data pipelines
Project description
Batchout
Framework for building data pipelines that:
- Extract batch of payloads with hierarchical structure like JSON or XML;
- Transform batch of payloads to batch of rows for a table defined by columns;
- Load batch of table rows to persistent storage.
Install
pip install batchout
Usage
It is better explained by example.
Let's say we periodically fetch a JSON message from some REST API:
{ "user": { "id": "someuserid", "last_seen": "2019-11-01T00:00:00" }, "sessions": [ { "id": "somesessionid", "created_at": "2019-10-01T00:00:00", "device": { "useragent": "chrome" } }, { "id": "othersessionid", "created_at": "2019-11-01T00:00:00", "device": { "useragent": "firefox" } } ] }
Fetched data has to be put into database table user_session
like this:
user_id user_last_seen session_id session_created_at session_useragent
-------------------------------------------------------------------------------------------
someuserid 2019-11-01T00:00:00 somesessionid 2019-10-01T00:00:00 chrome
someuserid 2019-11-01T00:00:00 othersessionid 2019-11-01T00:00:00 firefox
With Batchout, you don't need to write boilerplate code.
Just use batchout.Batch
for configuring and running your pipeline.
from batchout import Batch batch = Batch.from_config( dict( inputs=dict( some_api=dict( type='http', method='get', uri='https://some.api/my/user/sessions', ), ), extractors=dict( first_match_in_json=dict( type='jsonpath', ), ), indexes=dict( session_idx=dict( type='for_list', path='sessions', ) ), columns=dict( user_id=dict( type='string', path='user.id', ), user_last_seen=dict( type='string', path='user.last_seen', ), session_id=dict( type='string', path='sessions[{session_idx}].id', # notice usage of session_idx defined as index above ), session_created_at=dict( type='timestamp', path='sessions[{session_idx}].created_at', ), session_useragent=dict( type='timestamp', path='sessions[{session_idx}].device.useragent', ), ), selectors=dict( all_sessions=dict( type='sql', query='select * from some_api', columns=[ 'user_id', 'user_last_seen', 'session_id', 'session_created_at', 'session_useragent', ] ) ), outputs=dict( local_db=dict( type='postgres', mode='upsert', keys=['user_id', 'session_id'], host='localhost', port=5432, dbname='somedb', table='user_session', from_env=dict( user='DB_USER', # DB_USER and password='DB_PASSWORD', # DB_PASSWORD are read from environment ), ), ), tasks=dict( read_sessions=dict( type='reader', inputs=['some_api'], ), walk_sessions=dict( type='walker', inputs=['some_api'], indexes=['session_idx'], columns=[ 'user_id', 'user_last_seen', 'session_id', 'session_created_at', 'session_useragent', ], ), write_sessions_to_local_db=dict( type='writer', selector='all_sessions', outputs=['local_db'] ) ) ), defaults=dict( columns=dict( extractor='first_match_in_json', ), indexes=dict( extractor='first_match_in_json', ), ), ) batch.run_once()
Batch.run_once()
processes exactly one batch of payloads from each input.
Batch.run_forever()
does the same in infinite loop.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Filename, size | File type | Python version | Upload date | Hashes |
---|---|---|---|---|
Filename, size batchout-0.2.0-py3-none-any.whl (30.9 kB) | File type Wheel | Python version py3 | Upload date | Hashes View |
Filename, size batchout-0.2.0.tar.gz (15.5 kB) | File type Source | Python version None | Upload date | Hashes View |