Skip to main content

Collection of transforms for the Apache beam python SDK. Forks the original Mohamed Haseeb repository to make some workarounds

Project description

PyPI PyPI - Downloads

About

A collection of random transforms for the Apache beam python SDK . Many are simple transforms. The most useful ones are those for reading/writing from/to relational databases.

Installation

  • Using pip
pip install beam-nuggets
  • From source
git clone git@github.com:mohaseeb/beam-nuggets.git
cd beam-nuggets
pip install .

Supported transforms

IO

Others

Documentation

See here.

Usage

Write data to an SQLite table using beam-nugget's relational_db.Write transform.

# write_sqlite.py contents
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db

records = [
    {'name': 'Jan', 'num': 1},
    {'name': 'Feb', 'num': 2}
]

source_config = relational_db.SourceConfiguration(
    drivername='sqlite',
    database='/tmp/months_db.sqlite',
    create_if_missing=True  # create the database if not there 
)

table_config = relational_db.TableConfiguration(
    name='months',
    create_if_missing=True,  # automatically create the table if not there
    primary_key_columns=['num']  # and use 'num' column as primary key
)

with beam.Pipeline(options=PipelineOptions()) as p:  # Will use local runner
    months = p | "Reading month records" >> beam.Create(records)
    months | 'Writing to DB' >> relational_db.Write(
        source_config=source_config,
        table_config=table_config
    )

Execute the pipeline

python write_sqlite.py 

Examine the contents

sqlite3 /tmp/months_db.sqlite 'select * from months'
# output:
# 1.0|Jan
# 2.0|Feb

To write the same data to a PostgreSQL table instead, just create a suitable relational_db.SourceConfiguration as follows.

source_config = relational_db.SourceConfiguration(
    drivername='postgresql+pg8000',
    host='localhost',
    port=5432,
    username='postgres',
    password='password',
    database='calendar',
    create_if_missing=True  # create the database if not there 
)

Click here for more examples, including writing to PostgreSQL in Google Cloud Platform using the DataFlowRunner.

An example showing how you can use beam-nugget's relational_db.ReadFromDB transform to read from a PostgreSQL database table.

from __future__ import print_function
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db

with beam.Pipeline(options=PipelineOptions()) as p:
    source_config = relational_db.SourceConfiguration(
        drivername='postgresql+pg8000',
        host='localhost',
        port=5432,
        username='postgres',
        password='password',
        database='calendar',
    )
    records = p | "Reading records from db" >> relational_db.ReadFromDB(
        source_config=source_config,
        table_name='months',
        query='select num, name from months'  # optional. When omitted, all table records are returned. 
    )
    records | 'Writing to stdout' >> beam.Map(print)

See here for more examples.

Development

  • Install
git clone git@github.com:mohaseeb/beam-nuggets.git
cd beam-nuggets
export BEAM_NUGGETS_ROOT=`pwd`
pip install -e .[dev]
  • Make changes on dedicated dev branches
  • Run tests
cd $BEAM_NUGGETS_ROOT
python -m unittest discover -v
  • Generate docs
cd $BEAM_NUGGETS_ROOT
docs/generate_docs.sh
  • Create a PR against master.
  • After merging the accepted PR and updating the local master, upload a new build to pypi.
cd $BEAM_NUGGETS_ROOT
scripts/build_test_deploy.sh

Backlog

  • versioned docs?
  • Summarize the investigation of using Source/Sink Vs ParDo(and GroupBy) for IO
  • more nuggets: WriteToCsv
  • Investigate readiness of SDF ParDo, and possibility to use for relational_db.ReadFromDB
  • integration tests
  • DB transforms failures handling on IO transforms
  • more nuggets: Elasticsearch, Mongo
  • WriteToRelationalDB, logging

Contributers

mohaseeb, astrocox, 2514millerj

Licence

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sd-beam-nuggets-0.15.3.tar.gz (19.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sd_beam_nuggets-0.15.3-py3-none-any.whl (24.7 kB view details)

Uploaded Python 3

File details

Details for the file sd-beam-nuggets-0.15.3.tar.gz.

File metadata

  • Download URL: sd-beam-nuggets-0.15.3.tar.gz
  • Upload date:
  • Size: 19.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/47.3.1 requests-toolbelt/0.9.1 tqdm/4.43.0 CPython/3.6.9

File hashes

Hashes for sd-beam-nuggets-0.15.3.tar.gz
Algorithm Hash digest
SHA256 c77d7bf9079764fcc5619799a45b918cd0a7c77c949572cc5ec7e15667aec1cb
MD5 1cd7228eb08965ce5a622c74c01ef22c
BLAKE2b-256 0ae3b75c23581cbe76f2f0d79bf356b4965a4bc5b5a127f11d2861569f8120b4

See more details on using hashes here.

File details

Details for the file sd_beam_nuggets-0.15.3-py3-none-any.whl.

File metadata

  • Download URL: sd_beam_nuggets-0.15.3-py3-none-any.whl
  • Upload date:
  • Size: 24.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/47.3.1 requests-toolbelt/0.9.1 tqdm/4.43.0 CPython/3.6.9

File hashes

Hashes for sd_beam_nuggets-0.15.3-py3-none-any.whl
Algorithm Hash digest
SHA256 1ad3681a0c12499b5ed72ebfce277955d5dbb0289e8dd0785ec37b9c9aa3a37f
MD5 3e3684866a7383403babfd7578989b05
BLAKE2b-256 04a3738282453fe49574a7d398798f298890358c58f24bba8da863fe91c9f056

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page