Decode MPLog feature logs from proto, arrow, or parquet format
Project description
Inference Logging Client
A Python SDK for decoding MPLog feature logs from proto, arrow, or parquet format. This client enables you to decode binary-encoded feature data from machine learning inference logging pipelines into Spark DataFrames.
Table of Contents
- Overview
- Installation
- Quick Start
- Configuration
- Core API Reference
- Data Types
- Supported Feature Types
- Encoding Formats Explained
- Exception Handling
- Command Line Interface
- Advanced Usage Examples
- Architecture & Internals
- Troubleshooting
- Development
Overview
The Inference Logging Client is designed to decode MPLog (Model Proxy Log) feature data that has been encoded for efficient storage and transmission. It supports three encoding formats:
| Format | Description | Use Case |
|---|---|---|
| Proto | Custom binary encoding with generated flag + sequential features | Default, most compact |
| Arrow | Arrow IPC format with binary columns | Columnar analytics |
| Parquet | Parquet format with feature map | Long-term storage |
Key Features
- Multi-format support: Decode Proto, Arrow, and Parquet encoded logs
- Automatic format detection: Detects encoding format from metadata byte
- Zstd compression support: Automatic decompression of zstd-compressed data
- Schema fetching: Retrieves feature schemas from inference API with caching
- Spark integration: Returns data as PySpark DataFrames
- CLI tool: Command-line interface for quick decoding
- Thread-safe caching: LRU cache for schemas with thread-safe access
Installation
From PyPI
pip install inference-logging-client
From Source
cd py-sdk/inference_logging_client
pip install -e .
With Development Dependencies
pip install -e ".[dev]"
Dependencies
| Package | Version | Purpose |
|---|---|---|
| pyspark | >=3.3.0 | Spark DataFrame operations |
| pyarrow | >=5.0.0 | Arrow/Parquet format support |
| zstandard | >=0.15.0 | Zstd decompression |
Quick Start
Basic Decoding from Bytes
from pyspark.sql import SparkSession
import inference_logging_client
# Create SparkSession
spark = SparkSession.builder \
.appName("inference-decode") \
.getOrCreate()
# Read binary MPLog data
with open("inference_log.bin", "rb") as f:
data = f.read()
# Decode to Spark DataFrame
df = inference_logging_client.decode_mplog(
log_data=data,
model_proxy_id="product-ranking-model",
version=1,
spark=spark
)
# View the results
df.show()
# entity_id feature_price feature_category embedding_vector
# 0 prod_123 29.99 5 [0.1, 0.2, ...]
# 1 prod_456 49.99 3 [0.3, 0.4, ...]
# Stop SparkSession when done
spark.stop()
Decoding from a Spark DataFrame
from pyspark.sql import SparkSession
import inference_logging_client
# Create SparkSession
spark = SparkSession.builder \
.appName("inference-decode") \
.getOrCreate()
# Read parquet file containing MPLog data
df = spark.read.parquet("inference_logs.parquet")
# Expected columns: features, metadata, mp_config_id, entities, ...
print(df.columns)
# ['prism_ingested_at', 'features', 'metadata', 'mp_config_id', 'entities', ...]
# Decode features from each row
decoded_df = inference_logging_client.decode_mplog_dataframe(df, spark)
decoded_df.show()
# entity_id prism_ingested_at mp_config_id feature_1 feature_2
# 0 user_123 2024-01-15 10:30 my-model 42 3.14
# 1 user_456 2024-01-15 10:30 my-model 17 2.71
spark.stop()
Configuration
Environment Variables
| Variable | Default | Description |
|---|---|---|
INFERENCE_HOST |
http://localhost:8082 |
Inference service base URL |
INFERENCE_PATH |
/api/v1/inference/mp-config-registry/get_feature_schema |
Schema fetch API path |
Setting Environment Variables
export INFERENCE_HOST="https://inference.prod.example.com"
export INFERENCE_PATH="/api/v1/inference/mp-config-registry/get_feature_schema"
Programmatic Configuration
from pyspark.sql import SparkSession
import inference_logging_client
spark = SparkSession.builder.appName("decode").getOrCreate()
# Pass host directly to functions
df = inference_logging_client.decode_mplog(
log_data=data,
model_proxy_id="my-model",
version=1,
spark=spark,
inference_host="https://inference.staging.example.com"
)
Core API Reference
decode_mplog()
Main function to decode MPLog bytes to a Spark DataFrame.
def decode_mplog(
log_data: bytes,
model_proxy_id: str,
version: int,
spark: SparkSession,
format_type: Optional[Format] = None,
inference_host: Optional[str] = None,
decompress: bool = True,
schema: Optional[list] = None
) -> pyspark.sql.DataFrame:
Parameters
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
log_data |
bytes |
Yes | - | The MPLog bytes (possibly zstd compressed) |
model_proxy_id |
str |
Yes | - | The model proxy config ID for schema lookup |
version |
int |
Yes | - | The schema version (0-15) |
spark |
SparkSession |
Yes | - | The SparkSession to use for creating DataFrames |
format_type |
Format |
No | None |
Encoding format. If None, auto-detects from metadata |
inference_host |
str |
No | None |
Inference service URL. Falls back to INFERENCE_HOST env |
decompress |
bool |
No | True |
Whether to attempt zstd decompression |
schema |
list |
No | None |
Pre-fetched schema to skip API call |
Returns
pyspark.sql.DataFrame with:
- First column:
entity_id- identifier for each entity - Remaining columns: decoded feature values
Exceptions
| Exception | When Raised |
|---|---|
ValueError |
Version out of range (0-15) |
ImportError |
Data is zstd-compressed but zstandard not installed |
FormatError |
Unsupported format or parse error |
SchemaFetchError |
Failed to fetch schema from API |
SchemaNotFoundError |
No features in schema response |
Example: Basic Usage
from pyspark.sql import SparkSession
import inference_logging_client
spark = SparkSession.builder.appName("decode").getOrCreate()
with open("log.bin", "rb") as f:
data = f.read()
df = inference_logging_client.decode_mplog(
log_data=data,
model_proxy_id="recommendation-model",
version=2,
spark=spark
)
print(f"Decoded {df.count()} entities with {len(df.columns) - 1} features")
Example: Explicit Format
from pyspark.sql import SparkSession
from inference_logging_client import decode_mplog, Format
spark = SparkSession.builder.appName("decode").getOrCreate()
df = decode_mplog(
log_data=arrow_encoded_data,
model_proxy_id="my-model",
version=1,
spark=spark,
format_type=Format.ARROW # Skip auto-detection
)
Example: Pre-fetched Schema (Performance Optimization)
from pyspark.sql import SparkSession
from inference_logging_client import decode_mplog, get_feature_schema
spark = SparkSession.builder.appName("decode").getOrCreate()
# Fetch schema once
schema = get_feature_schema("my-model", 1, "https://inference.example.com")
# Decode multiple logs with same schema
for log_bytes in batch_of_logs:
df = decode_mplog(
log_data=log_bytes,
model_proxy_id="my-model",
version=1,
spark=spark,
schema=schema # Reuse cached schema
)
process(df)
decode_mplog_dataframe()
Decode MPLog features from a Spark DataFrame containing encoded feature data.
def decode_mplog_dataframe(
df: pyspark.sql.DataFrame,
spark: SparkSession,
inference_host: Optional[str] = None,
decompress: bool = True,
features_column: str = "features",
metadata_column: str = "metadata",
mp_config_id_column: str = "mp_config_id"
) -> pyspark.sql.DataFrame:
Parameters
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
df |
pyspark.sql.DataFrame |
Yes | - | Input Spark DataFrame with MPLog columns |
spark |
SparkSession |
Yes | - | The SparkSession to use |
inference_host |
str |
No | None |
Inference service URL |
decompress |
bool |
No | True |
Attempt zstd decompression |
features_column |
str |
No | "features" |
Column containing encoded features |
metadata_column |
str |
No | "metadata" |
Column containing metadata byte |
mp_config_id_column |
str |
No | "mp_config_id" |
Column containing model proxy ID |
Expected Input DataFrame Columns
| Column | Type | Required | Description |
|---|---|---|---|
features |
bytes/str |
Yes | Encoded feature bytes (raw, base64, or hex) |
metadata |
int/bytes |
Yes | Metadata byte for version/format detection |
mp_config_id |
str |
Yes | Model proxy config ID |
entities |
list/str |
No | Entity IDs (JSON list or single value) |
prism_ingested_at |
datetime |
No | Preserved in output |
prism_extracted_at |
datetime |
No | Preserved in output |
created_at |
datetime |
No | Preserved in output |
parent_entity |
str/list |
No | Preserved in output |
tracking_id |
str |
No | Preserved in output |
user_id |
str |
No | Preserved in output |
year, month, day, hour |
int |
No | Partition columns, preserved |
Returns
pyspark.sql.DataFrame with:
entity_id: Entity identifier (one row per entity)- Metadata columns (if present in input)
- Decoded feature columns
Example: Processing Parquet Logs
from pyspark.sql import SparkSession
import inference_logging_client
spark = SparkSession.builder.appName("decode").getOrCreate()
# Read from data lake
df = spark.read.parquet("s3://bucket/inference-logs/dt=2024-01-15/")
# Decode all rows
decoded = inference_logging_client.decode_mplog_dataframe(df, spark)
# Analyze features
decoded.groupBy('mp_config_id').avg('feature_score').show()
Example: Custom Column Names
from pyspark.sql import SparkSession
import inference_logging_client
spark = SparkSession.builder.appName("decode").getOrCreate()
# Your DataFrame has different column names
df = spark.read.csv("custom_logs.csv", header=True)
decoded = inference_logging_client.decode_mplog_dataframe(
df,
spark,
features_column="encoded_data", # Custom name
metadata_column="meta", # Custom name
mp_config_id_column="model_id" # Custom name
)
get_mplog_metadata()
Extract metadata from MPLog bytes without full decoding. Useful for inspecting format and version.
def get_mplog_metadata(
log_data: bytes,
decompress: bool = True
) -> DecodedMPLog:
Parameters
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
log_data |
bytes |
Yes | - | The MPLog bytes |
decompress |
bool |
No | True |
Attempt zstd decompression |
Returns
DecodedMPLog dataclass with:
user_id: User identifiertracking_id: Request tracking IDmodel_proxy_config_id: Model proxy config IDentities: List of entity IDsparent_entity: List of parent entity IDsmetadata_byte: Raw metadata bytecompression_enabled: Whether compression was enabledversion: Schema version (0-15)format_type: Format type int (0=proto, 1=arrow, 2=parquet)
Example: Inspect Log Before Decoding
import inference_logging_client
with open("unknown_log.bin", "rb") as f:
data = f.read()
metadata = inference_logging_client.get_mplog_metadata(data)
print(f"Model: {metadata.model_proxy_config_id}")
print(f"Version: {metadata.version}")
print(f"Format: {inference_logging_client.get_format_name(metadata.format_type)}")
print(f"Compression: {'enabled' if metadata.compression_enabled else 'disabled'}")
print(f"Entities: {len(metadata.entities)}")
get_feature_schema()
Fetch feature schema from the inference API with automatic caching.
def get_feature_schema(
model_config_id: str,
version: int,
inference_host: Optional[str] = None,
api_path: Optional[str] = None
) -> list[FeatureInfo]:
Parameters
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
model_config_id |
str |
Yes | - | Model proxy config ID |
version |
int |
Yes | - | Schema version |
inference_host |
str |
No | None |
Inference service URL |
api_path |
str |
No | None |
API path for schema endpoint |
Returns
list[FeatureInfo]: List of feature definitions with:
name: Feature namefeature_type: Feature data type (e.g., "FP32", "INT64", "FP32VECTOR")index: Feature index in encoding order
Caching Behavior
- Schemas are cached by
(model_config_id, version)tuple - Cache is thread-safe (uses threading.Lock)
- Maximum 100 cached schemas (LRU eviction)
- Host/path are NOT part of cache key (schemas are canonical)
Example: Manual Schema Fetch
from inference_logging_client import get_feature_schema
schema = get_feature_schema(
model_config_id="product-ranking",
version=3,
inference_host="https://inference.example.com"
)
for feature in schema:
print(f" {feature.index}: {feature.name} ({feature.feature_type})")
clear_schema_cache()
Clear the internal schema cache. Useful for testing or when schemas have changed.
def clear_schema_cache() -> None:
Example
from inference_logging_client import clear_schema_cache, get_feature_schema
# Clear before testing
clear_schema_cache()
# This will fetch fresh from API
schema = get_feature_schema("my-model", 1)
Data Types
Format Enum
from inference_logging_client import Format
class Format(Enum):
PROTO = "proto" # Custom binary encoding
ARROW = "arrow" # Arrow IPC format
PARQUET = "parquet" # Parquet format
FeatureInfo
from inference_logging_client import FeatureInfo
@dataclass
class FeatureInfo:
name: str # Feature name (e.g., "user_embedding")
feature_type: str # Type string (e.g., "FP32VECTOR")
index: int # Position in encoded data
DecodedMPLog
from inference_logging_client import DecodedMPLog
@dataclass
class DecodedMPLog:
user_id: str = ""
tracking_id: str = ""
model_proxy_config_id: str = ""
entities: list[str] = field(default_factory=list)
parent_entity: list[str] = field(default_factory=list)
metadata_byte: int = 0
compression_enabled: bool = False
version: int = 0
format_type: int = 0 # 0=proto, 1=arrow, 2=parquet
Supported Feature Types
Scalar Types
| Type Aliases | Size | Description |
|---|---|---|
INT8, I8 |
1 byte | Signed 8-bit integer |
INT16, I16, SHORT |
2 bytes | Signed 16-bit integer |
INT32, I32, INT |
4 bytes | Signed 32-bit integer |
INT64, I64, LONG |
8 bytes | Signed 64-bit integer |
UINT8, U8 |
1 byte | Unsigned 8-bit integer |
UINT16, U16 |
2 bytes | Unsigned 16-bit integer |
UINT32, U32 |
4 bytes | Unsigned 32-bit integer |
UINT64, U64 |
8 bytes | Unsigned 64-bit integer |
FP16, FLOAT16, F16 |
2 bytes | IEEE 754 half-precision float |
FP32, FLOAT32, F32, FLOAT |
4 bytes | IEEE 754 single-precision float |
FP64, FLOAT64, F64, DOUBLE |
8 bytes | IEEE 754 double-precision float |
FP8E5M2, FP8E4M3 |
1 byte | 8-bit floating point (raw byte) |
BOOL, BOOLEAN |
1 byte | Boolean value |
String Types
| Type | Description |
|---|---|
STRING, STR |
UTF-8 encoded string |
BYTES |
Binary bytes with 2-byte length prefix |
Vector Types
All scalar types have vector variants:
| Type Pattern | Description |
|---|---|
{TYPE}VECTOR |
e.g., FP32VECTOR, INT64VECTOR |
VECTOR_{TYPE} |
e.g., VECTOR_FP32, VECTOR_INT64 |
DATATYPE{TYPE}VECTOR |
e.g., DATATYPEFP32VECTOR |
Vectors can be encoded as:
- Binary: Packed element bytes (most common for feature stores)
- JSON: JSON array string (fallback)
Example: Working with Vectors
from pyspark.sql import SparkSession
import inference_logging_client
spark = SparkSession.builder.appName("decode").getOrCreate()
df = inference_logging_client.decode_mplog(data, "model", 1, spark)
# Vector columns contain arrays
df.select("entity_id", "user_embedding").show(truncate=False)
# Access vector elements with Spark SQL functions
from pyspark.sql import functions as F
df.select("entity_id", F.element_at("user_embedding", 1).alias("first_elem")).show()
Encoding Formats Explained
Proto Format
The default and most compact encoding format.
┌─────────────────────────────────────────────────────────────┐
│ Byte 0: Generated Flag (1 = no generated values) │
├─────────────────────────────────────────────────────────────┤
│ Feature 0: [fixed bytes OR 2-byte size + data] │
├─────────────────────────────────────────────────────────────┤
│ Feature 1: [fixed bytes OR 2-byte size + data] │
├─────────────────────────────────────────────────────────────┤
│ ... │
└─────────────────────────────────────────────────────────────┘
- Scalars: Fixed size based on type (e.g., 4 bytes for FP32)
- Strings/Vectors: 2-byte little-endian size prefix + data
Arrow Format
Uses Arrow IPC (Inter-Process Communication) format.
┌─────────────────────────────────────────────────────────────┐
│ Arrow IPC Stream │
│ ├── Schema: columns "0", "1", "2", ... (binary type) │
│ └── RecordBatch │
│ ├── Column "0": [entity0_feature0_bytes, ...] │
│ ├── Column "1": [entity0_feature1_bytes, ...] │
│ └── ... │
└─────────────────────────────────────────────────────────────┘
- Column names are feature indices as strings ("0", "1", "2", ...)
- Each cell contains raw binary feature bytes
- All entities in a single IPC blob
Parquet Format
Uses Parquet columnar format.
┌─────────────────────────────────────────────────────────────┐
│ Parquet File │
│ └── Column "Features": map<int, binary> │
│ ├── Row 0: {0: bytes, 1: bytes, ...} │
│ ├── Row 1: {0: bytes, 1: bytes, ...} │
│ └── ... │
└─────────────────────────────────────────────────────────────┘
- Features column is a map from feature index to binary bytes
- Each row represents one entity
- Alternative: columnar format with index-named columns (like Arrow)
Metadata Byte Layout
Bit Layout:
┌─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┐
│ 7 │ 6 │ 5 │ 4 │ 3 │ 2 │ 1 │ 0 │
├─────┴─────┼─────┴─────┴─────┴─────┼─────┼─────┤
│ Format │ Version │ Rsv │Comp │
│ (2 bits) │ (4 bits) │ │ │
└───────────┴───────────────────────┴─────┴─────┘
Format Type:
00 = Proto
01 = Arrow
10 = Parquet
11 = Reserved
Version: 0-15 (4 bits)
Compression: 0 = disabled, 1 = enabled (zstd)
Exception Handling
Exception Hierarchy
InferenceLoggingError (base)
├── SchemaFetchError # API request failed
├── SchemaNotFoundError # No features in response
├── DecodeError # Feature decoding failed
├── FormatError # Invalid format or parse error
└── ProtobufError # Protobuf parsing failed
Example: Comprehensive Error Handling
from pyspark.sql import SparkSession
from inference_logging_client import (
decode_mplog,
InferenceLoggingError,
SchemaFetchError,
SchemaNotFoundError,
FormatError,
)
spark = SparkSession.builder.appName("decode").getOrCreate()
try:
df = decode_mplog(data, "my-model", 1, spark)
except SchemaFetchError as e:
print(f"Failed to fetch schema: {e}")
# Check network, inference service availability
except SchemaNotFoundError as e:
print(f"Schema not found: {e}")
# Check model_proxy_id and version
except FormatError as e:
print(f"Invalid data format: {e}")
# Check data integrity, correct format
except ValueError as e:
print(f"Invalid parameter: {e}")
# Check version range (0-15)
except ImportError as e:
print(f"Missing dependency: {e}")
# Install zstandard if data is compressed
except InferenceLoggingError as e:
print(f"Decode error: {e}")
# Generic fallback
finally:
spark.stop()
Command Line Interface
Basic Usage
# Decode with auto-detection
inference-logging-client --model-proxy-id my-model --version 1 input.bin
# Short form
inference-logging-client -m my-model -v 1 input.bin
CLI Arguments
| Argument | Short | Required | Default | Description |
|---|---|---|---|---|
input |
- | Yes | - | Input file or - for stdin |
--model-proxy-id |
-m |
Yes | - | Model proxy config ID |
--version |
-v |
Yes | - | Schema version |
--format |
-f |
No | auto |
Format: proto, arrow, parquet, auto |
--inference-host |
- | No | env/localhost | Inference service URL |
--hex |
- | No | - | Input is hex-encoded |
--base64 |
- | No | - | Input is base64-encoded |
--no-decompress |
- | No | - | Skip zstd decompression |
--output |
-o |
No | stdout | Output directory (CSV/JSON) |
--json |
- | No | - | Output as JSON |
--spark-master |
- | No | local[*] |
Spark master URL |
Examples
# Output to CSV directory
inference-logging-client -m my-model -v 1 input.bin -o output_dir
# Output as JSON
inference-logging-client -m my-model -v 1 input.bin --json
# Read from stdin (base64 encoded)
echo "BASE64_DATA" | inference-logging-client -m my-model -v 1 --base64 -
# Read from stdin (hex encoded)
cat hex_data.txt | inference-logging-client -m my-model -v 1 --hex -
# Explicit Arrow format
inference-logging-client -m my-model -v 1 --format arrow input.bin
# Custom inference host
inference-logging-client -m my-model -v 1 \
--inference-host https://inference.prod.example.com \
input.bin
# Custom Spark master
inference-logging-client -m my-model -v 1 \
--spark-master spark://master:7077 \
input.bin
# Skip decompression (for pre-decompressed data)
inference-logging-client -m my-model -v 1 --no-decompress input.bin
CLI Output Format
+----------+----------+----------+----------+
| entity_id| feature_1| feature_2| feature_3|
+----------+----------+----------+----------+
| entity_0 | 1.5 | 2.5 | 3.5 |
| entity_1 | 4.5 | 5.5 | 6.5 |
+----------+----------+----------+----------+
--- Summary ---
Format: proto (from metadata)
Version: 1
Compression: disabled
Rows: 2
Columns: 4
Features: feature_1, feature_2, feature_3...
Advanced Usage Examples
Batch Processing with Schema Reuse
import os
import glob
from pyspark.sql import SparkSession
import inference_logging_client
spark = SparkSession.builder.appName("batch-decode").getOrCreate()
# Pre-fetch schema once
schema = inference_logging_client.get_feature_schema(
"batch-model", 2, "https://inference.example.com"
)
def process_file(filepath):
with open(filepath, "rb") as f:
data = f.read()
return inference_logging_client.decode_mplog(
log_data=data,
model_proxy_id="batch-model",
version=2,
spark=spark,
schema=schema # Reuse cached schema
)
# Process files sequentially
log_files = glob.glob("/data/logs/*.bin")
all_dfs = [process_file(f) for f in log_files]
# Union all DataFrames
from functools import reduce
all_data = reduce(lambda a, b: a.union(b), all_dfs)
print(f"Total entities: {all_data.count()}")
spark.stop()
Feature Analysis Pipeline
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import inference_logging_client
spark = SparkSession.builder.appName("analysis").getOrCreate()
# Decode logs
df = inference_logging_client.decode_mplog_dataframe(
spark.read.parquet("logs.parquet"),
spark
)
# Analyze vector features
embedding_col = "user_embedding"
# Get embedding statistics
df.select(
F.size(F.col(embedding_col)).alias("dimension"),
F.aggregate(F.col(embedding_col), F.lit(0.0), lambda acc, x: acc + x).alias("sum")
).show()
# Find entities with unusual embeddings (using array functions)
df.withColumn(
"embedding_norm",
F.sqrt(F.aggregate(
F.col(embedding_col),
F.lit(0.0),
lambda acc, x: acc + x * x
))
).filter(F.col("embedding_norm") > 10.0).show()
spark.stop()
Integration with Feature Store
from pyspark.sql import SparkSession
import inference_logging_client
spark = SparkSession.builder.appName("feature-compare").getOrCreate()
# Decode inference logs
df = inference_logging_client.decode_mplog(data, "ranking-model", 1, spark)
# Compare with feature store values
from feature_store import FeatureStoreClient
fs = FeatureStoreClient()
# Collect for comparison (for small datasets)
for row in df.collect():
entity_id = row['entity_id']
# Get fresh features from store
fresh_features = fs.get_features(entity_id, ["feature_a", "feature_b"])
# Compare logged vs fresh
for feature_name in ["feature_a", "feature_b"]:
logged = row[feature_name]
fresh = fresh_features[feature_name]
if logged != fresh:
print(f"Drift detected for {entity_id}.{feature_name}:")
print(f" Logged: {logged}")
print(f" Fresh: {fresh}")
spark.stop()
Custom Schema Source
from pyspark.sql import SparkSession
from inference_logging_client import decode_mplog, FeatureInfo
spark = SparkSession.builder.appName("custom-schema").getOrCreate()
# Define schema manually (useful for testing or offline processing)
custom_schema = [
FeatureInfo(name="user_age", feature_type="INT32", index=0),
FeatureInfo(name="user_score", feature_type="FP32", index=1),
FeatureInfo(name="user_embedding", feature_type="FP32VECTOR", index=2),
FeatureInfo(name="user_category", feature_type="STRING", index=3),
]
df = decode_mplog(
log_data=data,
model_proxy_id="my-model", # Not used when schema provided
version=1, # Not used when schema provided
spark=spark,
schema=custom_schema
)
spark.stop()
Architecture & Internals
Module Structure
inference_logging_client/
├── __init__.py # Public API exports, decode_mplog(), decode_mplog_dataframe()
├── __main__.py # Module execution entry point
├── cli.py # Command-line interface
├── decoder.py # Core byte decoding, type conversion
├── exceptions.py # Exception classes
├── formats.py # Proto/Arrow/Parquet format decoders
├── io.py # Schema fetching, protobuf parsing
├── types.py # Data type definitions (Format, FeatureInfo, DecodedMPLog)
└── utils.py # Utility functions (type normalization, formatting)
Decoding Flow
┌──────────────────┐
│ MPLog Bytes │
│ (compressed?) │
└────────┬─────────┘
│
┌────────▼─────────┐
│ Zstd Decompress │
│ (if enabled) │
└────────┬─────────┘
│
┌────────▼─────────┐
│ Parse Protobuf │
│ (outer wrapper) │
└────────┬─────────┘
│
┌──────────────┼──────────────┐
│ │ │
┌───────▼───────┐ ┌────▼────┐ ┌───────▼───────┐
│ Proto Format │ │ Arrow │ │ Parquet Format│
│ Decoder │ │ Decoder │ │ Decoder │
└───────┬───────┘ └────┬────┘ └───────┬───────┘
│ │ │
└──────────────┼──────────────┘
│
┌────────▼─────────┐
│ Feature Schema │◄──── API Fetch
│ (cached) │
└────────┬─────────┘
│
┌────────▼─────────┐
│ Decode Features │
│ (by type) │
└────────┬─────────┘
│
┌────────▼─────────┐
│ Spark DataFrame │
└──────────────────┘
Schema Cache
# Thread-safe LRU cache with max 100 entries
# Key: (model_config_id, version)
# Value: list[FeatureInfo]
# Cache is NOT keyed by host/path because schemas are canonical
# Same model+version = same schema regardless of which host serves it
Troubleshooting
Common Issues
"No features found in schema"
SchemaNotFoundError: No features found in schema for model_config_id=xxx, version=1
Causes:
- Incorrect
model_proxy_id - Wrong
versionnumber - Schema not yet registered
Solutions:
- Verify model_proxy_id matches exactly
- Check available versions in inference service
- Use
get_mplog_metadata()to see the version in the data
"Version out of valid range"
ValueError: Version 16 is out of valid range (0-15)
Cause: Version is encoded in 4 bits (0-15 only)
Solution: Check the version number passed to decode functions
"Data appears to be zstd-compressed but zstandard not installed"
ImportError: Data appears to be zstd-compressed but the 'zstandard' package is not installed.
Solution:
pip install zstandard
"Failed to read Arrow IPC data"
Causes:
- Corrupted data
- Wrong format specified
- Incomplete data
Solutions:
- Use
format_type=Nonefor auto-detection - Check data integrity
- Try
get_mplog_metadata()to inspect format
Empty DataFrame Returned
Causes:
- No entities in the log
- All features decoded as None
- Schema mismatch
Solutions:
- Check
get_mplog_metadata()to verify entity count - Verify schema matches data version
- Check for decode warnings
Debug Mode
import warnings
import logging
from pyspark.sql import SparkSession
# Enable all warnings
warnings.simplefilter("always")
# Enable debug logging for HTTP requests
logging.basicConfig(level=logging.DEBUG)
# Create Spark session with verbose logging
spark = SparkSession.builder \
.appName("debug") \
.config("spark.driver.extraJavaOptions", "-Dlog4j.logger.org.apache.spark=DEBUG") \
.getOrCreate()
# Inspect before decoding
import inference_logging_client
metadata = inference_logging_client.get_mplog_metadata(data)
print(f"Format: {metadata.format_type}")
print(f"Version: {metadata.version}")
print(f"Entities: {len(metadata.entities)}")
print(f"Model: {metadata.model_proxy_config_id}")
Development
Setup
# Clone repository
git clone https://github.com/Meesho/BharatMLStack.git
cd BharatMLStack/py-sdk/inference_logging_client
# Create virtual environment
python -m venv venv
source venv/bin/activate
# Install in editable mode with dev dependencies
pip install -e ".[dev]"
Running Tests
pytest
# With coverage
pytest --cov=inference_logging_client --cov-report=html
Code Formatting
# Format with black
black inference_logging_client/
# Lint with ruff
ruff check inference_logging_client/
Building Package
python -m build
License
MIT License
Repository
https://github.com/Meesho/BharatMLStack
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file inference_logging_client-0.2.5.tar.gz.
File metadata
- Download URL: inference_logging_client-0.2.5.tar.gz
- Upload date:
- Size: 47.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
931f73376f870d38ebd2b08c8e54909fe98804f243b7a7d41046ce55a2207678
|
|
| MD5 |
3ac86f6303e948bd662baacc2ef61801
|
|
| BLAKE2b-256 |
99d20fa249a2dcbdb92809863a7999d89ae0167d1abe52482f477ead82a86884
|
File details
Details for the file inference_logging_client-0.2.5-py3-none-any.whl.
File metadata
- Download URL: inference_logging_client-0.2.5-py3-none-any.whl
- Upload date:
- Size: 32.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d8aab9992bbacccf1ab61739ab8a78b7212177159fea8b49a12d7b1857fee03e
|
|
| MD5 |
85c4924ac044dded89280891f390c8f0
|
|
| BLAKE2b-256 |
40dddb923c50b7aa6743089534ca6b5ad8c15b7f86fd1f6ad4595012ee38e0ae
|