Skip to main content

A utility package to do bulk insertion faster from pandas dataframe to postgres table.

Project description

pg-bulk-loader

Overview

pg-bulk-loader is a utility package designed to facilitate faster bulk insertion DataFrame to a PostgreSQL Database. Currently, it supports load from pandas DataFrame only.

Purpose

This utility leverages the power of PostgreSQL in combination with Python to efficiently handle the bulk insertion of large datasets. The key features that contribute to its speed include:

  1. Utilization of Postgres' copy command
  2. Integration of Psycopg3's pipeline feature
  3. Implementation of Python's coroutines
  4. Harnessing the power of multiprocessing
  5. Capability to drop indexes during insertion and recreate them in parallel

package's Efficiency

Machine:

  • Resource config - 5 core, 8GB
  • Azure hosted PostgreSQL Server
  • Azure hosted Python service (jupyter notebook)

Table info:

  • 12 columns (3 texts, 2 date, 7 double)
  • Primary key: 3 columns (2 text and 1 date)
  • Indexes: 2 b-tree. (1 on single column and another on three columns)

Runtime:

  • Data Size: 20M
    • without PK and Indexes: ~55s
    • with PK and indexes: ~150s (~85s to insert data with PK enabled and ~65 seconds to create indexes)

Running with 1M records without having PK and Indexes with different approaches:

Screenshot Note: Numbers are in seconds!

Usage

The utility provides the following useful functions and classes:

  1. batch_insert_to_postgres
  2. batch_insert_to_postgres_with_multi_process
  3. BatchInsert

batch_insert_to_postgres() function

  • pg_conn_details: Instance of the PgConnectionDetail class containing PostgreSQL server connection details.
  • table_name: Name of the table for bulk insertion.
  • input_data: Data in the form of a pandas DataFrame or Python generator containing DataFrames.
  • batch_size: Number of records to insert and commit at a time.
  • min_conn_pool_size, max_conn_pool_size: Determine the number of PostgreSQL connections in the connection pool.
  • drop_and_create_index: Set to True if indexes need to be dropped during insert and re-created once insertion is complete.
  • use_multi_process_for_create_index: Set to True if indexes need to be re-created in parallel; otherwise, they will be created sequentially.

Note: Provide input either in the form of DataFrame or DataFrame generator

batch_insert_to_postgres_with_multi_process() function

  • pg_conn_details: Instance of the PgConnectionDetail class containing PostgreSQL server connection details.
  • table_name: Name of the table for bulk insertion.
  • data_generator: Python generator containing DataFrames.
  • batch_size: Number of records to insert and commit at a time.
  • min_conn_pool_size, max_conn_pool_size: Determine the number of PostgreSQL connections in the connection pool.
  • drop_and_create_index: Set to True if indexes need to be dropped during insert and re-created once insertion is complete.
  • no_of_processes: Specify the number of cores for multiprocessing.

BatchInsert class

This class serves as the core logic for the utility and is wrapped by the first two utility functions. Users may find it useful if additional logic needs to be developed around the functionality or if a custom sequential or parallel computation logic is required.

Properties to create an instance of BatchInsert class:

  • batch_size:Number of records to insert and commit at a time.
  • table_name: Name of the table for bulk insertion.
  • pg_conn_details: Instance of the PgConnectionDetail class containing PostgreSQL server connection details.
  • min_conn, max_conn: Determine the number of PostgreSQL connections in the connection pool.

Developer Notes:

  • The min_conn or min_conn_pool_size can be either equal to or less than the result of ceil(total_data_size / batch_size).
  • The max_conn or max_conn_pool_size can be either equal to or greater than the result of ceil(total_data_size / batch_size).
  • The no_of_processes can be set to the number of available cores or left as None for the system to determine the optimal number based on resource availability.
  • The ideal batch_size, as observed during testing, typically falls within the range of 100,000 to 250,000. However, this recommendation is contingent upon the characteristics of the data and table structure. The multiprocessing function execution must start in the main block.

Package installation:

`pip install pg-bulk-loader`

Examples:

  1. Loading entire dataset once and sending for bulk insert in batches:
import pandas as pd
import asyncio
from pg_bulk_loader import PgConnectionDetail, batch_insert_to_postgres


async def run():
    # Read data. Let's suppose below DataFrame has 20M records
    input_data_df = pd.DataFrame()
    
    # Create Postgres Connection Details object. This will help in creating and managing the database connections 
    pg_conn_details = PgConnectionDetail(
        user="<postgres username>",
        password="<postgres password>",
        database="<postgres database>",
        host="<host address to postgres server>",
        port="<port>",
        schema="<schema name where table exist>"
    )
    
    # Data will be inserted and committed in the batch of 2,50,000
    await batch_insert_to_postgres(
        pg_conn_details=pg_conn_details,
        table_name="<table_name>",
        input_data=input_data_df,
        batch_size=250000,
        min_conn_pool_size=20,
        max_conn_pool_size=25,
        use_multi_process_for_create_index=True,
        drop_and_create_index=True
    )


if __name__ == '__main__':
    asyncio.run(run())
  1. Loading dataset in chunks and sending for bulk insert in batches:
import pandas as pd
import asyncio
from pg_bulk_loader import PgConnectionDetail, batch_insert_to_postgres


async def run():
    # Read data. Let's suppose below DataFrame has 20M records
    input_data_df_generator = pd.read_csv("file.csv", chunksize=1000000)
    
    # Create Postgres Connection Details object. This will help in creating and managing the database connections 
    pg_conn_details = PgConnectionDetail(
        user="<postgres username>",
        password="<postgres password>",
        database="<postgres database>",
        host="<host address to postgres server>",
        port="<port>",
        schema="<schema name where table exist>"
    )
    
    # Data will be inserted and committed in the batch of 2,50,000
    await batch_insert_to_postgres(
        pg_conn_details=pg_conn_details,
        table_name="<table_name>",
        data_df=None,
        input_data=input_data_df_generator,
        batch_size=250000,
        min_conn_pool_size=20,
        max_conn_pool_size=25,
        use_multi_process_for_create_index=True,
        drop_and_create_index=True
    )


if __name__ == '__main__':
    asyncio.run(run())
  1. Parallel insertion using multiprocessing:

The below code uses 5 cores and processes 5M records parallely i.e. 1M on one core with 250000 records insertion at a time.

import pandas as pd
import asyncio
from pg_bulk_loader import PgConnectionDetail, batch_insert_to_postgres_with_multi_process


async def run():
    # Create Postgres Connection Details object. This will help in creating and managing the database connections 
    pg_conn_details = PgConnectionDetail(
        user="<postgres username>",
        password="<postgres password>",
        database="<postgres database>",
        host="<host address to postgres server>",
        port="<port>",
        schema="<schema name where table exist>"
    )
    
    df_generator = pd.read_csv("20M-file.csv", chunksize=1000000)
    
    # Data will be inserted and committed in the batch of 2,50,000
    await batch_insert_to_postgres_with_multi_process(
        pg_conn_details=pg_conn_details,
        table_name="<table_name>",
        data_generator=df_generator,
        batch_size=250000,
        min_conn_pool_size=20,
        max_conn_pool_size=25,
        no_of_processes=5,
        drop_and_create_index=True
    )


# The multiprocessing execution must start in the __main__.
if __name__ == '__main__':
    asyncio.run(run())

Development:

  • Run this command to install the required development dependencies pip install -r dev-requirements.txt
  • Run below commands to run the unit test cases: pytest or coverage run --source=src.pg_bulk_loader --module pytest --verbose && coverage report --show-missing

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pg_bulk_loader-1.1.2.tar.gz (14.3 kB view details)

Uploaded Source

Built Distribution

pg_bulk_loader-1.1.2-py3-none-any.whl (14.6 kB view details)

Uploaded Python 3

File details

Details for the file pg_bulk_loader-1.1.2.tar.gz.

File metadata

  • Download URL: pg_bulk_loader-1.1.2.tar.gz
  • Upload date:
  • Size: 14.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.4

File hashes

Hashes for pg_bulk_loader-1.1.2.tar.gz
Algorithm Hash digest
SHA256 6521539b26b35054e95be89844439f92f1ff6c98d35ddc0fc344eb20ec55c5e7
MD5 13cc87165328f0c58e237f493eda665a
BLAKE2b-256 2deeede898f2021347b3edf970ba4c9e9a55945d87542bb92cd0fd31bd67b6f3

See more details on using hashes here.

File details

Details for the file pg_bulk_loader-1.1.2-py3-none-any.whl.

File metadata

File hashes

Hashes for pg_bulk_loader-1.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 1b44ddea5b318da29def2ec06993480dad456e3ffc11fe87c667fdb1b815c548
MD5 2a38c05e9874c13dd0b91185dedc3381
BLAKE2b-256 3a0fab8d72b61c5d087029c10242e113c096c3b0c08e345ff20ade1c67d544f9

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page