Skip to main content

Packages for fast dataflow and workflow processing

Project description

MLFastFlow

A Python package for fast dataflow and workflow processing.

Installation

pip install mlfastflow

Features

  • FastKNN — label-anchored vector search for ML data sourcing
    • Exact and approximate search across four index strategies
    • L2 and cosine (inner product) distance metrics
    • Built-in label-recall validation
  • BigQueryClient — unified Pandas + Polars BigQuery client with GCS integration
  • Utility functions — CSV↔Parquet conversion, file concatenation, data profiling, get_info inspector, logging helpers, timer decorator

Quick Start

from mlfastflow import FastKNN

# query_df: labelled data — rows where label == 1 are the positive anchors
# search_df: the pool to search through
# knn_keys: feature columns used for similarity vectors
# label: binary column marking your positive examples

knn = FastKNN(
    query_df=labelled_df,
    search_df=pool_df,
    knn_keys=["feature_1", "feature_2", "feature_3"],
    label="is_positive",
)

sourced_df, sourced_df_with_label = knn.run()
knn.validate()

FastKNN — Vector Similarity Search

FastKNN is designed for ML data-sourcing workflows. It uses the label column as an anchor: rows where label == 1 in query_df are the positive examples, and the search finds the most similar candidates from search_df.

Key Parameters

knn_keys — the feature columns used to measure similarity between rows. These should be numeric columns that represent the vector space you want to search in (e.g. embeddings, normalised features, encoded variables). Only these columns are used for the distance computation; all other columns are preserved in the output via search_df_raw.

# Example: use three numeric feature columns as the similarity vector
knn_keys=["age_scaled", "income_scaled", "spend_scaled"]

fillna_method — strategy for handling missing values before the search. Missing values must be imputed because distance metrics require complete numeric vectors.

Value Fills with
'zero' (default) 0 — safe and fast, assumes absence = zero signal
'mean' Column mean — good for normally distributed features
'median' Column median — robust to outliers
'mode' Most frequent value — useful for categorical-encoded columns
'max' / 'min' Column max or min — use when extremes are meaningful
# Example: use column means to preserve feature scale
knn = FastKNN(..., fillna_method="mean")

Index Types

index_type Exact? Speed Memory Best for
'flat' (default) ✅ Yes Moderate High Small datasets, baseline
'ivf_flat' ≈ tunable Fast Moderate Large datasets (>100K)
'ivf_pq' ≈ tunable Very fast Low Very large / memory-constrained
'hnsw' ≈ tunable Very fast Moderate High recall + speed

Examples

from mlfastflow import FastKNN

# Exact search (default) — L2 distance
knn = FastKNN(
    query_df=labelled_df, search_df=pool_df,
    knn_keys=["f1", "f2", "f3"], label="is_positive",
)

# Cosine similarity
knn = FastKNN(
    query_df=labelled_df, search_df=pool_df,
    knn_keys=["f1", "f2", "f3"], label="is_positive",
    metric="ip", normalize=True,
)

# Approximate search — IVF Flat
knn = FastKNN(
    query_df=labelled_df, search_df=pool_df,
    knn_keys=["f1", "f2", "f3"], label="is_positive",
    index_type="ivf_flat", nlist=200, nprobe=16,
)

# Memory-efficient — IVF-PQ
knn = FastKNN(
    query_df=labelled_df, search_df=pool_df,
    knn_keys=[f"emb_{i}" for i in range(64)], label="is_positive",
    index_type="ivf_pq", nlist=500, nprobe=32, pq_m=8,
)

# Graph-based — HNSW
knn = FastKNN(
    query_df=labelled_df, search_df=pool_df,
    knn_keys=["f1", "f2", "f3"], label="is_positive",
    index_type="hnsw", hnsw_m=32, ef_construction=64, ef_search=32,
)

sourced_df, sourced_df_with_label = knn.run()
knn.validate()  # logs rows, label counts, and recall %

BigQuery Integration

MLFastFlow provides a powerful BigQueryClient class for seamless integration with Google BigQuery and Google Cloud Storage (GCS). It supports both Pandas and Polars DataFrames in a single unified client.

Initialization

from mlfastflow import BigQueryClient

# With a service account key file
client = BigQueryClient(
    project_id="your-gcp-project",
    dataset_id="your_dataset",
    key_file="/path/to/service-account.json"
)

# With Application Default Credentials (recommended on GCP / CI)
client = BigQueryClient(project_id="your-gcp-project", dataset_id="your_dataset")

# From environment variables (BQ_PROJECT_ID, BQ_DATASET_ID, BQ_KEY_FILE)
client = BigQueryClient.from_env()

# Context manager — client closes automatically
with BigQueryClient.from_env() as client:
    df = client.sql2df("SELECT * FROM orders LIMIT 10")

Running SQL Queries

# Pandas DataFrame (now uses Arrow internally for superior type consistency)
df = client.sql2df("SELECT * FROM orders WHERE status = 'active'")

# Polars DataFrame (Arrow transfer, no pandas overhead)
df = client.sql2polars("SELECT * FROM orders")

# Polars LazyFrame for deferred computation
lf = client.sql2polars("SELECT * FROM orders", lazy=True)
result = lf.filter(pl.col("amount") > 100).collect()

# Save directly to a local file (.parquet, .csv, .json)
client.sql2file(sql="SELECT * FROM orders", file_path="export.parquet")

# DDL / DML — returns job ID for tracking
job_id = client.run_sql("CREATE TABLE summary AS SELECT ...")

# Cost guard — raises before executing if scan > N GB
df = client.sql2df("SELECT * FROM huge_table", max_gb=1.0)

# Dry run — check cost without executing
estimate = client.sql2df("SELECT * FROM huge_table", dry_run=True)
print(f"Would scan {estimate['estimated_gb']} GB")

# BQ Storage Read API — opt-in for faster downloads on large result sets.
# Requires roles/bigquery.readSessionUser on the service account.
# Note: Both standard and Storage API paths now use Arrow-based conversion
# to ensure consistent datetime64 types and microsecond precision.
df = client.sql2df("SELECT * FROM huge_table", use_bqstorage=True)

Exploration & Inspection

# Peek at any table — no SQL required
df = client.preview("orders")               # first 10 rows (Pandas)
df = client.preview("orders", n=100, use_polars=True)

# Table metadata: row count, storage size, timestamps, schema
info = client.get_table_info("orders")
print(info["rows"], info["size_gb"], info["modified"])

# Schema only
schema = client.describe_table("orders")
# [{"name": "id", "type": "INT64", "mode": "NULLABLE", ...}, ...]

# List tables / check existence
tables = client.list_tables()              # ['orders', 'customers', ...]
exists = client.table_exists("orders")     # True / False

Table Operations

# Materialise a query result into a table (server-side, no data transferred)
client.query_to_table(
    "SELECT user_id, COUNT(*) AS orders FROM orders GROUP BY 1",
    dest_table_id="summary",
    if_exists="replace",   # 'fail' | 'replace' | 'append'
)

# Server-side table copy (no scan cost)
client.copy_table("orders_staging", "orders", if_exists="replace")
client.copy_table("orders", "orders_backup_20260304")

# Truncate (requires typing table name to confirm)
client.truncate_table("orders")

# Drop permanently (requires typing table name to confirm)
client.delete_table("old_staging_table")

# Batch SQL execution
client.execute_many([
    "TRUNCATE TABLE staging.orders",
    "INSERT INTO staging.orders SELECT ...",
])
client.execute_many([...], parallel=True)   # submit all jobs concurrently

Async Job Management

# Submit a long-running job and get the ID immediately
job_id = client.run_sql("CREATE TABLE archive AS SELECT ...", priority="INTERACTIVE")

# Check status at any time
status = client.check_job_status(job_id)

# Block until the job finishes (raises on failure or timeout)
client.wait_for_job(job_id, timeout=600)

DataFrame to BigQuery

# Pandas DataFrame → BigQuery (Arrow/Parquet path, no pandas round-trip)
client.df2table(df=pandas_df, table_id="orders", if_exists="replace")

# Polars DataFrame → BigQuery
client.polars2table(df=polars_df, table_id="orders", if_exists="append")

BigQuery ↔ Google Cloud Storage

# SQL → GCS (server-side export, no data through client)
client.sql2gcs(
    sql="SELECT * FROM orders",
    destination_uri="gs://bucket/export.parquet",
    format="PARQUET",       # 'PARQUET' | 'CSV' | 'JSON' | 'AVRO'
    compression="SNAPPY",
    use_sharding=True,      # auto-adds wildcard for parallel export
)

# DataFrame → GCS
client.df2gcs(df=polars_df, destination_uri="gs://bucket/export.parquet")

# GCS file → DataFrame (single file, no BQ query)
df = client.gcs2df("gs://bucket/export.parquet")
df = client.gcs2df("gs://bucket/data.csv", format="CSV", use_polars=True)

# GCS file → BigQuery table (supports wildcards)
client.gcs2table(
    gcs_uri="gs://bucket/data/*.parquet",
    table_id="orders",
    write_disposition="WRITE_TRUNCATE",
)

GCS Folder Management

client.create_gcs_folder("gs://bucket/new-folder/")

success, count = client.delete_gcs_folder(
    gcs_folder_path="gs://bucket/old-folder/",
    dry_run=True   # preview before deleting
)

Entity Relationship Diagram

client.erd(
    table_list=["project.dataset.orders", "project.dataset.customers"],
    output_filename="bq_erd",
    output_format="png",
    view_diagram=True,
)

Data Type Handling

# Fix mixed-type columns before uploading to BigQuery
df = BigQueryClient.fix_mixed_types(
    df=your_dataframe,
    columns=["col1", "col2"],  # optional — defaults to all object-dtype columns
    strategy="infer",          # 'infer' (numeric → datetime → string) | 'to_string'
)

Utility Functions

CSV to Parquet Conversion

Convert CSV files to Parquet using Polars streaming (sink_parquet) for constant memory usage, even on multi-GB files:

from mlfastflow import csv2parquet

# Convert a single CSV file
csv2parquet("path/to/file.csv")

# Convert all CSV files in a directory (recursively)
csv2parquet("path/to/directory", sub_folders=True)

# Custom output directory, zstd compression, skip already-converted files
csv2parquet(
    "path/to/source",
    output_dir="path/to/destination",
    compression="zstd",    # Options: 'snappy', 'zstd', 'lz4', 'gzip', 'uncompressed'
    overwrite=False         # Skip files that already have a .parquet counterpart
)

Parquet to CSV Conversion

Convert Parquet file(s) back to CSV — mirror image of csv2parquet:

from mlfastflow import parquet2csv

# Single file
parquet2csv("path/to/file.parquet")

# Whole directory, custom output location
parquet2csv("path/to/directory", output_dir="path/to/csv_export/", overwrite=True)

File Concatenation

Combine all CSV or Parquet files in a folder into a single file using lazy scanning:

from mlfastflow import concat_files

# Combine all Parquet files in a folder
output_path = concat_files("path/to/folder", file_type="parquet")

# Combine CSVs with a source column, custom output location
output_path = concat_files(
    "path/to/folder",
    file_type="csv",
    add_source_column=True,             # Adds a SOURCE column with the filename
    output_file="path/to/combined.csv",
    how="diagonal_relaxed"              # Handles schema mismatches across files
)

Timer Decorator

Log the execution time of any function with adaptive formatting (ms/s/min):

from mlfastflow import timer_decorator

@timer_decorator
def my_function():
    # ... your code ...
    pass

my_function()
# INFO:mlfastflow.utils:Finished 'my_function' in 342.5 ms

Data Profiling

Generate an HTML profiling report for any Pandas or Polars DataFrame:

from mlfastflow import profile

# Minimal report (fast)
profile(df, title="Customer Data Report", output_path="reports/")

# Full report with correlations
profile(df, title="Full Analysis", minimal=False)

# Append timestamp so successive runs don't overwrite each other
profile(df, title="Daily QC", timestamp=True)
# → Daily_QC_20260306_012345.html

Requires ydata-profiling: pip install ydata-profiling

get_info — Instant Object Inspector

Return a concise summary dict for any supported object — no optional dependencies:

from mlfastflow import get_info

# DataFrame (Pandas or Polars)
get_info(df)
# {'rows': 50000, 'columns': 8, 'backend': 'polars', 'memory_mb': 3.2, ...}

# BigQueryClient
get_info(client)
# {'project_id': 'my-project', 'dataset_id': 'my_ds',
#  'bq_client': 'initialized', 'query_count': 3,
#  'tables': ['orders', 'customers', ...]}

New types are added by registering a handler — no changes to existing code needed.

Logging Setup

from mlfastflow import configure_logging

configure_logging()          # INFO and above, clean timestamp format
configure_logging("DEBUG")   # verbose

License

MIT

Author

genX.AI

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mlfastflow-0.2.9.9.tar.gz (47.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mlfastflow-0.2.9.9-py3-none-any.whl (46.1 kB view details)

Uploaded Python 3

File details

Details for the file mlfastflow-0.2.9.9.tar.gz.

File metadata

  • Download URL: mlfastflow-0.2.9.9.tar.gz
  • Upload date:
  • Size: 47.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.5

File hashes

Hashes for mlfastflow-0.2.9.9.tar.gz
Algorithm Hash digest
SHA256 91931fe978b183aaca72a0a832055ad24feb9adfcb9552dcd2866ddb48a20f07
MD5 5bb44dc815e222bd00dc87f5a0890c89
BLAKE2b-256 32f7e14aa078c43602d0e68fad69e918e488cc1c352c01821ebb79d27cf17540

See more details on using hashes here.

File details

Details for the file mlfastflow-0.2.9.9-py3-none-any.whl.

File metadata

  • Download URL: mlfastflow-0.2.9.9-py3-none-any.whl
  • Upload date:
  • Size: 46.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.5

File hashes

Hashes for mlfastflow-0.2.9.9-py3-none-any.whl
Algorithm Hash digest
SHA256 9cd1fff63a0734c5aa321f47f1f447f1b4495867b496d5d9052c8e200e29f2d3
MD5 b4ae367a30eb7f8a29c24634a7c47c1b
BLAKE2b-256 ea87773007188299c84d25b698a8ecadd88ba6dc73e5fffcc3d8454b49abede8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page