Skip to main content

Packages for fast dataflow and workflow processing

Project description

MLFastFlow

A Python package for fast dataflow and workflow processing.

Installation

pip install mlfastflow

Features

  • FastKNN — FAISS-backed vector similarity search designed for ML data sourcing
    • Four index types: exact (flat) and approximate (ivf_flat, ivf_pq, hnsw)
    • L2 and cosine (inner product) distance metrics
    • Built-in label-recall validation
  • BigQueryClient — unified Pandas + Polars BigQuery client with GCS integration
  • Utility functions — CSV→Parquet conversion, file concatenation, data profiling, timer decorator

Quick Start

from mlfastflow import FastKNN

# query_df: labelled data — rows where label == 1 are the positive anchors
# search_df: the pool to search through
# knn_keys: feature columns used for similarity vectors
# label: binary column marking your positive examples

knn = FastKNN(
    query_df=labelled_df,
    search_df=pool_df,
    knn_keys=["feature_1", "feature_2", "feature_3"],
    label="is_positive",
)

sourced_df, sourced_df_with_label = knn.run()
knn.validate()

FastKNN — Vector Similarity Search

FastKNN is designed for ML data-sourcing workflows. It uses the label column as an anchor: rows where label == 1 in query_df are the positive examples, and the index finds the most similar candidates from search_df.

Index Types

index_type Exact? Speed Memory Best for
'flat' (default) ✅ Yes Moderate High Small datasets, baseline
'ivf_flat' ≈ tunable Fast Moderate Large datasets (>100K)
'ivf_pq' ≈ tunable Very fast Low Very large / memory-constrained
'hnsw' ≈ tunable Very fast Moderate High recall + speed

Examples

from mlfastflow import FastKNN

# Exact search (default) — L2 distance
knn = FastKNN(
    query_df=labelled_df, search_df=pool_df,
    knn_keys=["f1", "f2", "f3"], label="is_positive",
)

# Cosine similarity
knn = FastKNN(
    query_df=labelled_df, search_df=pool_df,
    knn_keys=["f1", "f2", "f3"], label="is_positive",
    metric="ip", normalize=True,
)

# Approximate search — IVF Flat
knn = FastKNN(
    query_df=labelled_df, search_df=pool_df,
    knn_keys=["f1", "f2", "f3"], label="is_positive",
    index_type="ivf_flat", nlist=200, nprobe=16,
)

# Memory-efficient — IVF-PQ
knn = FastKNN(
    query_df=labelled_df, search_df=pool_df,
    knn_keys=[f"emb_{i}" for i in range(64)], label="is_positive",
    index_type="ivf_pq", nlist=500, nprobe=32, pq_m=8,
)

# Graph-based — HNSW
knn = FastKNN(
    query_df=labelled_df, search_df=pool_df,
    knn_keys=["f1", "f2", "f3"], label="is_positive",
    index_type="hnsw", hnsw_m=32, ef_construction=64, ef_search=32,
)

sourced_df, sourced_df_with_label = knn.run()
knn.validate()  # logs rows, label counts, and recall %

BigQuery Integration

MLFastFlow provides a powerful BigQueryClient class for seamless integration with Google BigQuery and Google Cloud Storage (GCS). It supports both Pandas and Polars DataFrames in a single unified client.

Initialization

from mlfastflow import BigQueryClient

# Initialize with a service account key file
bq_client = BigQueryClient(
    project_id="your-gcp-project-id",
    dataset_id="your_dataset",
    key_file="/path/to/your/service-account-key.json"
)

# Or use Application Default Credentials (no key file needed on GCP)
bq_client = BigQueryClient(
    project_id="your-gcp-project-id",
    dataset_id="your_dataset"
)

# Display client configuration
bq_client.show()

Context Manager

The client supports Python's with statement for automatic resource cleanup:

with BigQueryClient(project_id="...", dataset_id="...") as client:
    df = client.sql2df("SELECT * FROM your_table LIMIT 10")
# Client is automatically closed here

Running SQL Queries

# Execute a SQL query and get results as a Pandas DataFrame
df = bq_client.sql2df("SELECT * FROM your_dataset.your_table LIMIT 10")

# Execute a SQL query and get results as a Polars DataFrame
df = bq_client.sql2polars("SELECT * FROM your_dataset.your_table LIMIT 10")

# Get a Polars LazyFrame for deferred computation
lf = bq_client.sql2polars("SELECT * FROM your_table", lazy=True)
result = lf.filter(pl.col("status") == "active").collect()

# Save query results directly to a local file (.parquet, .csv, .json)
bq_client.sql2file(
    sql="SELECT * FROM your_dataset.your_table",
    file_path="output.parquet"
)

# Run a DDL/DML query and get the job ID for tracking
job_id = bq_client.run_sql("CREATE TABLE your_dataset.new_table AS SELECT * FROM your_dataset.source_table")

# Check the status of an asynchronous query job
job_status = bq_client.check_job_status(job_id)

Cost Estimation (Dry Run)

Check how much data a query will scan before executing it:

estimate = bq_client.sql2df("SELECT * FROM huge_table", dry_run=True)
print(f"This query will scan {estimate['estimated_gb']} GB")

Table Operations

# Truncate a table (requires interactive confirmation by typing the table name)
bq_client.truncate_table("your_table_name")

DataFrame to BigQuery

import pandas as pd
import polars as pl

# Upload a Pandas DataFrame
df = pd.DataFrame({
    'id': [1, 2, 3],
    'name': ['Alice', 'Bob', 'Charlie'],
    'value': [100, 200, 300]
})

bq_client.df2table(
    df=df,
    table_id="your_table_name",
    if_exists="fail"  # Options: 'fail', 'replace', 'append'
)

# Upload a Polars DataFrame (uses efficient Arrow → Parquet path, no Pandas conversion)
pl_df = pl.DataFrame({
    'id': [1, 2, 3],
    'name': ['Alice', 'Bob', 'Charlie']
})

bq_client.polars2table(
    df=pl_df,
    table_id="your_table_name",
    if_exists="replace"
)

BigQuery to Google Cloud Storage

# Export query results to GCS (server-side, no data passes through your machine)
bq_client.sql2gcs(
    sql="SELECT * FROM your_dataset.your_table",
    destination_uri="gs://your-bucket/path/to/export.parquet",
    format="PARQUET",       # Options: 'PARQUET', 'CSV', 'JSON', 'AVRO'
    compression="SNAPPY",   # Options: 'NONE', 'GZIP', 'SNAPPY', 'DEFLATE'
    timeout=300,
    use_sharding=True       # Automatically adds wildcard for parallel export
)

# Export a DataFrame (Pandas or Polars) to GCS via a temporary BigQuery table
bq_client.df2gcs(
    df=pl_df,
    destination_uri="gs://your-bucket/path/to/export.parquet"
)

Google Cloud Storage to BigQuery

# Load data from GCS to BigQuery
bq_client.gcs2table(
    gcs_uri="gs://your-bucket/path/to/data/*.parquet",
    table_id="your_destination_table",
    write_disposition="WRITE_TRUNCATE",  # Options: 'WRITE_TRUNCATE', 'WRITE_APPEND', 'WRITE_EMPTY'
    source_format="PARQUET"              # Options: 'PARQUET', 'CSV', 'JSON', 'AVRO', 'ORC'
)

GCS Folder Management

# Create a folder in GCS
bq_client.create_gcs_folder("gs://your-bucket/new-folder/")

# Delete a folder and all its contents (dry run first)
success, deleted_count = bq_client.delete_gcs_folder(
    gcs_folder_path="gs://your-bucket/folder-to-delete/",
    dry_run=True  # Set to False to actually delete
)
print(f"Would delete {deleted_count} files" if success else "Error occurred")

Entity Relationship Diagram (ERD)

# Generate an ERD for a list of BigQuery tables
bq_client.erd(
    table_list=[
        "project.dataset.table1",
        "project.dataset.table2"
    ],
    output_filename="bq_erd",
    output_format="png",       # Options: 'png', 'svg', 'pdf'
    view_diagram=True          # Automatically open the generated diagram
)

Resource Management

# Using context manager (recommended)
with BigQueryClient(...) as client:
    df = client.sql2df("SELECT ...")

# Or close manually
bq_client.close()

Data Type Handling

# Fix mixed data types in a Pandas DataFrame before uploading
# Inference chain: numeric → datetime → string (nulls preserved)
df = BigQueryClient.fix_mixed_types(
    df=your_dataframe,
    columns=["column1", "column2"],  # Optional: defaults to all object-dtype columns
    strategy="infer"                 # Options: 'infer', 'to_string'
)

Utility Functions

CSV to Parquet Conversion

Convert CSV files to Parquet using Polars streaming (sink_parquet) for constant memory usage, even on multi-GB files:

from mlfastflow import csv2parquet

# Convert a single CSV file
csv2parquet("path/to/file.csv")

# Convert all CSV files in a directory (recursively)
csv2parquet("path/to/directory", sub_folders=True)

# Custom output directory, zstd compression, skip already-converted files
csv2parquet(
    "path/to/source",
    output_dir="path/to/destination",
    compression="zstd",    # Options: 'snappy', 'zstd', 'lz4', 'gzip', 'uncompressed'
    overwrite=False         # Skip files that already have a .parquet counterpart
)

File Concatenation

Combine all CSV or Parquet files in a folder into a single file using lazy scanning:

from mlfastflow import concat_files

# Combine all Parquet files in a folder
output_path = concat_files("path/to/folder", file_type="parquet")

# Combine CSVs with a source column, custom output location
output_path = concat_files(
    "path/to/folder",
    file_type="csv",
    add_source_column=True,             # Adds a SOURCE column with the filename
    output_file="path/to/combined.csv",
    how="diagonal_relaxed"              # Handles schema mismatches across files
)

Timer Decorator

Log the execution time of any function with adaptive formatting (ms/s/min):

from mlfastflow import timer_decorator

@timer_decorator
def my_function():
    # ... your code ...
    pass

my_function()
# INFO:mlfastflow.utils:Finished 'my_function' in 342.5 ms

Data Profiling

Generate an HTML profiling report for any Pandas or Polars DataFrame:

from mlfastflow import profile

# Generate a minimal profiling report
profile(df, title="Customer Data Report", output_path="reports/")

# Full report with correlations
profile(df, title="Full Analysis", minimal=False)

Requires ydata-profiling: pip install ydata-profiling

License

MIT

Author

Xileven

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mlfastflow-0.2.9.0.tar.gz (35.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mlfastflow-0.2.9.0-py3-none-any.whl (34.3 kB view details)

Uploaded Python 3

File details

Details for the file mlfastflow-0.2.9.0.tar.gz.

File metadata

  • Download URL: mlfastflow-0.2.9.0.tar.gz
  • Upload date:
  • Size: 35.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.11

File hashes

Hashes for mlfastflow-0.2.9.0.tar.gz
Algorithm Hash digest
SHA256 cfcde6cdf7726b733255d0cbc71d27d237abf2cf02dbc395a2543695400e3aeb
MD5 e456d45373d498b9f7b955eaa64da5a2
BLAKE2b-256 bc02b2956680f7b7231631e9c07793a148f81f16f91fe7943ac0ae022e0cc967

See more details on using hashes here.

File details

Details for the file mlfastflow-0.2.9.0-py3-none-any.whl.

File metadata

  • Download URL: mlfastflow-0.2.9.0-py3-none-any.whl
  • Upload date:
  • Size: 34.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.11

File hashes

Hashes for mlfastflow-0.2.9.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b313a45f3767c41e9d07f07bb68018b6dca5ff743c1109bd5bd457633c5f1eb4
MD5 0e9f896d6a8b66456ee4d13456fbc127
BLAKE2b-256 bf9c4a89f42e9b6e582e6d5e79113e1a1db36b995ab0601f974596f83cb965aa

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page