Skip to main content

Parallel processing on pandas with progress bars

Project description

Parallel-pandas

Makes it easy to parallelize your calculations in pandas on all your CPUs.

Installation

pip install --upgrade parallel-pandas

Quickstart

import pandas as pd
import numpy as np
from parallel_pandas import ParallelPandas

#initialize parallel-pandas
ParallelPandas.initialize(n_cpu=16, split_factor=4, disable_pr_bar=True)

# create big DataFrame
df = pd.DataFrame(np.random.random((1_000_000, 100)))

# calculate multiple quantiles. Pandas only uses one core of CPU
%%timeit
res = df.quantile(q=[.25, .5, .95], axis=1)

3.66 s ± 31.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

#p_quantile is parallel analogue of quantile methods. Can use all cores of your CPU.
%%timeit
res = df.p_quantile(q=[.25, .5, .95], axis=1)

679 ms ± 10.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

As you can see the p_quantile method is 5 times faster!

Usage

Under the hood, parallel-pandas works very simply. The Dataframe or Series is split into chunks along the first or second axis. Then these chunks are passed to a pool of processes or threads where the desired method is executed on each part. At the end, the parts are concatenated to get the final result.

When initializing parallel-pandas you can specify the following options:

  1. n_cpu - the number of cores of your CPU that you want to use (default None - use all cores of CPU)
  2. split_factor - Affects the number of chunks into which the DataFrame/Series is split according to the formula chunks_number = split_factor*n_cpu (default 1).
  3. show_vmem - Shows a progress bar with available RAM (default False)
  4. disable_pr_bar - Disable the progress bar for parallel tasks (default False)

For example

import pandas as pd
import numpy as np
from parallel_pandas import ParallelPandas

#initialize parallel-pandas
ParallelPandas.initialize(n_cpu=16, split_factor=4, disable_pr_bar=False)

# create big DataFrame
df = pd.DataFrame(np.random.random((1_000_000, 100)))

During initialization, we specified split_factor=4 and n_cpu = 16, so the DataFrame will be split into 64 chunks (in the case of the describe method, axis = 1) and the progress bar shows the progress for each chunk

You can parallelize any expression with pandas Dataframe. For example, let's do a z-score normalization of columns in a dataframe. Let's look at the execution time and memory consumption. Compare with synchronous execution and with Dask.DataFrame

import pandas as pd
import numpy as np
from parallel_pandas import ParallelPandas
import dask.dataframe as dd
from time import monotonic

#initialize parallel-pandas
ParallelPandas.initialize(n_cpu=16, split_factor=8, disable_pr_bar=True)

# create big DataFrame
df = pd.DataFrame(np.random.random((1_000_000, 1000)))

# create dask DataFrame
ddf = dd.from_pandas(df, npartitions=128)

start = monotonic()
res=(df-df.mean())/df.std()
print(f'synchronous z-score normalization time took: {monotonic()-start:.1f} s.')
synchronous z-score normalization time took: 21.7 s.
#parallel-pandas
start = monotonic()
res=(df-df.p_mean())/df.p_std()
print(f'parallel z-score normalization time took: {monotonic()-start:.1f} s.')
parallel z-score normalization time took: 11.7 s.
#dask dataframe
start = monotonic()
res=((ddf-ddf.mean())/ddf.std()).compute()
print(f'dask parallel z-score normalization time took: {monotonic()-start:.1f} s.')
dask parallel z-score normalization time took: 12.5 s.

Pay attention to memory consumption. parallel-pandas and dask use almost half as much RAM as pandas

For some methods parallel-pandas is faster than dask.DataFrame:

#dask
%%time
res = ddf.nunique().compute()
Wall time: 42.9 s

%%time
res = ddf.rolling(10).mean().compute()
Wall time: 19.1 s

#parallel-pandas
%%time
res = df.p_nunique()
Wall time: 12.9 s

%%time
res = df.rolling(10).p_mean()
Wall time: 12.5 s

API

Parallel counterparts for pandas Series methods

methods parallel analogue executor
pd.Series.apply() pd.Series.p_apply() threads / processes
pd.Series.map() pd.Series.p_map() threads / processes

Parallel counterparts for pandas SeriesGroupBy methods

methods parallel analogue executor
pd.SeriesGroupBy.apply() pd.SeriesGroupBy.p_apply() threads / processes

Parallel counterparts for pandas Dataframe methods

methods parallel analogue executor
df.mean() df.p_mean() threads
df.min() df.p_min() threads
df.max() df.p_max() threads
df.median() df.p_max() threads
df.kurt() df.p_kurt() threads
df.skew() df.p_skew() threads
df.sum() df.p_sum() threads
df.prod() df.p_prod() threads
df.var() df.p_var() threads
df.sem() df.p_sem() threads
df.std() df.p_std() threads
df.cummin() df.p_cummin() threads
df.cumsum() df.p_cumsum() threads
df.cummax() df.p_cummax() threads
df.cumprod() df.p_cumprod() threads
df.apply() df.p_apply() threads / processes
df.applymap() df.p_applymap() processes
df.replace() df.p_replace() threads
df.describe() df.p_describe() threads
df.nunique() df.p_nunique() threads / processes
df.mad() df.p_mad() threads
df.idxmin() df.p_idxmin() threads
df.idxmax() df.p_idxmax() threads
df.rank() df.p_rank() threads
df.mode() df.p_mode() threads/processes
df.agg() df.p_agg() threads/processes
df.aggregate() df.p_aggregate() threads/processes

Parallel counterparts for pandas DataframeGroupBy methods

methods parallel analogue executor
DataFrameGroupBy.apply() DataFrameGroupBy.p_apply() threads / processes

Parallel counterparts for pandas window methods

Rolling

methods parallel analogue executor
pd.core.window.Rolling.apply() pd.core.window.Rolling.p_apply() threads / processes
pd.core.window.Rolling.min() pd.core.window.Rolling.p_min() threads / processes
pd.core.window.Rolling.max() pd.core.window.Rolling.p_max() threads / processes
pd.core.window.Rolling.mean() pd.core.window.Rolling.p_mean() threads / processes
pd.core.window.Rolling.sum() pd.core.window.Rolling.p_sum() threads / processes
pd.core.window.Rolling.var() pd.core.window.Rolling.p_var() threads / processes
pd.core.window.Rolling.sem() pd.core.window.Rolling.p_sem() threads / processes
pd.core.window.Rolling.skew() pd.core.window.Rolling.p_skew() threads / processes
pd.core.window.Rolling.kurt() pd.core.window.Rolling.p_kurt() threads / processes
pd.core.window.Rolling.median() pd.core.window.Rolling.p_median() threads / processes
pd.core.window.Rolling.quantile() pd.core.window.Rolling.p_quantile() threads / processes
pd.core.window.Rolling.rank() pd.core.window.Rolling.p_rank() threads / processes
pd.core.window.Rolling.agg() pd.core.window.Rolling.p_agg() threads / processes
pd.core.window.Rolling.aggregate() pd.core.window.Rolling.p_aggregate() threads / processes

Window

methods parallel analogue executor
pd.core.window.Window.mean() pd.core.window.Window.p_mean() threads / processes
pd.core.window.Window.sum() pd.core.window.Window.p_sum() threads / processes
pd.core.window.Window.var() pd.core.window.Window.p_var() threads / processes
pd.core.window.Window.std() pd.core.window.Window.p_std() threads / processes

RollingGroupby

methods parallel analogue executor
pd.core.window.RollingGroupby.apply() pd.core.window.RollingGroupby.p_apply() threads / processes
pd.core.window.RollingGroupby.min() pd.core.window.RollingGroupby.p_min() threads / processes
pd.core.window.RollingGroupby.max() pd.core.window.RollingGroupby.p_max() threads / processes
pd.core.window.RollingGroupby.mean() pd.core.window.RollingGroupby.p_mean() threads / processes
pd.core.window.RollingGroupby.sum() pd.core.window.RollingGroupby.p_sum() threads / processes
pd.core.window.RollingGroupby.var() pd.core.window.RollingGroupby.p_var() threads / processes
pd.core.window.RollingGroupby.sem() pd.core.window.RollingGroupby.p_sem() threads / processes
pd.core.window.RollingGroupby.skew() pd.core.window.RollingGroupby.p_skew() threads / processes
pd.core.window.RollingGroupby.kurt() pd.core.window.RollingGroupby.p_kurt() threads / processes
pd.core.window.RollingGroupby.median() pd.core.window.RollingGroupby.p_median() threads / processes
pd.core.window.RollingGroupby.quantile() pd.core.window.RollingGroupby.p_quantile() threads / processes
pd.core.window.RollingGroupby.rank() pd.core.window.RollingGroupby.p_rank() threads / processes
pd.core.window.RollingGroupby.agg() pd.core.window.RollingGroupby.p_agg() threads / processes
pd.core.window.RollingGroupby.aggregate() pd.core.window.RollingGroupby.p_aggregate() threads / processes

Expanding

methods parallel analogue executor
pd.core.window.Expanding.apply() pd.core.window.Expanding.p_apply() threads / processes
pd.core.window.Expanding.min() pd.core.window.Expanding.p_min() threads / processes
pd.core.window.Expanding.max() pd.core.window.Expanding.p_max() threads / processes
pd.core.window.Expanding.mean() pd.core.window.Expanding.p_mean() threads / processes
pd.core.window.Expanding.sum() pd.core.window.Expanding.p_sum() threads / processes
pd.core.window.Expanding.var() pd.core.window.Expanding.p_var() threads / processes
pd.core.window.Expanding.sem() pd.core.window.Expanding.p_sem() threads / processes
pd.core.window.Expanding.skew() pd.core.window.Expanding.p_skew() threads / processes
pd.core.window.Expanding.kurt() pd.core.window.Expanding.p_kurt() threads / processes
pd.core.window.Expanding.median() pd.core.window.Expanding.p_median() threads / processes
pd.core.window.Expanding.quantile() pd.core.window.Expanding.p_quantile() threads / processes
pd.core.window.Expanding.rank() pd.core.window.Expanding.p_rank() threads / processes
pd.core.window.Expanding.agg() pd.core.window.Expanding.p_agg() threads / processes
pd.core.window.Expanding.aggregate() pd.core.window.Expanding.p_aggregate() threads / processes

ExpandingGroupby

methods parallel analogue executor
pd.core.window.ExpandingGroupby.apply() pd.core.window.ExpandingGroupby.p_apply() threads / processes
pd.core.window.ExpandingGroupby.min() pd.core.window.ExpandingGroupby.p_min() threads / processes
pd.core.window.ExpandingGroupby.max() pd.core.window.ExpandingGroupby.p_max() threads / processes
pd.core.window.ExpandingGroupby.mean() pd.core.window.ExpandingGroupby.p_mean() threads / processes
pd.core.window.ExpandingGroupby.sum() pd.core.window.ExpandingGroupby.p_sum() threads / processes
pd.core.window.ExpandingGroupby.var() pd.core.window.ExpandingGroupby.p_var() threads / processes
pd.core.window.ExpandingGroupby.sem() pd.core.window.ExpandingGroupby.p_sem() threads / processes
pd.core.window.ExpandingGroupby.skew() pd.core.window.ExpandingGroupby.p_skew() threads / processes
pd.core.window.ExpandingGroupby.kurt() pd.core.window.ExpandingGroupby.p_kurt() threads / processes
pd.core.window.ExpandingGroupby.median() pd.core.window.ExpandingGroupby.p_median() threads / processes
pd.core.window.ExpandingGroupby.quantile() pd.core.window.ExpandingGroupby.p_quantile() threads / processes
pd.core.window.ExpandingGroupby.rank() pd.core.window.ExpandingGroupby.p_rank() threads / processes
pd.core.window.ExpandingGroupby.agg() pd.core.window.ExpandingGroupby.p_agg() threads / processes
pd.core.window.ExpandingGroupby.aggregate() pd.core.window.ExpandingGroupby.p_aggregate() threads / processes

ExponentialMovingWindow

methods parallel analogue executor
pd.core.window.ExponentialMovingWindow.mean() pd.core.window.ExponentialMovingWindow.p_mean() threads / processes
pd.core.window.ExponentialMovingWindow.sum() pd.core.window.ExponentialMovingWindow.p_sum() threads / processes
pd.core.window.ExponentialMovingWindow.var() pd.core.window.ExponentialMovingWindow.p_var() threads / processes
pd.core.window.ExponentialMovingWindow.std() pd.core.window.ExponentialMovingWindow.p_std() threads / processes

ExponentialMovingWindowGroupby

methods parallel analogue executor
pd.core.window.ExponentialMovingWindowGroupby.mean() pd.core.window.ExponentialMovingWindowGroupby.p_mean() threads / processes
pd.core.window.ExponentialMovingWindowGroupby.sum() pd.core.window.ExponentialMovingWindowGroupby.p_sum() threads / processes
pd.core.window.ExponentialMovingWindowGroupby.var() pd.core.window.ExponentialMovingWindowGroupby.p_var() threads / processes
pd.core.window.ExponentialMovingWindowGroupby.std() pd.core.window.ExponentialMovingWindowGroupby.p_std() threads / processes

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

parallel-pandas-0.5.0.tar.gz (18.1 kB view details)

Uploaded Source

Built Distribution

parallel_pandas-0.5.0-py3-none-any.whl (18.4 kB view details)

Uploaded Python 3

File details

Details for the file parallel-pandas-0.5.0.tar.gz.

File metadata

  • Download URL: parallel-pandas-0.5.0.tar.gz
  • Upload date:
  • Size: 18.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.10.6

File hashes

Hashes for parallel-pandas-0.5.0.tar.gz
Algorithm Hash digest
SHA256 d18b9083b9220cc8bd0cc52765f9f4c0f19e2803d557e02151df26814ef8d731
MD5 47d945a9bef28500f0548096ba7ecc08
BLAKE2b-256 c8988f4295808498f819038a4815c96e1573a061200677a25e35fe4bc024be4b

See more details on using hashes here.

File details

Details for the file parallel_pandas-0.5.0-py3-none-any.whl.

File metadata

File hashes

Hashes for parallel_pandas-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 1adf984519e4ec2994443fce300a289f13b671074609f7a4b8a444a9a800942a
MD5 ff04a514314194e66fda0874c52b8f3e
BLAKE2b-256 fb33f0d5fa2b7b328a4de7f6b683bf0035c65b5724e1b30c2a67a53b5de59d11

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page