Skip to main content

Light IO transforms for Postgres read/write in Apache Beam pipelines.

Project description

beam-postgres

PyPI Supported Versions

Light IO transforms for Postgres read/write in Apache Beam pipelines.

Goal

The project aims to provide highly performant and customizable transforms and is not intended to support many different SQL database engines.

Features

  • ReadAllFromPostgres, ReadFromPostgres`` and WriteToPostgres` transforms
  • Records can be mapped to tuples, dictionaries or dataclasses
  • Reads and writes are in configurable batches

Usage

Printing data from the database table:

import apache_beam as beam
from psycopg.rows import dict_row

from beam_postgres.io import ReadAllFromPostgres

with beam.Pipeline() as p:
    data = p | "Reading example records from database" >> ReadAllFromPostgres(
        "host=localhost dbname=examples user=postgres password=postgres",
        "select id, data from source",
        dict_row,
    )
    data | "Writing to stdout" >> beam.Map(print)

Writing data to the database table:

from dataclasses import dataclass

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

from beam_postgres.io import WriteToPostgres


@dataclass
class Example:
    data: str


with beam.Pipeline(options=PipelineOptions()) as p:
    data = p | "Reading example records" >> beam.Create(
        [
            Example("example1"),
            Example("example2"),
        ]
    )
    data | "Writing example records to database" >> WriteToPostgres(
        "host=localhost dbname=examples user=postgres password=postgres",
        "insert into sink (data) values (%(data)s)",
    )

See here for more examples.

Reading in batches

There may be situations when you have so much data that it will not fit into the memory - then you want to read your table data in batches. You can see an example code here (the code reads records in a batches of 1).

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

beam_postgres-0.5.0.tar.gz (10.5 kB view details)

Uploaded Source

Built Distribution

beam_postgres-0.5.0-py3-none-any.whl (10.6 kB view details)

Uploaded Python 3

File details

Details for the file beam_postgres-0.5.0.tar.gz.

File metadata

  • Download URL: beam_postgres-0.5.0.tar.gz
  • Upload date:
  • Size: 10.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.10.12

File hashes

Hashes for beam_postgres-0.5.0.tar.gz
Algorithm Hash digest
SHA256 d8172b39a3670046864ba1fd7ff10e18c232dc3816054a5ec22c85e23a9e01e1
MD5 8c1ab88a6d4237b172e941baa7bef36b
BLAKE2b-256 4f191527716ebffe14a84b55e90c50a4980103623d02489a1d3eca082de2c188

See more details on using hashes here.

File details

Details for the file beam_postgres-0.5.0-py3-none-any.whl.

File metadata

File hashes

Hashes for beam_postgres-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 9d7a17a6b3b283ce5a2b5d5ec4c2f779b0bdd9ab8961299fb483dd67985fc36f
MD5 2cf36b8af8b5304f519288c8d0ea019d
BLAKE2b-256 017858b3f2add8c7a39f9dcedc4963a40b950541e3089dbd25e4c669eff8bc77

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page