Skip to main content

Data chunking for humans, particularly for data engineers

Project description

PyPI pyversions PyPI status GitHub license Code style

datachunks

Data chunking for humans, particularly for data engineers. Makes batched data processing less painful and a little bit more joyful.

Install

datachunks requires Python 3.8 or newer. Install it from PyPI:

$ pip install datachunks

Usage

datachunks implements two chunking strategy:

  1. "Pull" strategy. Wrap your source stream with chunks generator and consume a chunked data stream.
  2. "Push" strategy. Create a special "feeder" object that will send data chunks to a specified consumer function.

First strategy is simple an sutable for most of applications, but second gives more flexibility in building non-trivial in-memory processing pipelines.

chunks and achunks functions

These functions implement the "pull" chunking strategy resectively for synchronous and async/await apllications.

from datachunks import chunks

for chunk in chunks(range(12), 5):
    print(chunk)

Expected output:

[0, 1, 2, 3, 4]
[5, 6, 7, 8, 9]
[10, 11]

Asynchronous version example:

import asyncio
from datachunks import achunks

async def arange(*args, **kwargs):
    for i in range(*args, **kwargs):
        yield i

async def achunks_demo():
    async for chunk in achunks(arange(12), 5):
        print(chunk)

asyncio.run(achunks_demo())

Expected output:

[0, 1, 2, 3, 4]
[5, 6, 7, 8, 9]
[10, 11]

Using "push" strategy

"Push" strategy is implemented in ChunkingFeeder and AsyncChunkingFeeder objects.

Consider the situation we decided to process odd and even numbers separately. For this purpose we create two feeders and put into them odd and even values.

from datachunks import ChunkingFeeder

with ChunkingFeeder(lambda c: print(f'evens: {c}'), 5) as print_evens_feeder, \
        ChunkingFeeder(lambda c: print(f'odds: {c}'), 5) as print_odds_feeder:
    for i in range(25):
        if i % 2 == 0:
            print_evens_feeder.put(i)
        else:
            print_odds_feeder.put(i)

Expected output:

evens: [0, 2, 4, 6, 8]
odds: [1, 3, 5, 7, 9]
evens: [10, 12, 14, 16, 18]
odds: [11, 13, 15, 17, 19]
odds: [21, 23]
evens: [20, 22, 24]

Additional features:

  • It is guarandeed that all data is delivered to the callback functions after the context exit.
  • It is possible to produce additional items in callback function. It allows to build flexible and even recursive data processing, but of course it is your responsibility to avoid infinite recursion.
  • By default ChunkingFeeder calls its chunk consumer synchronously. To use multithreading specify the workers_num parameter.
  • To use multiprocessing set the multiprocessing parameter to True in addition to workers_num parameter.
  • The AsyncChunkingFeeder also supports the workers_num parameter, but does not support multiprocessing.

ETL example

Consider a simple ETL task: we have an orders.jsonl file that we need to upload to some Mongo database. Sending objects one-by-one is too slow, and file is too big to opload it in one big batch. So we are going to split this data to chunks of reasonable size.

Function read_jsonl reads the file and yields objects one-by-one:

import json

def read_jsonl():
    with open('orders.jsonl', 'r', encoding='utf-8') as jsonl:
        for jsoned_obj in jsonl:
            if jsoned_obj:
                yield json.loads(jsoned_obj)

The following function pulls objects through chunks generator and send objects to some MongoDB:

from datachunks import chunks

def transfer_orders(db_connection):
    for chunk in chunks(read_jsonl(), chunk_size=200):
        db_connection.orders.insert_many(chunk)

After a while we decided to store purchase and sales orders into different MongoDB collections, so let's use two chunking feeders:

from datachunks import ChunkingFeeder

class TransferOrders():
    def __init__(self, db_connection):
        self.db_connection = db_connection

    def store_purchase_orders(self, chunk):
        self.db_connection.purchase_orders.insert_many(chunk)

    def store_sales_orders(self, chunk):
        self.db_connection.sales_orders.insert_many(chunk)

    def do_transfer(self):
        with ChunkingFeeder(self.store_purchase_orders, 100, workers_num=1) as purchase_feeder, \
                ChunkingFeeder(self.store_purchase_orders, 500, workers_num=1) as sales_feeder:
            for order in read_jsonl():
                if order.get('order_type') == 'purchase':
                    purchase_feeder.put(order)
                elif order.get('order_type') == 'sales':
                    sales_feeder.put(order)

def transfer_orders(db_connection):
    TransferOrders(db_connection).do_transfer()

The pymongo library is thread-safe, so it makes sense to speed up our process by storing data in separate threads.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

datachunks-0.0.6.tar.gz (7.8 kB view details)

Uploaded Source

Built Distribution

datachunks-0.0.6-py3-none-any.whl (7.4 kB view details)

Uploaded Python 3

File details

Details for the file datachunks-0.0.6.tar.gz.

File metadata

  • Download URL: datachunks-0.0.6.tar.gz
  • Upload date:
  • Size: 7.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.15 CPython/3.10.4 Linux/5.15.0-52-generic

File hashes

Hashes for datachunks-0.0.6.tar.gz
Algorithm Hash digest
SHA256 e2d3a8b1c04df13f067a1b3787a5cb578b06d3b90e8dc52af20caafd8db83c68
MD5 7e590790af712cce7ba32f2a5591b7a7
BLAKE2b-256 03f74226a16637479ae8ad979707913f6f8bccf78d8f323ba31cdff867cdb8e5

See more details on using hashes here.

File details

Details for the file datachunks-0.0.6-py3-none-any.whl.

File metadata

  • Download URL: datachunks-0.0.6-py3-none-any.whl
  • Upload date:
  • Size: 7.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.15 CPython/3.10.4 Linux/5.15.0-52-generic

File hashes

Hashes for datachunks-0.0.6-py3-none-any.whl
Algorithm Hash digest
SHA256 1d826a36b80ef78019024dc0488191167ade9d004769b0c5b5ae91c471e7872e
MD5 334408c7b429cb32e52160892e3db518
BLAKE2b-256 ef95da7237cc53ae031629f34f77c1b63112dd95604396d76525e4cb78023a8c

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page