Skip to main content

Apache Spark-based client for BharatML Stack data pipeline operations

Project description

spark_feature_push_client

PyPI version Build Status Python 3.7+ Discord License

Apache Spark-based client for pushing ML features from offline batch sources to the BharatML Stack Online Feature Store via Kafka. This client is designed for data pipeline operations - reading from batch sources and publishing to Kafka for online consumption.

Installation

pip install spark_feature_push_client

Dependencies

This package depends on:

  • bharatml_commons: Common utilities and protobuf definitions
  • PySpark 3.0+: For distributed data processing

Architecture Role

┌─────────────────┐    ┌──────────────────────┐    ┌─────────────┐    ┌─────────────────┐
│   Batch Sources │───▶│ Spark Feature Push   │───▶│   Kafka     │───▶│ Online Feature  │
│ • Tables        │    │      Client          │    │             │    │     Store       │
│ • Parquet       │    │ • Read & Transform   │    │             │    │                 │
│ • Delta         │    │ • Protobuf Serialize │    │             │    │                 │
│ • S3/GCS/ADLS   │    │ • Batch Processing   │    │             │    │                 │
└─────────────────┘    └──────────────────────┘    └─────────────┘    └─────────────────┘
                                                                                ▲
                                                                                │
                                                                      ┌─────────────────┐
                                                                      │ grpc_feature_   │
                                                                      │ client          │
                                                                      │ (Real-time)     │
                                                                      └─────────────────┘

Features

  • Batch Source Integration: Read from Tables (Hive/Delta), Parquet, and Delta files on cloud storage
  • Spark Processing: Leverage Apache Spark for distributed data processing
  • Protobuf Serialization: Convert feature data to protobuf format using bharatml_commons schemas
  • Kafka Publishing: Push serialized features to Kafka topics for online consumption
  • Metadata Integration: Fetch feature schemas and configurations via REST API
  • Data Type Support: Handle scalar and vector types (strings, numbers, booleans, arrays)
  • Batch Optimization: Configurable batch sizes for optimal Kafka throughput

When to Use This Client

Use spark_feature_push_client for:

  • 🔄 Batch ETL Pipelines: Scheduled feature computation and publishing
  • 📊 Historical Data Backfill: Loading historical features into online store
  • 🏗️ Data Engineering: Spark-based feature transformations
  • 📈 Large Scale Processing: Processing millions of records efficiently
  • Offline-to-Online: Bridge between batch and real-time systems

Use grpc_feature_client for:

  • 🚀 Real-time Operations: Direct persist/retrieve operations
  • 🔍 Interactive Queries: Low-latency feature lookups
  • 🎯 API Integration: Service-to-service communication
  • 💨 Single Records: Persisting individual feature records

Quick Start

from spark_feature_push_client import OnlineFeatureStorePyClient

# Initialize client with metadata source
client = OnlineFeatureStorePyClient(
    features_metadata_source_url="https://api.example.com/metadata",
    job_id="feature-pipeline-job",
    job_token="your-auth-token"
)

# Get feature configuration 
feature_details = client.get_features_details()

# Process your Spark DataFrame
proto_df = client.generate_df_with_protobuf_messages(your_spark_df)

# Push to Kafka
client.write_protobuf_df_to_kafka(
    proto_df,
    kafka_bootstrap_servers="localhost:9092",
    kafka_topic="features.user_features"
)

Related Packages

This package is part of the BharatML Stack ecosystem:

License

Licensed under the BharatMLStack Business Source License 1.1. See LICENSE for details.

Contributing

We welcome contributions! Please see our Contributing Guide for details.

Prerequisites

  • Apache Spark 3.0+: For distributed processing
  • Kafka Connector: spark-sql-kafka for Kafka integration
  • Java 8/11: Required by Spark
  • bharatml_common: For protobuf schemas
# Example Spark session setup
spark = SparkSession.builder \
    .appName("FeaturePipeline") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0") \
    .getOrCreate()

Supported Data Sources

1. Database Tables

# Hive/Delta tables
df = spark.sql("SELECT * FROM feature_db.user_features")

2. Cloud Storage - Parquet

# AWS S3
df = spark.read.parquet("s3a://bucket/path/to/features/")

# Google Cloud Storage  
df = spark.read.parquet("gs://bucket/path/to/features/")

# Azure Data Lake
df = spark.read.parquet("abfss://container@account.dfs.core.windows.net/path/")

3. Cloud Storage - Delta

# Delta format on cloud storage
df = spark.read.format("delta").load("s3a://bucket/delta-table/")

Configuration Examples

Basic Pipeline

from pyspark.sql import SparkSession
from spark_feature_push_client import OnlineFeatureStorePyClient

# Create Spark session
spark = SparkSession.builder \
    .appName("FeatureETL") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0") \
    .getOrCreate()

# Initialize client
client = OnlineFeatureStorePyClient(
    features_metadata_source_url="https://metadata-service.example.com/api/v1/features",
    job_id="daily-feature-pipeline",
    job_token="pipeline-secret-token",
    fgs_to_consider=["user_demographics", "user_behavior"]  # Optional: filter feature groups
)

# Get metadata and column mappings
(
    offline_src_type_columns,
    offline_col_to_default_values_map, 
    entity_column_names
) = client.get_features_details()

print(f"Entity columns: {entity_column_names}")
print(f"Feature mappings: {offline_src_type_columns}")

Reading from Multiple Sources

def get_features_from_all_sources(spark, entity_columns, feature_mapping, default_values):
    """
    Read and combine features from multiple offline sources
    """
    dataframes = []
    
    for source_info in feature_mapping:
        table_name, source_type, feature_list = source_info
        
        if source_type == "TABLE":
            # Read from Hive/Delta table
            df = spark.table(table_name)
            
        elif source_type.startswith("PARQUET_"):
            # Read from Parquet files
            df = spark.read.parquet(table_name)
            
        elif source_type.startswith("DELTA_"):
            # Read from Delta files
            df = spark.read.format("delta").load(table_name)
        
        # Select and rename columns
        select_cols = entity_columns.copy()
        for original_col, renamed_col in feature_list:
            if original_col in df.columns:
                df = df.withColumnRenamed(original_col, renamed_col)
                select_cols.append(renamed_col)
        
        df = df.select(select_cols)
        dataframes.append(df)
    
    # Union all dataframes
    if dataframes:
        combined_df = dataframes[0]
        for df in dataframes[1:]:
            combined_df = combined_df.unionByName(df, allowMissingColumns=True)
        
        # Fill missing values with defaults
        for col, default_val in default_values.items():
            if col in combined_df.columns:
                combined_df = combined_df.fillna({col: default_val})
        
        return combined_df
    
    return None

# Use the function
df = get_features_from_all_sources(
    spark, 
    entity_column_names, 
    offline_src_type_columns, 
    offline_col_to_default_values_map
)

Protobuf Serialization & Kafka Publishing

# Convert DataFrame to protobuf messages
# This creates binary protobuf messages suitable for Kafka
proto_df = client.generate_df_with_protobuf_messages(
    df, 
    intra_batch_size=20  # Batch size for serialization
)

# The proto_df has schema: [value: binary, intra_batch_id: long]
proto_df.printSchema()
# root
#  |-- value: binary (nullable = false)  
#  |-- intra_batch_id: long (nullable = false)

# Write to Kafka with batching for better throughput
client.write_protobuf_df_to_kafka(
    proto_df,
    kafka_bootstrap_servers="broker1:9092,broker2:9092,broker3:9092",
    kafka_topic="features.user_features",
    additional_options={
        "kafka.acks": "all",
        "kafka.retries": "3",
        "kafka.compression.type": "snappy"
    },
    kafka_num_batches=4  # Split into 4 parallel Kafka writes
)

Data Type Handling

The client automatically handles the protobuf data type mappings:

Scalar Types

# Example DataFrame with different types
data = [
    ("user123", 25, 185.5, True, "premium"),      # int, float, bool, string
    ("user456", 30, 170.0, False, "basic")
]
df = spark.createDataFrame(data, ["user_id", "age", "height", "is_premium", "tier"])

# Automatically mapped to protobuf:
# age -> int32_values
# height -> fp32_values  
# is_premium -> bool_values
# tier -> string_values

Vector Types

# Example with vector/array features
from pyspark.sql.functions import array, lit

df = spark.createDataFrame([
    ("user123", [0.1, 0.2, 0.3], ["tech", "sports"], [1, 2, 3])
], ["user_id", "embeddings", "interests", "scores"])

# Automatically mapped to protobuf vectors:
# embeddings -> fp32_values in Vector
# interests -> string_values in Vector
# scores -> int32_values in Vector

Production Pipeline Example

def run_feature_pipeline():
    """
    Complete feature pipeline from batch sources to Kafka
    """
    
    # 1. Initialize Spark
    spark = SparkSession.builder \
        .appName("DailyFeaturePipeline") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0") \
        .getOrCreate()
    
    try:
        # 2. Initialize feature client
        client = OnlineFeatureStorePyClient(
            features_metadata_source_url=os.getenv("METADATA_URL"),
            job_id=os.getenv("JOB_ID"),
            job_token=os.getenv("JOB_TOKEN")
        )
        
        # 3. Get feature configuration
        feature_mapping, default_values, entity_columns = client.get_features_details()
        
        # 4. Read and process data
        df = get_features_from_all_sources(spark, entity_columns, feature_mapping, default_values)
        
        if df is None or df.count() == 0:
            raise ValueError("No data found in sources")
        
        # 5. Convert to protobuf
        proto_df = client.generate_df_with_protobuf_messages(df, intra_batch_size=50)
        
        # 6. Publish to Kafka
        client.write_protobuf_df_to_kafka(
            proto_df,
            kafka_bootstrap_servers=os.getenv("KAFKA_BROKERS"),
            kafka_topic=os.getenv("KAFKA_TOPIC"),
            additional_options={
                "kafka.acks": "all",
                "kafka.compression.type": "snappy",
                "kafka.max.request.size": "10485760"  # 10MB
            },
            kafka_num_batches=int(os.getenv("KAFKA_BATCHES", "4"))
        )
        
        print(f"✅ Successfully processed {df.count()} records")
        
    finally:
        spark.stop()

if __name__ == "__main__":
    run_feature_pipeline()

Configuration Options

Client Configuration

client = OnlineFeatureStorePyClient(
    features_metadata_source_url="https://api.example.com/metadata",  # Required
    job_id="pipeline-job-001",                                       # Required  
    job_token="secret-token-123",                                    # Required
    fgs_to_consider=["user_features", "item_features"]               # Optional: filter feature groups
)

Protobuf Serialization Options

proto_df = client.generate_df_with_protobuf_messages(
    df,
    intra_batch_size=20  # Records per protobuf message (default: 20)
)

Kafka Publishing Options

client.write_protobuf_df_to_kafka(
    proto_df,
    kafka_bootstrap_servers="localhost:9092",
    kafka_topic="features.topic",
    additional_options={
        "kafka.acks": "all",                    # Acknowledgment level
        "kafka.retries": "3",                   # Retry attempts
        "kafka.compression.type": "snappy",     # Compression
        "kafka.batch.size": "16384",            # Batch size
        "kafka.linger.ms": "100",               # Batching delay
        "kafka.max.request.size": "10485760"    # Max message size
    },
    kafka_num_batches=1  # Number of parallel Kafka writers (default: 1)
)

Performance Tuning

Spark Optimizations

spark = SparkSession.builder \
    .appName("FeaturePipeline") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

Memory Management

# For large datasets, consider:
df = df.repartition(200)  # Optimal partition count
df.cache()  # Cache if reused multiple times

Kafka Throughput

# For high-throughput scenarios:
client.write_protobuf_df_to_kafka(
    proto_df,
    kafka_bootstrap_servers="brokers",
    kafka_topic="topic", 
    kafka_num_batches=8,  # Increase parallel writers
    additional_options={
        "kafka.batch.size": "65536",      # Larger batches
        "kafka.linger.ms": "100",         # Allow batching delay
        "kafka.compression.type": "lz4"   # Fast compression
    }
)

Monitoring & Debugging

DataFrame Inspection

# Check data before processing
print(f"Records: {df.count()}")
print(f"Columns: {df.columns}")
df.printSchema()
df.show(5)

# Check protobuf output
proto_df.show(5, truncate=False)
print(f"Protobuf messages: {proto_df.count()}")

Error Handling

try:
    proto_df = client.generate_df_with_protobuf_messages(df)
    client.write_protobuf_df_to_kafka(proto_df, brokers, topic)
    
except Exception as e:
    print(f"Pipeline failed: {e}")
    # Log to monitoring system
    # Send alerts
    raise

Integration with Other SDKs

With gRPC Feature Client

# Spark client pushes features to Kafka
spark_client = OnlineFeatureStorePyClient(...)
spark_client.write_protobuf_df_to_kafka(proto_df, brokers, topic)

# gRPC client retrieves features in real-time
from grpc_feature_client import GRPCFeatureClient
grpc_client = GRPCFeatureClient(config)
features = grpc_client.retrieve_decoded_features(...)

With HTTP Feature Client (bharatml_common)

# Use HTTP client for metadata validation
from bharatml_common import HTTPFeatureClient
http_client = HTTPFeatureClient(base_url, job_id, token)
metadata = http_client.get_feature_metadata()

# Validate feature names using shared utilities
from bharatml_common import clean_column_name
clean_features = [clean_column_name(name) for name in feature_names]

# Process with Spark client
spark_client.generate_df_with_protobuf_messages(df)

Common Use Cases

1. Daily Batch ETL

# Cron job: 0 2 * * * (daily at 2 AM)
spark-submit \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0 \
  --conf spark.sql.adaptive.enabled=true \
  daily_feature_pipeline.py

2. Historical Backfill

# Backfill last 30 days
from datetime import datetime, timedelta

for i in range(30):
    date = datetime.now() - timedelta(days=i)
    df = spark.sql(f"""
        SELECT * FROM features 
        WHERE date = '{date.strftime('%Y-%m-%d')}'
    """)
    
    proto_df = client.generate_df_with_protobuf_messages(df)
    client.write_protobuf_df_to_kafka(proto_df, brokers, f"backfill.{date.strftime('%Y%m%d')}")

3. Real-time Streaming (Advanced)

# Read from streaming source, process, and publish
streaming_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", input_brokers) \
    .option("subscribe", input_topic) \
    .load()

# Process streaming DataFrame
processed_df = streaming_df.select(...)

# Write to output Kafka (requires structured streaming)
query = processed_df.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", output_brokers) \
    .option("topic", output_topic) \
    .start()

Troubleshooting

Common Issues

  1. OutOfMemoryError

    # Increase driver memory or reduce partition size
    spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionNum", "50")
    
  2. Kafka Connection Timeout

    # Check network connectivity and broker addresses
    additional_options = {
        "kafka.request.timeout.ms": "60000",
        "kafka.session.timeout.ms": "30000"
    }
    
  3. Protobuf Serialization Errors

    # Check data types and null values
    df = df.fillna({"string_col": "", "numeric_col": 0})
    
  4. Metadata API Errors

    # Verify job_id, job_token, and URL
    # Check API server logs
    

Debug Mode

import logging
logging.basicConfig(level=logging.DEBUG)

# Enable Spark SQL logging
spark.sparkContext.setLogLevel("INFO")

Migration from Legacy Clients

If migrating from older versions:

# Old import
# from online_feature_store_py_client import OnlineFeatureStorePyClient

# New import (same interface)
from spark_feature_push_client import OnlineFeatureStorePyClient

# API remains the same - no code changes needed!

Best Practices

  1. Resource Management: Always stop Spark sessions
  2. Error Handling: Implement proper exception handling and retries
  3. Monitoring: Add metrics and logging to your pipelines
  4. Testing: Test with sample data before production runs
  5. Security: Use secure Kafka configurations in production
  6. Performance: Monitor Spark UI for optimization opportunities

The Spark Feature Push Client is your gateway from batch data sources to the real-time online feature store! 🚀

Contributing

We welcome contributions from the community! Please see our Contributing Guide for details on how to get started.

Community & Support

License

BharatMLStack is open-source software licensed under the BharatMLStack Business Source License 1.1.


Built with ❤️ for the ML community from Meesho
If you find this useful, ⭐️ the repo — your support means the world to us!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

spark_feature_push_client-1.0.1.tar.gz (11.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

spark_feature_push_client-1.0.1-py3-none-any.whl (12.5 kB view details)

Uploaded Python 3

File details

Details for the file spark_feature_push_client-1.0.1.tar.gz.

File metadata

File hashes

Hashes for spark_feature_push_client-1.0.1.tar.gz
Algorithm Hash digest
SHA256 3e43c5c9bc977e167c8eca0bbfc199e1e2b572eb9a7fc6dbb184e485cc3dae6b
MD5 a127b9750237d9f5d196db1542458b0f
BLAKE2b-256 ecf5aeae5695597672619ba75a52f2d6ee6e4726c406c8ddb49c96a5568bae73

See more details on using hashes here.

File details

Details for the file spark_feature_push_client-1.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for spark_feature_push_client-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 4ce4660008b760da44268581ef81cf2a5150d0b0c996c566d892753e75e6b259
MD5 a7caf6ddc7130beb0d2b3876d12477cc
BLAKE2b-256 90f4b61e524f561d3e08f1e8a9d4aff5a9de1ac15af0d275544faa7a00eb712d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page