Skip to main content

A utility to split a large CSV into smaller ones, and uses multiprocessing to process the CSVs in parallel.

Project description

csv-batcher

Installation

TBD (python package) pip install csv-batcher

Documentation

https://tangledpath.github.io/csv-batcher/csv_batcher.html

Usage

Arguments sent to callback function can be controlled by creating pooler with callback_with and the CallbackWith enum values:

As dataframe row

  from csv_batcher.csv_pooler import CSVPooler, CallbackWith

  # Callback function passed to pooler; accepts a dataframe row
  #   as a pandas Series (via apply)
  def process_dataframe_row(row):
    return row.iloc[0]

  pooler = CSVPooler(
    "5mSalesRecords.csv",
    process_dataframe_row,
    callback_with=CallbackWith.DATAFRAME_ROW,
    pool_size=16
  )
  pooler.process()

### As dataframe
```python
  from csv_batcher.csv_pooler import CSVPooler, CallbackWith

  # Used in DataFrame.apply:
  def process_dataframe_row(row):
    return row.iloc[0]

  # Callback function passed to pooler; accepts a dataframe:
  def process_dataframe(df):
    foo = df.apply(process_dataframe_row, axis=1)
    # Or do something more complicated....
    return len(df)

  pooler = CSVPooler(
    "5mSalesRecords.csv",
    process_dataframe,
    callback_with=CallbackWith.DATAFRAME,
    pool_size=16
  )
  pooler.process()

### As CSV filename
```python
  from csv_batcher.csv_pooler import CSVPooler, CallbackWith

  def process_csv_filename(csv_chunk_filename):
      # print("processing ", csv_chunk_filename)
      df = pd.read_csv(csv_chunk_filename, skipinitialspace=True, index_col=None)
      foo = df.apply(process_dataframe_row, axis=1)
      return len(df)

  def process_as_dataframe(df):
      foo = df.apply(process_dataframe_row, axis=1)
      return len(df)

  def process_dataframe_row(row):
      return row.iloc[0]

  pooler = CSVPooler(
    "5mSalesRecords.csv",
    process_dataframe,
    callback_with=CallbackWith.CSV_FILENAME
    chunk_lines=10000,
    pool_size=16
  )
  pooler.process()

Development

Linting

   ruff check . # Find linting errors
   ruff check . --fix # Auto-fix linting errors (where possible)

Documentation

  # Shows in browser
  poetry run pdoc csv_batcher
  # Generates to ./docs
  poetry run pdoc csv_batcher -o ./docs

Testing

  clear; pytest

Publishing

poetry publish --build -u __token__ -p $PYPI_TOKEN

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

csv_batcher-0.1.2.tar.gz (4.0 kB view hashes)

Uploaded Source

Built Distribution

csv_batcher-0.1.2-py3-none-any.whl (5.6 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page