Skip to main content

Packages for fast dataflow and workflow processing

Project description

MLFastFlow

A Python package for fast dataflow and workflow processing.

Installation

pip install mlfastflow

Features

  • Easy-to-use data sourcing with the Sourcing class
  • Flexible vector search capabilities
  • Optimized for data processing workflows
  • Powerful BigQuery integration with support for:
    • Table operations (create, truncate, delete)
    • Asynchronous query execution for long-running jobs
    • Efficient data transfer between BigQuery and GCS
    • Advanced GCS folder management capabilities

Quick Start

from mlfastflow import Sourcing

# Create a sourcing instance
sourcing = Sourcing(
    query_df=your_query_dataframe,
    db_df=your_database_dataframe,
    columns_for_sourcing=["column1", "column2"],
    label="your_label"
)

# Process your data
sourced_db_df_without_label, sourced_db_df_with_label = (
    sourcing.sourcing()
)

BigQuery Integration

MLFastFlow provides a powerful BigQueryClient class for seamless integration with Google BigQuery and Google Cloud Storage (GCS).

Initialization

from mlfastflow import BigQueryClient

# Initialize the client with your GCP credentials
bq_client = BigQueryClient(
    project_id="your-gcp-project-id",
    dataset_id="your_dataset",
    key_file="/path/to/your/service-account-key.json"
)

Running SQL Queries

# Execute a SQL query and get results as a pandas DataFrame
df = bq_client.sql2df("SELECT * FROM your_dataset.your_table LIMIT 10")

# Run a query without returning results
bq_client.run_sql("CREATE TABLE your_dataset.new_table AS SELECT * FROM your_dataset.source_table")

# Execute a long-running query asynchronously and get the job_id for status checking
job_id = bq_client.run_sql("CREATE TABLE your_dataset.large_table AS SELECT * FROM your_dataset.huge_table")

# Check the status of an asynchronous query job
job_status = bq_client.check_job_status(job_id)

Table Operations

# Truncate a table (remove all rows while preserving schema)
bq_client.truncate_table("your_table_name")

DataFrame to BigQuery

import pandas as pd

# Create a sample DataFrame
df = pd.DataFrame({
    'id': [1, 2, 3],
    'name': ['Alice', 'Bob', 'Charlie'],
    'value': [100, 200, 300]
})

# Upload DataFrame to BigQuery
bq_client.df2table(
    df=df,
    table_id="your_table_name",
    if_exists="fail"  # Options: 'fail',  'append'
)

BigQuery to Google Cloud Storage

# Export query results to GCS as Parquet files (default)
bq_client.sql2gcs(
    sql="SELECT * FROM your_dataset.your_table",
    destination_uri="gs://your-bucket/path/to/export/",
    destination_format="PARQUET"  # Options: 'PARQUET', 'CSV', 'JSON', 'AVRO'
)

# Export large query results with control over file sizes using SQL EXPORT DATA
bq_client.sql2gcs_via_query(
    sql="SELECT * FROM your_dataset.large_table",
    destination_uri="gs://your-bucket/path/to/export/data-*.parquet",
    destination_format="PARQUET",
    max_file_size="5GB"  # Control output file size
)

# Save SQL query text to GCS for documentation/audit purposes
bq_client.save_sql_to_gcs(
    sql_content="SELECT * FROM your_dataset.your_table WHERE date = '2025-05-08'",
    bucket_name="your-bucket",
    blob_name="queries/daily_extract.sql",
    metadata={"purpose": "daily_extraction", "author": "data_team"}
)

Google Cloud Storage to BigQuery

# Load data from GCS to BigQuery
bq_client.gcs2table(
    gcs_uri="gs://your-bucket/path/to/data/*.parquet",
    table_id="your_destination_table",
    write_disposition="WRITE_TRUNCATE",  # Options: 'WRITE_TRUNCATE', 'WRITE_APPEND', 'WRITE_EMPTY'
    source_format="PARQUET"  # Options: 'PARQUET', 'CSV', 'JSON', 'AVRO', 'ORC'
)

GCS Folder Management

# Create a proper folder in GCS that appears in the GCS Console
bq_client.create_gcs_folder("gs://your-bucket/new-folder/")

# Delete a folder and all its contents
success, deleted_count = bq_client.delete_gcs_folder(
    gcs_folder_path="gs://your-bucket/folder-to-delete/",
    dry_run=True  # Set to False to actually delete
)
print(f"Would delete {deleted_count} files" if success else "Error occurred")

Resource Management

# Explicitly close the client when done to free resources
bq_client.close()
del bq_client
bq_client = None

For more detailed examples and advanced usage, refer to the documentation.

License

MIT

Author

Xileven

Project details


Release history Release notifications | RSS feed

This version

0.2.1

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mlfastflow-0.2.1.tar.gz (34.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mlfastflow-0.2.1-py3-none-any.whl (35.2 kB view details)

Uploaded Python 3

File details

Details for the file mlfastflow-0.2.1.tar.gz.

File metadata

  • Download URL: mlfastflow-0.2.1.tar.gz
  • Upload date:
  • Size: 34.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.11

File hashes

Hashes for mlfastflow-0.2.1.tar.gz
Algorithm Hash digest
SHA256 a6c45c21e10f81a3f186845179e1792f83846c3af47e7af8a12b9933719a82db
MD5 80b6721ee18d7c8498e5fd0ddf92a123
BLAKE2b-256 e71bb24fb9d87ad216352965ff7723478774ea89be3aafcf0d8964279d2ef34a

See more details on using hashes here.

File details

Details for the file mlfastflow-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: mlfastflow-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 35.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.11

File hashes

Hashes for mlfastflow-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 89490b2833392b28a4356649cb95c48cae2dae9e715eda0cbef37a2c708cc68d
MD5 f1e256691b9463ccfe9314915a9aaa4f
BLAKE2b-256 51bd5812cb9e784db94c3ab4d55972e4e6bc5087aaa15d8db2fc9b8780aa099c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page