Skip to main content

Arrow -> PostgreSQL encoder

Project description

pgpq

Convert PyArrow RecordBatches to Postgres' native binary format.

Usage

Copying a dataset to PostgreSQL using psycopg

"""Example for README.md"""
from tempfile import mkdtemp
import psycopg
import pyarrow.dataset as ds
import requests
from pgpq import ArrowToPostgresBinaryEncoder

# let's get some example data
tmpdir = mkdtemp()
with open(f"{tmpdir}/yellow_tripdata_2023-01.parquet", mode="wb") as f:
    resp = requests.get(
        "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet"
    )
    resp.raise_for_status()
    f.write(resp.content)

# load an arrow dataset
# arrow can load datasets from partitioned parquet files locally or in S3/GCS
# it handles buffering, matching globs, etc.
dataset = ds.dataset(tmpdir)

# create an encoder object which will do the encoding
# and give us the expected Postgres table schema
encoder = ArrowToPostgresBinaryEncoder(dataset.schema)
# get the expected Postgres destination schema
# note that this is _not_ the same as the incoming arrow schema
# and not necessarily the schema of your permanent table
# instead it's the schema of the data that will be sent over the wire
# which for example does not have timezones on any timestamps
pg_schema = encoder.schema()
# assemble ddl for a temporary table
# it's often a good idea to bulk load into a temp table to:
# (1) Avoid indexes
# (2) Stay in-memory as long as possible
# (3) Be more flexible with types
#     (you can't load a SMALLINT into a BIGINT column without casting)
cols = [f'"{col_name}" {col.data_type.ddl()}' for col_name, col in pg_schema.columns]
ddl = f"CREATE TEMP TABLE data ({','.join(cols)})"

with psycopg.connect("postgres://postgres:postgres@localhost:5432/postgres") as conn:
    with conn.cursor() as cursor:
        cursor.execute(ddl)  # type: ignore
        with cursor.copy("COPY data FROM STDIN WITH (FORMAT BINARY)") as copy:
            copy.write(encoder.write_header())
            for batch in dataset.to_batches():
                copy.write(encoder.write_batch(batch))
            copy.write(encoder.finish())
        # load into your actual table, possibly doing type casts
        # cursor.execute("INSERT INTO \"table\" SELECT * FROM data")

Defining field encoders

"""Showcase defining encoders for fields."""
import pgpq
import psycopg
import pyarrow as pa
from pgpq import encoders
from pgpq import schema


data = [
    pa.array([1, 2, 3, 4]),
    pa.array(['{"age": 33, "name": "alice"}', '{"age": 24, "name": "bob"}', "{}", "null"]),
]
arrow_schema = pa.schema([("id", pa.int64()), ("properties", pa.string())])
record_batch = pa.RecordBatch.from_arrays(data, schema=arrow_schema)

encoder = pgpq.ArrowToPostgresBinaryEncoder(record_batch.schema)
pg_schema_with_text_properties = encoder.schema()

assert [
    (col_name, col.data_type.ddl())
    for col_name, col in pg_schema_with_text_properties.columns
] == [("id", "INT8"), ("properties", "TEXT")]

# To support a different PostgreSQL schema, we change the default encoders generated by pgpq:
# * 'id' encoded as INT8 (BIGINT).
# * 'properties' encoded as JSONB.
field_encoders = {
    "id": encoders.Int64EncoderBuilder(pa.field("id", pa.int64())),
    "properties": encoders.StringEncoderBuilder.new_with_output(
        pa.field("properties", pa.string()), schema.Jsonb()
    ),
}
encoder = pgpq.ArrowToPostgresBinaryEncoder.new_with_encoders(record_batch.schema, field_encoders)
pg_schema_with_jsonb_properties = encoder.schema()

assert [
    (col_name, col.data_type.ddl())
    for col_name, col in pg_schema_with_jsonb_properties.columns
] == [("id", "INT8"), ("properties", "JSONB")]

ddl = """
CREATE TABLE id_properties (
    id INT8, -- Alternative: BIGINT
    properties JSONB
)
"""

# Without the right encoding, PostgreSQL will report errors in the binary data format when
# executing the following COPY: It expects properties to be encoded as JSONB not TEXT.
with psycopg.connect("postgres://posthog:posthog@localhost:5432/posthog") as conn:
    with conn.cursor() as cursor:
        cursor.execute(ddl)

        with cursor.copy("COPY id_properties FROM STDIN WITH (FORMAT BINARY)") as copy:
            copy.write(encoder.write_header())
            copy.write(encoder.write_batch(record_batch))
            copy.write(encoder.finish())

# The 'id' field matches our schema, so we can use the default encoder for it.
# But, we still need to encode properties as JSONB.
# `infer_encoder` can be used to obtain the default encoder for a field.
field_encoders = {
    "id": pgpq.ArrowToPostgresBinaryEncoder.infer_encoder(record_batch.field("id")),
    "properties": encoders.StringEncoderBuilder.new_with_output(
        pa.field("properties", pa.string()), schema.Jsonb()
    ),
}
encoder = pgpq.ArrowToPostgresBinaryEncoder.new_with_encoders(record_batch.schema, field_encoders)
pg_schema_inferred_id_and_jsonb_properties = encoder.schema()

assert [
    (col_name, col.data_type.ddl())
    for col_name, col in pg_schema_inferred_id_and_jsonb_properties.columns
] == [("id", "INT8"), ("properties", "JSONB")]

with psycopg.connect("postgres://postgres:postgres@localhost:5432/postgres") as conn:
    with conn.cursor() as cursor:

        with cursor.copy("COPY id_properties FROM STDIN WITH (FORMAT BINARY)") as copy:
            copy.write(encoder.write_header())
            copy.write(encoder.write_batch(record_batch))
            copy.write(encoder.finish())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgpq-0.11.1.tar.gz (64.6 kB view details)

Uploaded Source

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

pgpq-0.11.1-cp39-abi3-win_amd64.whl (614.0 kB view details)

Uploaded CPython 3.9+Windows x86-64

pgpq-0.11.1-cp39-abi3-win32.whl (550.2 kB view details)

Uploaded CPython 3.9+Windows x86

pgpq-0.11.1-cp39-abi3-musllinux_1_2_x86_64.whl (909.0 kB view details)

Uploaded CPython 3.9+musllinux: musl 1.2+ x86-64

pgpq-0.11.1-cp39-abi3-musllinux_1_2_i686.whl (951.5 kB view details)

Uploaded CPython 3.9+musllinux: musl 1.2+ i686

pgpq-0.11.1-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (713.5 kB view details)

Uploaded CPython 3.9+manylinux: glibc 2.17+ x86-64

pgpq-0.11.1-cp39-abi3-manylinux_2_5_i686.manylinux1_i686.whl (766.4 kB view details)

Uploaded CPython 3.9+manylinux: glibc 2.5+ i686

pgpq-0.11.1-cp39-abi3-macosx_10_12_x86_64.whl (676.4 kB view details)

Uploaded CPython 3.9+macOS 10.12+ x86-64

pgpq-0.11.1-cp39-abi3-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl (1.3 MB view details)

Uploaded CPython 3.9+macOS 10.12+ universal2 (ARM64, x86-64)macOS 10.12+ x86-64macOS 11.0+ ARM64

File details

Details for the file pgpq-0.11.1.tar.gz.

File metadata

  • Download URL: pgpq-0.11.1.tar.gz
  • Upload date:
  • Size: 64.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for pgpq-0.11.1.tar.gz
Algorithm Hash digest
SHA256 b72e70ba77bb06446b54960efa53aa6271c0106502953e3deecd1927b5c33ce3
MD5 69916137e70023034da53fa5fe958340
BLAKE2b-256 2d7bd6966024f8f0b70d46ab332fb12a33716f7b4a34a32860be95544561e76e

See more details on using hashes here.

File details

Details for the file pgpq-0.11.1-cp39-abi3-win_amd64.whl.

File metadata

  • Download URL: pgpq-0.11.1-cp39-abi3-win_amd64.whl
  • Upload date:
  • Size: 614.0 kB
  • Tags: CPython 3.9+, Windows x86-64
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for pgpq-0.11.1-cp39-abi3-win_amd64.whl
Algorithm Hash digest
SHA256 105acef7713a8599f975c700d8f1f0ab80815637472c1655b30e77179ace985e
MD5 c0a63fdf921b83f7e983f11fe8185bc7
BLAKE2b-256 2dcfbb26a8f16f5cfb752ee8570f1ba09e7b10a6dd71a1a100e298027ffbdd16

See more details on using hashes here.

File details

Details for the file pgpq-0.11.1-cp39-abi3-win32.whl.

File metadata

  • Download URL: pgpq-0.11.1-cp39-abi3-win32.whl
  • Upload date:
  • Size: 550.2 kB
  • Tags: CPython 3.9+, Windows x86
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for pgpq-0.11.1-cp39-abi3-win32.whl
Algorithm Hash digest
SHA256 af105452ccefb21cba5c90c5c4dc03611161e718df60a7c28485e0b2b5aa48c5
MD5 9d232fff6ee7143e606de240e18fa77e
BLAKE2b-256 fdc410ee6abc903461f0737707e77be5d3865aaf3e79030ec77ff00f66d805e6

See more details on using hashes here.

File details

Details for the file pgpq-0.11.1-cp39-abi3-musllinux_1_2_x86_64.whl.

File metadata

  • Download URL: pgpq-0.11.1-cp39-abi3-musllinux_1_2_x86_64.whl
  • Upload date:
  • Size: 909.0 kB
  • Tags: CPython 3.9+, musllinux: musl 1.2+ x86-64
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for pgpq-0.11.1-cp39-abi3-musllinux_1_2_x86_64.whl
Algorithm Hash digest
SHA256 a1708210690320966cedc752ccc76e5fe458a3a5f7a6618430756e82432a29ab
MD5 539851330a38c1e2c722b7e9478677ac
BLAKE2b-256 c41bef63ab4fb097ee2ddc840e4353fabe5dfe10ba5a806f2b0d5d3d0dff96de

See more details on using hashes here.

File details

Details for the file pgpq-0.11.1-cp39-abi3-musllinux_1_2_i686.whl.

File metadata

  • Download URL: pgpq-0.11.1-cp39-abi3-musllinux_1_2_i686.whl
  • Upload date:
  • Size: 951.5 kB
  • Tags: CPython 3.9+, musllinux: musl 1.2+ i686
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for pgpq-0.11.1-cp39-abi3-musllinux_1_2_i686.whl
Algorithm Hash digest
SHA256 e4db86dfe71528a9c3090d9e8d4992e2a3189e58fdf8848504789364e256df66
MD5 6fbabbd70cb62fbad32810c1cc59626d
BLAKE2b-256 9f41c5511d683445f069c458b1d61e79f12a12f2864413b5cd920fccc150cda4

See more details on using hashes here.

File details

Details for the file pgpq-0.11.1-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.

File metadata

File hashes

Hashes for pgpq-0.11.1-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm Hash digest
SHA256 a24d25276b0726dce4340ff2b73acaeeb08ff6881820a4167f75eea3ac532c3d
MD5 5997632fcf8e7110b128d9fd72124663
BLAKE2b-256 e131a1937aa40e9c78b36675031b1fbe0401a95da6187f533e8829acbfa7dafa

See more details on using hashes here.

File details

Details for the file pgpq-0.11.1-cp39-abi3-manylinux_2_5_i686.manylinux1_i686.whl.

File metadata

File hashes

Hashes for pgpq-0.11.1-cp39-abi3-manylinux_2_5_i686.manylinux1_i686.whl
Algorithm Hash digest
SHA256 44cb2eaa85804bbc172da8e2afd6c9df9ab39c3d5d589a9905fa86903c998f19
MD5 16da9757dd2c3b58bbb3c13fe2819509
BLAKE2b-256 d2bdc42382d6943eb526b4901779d4e828379ed7098a31f1bb7cd8d22664203f

See more details on using hashes here.

File details

Details for the file pgpq-0.11.1-cp39-abi3-macosx_10_12_x86_64.whl.

File metadata

File hashes

Hashes for pgpq-0.11.1-cp39-abi3-macosx_10_12_x86_64.whl
Algorithm Hash digest
SHA256 6a20b0bd4dbc7e67924c2b68442f089075e0809451385e155eee92556faf90fb
MD5 3c40261d968b090d3d788925557c152d
BLAKE2b-256 3fbe72695bace77a66fc266dcd72a84ba6fbe0072db53e57c60fe5affcd2340d

See more details on using hashes here.

File details

Details for the file pgpq-0.11.1-cp39-abi3-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl.

File metadata

File hashes

Hashes for pgpq-0.11.1-cp39-abi3-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl
Algorithm Hash digest
SHA256 ada6c215526202859e9e5077f971bdda5caa6e907bc335f5b819f5a873e069ae
MD5 746a5a8a30d7f52fda2d0979ea2c8db9
BLAKE2b-256 8e01a7d286ba036ad782970b851992f42d80121e6c5d994a68bb648ff01526e8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page