Skip to main content

PostgreSQL communication manager

Project description

pgcom

Python Build Status codecov pypi Documentation Status License: MIT

Communication manager for PostgreSQL database, provides a collection of wrappers over psycopg adapter to simplify the usage of basic SQL operators.

Installation

To install the package, simply use pip.

$ pip install pgcom

Basic usage

To initialize a new commuter, you need to set the basic connection parameters:

  • host - database host address
  • port - connection port number
  • user - user name used to authenticate
  • password - password used to authenticate
  • db_name - the database name

Any other connection parameter can be passed as a keyword. The list of the supported parameters can be seen here.

from pgcom import Commuter

conn_params = {
    'host': 'localhost',
    'port': '5432',
    'user': 'postgres',
    'password': 'password',
    'db_name': 'test_db'
}

commuter = Commuter(**conn_params)

Basic operations are provided with execute and select, insert methods.

Execute a database operation (query or command):

commuter.execute(f"""
    CREATE TABLE IF NOT EXISTS people (
        name text, 
        age integer)
    """)
commuter.execute(
    cmd='INSERT INTO people VALUES (%s, %s)', 
    vars=('Yeltsin', 76))
commuter.execute('DROP TABLE people')

Use select for reading SQL query into a DataFrame. This method returns a DataFrame corresponding to the result set of the query string.

age = 55
df = commuter.select(f'SELECT * FROM people WHERE age > {age}')

To write records stored in a DataFrame to database, you can use insert method.

import pandas as pd

df = pd.DataFrame({
    'name': ['Gorbachev', 'Yeltsin'], 
    'age': [89, 76]
}) 

commuter.insert(table_name='people', data=df)

Schema

To specify schema, you have two different options. You can either specify the schema attribute in the constructor, or just pass it directly to the method.

When you create a new Commuter instance with specified schema, all the methods will use this schema if other is not specified by method parameter. By default the public schema is used.

print(Commuter(**conn_params))

(host=localhost, user=postgres, db_name=test_db, schema=public)

print(Commuter(schema='model', **conn_params))

(host=localhost, user=postgres, db_name=test_db, schema=model)

If you omit setting schema using class constructor and prefer rather pass it to the methods, you can use any of the following options.

commuter = Commuter(**conn_params)  # public schema is used by default

# specify schema in SQL string, if method doesn't have schema argument
df = commuter.select('SELECT * FROM model.people WHERE age > 55')

# if method contains `schema` and `table_name` argument
commuter.insert(table_name='model.people', data=df)
# or 
commuter.insert(table_name='people', data=df, schema='model')

Select one element

Use the select_one method when your query results in a single element. This method returns a scalar value, not a DataFrame. Specify the default argument if you need the default value to be returned in case the query result is empty, otherwise None will be returned.

n_obs = commuter.select_one(
    cmd='SELECT COUNT(*) FROM people WHERE age > 55',
    default=0)

Insert one row and return serial key

When using a SERIAL column to provide unique identifiers, you may need to return the ID assigned to a new row. To obtain this, insert_return or insert_row method can be used.

If you use insert_row then you need to pass values as kwargs.

commuter.execute(f"""
    CREATE TABLE people (
        num SERIAL PRIMARY KEY, 
        name text, 
        age integer)
    """)

num = commuter.insert_row(
    table_name='people', 
    name='Yeltsin', 
    age=76, 
    return_id='num')

print(f'returned value: {num}')

returned value: 1

Using insert_return, you need to specify SQL string.

num = commuter.insert_return(
    cmd='INSERT INTO people (name, age) VALUES (%s, %s)', 
    values=('Yeltsin', 76),
    return_id='num')

print(f'returned value: {num}')

returned value: 2

Insert data using COPY FROM command

PostgreSQL COPY FROM command copies data from a file-system file to a table (appending the data to whatever is in the table already).

Currently no adaptation is provided between Python and PostgreSQL types on COPY: the file can be any Python file-like object but its format must be in the format accepted by PostgreSQL COPY command (data format, escaped characters, etc).

The copy_from method adapts an interface to efficient PostgreSQL COPY FROM command provided by Psycopg cursor objects to support writing data stored in a DataFrame.

To see a difference, let's try to insert data from the DataFrame with 1M rows and two columns using just a basic insert method.

from time import time
import pandas as pd

df = pd.DataFrame({
    'name': ['Yeltsin'] * int(1e6), 
    'age': [76] * int(1e6)
})

start = time()
commuter.insert(table_name='people', data=df)
print(f'processing time: {time() - start:.1f} sec')

processing time: 47.6 sec

Now implementing the same operation with copy_from.

start = time()
commuter.copy_from(table_name='people', data=df)
print(f'processing time: {time() - start:.1f} sec')

processing time: 1.3 sec

format_data

Set the format_data argument as True, if you need to adjust data before applying copy_from. It will control columns order according the table information stored in database information schema and converts float types to integer if needed.

df = pd.DataFrame({'age': [76.0], 'name': ['Yeltsin']})
commuter.copy_from('people', df)

psycopg2.errors.InvalidTextRepresentation: invalid input syntax for type integer: "Yeltsin"

Without formatting we caught an error trying to insert a text data into the first table column, which has an integer type. Now set format_data as True and repeat the operation.

commuter.copy_from('people', df, format_data=True)
n_obs = commuter.select_one('SELECT COUNT(*) FROM people')
print(f'number of added rows: {n_obs}')

number of added rows: 1

where

When table has a constraint and the DataFrame contains rows conflicted with this constraint, the data cannot be added to the table with the copy_from and ValueError will be raised. It is still possible to insert the data with the execute method, using for example INSERT ON CONFLICT statement (see here for details).

Let's create a table with the primary key and insert one row.

commuter.execute(f"""
CREATE TABLE people (
    name text PRIMARY KEY, 
    age integer)
    """)

commuter.insert_row('people', name='Yeltsin', age=76)

Now, if we try to insert the same row we catch an error.

commuter.copy_from('people', df)

ValueError: duplicate key value violates unique constraint "people_pkey"

DETAIL: Key (name)=(Yeltsin) already exists.

Using where argument, we can specify the WHERE clause of the DELETE statement, which will be executed before calling COPY FROM. This means that all rows where age is equal to 76 will be deleted from the table and then COPY FROM command will be called.

commuter.copy_from('people', df, where='age=76')
n_obs = commuter.select_one('SELECT COUNT(*) FROM people')
print(f'number of added rows: {n_obs}')

number of added rows: 1

Resolve primary conflicts

In the last example, we deleted rows from the table before using copy_from. In contrast to it, the resolve_primary_conflicts method can be used to control the data integrity and, instead of removing rows from the table, remove it from the DataFrame.

To implement it, the method selects data from the table and removes all rows from the given DataFrame, which violate primary key constraint in the selected data. To reduce the amount of querying data (when table is large), you need to specify the where argument. It specifies the WHERE clause in the SELECT query.

commuter.execute(f"""
CREATE TABLE people (
    id integer PRIMARY KEY, 
    name text, 
    age integer)
    """)

df = pd.DataFrame({
    'id': [1,2,3,4,5], 
    'name': ['Brezhnev', 'Andropov', 'Chernenko', 'Gorbachev', 'Yeltsin'],
    'age': [75, 69, 73, 89, 76]})

commuter.copy_from('people', df)
print(df)
id name age
1 Brezhnev 75
2 Andropov 69
3 Chernenko 73
4 Gorbachev 89
5 Yeltsin 76

Assume, that we have the new data we want to add to the table.

df = pd.DataFrame({
    'id': [6,3], 
    'name': ['Khrushchev', 'Putin'],
    'age': [77, 67]})
print(df)
id name age
6 Khrushchev 77
3 Putin 67

We apply resolve_primary_conflicts to sanitize the data before copying and specify where to compare the new entries only across the people older than 60 (to reduce the complexity).

df = commuter.resolve_primary_conflicts(
    table_name='people',
    data=df,
    where='age > 60')
print(df)
id name age
6 Khrushchev 77

Rows with conflicted keys have been deleted and copy_from can be now used without a doubt.

Resolve foreign conflicts

To sanitize the DataFrame for the case of potential conflicts on the foreign key, use resolve_foreign_conflicts. It selects data from the parent_table and removes all rows from the given DataFrame, which violate foreign key constraint in the selected data.

df = commuter.resolve_foreign_conflicts(
    table_name='table_name',
    parent_name='parent_table_name',
    data=df,
    where='condition to reduce the selected data')

License

Package is released under MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for pgcom, version 0.1.3
Filename, size File type Python version Upload date Hashes
Filename, size pgcom-0.1.3-py3-none-any.whl (14.6 kB) File type Wheel Python version py3 Upload date Hashes View hashes
Filename, size pgcom-0.1.3.tar.gz (16.7 kB) File type Source Python version None Upload date Hashes View hashes

Supported by

Elastic Elastic Search Pingdom Pingdom Monitoring Google Google BigQuery Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN DigiCert DigiCert EV certificate StatusPage StatusPage Status page