Skip to main content

A python utility to split a large CSV into smaller ones, and uses multiprocessing to process the CSVs in parallel.

Project description

csv-batcher

 

 

 

 

 

 

 

Vertical scaling

A lightweight, python-based, multiprocess CSV batcher suitable for use with dataframes or other tools that deal with large CSV files (or those that require timely processing).

Installation

pip install csv-batcher

GitHub

https://github.com/tangledpath/csv-batcher

Documentation

https://tangledpath.github.io/csv-batcher/csv_batcher.html

Further excercises

  • Possibly implement pooling with celery (for use in django apps, etc.), which can bring about horizontal scaling.

Usage

Arguments sent to callback function can be controlled by creating pooler with callback_with and the CallbackWith enum values:

As dataframe row

from csv_batcher.csv_pooler import CSVPooler, CallbackWith

# Callback function passed to pooler; accepts a dataframe row
#   as a pandas Series (via apply)
def process_dataframe_row(row):
    return row.iloc[0]

pooler = CSVPooler(
    "5mSalesRecords.csv",
    process_dataframe_row,
    callback_with=CallbackWith.DATAFRAME_ROW,
    pool_size=16
)
for processed_batch in pooler.process():
    print(processed_batch)

As dataframe

from csv_batcher.csv_pooler import CSVPooler, CallbackWith

# Used from process_datafrom's apply:
def process_dataframe_row(row):
    return row.iloc[0]

# Callback function passed to pooler; accepts a dataframe:
def process_dataframe(df):
    foo = df.apply(process_dataframe_row, axis=1)
    # Or do something more complicated....
    return len(df)

pooler = CSVPooler(
    "5mSalesRecords.csv",
    process_dataframe,
    callback_with=CallbackWith.DATAFRAME,
    pool_size=16
)
for processed_batch in pooler.process():
    print(processed_batch)

As CSV filename

import pandas as pd
from csv_batcher.csv_pooler import CSVPooler, CallbackWith

# Used from process_csv_filename's apply:
def process_dataframe_row(row):
    return row.iloc[0]

def process_csv_filename(csv_chunk_filename):
    # print("processing ", csv_chunk_filename)
    df = pd.read_csv(csv_chunk_filename, skipinitialspace=True, index_col=None)
    foo = df.apply(process_dataframe_row, axis=1)
    return len(df)

pooler = CSVPooler(
    "5mSalesRecords.csv",
    process_csv_filename,
    callback_with=CallbackWith.CSV_FILENAME,
    chunk_lines=10000,
    pool_size=16
)
for processed_batch in pooler.process():
    print(processed_batch)

Development

Linting

ruff check . # Find linting errors
ruff check . --fix # Auto-fix linting errors (where possible)

Documentation

# Shows in browser
poetry run pdoc csv_batcher
# Generates to ./docs
poetry run pdoc csv_batcher -o ./docs
# OR (recommended)
bin/build.sh

Testing

clear; pytest

Publishing

poetry publish --build -u __token__ -p $PYPI_TOKEN`
# OR (recommended)
bin/publish.sh

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

csv_batcher-0.1.6.tar.gz (4.6 kB view hashes)

Uploaded Source

Built Distribution

csv_batcher-0.1.6-py3-none-any.whl (6.1 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page