Skip to main content

An ETL/ELT transformation library for converting nested JSON structures into flat, tabular formats

Project description

Transmog

PyPI version Python versions License

A Python library for transforming complex nested JSON data into flat, structured formats while preserving parent-child relationships.

Features

  • Multiple Input Formats: Process JSON, JSONL (line-delimited JSON), and CSV files
  • Flattening: Flatten deeply nested structures with customizable delimiter options
  • Output Flexibility:
    • Native formats: Python dictionaries, JSON objects, PyArrow Tables
    • Bytes output: Serialize directly to Parquet, CSV, or JSON bytes
    • File export: Write to various file formats (JSON, CSV, Parquet)
  • Performance Optimization:
    • Process large datasets with configurable memory management
    • Single-pass or chunked processing depending on data size
    • Stream data from files efficiently
  • Metadata Generation: Track data lineage with automatic ID generation and parent-child relationships
  • Error Recovery: Recover from malformed data with customizable strategies
  • Consistent IDs: Deterministic ID generation for data consistency across processing runs

Installation

pip install transmog

For minimal installation without optional dependencies:

pip install transmog[minimal]

For development installation:

pip install transmog[dev]

See the installation guide for more details.

Quick Example

import transmog as tm

# Sample nested data
data = {
    "user": {
        "id": 1,
        "name": "John Doe",
        "contact": {
            "email": "john@example.com"
        },
        "orders": [
            {"id": 101, "amount": 99.99},
            {"id": 102, "amount": 45.50}
        ]
    }
}

# Process the data with default configuration
processor = tm.Processor()
result = processor.process(data)

# Native data structure output
tables = result.to_dict()                # Get all tables as Python dictionaries
pa_tables = result.to_pyarrow_tables()   # Get as PyArrow Tables

# Access the data in memory
main_table = tables["main"]              # Main table as Python dict
orders = tables["user_orders"]           # Child table as Python dict

# Bytes output for direct writing
json_bytes = result.to_json_bytes(indent=2)  # Get all tables as JSON bytes
csv_bytes = result.to_csv_bytes()        # Get all tables as CSV bytes
parquet_bytes = result.to_parquet_bytes()    # Get all tables as Parquet bytes

# Direct write to files
with open("main_table.json", "wb") as f:
    f.write(json_bytes["main"])

# Or use PyArrow tables directly
pa_table = pa_tables["main"]       # Work with PyArrow Table directly
print(f"Table has {pa_table.num_rows} rows and {pa_table.num_columns} columns")

# File output (still supported)
result.write_all_json("output_dir/json")
result.write_all_csv("output_dir/csv")
result.write_all_parquet("output_dir/parquet")

Configuration

Transmog provides a flexible configuration system through the TransmogConfig class:

import transmog as tm

# Use pre-configured modes
config = tm.TransmogConfig.memory_optimized()  # For large datasets
# or
config = tm.TransmogConfig.performance_optimized()  # For speed-critical processing

# Create custom configuration
config = (
    tm.TransmogConfig.default()
    .with_naming(
        separator=".",
        abbreviate_table_names=False
    )
    .with_processing(
        batch_size=5000,
        cast_to_string=True
    )
    .with_metadata(
        id_field="custom_id"
    )
    .with_error_handling(
        max_retries=3
    )
)

# Use the configuration
processor = tm.Processor(config=config)

See the configuration guide for more details.

Error Recovery Strategies

Transmog provides several recovery strategies for handling problematic data:

# Default strict recovery (fails on any error)
processor = tm.Processor.default()

# Skip and log recovery (skips problematic records)
processor = tm.Processor().with_error_handling(recovery_strategy="skip")

# Partial recovery (preserves valid portions of problematic records)
processor = tm.Processor.with_partial_recovery()

# Process data that may contain errors
result = processor.process(problematic_data, entity_name="records")

The partial recovery strategy is particularly valuable when working with:

  • Data migration from legacy systems
  • Processing API responses with inconsistent structures
  • Recovering data from malformed files

See the error handling guide for more information.

Cache Configuration

Transmog provides configurable value processing cache for performance optimization:

import transmog as tm

# Default configuration
processor = tm.Processor()

# Disable caching completely
processor = tm.Processor(
    tm.TransmogConfig.default().with_caching(enabled=False)
)

# Configure cache size
processor = tm.Processor(
    tm.TransmogConfig.default().with_caching(maxsize=50000)
)

# Clear cache after batch processing
processor = tm.Processor(
    tm.TransmogConfig.default().with_caching(clear_after_batch=True)
)

# Manually clear cache
processor.clear_cache()

The cache system improves performance when processing datasets with repeated values while providing memory usage control. See the cache configuration guide for more details.

Processing Large Datasets

For large datasets, use memory-optimized configuration:

# Memory-optimized processor
config = tm.TransmogConfig.memory_optimized()
processor = tm.Processor(config=config)

# Process a large file in chunks
result = processor.process_chunked(
    "large_data.jsonl",
    entity_name="records",
    chunk_size=1000  # Process 1000 records at a time
)

Performance Benchmarking

Transmog provides tools to benchmark performance and identify optimization opportunities:

# Command-line benchmarking script
python scripts/run_benchmarks.py --records 5000 --complexity complex
python scripts/run_benchmarks.py --mode streaming
python scripts/run_benchmarks.py --strategy memory

# Pytest benchmarks
pytest tests/benchmarks/

The benchmarking tools help evaluate:

  • Overall performance with different configurations
  • Performance impact of different processing modes and strategies
  • Memory usage characteristics
  • Component-level performance metrics

For more details, see the Benchmarking Guide.

Deterministic ID Generation

Configure deterministic IDs using the new configuration system:

# Configure deterministic IDs based on specific fields
config = tm.TransmogConfig.with_deterministic_ids({
    "": "id",                     # Root level uses "id" field
    "user_orders": "id"           # Order records use "id" field
})

# Or use a custom ID generation strategy
def custom_id_strategy(record):
    return f"CUSTOM-{record['id']}"

config = tm.TransmogConfig.with_custom_id_generation(custom_id_strategy)

# Use the configuration
processor = tm.Processor(config=config)
result = processor.process(data)

See the deterministic IDs guide for more information.

Output Format Options

Transmog provides three main categories of output formats:

  1. Native Data Structures - Python objects like dictionaries and PyArrow Tables

    result.to_dict()              # Python dictionaries
    result.to_json_objects()      # JSON-serializable Python objects
    result.to_pyarrow_tables()    # PyArrow Tables
    
  2. Bytes Serialization - Raw bytes in JSON, CSV, or Parquet format

    result.to_json_bytes()        # JSON bytes
    result.to_csv_bytes()         # CSV bytes
    result.to_parquet_bytes()     # Parquet bytes
    
  3. File Output - Direct writing to files in different formats

    result.write_all_json()       # Write to JSON files
    result.write_all_csv()        # Write to CSV files
    result.write_all_parquet()    # Write to Parquet files
    result.stream_to_parquet()    # Stream to Parquet files with optimal memory usage
    
  4. Streaming Output - Direct streaming for memory-efficient processing

    # Process and stream data directly to Parquet files
    from transmog.io import create_streaming_writer
    
    writer = create_streaming_writer(
        "parquet",
        destination="output_dir",
        compression="snappy",
        row_group_size=10000
    )
    
    with writer:
        # Process and write data in batches
        for batch in data_batches:
            batch_result = processor.process_batch(batch)
            writer.write_main_records(batch_result.get_main_table())
    
            for table_name in batch_result.get_table_names():
                writer.initialize_child_table(table_name)
                writer.write_child_records(table_name, batch_result.get_child_table(table_name))
    

Documentation

Use Cases

  • Data ETL pipelines
  • API response processing
  • JSON/CSV conversion
  • Preparing nested data for tabular analysis
  • Data normalization and standardization
  • Integration with data processing frameworks
  • In-memory data transformation
  • Cloud-based serverless processing
  • Incremental data processing with consistent IDs

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

Please make sure to update tests as appropriate.

License

Distributed under the MIT License. See LICENSE for more information.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

transmog-1.0.0.tar.gz (93.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

transmog-1.0.0-py3-none-any.whl (110.2 kB view details)

Uploaded Python 3

File details

Details for the file transmog-1.0.0.tar.gz.

File metadata

  • Download URL: transmog-1.0.0.tar.gz
  • Upload date:
  • Size: 93.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.3

File hashes

Hashes for transmog-1.0.0.tar.gz
Algorithm Hash digest
SHA256 b52714c9d982a901276cd8d46552a66e41ea0472fca18aa84c1e953c91e6fbae
MD5 623cf10eb58a14119023aff2debebc13
BLAKE2b-256 33887fd9008f28fc738602acc863d48e81d9981c74acbe838557856188135020

See more details on using hashes here.

File details

Details for the file transmog-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: transmog-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 110.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.3

File hashes

Hashes for transmog-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d8fdb9919db0b14b4ee9e86b57ffdf68e0653f74df3929470fea8a3ce016eac7
MD5 2ca4cfeb9373128de9e976f58e9d1773
BLAKE2b-256 b0a819a950984190f830b9e74d8be405892b3b08ba1394e67d1c7853aed1ed9f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page