Skip to main content

A python utility to split a large CSV into smaller ones, and uses multiprocessing to process the CSVs in parallel.

Project description

csv-batcher

 

 

 

 

 

 

 

Vertical scaling

A lightweight, python-based, multiprocess CSV batcher suitable for use with dataframes or other tools that deal with large CSV files (or those that require timely processing).

Installation

pip install csv-batcher

GitHub

https://github.com/tangledpath/csv-batcher

Documentation

https://tangledpath.github.io/csv-batcher/csv_batcher.html

Further excercises

  • Possibly implement pooling with celery (for use in django apps, etc.), which can bring about horizontal scaling.

Usage

Arguments sent to callback function can be controlled by creating pooler with callback_with and the CallbackWith enum values:

As dataframe row

from csv_batcher.csv_pooler import CSVPooler, CallbackWith

# Callback function passed to pooler; accepts a dataframe row
#   as a pandas Series (via apply)
def process_dataframe_row(row):
    return row.iloc[0]

pooler = CSVPooler(
    "5mSalesRecords.csv",
    process_dataframe_row,
    callback_with=CallbackWith.DATAFRAME_ROW,
    pool_size=16
)
for processed_batch in pooler.process():
    print(processed_batch)

As dataframe

from csv_batcher.csv_pooler import CSVPooler, CallbackWith

# Used from process_datafrom's apply:
def process_dataframe_row(row):
    return row.iloc[0]

# Callback function passed to pooler; accepts a dataframe:
def process_dataframe(df):
    foo = df.apply(process_dataframe_row, axis=1)
    # Or do something more complicated....
    return len(df)

pooler = CSVPooler(
    "5mSalesRecords.csv",
    process_dataframe,
    callback_with=CallbackWith.DATAFRAME,
    pool_size=16
)
for processed_batch in pooler.process():
    print(processed_batch)

As CSV filename

import pandas as pd
from csv_batcher.csv_pooler import CSVPooler, CallbackWith

# Used from process_csv_filename's apply:
def process_dataframe_row(row):
    return row.iloc[0]

def process_csv_filename(csv_chunk_filename):
    # print("processing ", csv_chunk_filename)
    df = pd.read_csv(csv_chunk_filename, skipinitialspace=True, index_col=None)
    foo = df.apply(process_dataframe_row, axis=1)
    return len(df)

pooler = CSVPooler(
    "5mSalesRecords.csv",
    process_csv_filename,
    callback_with=CallbackWith.CSV_FILENAME,
    chunk_lines=10000,
    pool_size=16
)
for processed_batch in pooler.process():
    print(processed_batch)

Development

Linting

ruff check . # Find linting errors
ruff check . --fix # Auto-fix linting errors (where possible)

Documentation

# Shows in browser
poetry run pdoc csv_batcher
# Generates to ./docs
poetry run pdoc csv_batcher -o ./docs
# OR (recommended)
bin/build.sh

Testing

clear; pytest

Publishing

poetry publish --build -u __token__ -p $PYPI_TOKEN`
# OR (recommended)
bin/publish.sh

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

csv_batcher-0.1.6.tar.gz (4.6 kB view details)

Uploaded Source

Built Distribution

csv_batcher-0.1.6-py3-none-any.whl (6.1 kB view details)

Uploaded Python 3

File details

Details for the file csv_batcher-0.1.6.tar.gz.

File metadata

  • Download URL: csv_batcher-0.1.6.tar.gz
  • Upload date:
  • Size: 4.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.1 CPython/3.12.2 Darwin/22.6.0

File hashes

Hashes for csv_batcher-0.1.6.tar.gz
Algorithm Hash digest
SHA256 8218825e8033a71a0760d925fb8c3e21ad61211b17b61f8dfc18cf3779f90a62
MD5 5f130f937fbe77857b1f3b243c16b501
BLAKE2b-256 e0116aea2e0c8d03d383f8f8217a7afa889384383b133765036f07791b47b117

See more details on using hashes here.

File details

Details for the file csv_batcher-0.1.6-py3-none-any.whl.

File metadata

  • Download URL: csv_batcher-0.1.6-py3-none-any.whl
  • Upload date:
  • Size: 6.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.1 CPython/3.12.2 Darwin/22.6.0

File hashes

Hashes for csv_batcher-0.1.6-py3-none-any.whl
Algorithm Hash digest
SHA256 2b20bd48a47a0d5e3e8c687101020617532387a3ddcd181ae013d919f9a8b044
MD5 6214fa0df46455da49da07cd8052452f
BLAKE2b-256 5c65858f1cabc9697a50b6098fc9c44b50cba1916f699403912979492904eabd

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page