A python utility to split a large CSV into smaller ones, and uses multiprocessing to process the CSVs in parallel.
Project description
csv-batcher
Vertical scaling
A lightweight, python-based, multiprocess CSV batcher suitable for use with dataframes or other tools that deal with large CSV files (or those that require timely processing).
Installation
pip install csv-batcher
GitHub
https://github.com/tangledpath/csv-batcher
Documentation
https://tangledpath.github.io/csv-batcher/csv_batcher.html
Further excercises
- Possibly implement pooling with celery (for use in django apps, etc.), which can bring about horizontal scaling.
Usage
Arguments sent to callback function can be controlled by
creating pooler with callback_with
and the CallbackWith enum
values:
As dataframe row
from csv_batcher.csv_pooler import CSVPooler, CallbackWith
# Callback function passed to pooler; accepts a dataframe row
# as a pandas Series (via apply)
def process_dataframe_row(row):
return row.iloc[0]
pooler = CSVPooler(
"5mSalesRecords.csv",
process_dataframe_row,
callback_with=CallbackWith.DATAFRAME_ROW,
pool_size=16
)
for processed_batch in pooler.process():
print(processed_batch)
As dataframe
from csv_batcher.csv_pooler import CSVPooler, CallbackWith
# Used from process_datafrom's apply:
def process_dataframe_row(row):
return row.iloc[0]
# Callback function passed to pooler; accepts a dataframe:
def process_dataframe(df):
foo = df.apply(process_dataframe_row, axis=1)
# Or do something more complicated....
return len(df)
pooler = CSVPooler(
"5mSalesRecords.csv",
process_dataframe,
callback_with=CallbackWith.DATAFRAME,
pool_size=16
)
for processed_batch in pooler.process():
print(processed_batch)
As CSV filename
import pandas as pd
from csv_batcher.csv_pooler import CSVPooler, CallbackWith
# Used from process_csv_filename's apply:
def process_dataframe_row(row):
return row.iloc[0]
def process_csv_filename(csv_chunk_filename):
# print("processing ", csv_chunk_filename)
df = pd.read_csv(csv_chunk_filename, skipinitialspace=True, index_col=None)
foo = df.apply(process_dataframe_row, axis=1)
return len(df)
pooler = CSVPooler(
"5mSalesRecords.csv",
process_csv_filename,
callback_with=CallbackWith.CSV_FILENAME,
chunk_lines=10000,
pool_size=16
)
for processed_batch in pooler.process():
print(processed_batch)
Development
Linting
ruff check . # Find linting errors
ruff check . --fix # Auto-fix linting errors (where possible)
Documentation
# Shows in browser
poetry run pdoc csv_batcher
# Generates to ./docs
poetry run pdoc csv_batcher -o ./docs
# OR (recommended)
bin/build.sh
Testing
clear; pytest
Publishing
poetry publish --build -u __token__ -p $PYPI_TOKEN`
# OR (recommended)
bin/publish.sh
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file csv_batcher-0.1.6.tar.gz
.
File metadata
- Download URL: csv_batcher-0.1.6.tar.gz
- Upload date:
- Size: 4.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.1 CPython/3.12.2 Darwin/22.6.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8218825e8033a71a0760d925fb8c3e21ad61211b17b61f8dfc18cf3779f90a62 |
|
MD5 | 5f130f937fbe77857b1f3b243c16b501 |
|
BLAKE2b-256 | e0116aea2e0c8d03d383f8f8217a7afa889384383b133765036f07791b47b117 |
File details
Details for the file csv_batcher-0.1.6-py3-none-any.whl
.
File metadata
- Download URL: csv_batcher-0.1.6-py3-none-any.whl
- Upload date:
- Size: 6.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.1 CPython/3.12.2 Darwin/22.6.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2b20bd48a47a0d5e3e8c687101020617532387a3ddcd181ae013d919f9a8b044 |
|
MD5 | 6214fa0df46455da49da07cd8052452f |
|
BLAKE2b-256 | 5c65858f1cabc9697a50b6098fc9c44b50cba1916f699403912979492904eabd |