Skip to main content

Framework for building data pipelines

Project description

Batchout

Framework for building data pipelines that:

  1. Extract batch of payloads of any structure like JSON, XML or just text/bytes;
  2. Transform batch of payloads to batch of rows for a table defined by columns;
  3. Load batch of table rows to persistent storage.

Install

pip install batchout without dependencies except Python 3.9+

pip install batchout[cli] with a basic CLI if you avoid coding in Python

pip install batchout[postgres] for Postgres reading/writing

pip install batchout[jsonpath] to extract data from JSON documents

pip install batchout[xpath] to extract data from XML/HTML

Use in Python

It is better explained by example.

Let's say we periodically fetch a JSON message from some REST API:

{
  "user": {
    "id": "someuserid",
    "last_seen": "2019-11-01T00:00:00"
  },
  "sessions": [
     {
        "id": "somesessionid",
        "created_at": "2019-10-01T00:00:00",
        "device": {
           "useragent": "chrome"
        }
     },
     {
        "id": "othersessionid",
        "created_at": "2019-11-01T00:00:00",
        "device": {
           "useragent": "firefox"
        }
     }
  ]
}

Fetched data has to be put into database table user_session like this:

user_id       user_last_seen        session_id      session_created_at    session_useragent
-------------------------------------------------------------------------------------------
someuserid    2019-11-01T00:00:00   somesessionid   2019-10-01T00:00:00   chrome
someuserid    2019-11-01T00:00:00   othersessionid  2019-11-01T00:00:00   firefox

With Batchout, you don't need to write boilerplate code.

Just use batchout.Batch for configuring and running your pipeline.

from batchout import Batch

batch = Batch.from_config(
    dict(
        inputs=dict(
            some_api=dict(
                type='http',
                method='get',
                uri='https://some.api/my/user/sessions',
            ),
        ),
        extractors=dict(
            first_match_in_json=dict(
                type='jsonpath',
            ),
        ),
        indexes=dict(
            session_idx=dict(
                type='for_list',
                path='sessions',
            )
        ),
        columns=dict(
            user_id=dict(
                type='string',
                path='user.id',
            ),
            user_last_seen=dict(
                type='string',
                path='user.last_seen',
            ),
            session_id=dict(
                type='string',
                path='sessions[{session_idx}].id',  # notice usage of session_idx defined as index above
            ),
            session_created_at=dict(
                type='datetime',
                path='sessions[{session_idx}].created_at',
            ),
            session_useragent=dict(
                type='datetime',
                path='sessions[{session_idx}].device.useragent',
            ),
        ),
        maps=dict(
            some_api=[
                'user_id',
                'user_last_seen',
                dict(
                    session_idx=[
                        'session_id',
                        'session_created_at',
                        'session_useragent',
                    ]
                ),
            ],
        ),
        selectors=dict(
            all_sessions=dict(
                type='sql',
                query='select * from some_api',
                columns=[
                    'user_id',
                    'user_last_seen',
                    'session_id',
                    'session_created_at',
                    'session_useragent',
                ]
            )
        ),
        outputs=dict(
            local_db=dict(
                type='postgres',
                mode='upsert',
                keys=['user_id', 'session_id'],
                host='localhost',
                port=5432,
                dbname='somedb',
                table='user_session',
                from_env=dict(
                    user='DB_USER',          # DB_USER and
                    password='DB_PASSWORD',  # DB_PASSWORD are read from environment
                ),
            ),
        ),
        tasks=dict(
            read_sessions=dict(
                type='reader',
                inputs=['some_api'],
            ),
            write_sessions_to_local_db=dict(
                type='writer',
                selector='all_sessions',
                outputs=['local_db']
            )
        )
    ),
    defaults=dict(
        columns=dict(
            extractor='first_match_in_json',
        ),
        indexes=dict(
            extractor='first_match_in_json',
        ),
    ),
)
batch.run_once()

Use in terminal

Requires pip install batchout[cli]

$ batchout --help
usage: batchout [-h] -c CONFIG [-n NUM_BATCHES] [-w MIN_WAIT_SEC] [-W MAX_WAIT_SEC] [-l LOG_LEVEL]

Run Batchout from a config file (YAML)

optional arguments:
  -h, --help            show this help message and exit
  -c CONFIG, --config CONFIG
                        Path to YAML config file
  -I [IMPORT_FROM ...], --import-from [IMPORT_FROM ...]
                        Import Python modules containing custom Batchout components
  -n NUM_BATCHES, --num-batches NUM_BATCHES
                        Stop after N batches (never stop if -1 or less)
  -w MIN_WAIT_SEC, --min-wait-sec MIN_WAIT_SEC
                        Minimum seconds to wait between batches
  -W MAX_WAIT_SEC, --max-wait-sec MAX_WAIT_SEC
                        Maximum seconds to wait between batches
  -l LOG_LEVEL, --log-level LOG_LEVEL
                        Choose logging level between 10 (DEBUG) and 50 (FATAL)

Read documentation

First time? Proceed to Batchout documentation.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

batchout-0.3.0.tar.gz (24.3 kB view details)

Uploaded Source

Built Distribution

batchout-0.3.0-py3-none-any.whl (35.0 kB view details)

Uploaded Python 3

File details

Details for the file batchout-0.3.0.tar.gz.

File metadata

  • Download URL: batchout-0.3.0.tar.gz
  • Upload date:
  • Size: 24.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.9.8

File hashes

Hashes for batchout-0.3.0.tar.gz
Algorithm Hash digest
SHA256 c5217c511e943922d28a090f8ad852c7b202b0a73ba9c970902d4a83c56d50ba
MD5 862739c16bfd0ec12cbdd9a815ba56bf
BLAKE2b-256 795eb8da5271d5c32d8bd0ec08001b4c24fce2c776c5f930e6afcd8b19c72b5a

See more details on using hashes here.

File details

Details for the file batchout-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: batchout-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 35.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.9.8

File hashes

Hashes for batchout-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 26c8db4ff70da2e1ff4c7720ea64525d716f120efb74f54f7bf2b76ad0b42d79
MD5 ae07e01a4a9989250fb1371336bcb2c1
BLAKE2b-256 3f2bc94d58c029db686c6118a29220fe830b0a18f3ce11c0cac55a0a6339e89f

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page