Skip to main content

Apache Beam 2.72.0 transforms and utilities for Python 3.12

Project description

DOJO-Beam-Transforms

DOJO-Beam-Transforms is a reusable collection of Apache Beam transforms and utility components for data cleaning, enrichment, and ingestion workflows.

Current Release

  • Package: dojo-beam-transforms
  • Current release: 3.1.1
  • Python target: 3.12

Compatibility Matrix (v3.1.1)

Apache Beam SDK

  • apache-beam[dataframe,gcp,interactive]==2.72.0

Core Dependencies

  • pandas==2.1.1
  • numpy==1.26.3
  • pytz==2025.2
  • openpyxl==3.1.5

Installation

Install from PyPI

pip install dojo-beam-transforms==3.1.1

Install from GitHub tag

pip install "git+https://github.com/DOJO-Smart-Ways/DOJO-Beam-Transforms.git@release-v3.1.1"

Updated Usage Examples

Example 1: CSV input + cleaning + enrichment

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

from pipeline_components.input_file import read_csvs_union
from pipeline_components import data_cleaning as dc
from pipeline_components import data_enrichment as de


pipeline_options = PipelineOptions()

with beam.Pipeline(options=pipeline_options) as pipeline:
    records = read_csvs_union(
        pipeline=pipeline,
        input_pattern="gs://my-bucket/input/*.csv",
        delimiter=";",
        identifier="orders"
    )

    cleaned = (
        records
        | "Keep Relevant Columns" >> beam.ParDo(
            dc.KeepColumns(["order_id", "status", "amount"])
        )
        | "Normalize Status" >> beam.ParDo(
            dc.ReplaceValues(["status"], {"": "UNKNOWN", None: "UNKNOWN"})
        )
        | "Clean Amount Regex" >> beam.ParDo(
            dc.ReplaceRegex(["amount"], [(r",", ".")])
        )
    )

    enriched = (
        cleaned
        | "Cast Amount To Float" >> beam.ParDo(de.ColumnsToFloat(["amount"]))
        | "Force Order Id As String" >> beam.ParDo(de.ColumnsToString(["order_id"]))
    )

    _ = enriched | "Write Output" >> beam.io.WriteToText("gs://my-bucket/output/orders")

Example 2: Read from BigQuery and write to BigQuery

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

from pipeline_components.input_file import read_bq
from pipeline_components.data_cleaning.TrimValues import TrimValues
from pipeline_components.data_cleaning.DropDuplicates import DropDuplicates


temp_location = "gs://my-project-temp/dataflow"
query = """
SELECT order_id, customer_name, city
FROM `my_project.my_dataset.orders`
"""

pipeline_options = PipelineOptions()

with beam.Pipeline(options=pipeline_options) as pipeline:
    rows = read_bq(
        pipeline=pipeline,
        query=query,
        temp_location=temp_location,
        use_standard_sql=True,
        identifier="orders_bq"
    )

    transformed = (
        rows
        | "Trim Customer Name" >> beam.ParDo(TrimValues(["customer_name"]))
        | "Drop Duplicates" >> beam.ParDo(DropDuplicates(["order_id"]))
    )

    transformed | "Write To BQ" >> beam.io.WriteToBigQuery(
        table="my_project:my_dataset.orders_clean",
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    )

Docker Image for Dataflow Custom Container

The repository includes a Dockerfile aligned with Beam SDK 2.72.0 and Python 3.12.

docker build -t dojo_beam:3.1.1 .
docker tag dojo_beam:3.1.1 REGION-docker.pkg.dev/PROJECT_ID/dojo-beam/dojo_beam:3.1.1
docker push REGION-docker.pkg.dev/PROJECT_ID/dojo-beam/dojo_beam:3.1.1

For Dataflow custom container runs, set:

  • sdk_container_image=REGION-docker.pkg.dev/PROJECT_ID/dojo-beam/dojo_beam:3.1.1
  • sdk_location=container

Release Preparation Checklist (GitHub + PyPI)

  1. Confirm versions are aligned in:
    • setup.py
    • pyproject.toml
    • enums/DojoBeamTransformVersion.py
    • Docker image/tag references
  2. Build distributions:
python -m pip install --upgrade build twine
python -m build
python -m twine check dist/*
  1. Tag and publish GitHub release:
git tag -a release-v3.1.1 -m "Release v3.1.1"
git push origin release-v3.1.1
  1. Publish to PyPI:
python -m twine upload dist/*

Release Text (Ready to Use)

See RELEASE_TEXT_v3.1.1.md for prewritten notes for:

  • GitHub Release description
  • PyPI long-description changelog section

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dojo_beam_transforms-3.1.1.tar.gz (24.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dojo_beam_transforms-3.1.1-py3-none-any.whl (43.1 kB view details)

Uploaded Python 3

File details

Details for the file dojo_beam_transforms-3.1.1.tar.gz.

File metadata

  • Download URL: dojo_beam_transforms-3.1.1.tar.gz
  • Upload date:
  • Size: 24.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.13

File hashes

Hashes for dojo_beam_transforms-3.1.1.tar.gz
Algorithm Hash digest
SHA256 772de3270fe70a24eb2b1b83b3c9df501e688d3c5bdd226cc8a741276887de64
MD5 e58b738f239d292a2b96dea5c60a1401
BLAKE2b-256 f3787992796860b25d920d520ea535e9b2963ba9f7d689d07c5fc1e173e20c61

See more details on using hashes here.

File details

Details for the file dojo_beam_transforms-3.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for dojo_beam_transforms-3.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 11b39c6030f84b2af282cd8eeaebf2d3b2c1c3afd2eee79eb4e478859fa50c13
MD5 23dc1a18a724702d76204589b0126214
BLAKE2b-256 b79be200861eaf8b9c966d3db133be1c17da10077923d8ebee96541671fc8535

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page