Skip to main content

Apache Beam 2.72.0 transforms and utilities for Python 3.12

Project description

DOJO-Beam-Transforms

DOJO-Beam-Transforms is a reusable collection of Apache Beam transforms and utility components for data cleaning, enrichment, and ingestion workflows.

Current Release

  • Package: dojo-beam-transforms
  • Current release: 3.1.1
  • Python target: 3.12

Compatibility Matrix (v3.1.1)

Apache Beam SDK

  • apache-beam[dataframe,gcp,interactive]==2.72.0

Core Dependencies

  • pandas==2.1.1
  • numpy==1.26.3
  • pytz==2025.2
  • openpyxl==3.1.5

Installation

Install from PyPI

pip install dojo-beam-transforms==3.1.1

Install from GitHub tag

pip install "git+https://github.com/DOJO-Smart-Ways/DOJO-Beam-Transforms.git@release-v3.1.1"

Updated Usage Examples

Example 1: CSV input + cleaning + enrichment

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

from pipeline_components.input_file import read_csvs_union
from pipeline_components import data_cleaning
from pipeline_components import data_enrichment


pipeline_options = PipelineOptions()

with beam.Pipeline(options=pipeline_options) as pipeline:
    records = read_csvs_union(
        pipeline=pipeline,
        input_pattern="gs://my-bucket/input/*.csv",
        delimiter=";",
        identifier="orders"
    )

    cleaned = (
        records
        | "Keep Relevant Columns" >> beam.ParDo(
            KeepColumns(["order_id", "status", "amount"])
        )
        | "Normalize Status" >> beam.ParDo(
            ReplaceValues(["status"], {"": "UNKNOWN", None: "UNKNOWN"})
        )
        | "Clean Amount Regex" >> beam.ParDo(
            ReplaceRegex(["amount"], [(r",", ".")])
        )
    )

    enriched = (
        cleaned
        | "Cast Amount To Float" >> beam.ParDo(ColumnsToFloat(["amount"]))
        | "Force Order Id As String" >> beam.ParDo(ColumnsToString(["order_id"]))
    )

    _ = enriched | "Write Output" >> beam.io.WriteToText("gs://my-bucket/output/orders")

Example 2: Read from BigQuery and write to BigQuery

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

from pipeline_components.input_file import read_bq
from pipeline_components.data_cleaning.TrimValues import TrimValues
from pipeline_components.data_cleaning.DropDuplicates import DropDuplicates


temp_location = "gs://my-project-temp/dataflow"
query = """
SELECT order_id, customer_name, city
FROM `my_project.my_dataset.orders`
"""

pipeline_options = PipelineOptions()

with beam.Pipeline(options=pipeline_options) as pipeline:
    rows = read_bq(
        pipeline=pipeline,
        query=query,
        temp_location=temp_location,
        use_standard_sql=True,
        identifier="orders_bq"
    )

    transformed = (
        rows
        | "Trim Customer Name" >> beam.ParDo(TrimValues(["customer_name"]))
        | "Drop Duplicates" >> beam.ParDo(DropDuplicates(["order_id"]))
    )

    transformed | "Write To BQ" >> beam.io.WriteToBigQuery(
        table="my_project:my_dataset.orders_clean",
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    )

Docker Image for Dataflow Custom Container

The repository includes a Dockerfile aligned with Beam SDK 2.72.0 and Python 3.12.

docker build -t dojo_beam:3.1.1 .
docker tag dojo_beam:3.1.1 REGION-docker.pkg.dev/PROJECT_ID/dojo-beam/dojo_beam:3.1.1
docker push REGION-docker.pkg.dev/PROJECT_ID/dojo-beam/dojo_beam:3.1.1

For Dataflow custom container runs, set:

  • sdk_container_image=REGION-docker.pkg.dev/PROJECT_ID/dojo-beam/dojo_beam:3.1.1
  • sdk_location=container

Release Preparation Checklist (GitHub + PyPI)

  1. Confirm versions are aligned in:
    • setup.py
    • pyproject.toml
    • enums/DojoBeamTransformVersion.py
    • Docker image/tag references
  2. Build distributions:
python -m pip install --upgrade build twine
python -m build
python -m twine check dist/*
  1. Tag and publish GitHub release:
git tag -a release-v3.1.1 -m "Release v3.1.1"
git push origin release-v3.1.1
  1. Publish to PyPI:
python -m twine upload dist/*

Release Text (Ready to Use)

See RELEASE_TEXT_v3.1.1.md for prewritten notes for:

  • GitHub Release description
  • PyPI long-description changelog section

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dojo_beam_transforms-3.1.1.post1.tar.gz (24.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dojo_beam_transforms-3.1.1.post1-py3-none-any.whl (43.1 kB view details)

Uploaded Python 3

File details

Details for the file dojo_beam_transforms-3.1.1.post1.tar.gz.

File metadata

File hashes

Hashes for dojo_beam_transforms-3.1.1.post1.tar.gz
Algorithm Hash digest
SHA256 271be8ebb21c04caca34285febb982feabdff32814887007f595fbe90664f1b6
MD5 b7bb2551f173f01cc761e160b463c8cb
BLAKE2b-256 21b0a4e3927e947720917807f847bd5b3e060043917f2485df97eca3afbc2b50

See more details on using hashes here.

File details

Details for the file dojo_beam_transforms-3.1.1.post1-py3-none-any.whl.

File metadata

File hashes

Hashes for dojo_beam_transforms-3.1.1.post1-py3-none-any.whl
Algorithm Hash digest
SHA256 b7bd22bd6c34c687c96af639d9bb7a9faa03aa1c87db9a50fa0bfce239e89128
MD5 a2e8f4940c0e04e45492a12d871a3880
BLAKE2b-256 23f408077d4a4368ac2523b9ecfe05e1e7e76cceb255599a4bcc78f21ca5dc81

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page