Apache Beam 2.72.0 transforms and utilities for Python 3.12
Project description
DOJO-Beam-Transforms
DOJO-Beam-Transforms is a reusable collection of Apache Beam transforms and utility components for data cleaning, enrichment, and ingestion workflows.
Current Release
- Package:
dojo-beam-transforms - Current release:
3.1.1 - Python target:
3.12
Compatibility Matrix (v3.1.1)
Apache Beam SDK
apache-beam[dataframe,gcp,interactive]==2.72.0
Core Dependencies
pandas==2.1.1numpy==1.26.3pytz==2025.2openpyxl==3.1.5
Installation
Install from PyPI
pip install dojo-beam-transforms==3.1.1
Install from GitHub tag
pip install "git+https://github.com/DOJO-Smart-Ways/DOJO-Beam-Transforms.git@release-v3.1.1"
Updated Usage Examples
Example 1: CSV input + cleaning + enrichment
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from pipeline_components.input_file import read_csvs_union
from pipeline_components import data_cleaning
from pipeline_components import data_enrichment
pipeline_options = PipelineOptions()
with beam.Pipeline(options=pipeline_options) as pipeline:
records = read_csvs_union(
pipeline=pipeline,
input_pattern="gs://my-bucket/input/*.csv",
delimiter=";",
identifier="orders"
)
cleaned = (
records
| "Keep Relevant Columns" >> beam.ParDo(
KeepColumns(["order_id", "status", "amount"])
)
| "Normalize Status" >> beam.ParDo(
ReplaceValues(["status"], {"": "UNKNOWN", None: "UNKNOWN"})
)
| "Clean Amount Regex" >> beam.ParDo(
ReplaceRegex(["amount"], [(r",", ".")])
)
)
enriched = (
cleaned
| "Cast Amount To Float" >> beam.ParDo(ColumnsToFloat(["amount"]))
| "Force Order Id As String" >> beam.ParDo(ColumnsToString(["order_id"]))
)
_ = enriched | "Write Output" >> beam.io.WriteToText("gs://my-bucket/output/orders")
Example 2: Read from BigQuery and write to BigQuery
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from pipeline_components.input_file import read_bq
from pipeline_components.data_cleaning.TrimValues import TrimValues
from pipeline_components.data_cleaning.DropDuplicates import DropDuplicates
temp_location = "gs://my-project-temp/dataflow"
query = """
SELECT order_id, customer_name, city
FROM `my_project.my_dataset.orders`
"""
pipeline_options = PipelineOptions()
with beam.Pipeline(options=pipeline_options) as pipeline:
rows = read_bq(
pipeline=pipeline,
query=query,
temp_location=temp_location,
use_standard_sql=True,
identifier="orders_bq"
)
transformed = (
rows
| "Trim Customer Name" >> beam.ParDo(TrimValues(["customer_name"]))
| "Drop Duplicates" >> beam.ParDo(DropDuplicates(["order_id"]))
)
transformed | "Write To BQ" >> beam.io.WriteToBigQuery(
table="my_project:my_dataset.orders_clean",
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
)
Docker Image for Dataflow Custom Container
The repository includes a Dockerfile aligned with Beam SDK 2.72.0 and Python 3.12.
docker build -t dojo_beam:3.1.1 .
docker tag dojo_beam:3.1.1 REGION-docker.pkg.dev/PROJECT_ID/dojo-beam/dojo_beam:3.1.1
docker push REGION-docker.pkg.dev/PROJECT_ID/dojo-beam/dojo_beam:3.1.1
For Dataflow custom container runs, set:
sdk_container_image=REGION-docker.pkg.dev/PROJECT_ID/dojo-beam/dojo_beam:3.1.1sdk_location=container
Release Preparation Checklist (GitHub + PyPI)
- Confirm versions are aligned in:
setup.pypyproject.tomlenums/DojoBeamTransformVersion.py- Docker image/tag references
- Build distributions:
python -m pip install --upgrade build twine
python -m build
python -m twine check dist/*
- Tag and publish GitHub release:
git tag -a release-v3.1.1 -m "Release v3.1.1"
git push origin release-v3.1.1
- Publish to PyPI:
python -m twine upload dist/*
Release Text (Ready to Use)
See RELEASE_TEXT_v3.1.1.md for prewritten notes for:
- GitHub Release description
- PyPI long-description changelog section
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dojo_beam_transforms-3.1.1.post1.tar.gz.
File metadata
- Download URL: dojo_beam_transforms-3.1.1.post1.tar.gz
- Upload date:
- Size: 24.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
271be8ebb21c04caca34285febb982feabdff32814887007f595fbe90664f1b6
|
|
| MD5 |
b7bb2551f173f01cc761e160b463c8cb
|
|
| BLAKE2b-256 |
21b0a4e3927e947720917807f847bd5b3e060043917f2485df97eca3afbc2b50
|
File details
Details for the file dojo_beam_transforms-3.1.1.post1-py3-none-any.whl.
File metadata
- Download URL: dojo_beam_transforms-3.1.1.post1-py3-none-any.whl
- Upload date:
- Size: 43.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b7bd22bd6c34c687c96af639d9bb7a9faa03aa1c87db9a50fa0bfce239e89128
|
|
| MD5 |
a2e8f4940c0e04e45492a12d871a3880
|
|
| BLAKE2b-256 |
23f408077d4a4368ac2523b9ecfe05e1e7e76cceb255599a4bcc78f21ca5dc81
|