Skip to main content

Framework for building data pipelines

Project description

Batchout

Framework for building data pipelines that:

  1. Extract batch of payloads with hierarchical structure like JSON or XML;
  2. Transform batch of payloads to batch of rows for a table defined by columns;
  3. Load batch of table rows to persistent storage.

Install

pip install batchout

Usage

It is better explained by example.

Let's say we periodically fetch a JSON message from some REST API:

{
  "user": {
    "id": "someuserid",
    "last_seen": "2019-11-01T00:00:00"
  },
  "sessions": [
     {
        "id": "somesessionid",
        "created_at": "2019-10-01T00:00:00",
        "device": {
           "useragent": "chrome"
        }
     },
     {
        "id": "othersessionid",
        "created_at": "2019-11-01T00:00:00",
        "device": {
           "useragent": "firefox"
        }
     }
  ]
}

Fetched data has to be put into database table user_session like this:

user_id       user_last_seen        session_id      session_created_at    session_useragent
-------------------------------------------------------------------------------------------
someuserid    2019-11-01T00:00:00   somesessionid   2019-10-01T00:00:00   chrome
someuserid    2019-11-01T00:00:00   othersessionid  2019-11-01T00:00:00   firefox

With Batchout, you don't need to write boilerplate code.

Just use batchout.Batch for configuring and running your pipeline.

from batchout import Batch


batch = Batch.from_config(
    dict(
        inputs=dict(
            some_api=dict(
                type='http',
                method='get',
                uri='https://some.api/my/user/sessions',
            ),
        ),
        extractors=dict(
            first_match_in_json=dict(
                type='jsonpath',
            ),
        ),
        indexes=dict(
            session_idx=dict(
                type='for_list',
                path='sessions',
            )
        ),
        columns=dict(
            user_id=dict(
                type='string',
                path='user.id',
            ),
            user_last_seen=dict(
                type='string',
                path='user.last_seen',
            ),
            session_id=dict(
                type='string',
                path='sessions[{session_idx}].id',  # notice usage of session_idx defined as index above
            ),
            session_created_at=dict(
                type='timestamp',
                path='sessions[{session_idx}].created_at',
            ),
            session_useragent=dict(
                type='timestamp',
                path='sessions[{session_idx}].device.useragent',
            ),
        ),
        selectors=dict(
            all_sessions=dict(
                type='sql',
                query='select * from some_api',
                columns=[
                    'user_id',
                    'user_last_seen',
                    'session_id',
                    'session_created_at',
                    'session_useragent',
                ]
            )
        ),
        outputs=dict(
            local_db=dict(
                type='postgres',
                mode='upsert',
                keys=['user_id', 'session_id'],
                host='localhost',
                port=5432,
                dbname='somedb',
                table='user_session',
                from_env=dict(
                    user='DB_USER',          # DB_USER and
                    password='DB_PASSWORD',  # DB_PASSWORD are read from environment
                ),
            ),
        ),
        tasks=dict(
            read_sessions=dict(
                type='reader',
                inputs=['some_api'],
            ),
            walk_sessions=dict(
                type='walker',
                inputs=['some_api'],
                indexes=['session_idx'],
                columns=[
                    'user_id',
                    'user_last_seen',
                    'session_id',
                    'session_created_at',
                    'session_useragent',
                ],
            ),
            write_sessions_to_local_db=dict(
                type='writer',
                selector='all_sessions',
                outputs=['local_db']
            )
        )
    ),
    defaults=dict(
        columns=dict(
            extractor='first_match_in_json',
        ),
        indexes=dict(
            extractor='first_match_in_json',
        ),
    ),
)
batch.run_once()

Batch.run_once() processes exactly one batch of payloads from each input.

Batch.run_forever() does the same in infinite loop.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

batchout-0.2.0.tar.gz (15.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

batchout-0.2.0-py3-none-any.whl (30.9 kB view details)

Uploaded Python 3

File details

Details for the file batchout-0.2.0.tar.gz.

File metadata

  • Download URL: batchout-0.2.0.tar.gz
  • Upload date:
  • Size: 15.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/2.0.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/42.0.1 requests-toolbelt/0.9.1 tqdm/4.37.0 CPython/3.6.8

File hashes

Hashes for batchout-0.2.0.tar.gz
Algorithm Hash digest
SHA256 13be095c97be0d54e52531ff606e2fc70c877409979576201787c038253919a2
MD5 b84dc0bed9f995184cc40e2fd442c811
BLAKE2b-256 98028754ba4af5d9b1ffb2b04184d30e7a735c0ffe198f511377b0fb4c0d2c7f

See more details on using hashes here.

File details

Details for the file batchout-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: batchout-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 30.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/2.0.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/42.0.1 requests-toolbelt/0.9.1 tqdm/4.37.0 CPython/3.6.8

File hashes

Hashes for batchout-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 fb35a0a4b5a188edbd6e78e74e82fab3f174e3a2a1511bd8a6897b2a89d2a8ae
MD5 2735889bcd986ef41501fcb2bd98c160
BLAKE2b-256 5fc20cf92e3d3ea0c5f7f4df77d4f2990add9725b55057fcf05aed466e1c147d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page