Light IO transforms for Postgres read/write in Apache Beam pipelines.
Project description
beam-postgres
Light IO transforms for Postgres read/write in Apache Beam pipelines.
Goal
The project aims to provide highly performant and customizable transforms and is not intended to support many different SQL database engines.
Features
ReadAllFromPostgres
,ReadFromPostgres`` and
WriteToPostgres` transforms- Records can be mapped to tuples, dictionaries or dataclasses
- Reads and writes are in configurable batches
Usage
Printing data from the database table:
import apache_beam as beam
from psycopg.rows import dict_row
from beam_postgres.io import ReadAllFromPostgres
with beam.Pipeline() as p:
data = p | "Reading example records from database" >> ReadAllFromPostgres(
"host=localhost dbname=examples user=postgres password=postgres",
"select id, data from source",
dict_row,
)
data | "Writing to stdout" >> beam.Map(print)
Writing data to the database table:
from dataclasses import dataclass
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_postgres.io import WriteToPostgres
@dataclass
class Example:
data: str
with beam.Pipeline(options=PipelineOptions()) as p:
data = p | "Reading example records" >> beam.Create(
[
Example("example1"),
Example("example2"),
]
)
data | "Writing example records to database" >> WriteToPostgres(
"host=localhost dbname=examples user=postgres password=postgres",
"insert into sink (data) values (%s)",
)
See here for more examples.
Reading in batches
There may be situations when you have so much data that it will not fit into the memory - then you want to read your table data in batches. You can see an example code here (the code reads records in a batches of 1).
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
beam-postgres-0.4.1.tar.gz
(10.6 kB
view hashes)
Built Distribution
Close
Hashes for beam_postgres-0.4.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1d73cf35506548c665101c7f19c21ce0152ea795cedb1dc5392588a6ff2840e7 |
|
MD5 | 54937a5ca289e38f3f401cf8336a6d80 |
|
BLAKE2b-256 | a73da72457549c731ac9cc86761317a0c1b1a6bdb84c4e05ae1c44517c37bbed |