Skip to main content

Collection of transforms for the Apache beam python SDK.

Project description

A collection of random transforms for the Apache beam python SDK . Many are simple (or trivial) transforms. The most useful ones are those for reading/writing from/to relational databases.

Installation

  • Using pip
pip install beam-nuggets
  • From source
git clone git@github.com:mohaseeb/beam-nuggets.git
cd beam-nuggets
pip install .

Usage

Write data to an SQLite table using beam-nugget's relational_db.Write transform.

# write_sqlite.py contents
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db

records = [
    {'name': 'Jan', 'num': 1},
    {'name': 'Feb', 'num': 2}
]

source_config = relational_db.SourceConfiguration(
    drivername='sqlite',
    database='/tmp/months_db.sqlite',
    create_if_missing=True  # create the database if not there 
)

table_config = relational_db.TableConfiguration(
    name='months',
    create_if_missing=True,  # automatically create the table if not there
    primary_key_columns=['num']  # and use 'num' column as a primary key
)

with beam.Pipeline(options=PipelineOptions()) as p:  # Will use local runner
    months = p | "Reading month records" >> beam.Create(records)
    months | 'Writing to DB' >> relational_db.Write(
        source_config=source_config,
        table_config=table_config
    )

Execute the pipeline

python write_sqlite.py 

Examine the contents

sqlite3 /tmp/months_db.sqlite 'select * from months'
# output:
# 1.0|Jan
# 2.0|Feb

To write the same data to a PostgreSQL table instead, just create a suitable relational_db.SourceConfiguration as follows.

source_config = relational_db.SourceConfiguration(
    drivername='postgresql+pg8000',
    host='localhost',
    port=5432,
    username='postgres',
    password='password',
    database='calendar',
    create_if_missing=True  # create the database if not there 
)

An example showing how you can use beam-nugget's relational_db.Read transform to read from a PostgreSQL database table.

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db

with beam.Pipeline(options=PipelineOptions()) as p:
    source_config = relational_db.SourceConfiguration(
        drivername='postgresql+pg8000',
        host='localhost',
        port=5432,
        username='postgres',
        password='password',
        database='calendar',
    )
    records = p | "Reading records from db" >> relational_db.Read(
        source_config=source_config,
        table_name='months',
    )
    records | 'Writing to stdout' >> beam.Map(print)

See here for more examples.

Supported transforms

IO

  • relational_db.Read for reading from relational database tables.
  • relational_db.Write for writing to relational database tables.
    Above transforms uses SqlAlchemy to communicate with the database, and hence they can read from and write to all relational databases supported by SqlAlchemy. The transforms are tested against PostgreSQL, MySQL and SQLite.

Others

Documentation

See here.

Development

  • Install
git clone git@github.com:mohaseeb/beam-nuggets.git
cd beam-nuggets
export BEAM_NUGGETS_ROOT=`pwd`
pip install -e .[dev]
  • Make changes on dedicated dev branches
  • Run tests
cd $BEAM_NUGGETS_ROOT
python -m unittest discover -v
  • Generate docs
cd $BEAM_NUGGETS_ROOT
docs/generate_docs.sh
  • Create a PR against master.
  • After merging the accepted PR and updating the local master, upload a new build to pypi.
cd $BEAM_NUGGETS_ROOT
scripts/build_test_deploy.sh

Backlog

  • versioned docs?
  • Summarize the investigation of using Source/Sink Vs ParDo(and GroupBy) for IO
  • instruct how to use other DB drivers supported by sqlalchemy
  • Example how to run on GCP
  • Sql queries support in relational_db.Read
  • more nuggets: WriteToCsv
  • integration tests
  • DB transforms failures handling on IO transforms
  • more nuggets: Elasticsearch, Mongo
  • WriteToRelationalDB, logging

Licence

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

beam-nuggets-0.11.0.tar.gz (18.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

beam_nuggets-0.11.0-py2-none-any.whl (23.4 kB view details)

Uploaded Python 2

File details

Details for the file beam-nuggets-0.11.0.tar.gz.

File metadata

  • Download URL: beam-nuggets-0.11.0.tar.gz
  • Upload date:
  • Size: 18.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.12.1 pkginfo/1.5.0.1 requests/2.21.0 setuptools/40.6.3 requests-toolbelt/0.8.0 tqdm/4.29.1 CPython/2.7.12

File hashes

Hashes for beam-nuggets-0.11.0.tar.gz
Algorithm Hash digest
SHA256 cdf591b5600c44ddfb5a9121f336b71d7274db81ca3c7a421c315fab8366a083
MD5 7b6ec59fdf3c5d4bbf857e4d7988e1cf
BLAKE2b-256 64c28601b967752902d73d515ee4b9f3f090fd4f165415349070899579e6408f

See more details on using hashes here.

File details

Details for the file beam_nuggets-0.11.0-py2-none-any.whl.

File metadata

  • Download URL: beam_nuggets-0.11.0-py2-none-any.whl
  • Upload date:
  • Size: 23.4 kB
  • Tags: Python 2
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.12.1 pkginfo/1.5.0.1 requests/2.21.0 setuptools/40.6.3 requests-toolbelt/0.8.0 tqdm/4.29.1 CPython/2.7.12

File hashes

Hashes for beam_nuggets-0.11.0-py2-none-any.whl
Algorithm Hash digest
SHA256 3a8d4a1c08ae18fb3717f9aece76aed5054b1a04da94757058290c2c071d223d
MD5 138edcd6c876b571be9a011ad36d2ed8
BLAKE2b-256 4943d5b49e91a1a51a1eaf2e113959cb9cabc7254b53571cb6142d5d7c73ea9e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page