Skip to main content

High-performance bitemporal timeseries update processor

Project description

PyTemporal Library

A high-performance Rust library with Python bindings for processing bitemporal timeseries data. Optimized for financial services and applications requiring immutable audit trails with both business and system time dimensions.

Features

  • High Performance: ~82,500 rows/second throughput with full bitemporal processing
  • Arrow-Direct Hashing: Zero-deserialization hash computation for improved performance
  • Configurable Hash Algorithms: XxHash (default) or SHA256 for legacy compatibility
  • Optimized Python Wrapper: Batch consolidation reduces conversion overhead to <0.1 seconds
  • Zero-Copy Processing: Apache Arrow columnar data format for efficient memory usage
  • Flexible Date/Time Support: Handles Date32, Date64, and all Timestamp types (Second/Millisecond/Microsecond/Nanosecond)
  • Parallel Processing: Rayon-based parallelization with adaptive thresholds
  • Conflation: Automatic merging of adjacent segments with identical values to reduce storage
  • Full State Mode: Complete state replacement with tombstone records for audit trails
  • Flexible Schema: Dynamic ID and value column configuration
  • Python Integration: High-level DataFrame API with seamless PyO3 bindings
  • Modular Architecture: Clean separation of concerns with dedicated modules
  • Performance Monitoring: Integrated flamegraph generation and GitHub Pages benchmark reports

Installation

Build from source (requires Rust):

git clone <your-repository-url>
cd pytemporal
uv run maturin develop --release

Quick Start

High-Level DataFrame API (Recommended)

import pandas as pd
from pytemporal import BitemporalTimeseriesProcessor

# Initialize processor
processor = BitemporalTimeseriesProcessor(
    id_columns=['id', 'field'],
    value_columns=['mv', 'price']
)

# Current state
current_state = pd.DataFrame({
    'id': [1234, 1234],
    'field': ['test', 'fielda'], 
    'mv': [300, 400],
    'price': [400, 500],
    'effective_from': pd.to_datetime(['2020-01-01', '2020-01-01']),
    'effective_to': pd.to_datetime(['2021-01-01', '2021-01-01']),
    'as_of_from': pd.to_datetime(['2025-01-01', '2025-01-01']),
    'as_of_to': pd.to_datetime(['2262-04-11', '2262-04-11'])
})

# Updates
updates = pd.DataFrame({
    'id': [1234],
    'field': ['test'],
    'mv': [400], 
    'price': [300],
    'effective_from': pd.to_datetime(['2020-06-01']),
    'effective_to': pd.to_datetime(['2020-09-01']),
    'as_of_from': pd.to_datetime(['2025-07-27']),
    'as_of_to': pd.to_datetime(['2262-04-11'])
})

# Process updates (delta mode - incremental updates)
rows_to_expire, rows_to_insert = processor.compute_changes(
    current_state, 
    updates,
    update_mode='delta'
)

print(f"Records to expire: {len(rows_to_expire)}")
print(f"Records to insert: {len(rows_to_insert)}")

# Process updates (full_state mode - complete state replacement)
rows_to_expire, rows_to_insert = processor.compute_changes(
    current_state, 
    updates,
    update_mode='full_state'
)

Low-Level Arrow API

import pandas as pd
from pytemporal import compute_changes, compute_changes_with_hash_algorithm
import pyarrow as pa

# Convert pandas DataFrames to Arrow RecordBatches
current_batch = pa.RecordBatch.from_pandas(current_state)
updates_batch = pa.RecordBatch.from_pandas(updates)

# Direct Arrow processing (XxHash default - fastest)
expire_indices, insert_batches, expired_batches = compute_changes(
    current_batch,
    updates_batch,
    id_columns=['id', 'field'],
    value_columns=['mv', 'price'],
    system_date='2025-07-27',
    update_mode='delta'
)

# Optional: Use SHA256 for legacy compatibility
expire_indices, insert_batches, expired_batches = compute_changes_with_hash_algorithm(
    current_batch,
    updates_batch,
    id_columns=['id', 'field'],
    value_columns=['mv', 'price'],
    system_date='2025-07-27',
    update_mode='delta',
    hash_algorithm='sha256'  # or 'xxhash'
)

Hash Computation API

from pytemporal import add_hash_key_with_algorithm

# Add hash column with different algorithms
batch_xxhash = add_hash_key_with_algorithm(batch, value_columns, 'xxhash')
batch_sha256 = add_hash_key_with_algorithm(batch, value_columns, 'sha256')

print(f"XxHash: {batch_xxhash.column('value_hash')[0]}")  # 16-char hex
print(f"SHA256: {batch_sha256.column('value_hash')[0]}")  # 64-char hex

Algorithm Explanation with Examples

Bitemporal Model

Each record tracks two time dimensions:

  • Effective Time (effective_from, effective_to): When the data is valid in the real world
  • As-Of Time (as_of_from, as_of_to): When the data was known to the system

Both use TimestampMicrosecond precision for maximum accuracy.

Core Algorithm: Timeline Processing

The algorithm processes updates by creating a timeline of events and determining what should be active at each point in time.

Example 1: Simple Overwrite

Current State:

ID=123, effective: [2020-01-01, 2021-01-01], as_of: [2025-01-01, max], mv=100

Update:

ID=123, effective: [2020-06-01, 2020-09-01], as_of: [2025-07-27, max], mv=200

Timeline Processing:

  1. Create Events:

    • 2020-01-01: Current starts (mv=100)
    • 2020-06-01: Update starts (mv=200)
    • 2020-09-01: Update ends
    • 2021-01-01: Current ends
  2. Process Timeline:

    • [2020-01-01, 2020-06-01): Current active → emit mv=100
    • [2020-06-01, 2020-09-01): Update active → emit mv=200
    • [2020-09-01, 2021-01-01): Current active → emit mv=100
  3. Result:

    • Expire: Original record (index 0)
    • Insert: Three new records covering the split timeline

Visual Representation:

Before:
Current |=======mv=100========|
        2020-01-01      2021-01-01

Update       |==mv=200==|
             2020-06-01  2020-09-01

After:
New     |=100=|=mv=200=|=100=|
        2020   2020     2020  2021
        01-01  06-01    09-01 01-01

Example 2: Conflation (Adjacent Identical Values)

Current State:

ID=123, effective: [2020-01-01, 2020-06-01], as_of: [2025-01-01, max], mv=100
ID=123, effective: [2020-06-01, 2021-01-01], as_of: [2025-01-01, max], mv=100  

Update:

ID=123, effective: [2020-03-01, 2020-04-01], as_of: [2025-07-27, max], mv=100

Since the update has the same value (mv=100) as the current state, the algorithm detects this as a no-change scenario and skips processing entirely.

Example 3: Complex Multi-Update

Current State:

ID=123, effective: [2020-01-01, 2021-01-01], as_of: [2025-01-01, max], mv=100

Updates:

ID=123, effective: [2020-03-01, 2020-06-01], as_of: [2025-07-27, max], mv=200
ID=123, effective: [2020-09-01, 2020-12-01], as_of: [2025-07-27, max], mv=300

Timeline Processing:

  1. Events: 2020-01-01 (current start), 2020-03-01 (update1 start), 2020-06-01 (update1 end), 2020-09-01 (update2 start), 2020-12-01 (update2 end), 2021-01-01 (current end)

  2. Result:

    • [2020-01-01, 2020-03-01): mv=100 (current)
    • [2020-03-01, 2020-06-01): mv=200 (update1)
    • [2020-06-01, 2020-09-01): mv=100 (current)
    • [2020-09-01, 2020-12-01): mv=300 (update2)
    • [2020-12-01, 2021-01-01): mv=100 (current)

Post-Processing Conflation

After timeline processing, the algorithm merges adjacent segments with identical value hashes:

Before Conflation:

|--mv=100--|--mv=100--|--mv=200--|--mv=100--|--mv=100--|

After Conflation:

|--------mv=100--------|--mv=200--|--------mv=100--------|

This significantly reduces database row count while preserving temporal accuracy.

Update Modes

The algorithm supports two distinct update modes that determine how updates interact with existing current state:

1. Delta Mode (default)

  • Behavior: Only provided records are treated as updates
  • Existing State: Preserved where not temporally overlapped by updates
  • Use Case: Incremental updates where you only want to modify specific time periods
  • Processing: Uses timeline-based algorithm to handle temporal overlaps

2. Full State Mode

  • Behavior: Provided records represent the complete desired state for each ID group
  • Existing State: Only preserved if identical records exist in the updates (same values and temporal ranges)
  • Value Comparison: Records are compared using SHA256 hashes of value columns
  • Processing Rules:
    • Unchanged Records: If an update has identical values and temporal range as current state, neither expire nor insert
    • Changed Records: If values differ, expire old record and insert new record
    • New Records: Insert records that don't exist in current state
    • Deleted Records: Records not present in updates are expired AND get tombstone records created
  • Tombstone Records: When records are deleted (exist in current state but not in updates):
    • The original record is expired with its original as_of_from
    • A tombstone record is created with the same ID and values but effective_to = system_date
    • This maintains complete audit trail showing exactly when data became inactive

Full State Mode Example

Current State:

ID=1, mv=100, price=250, effective=[2020-01-01, INFINITY]
ID=2, mv=300, price=400, effective=[2020-01-01, INFINITY]  
ID=3, mv=500, price=600, effective=[2020-01-01, INFINITY] 

Updates (Full State):

ID=1, mv=150, price=250, effective=[2020-01-01, 2020-02-01]  # Values changed
ID=2, mv=300, price=400, effective=[2020-01-01, INFINITY]   # Values identical  
# Note: ID=3 is not in updates = deleted

Result:

  • Expire: ID=1 (values changed), ID=3 (deleted)
  • Insert:
    • ID=1 (new values with updated effective_to)
    • ID=3 (tombstone: same values but effective_to=system_date)
  • No Action: ID=2 (identical values, no change needed)

Tombstone Example:

Original:  ID=3, mv=500, price=600, effective=[2020-01-01, INFINITY]
Tombstone: ID=3, mv=500, price=600, effective=[2020-01-01, 2025-08-30]  # Truncated to system_date

Timezone Handling

The library seamlessly handles timezone-aware timestamps from databases like PostgreSQL:

# PostgreSQL timestamptz columns (timezone-aware)
current_state = pd.DataFrame({
    'id': [1],
    'value': ['test'],
    'effective_from': pd.Timestamp("2024-01-01", tz='UTC'),  # timezone-aware
    'effective_to': pd.Timestamp("2099-12-31", tz='UTC'),
    # ... other columns
})

# Client updates (often timezone-naive)  
updates = pd.DataFrame({
    'id': [1],
    'value': ['updated'],
    'effective_from': pd.Timestamp("2024-01-02"),  # timezone-naive
    'effective_to': pd.Timestamp("2099-12-31"),
    # ... other columns
})

# Automatically handles timezone normalization
rows_to_expire, rows_to_insert = processor.compute_changes(current_state, updates)

Key Features:

  • Automatic Schema Normalization: Mixed timezone-aware and timezone-naive timestamps are automatically reconciled
  • PostgreSQL Compatible: Seamless integration with timestamptz columns
  • Timezone Preservation: Original timezone information is preserved throughout the processing pipeline
  • Arrow Schema Compatibility: Ensures consistent timestamp[us, tz=UTC] schemas for Rust processing

Parallelization Strategy

The algorithm uses adaptive parallelization:

  • Serial Processing: Small datasets (<50 ID groups AND <10k records)
  • Parallel Processing: Large datasets using Rayon for CPU-bound operations
  • ID Group Independence: Each ID group processes independently, enabling perfect parallelization

Performance

Current Performance (Optimized)

Benchmarked on modern hardware with Arrow-direct hashing:

Large Scale Performance (Tested Configuration):

  • 800k rows × 80 columns: ~10 seconds total processing
  • Throughput: ~82,500 rows/second
  • Memory Usage: ~10-12GB for full dataset
  • Data Complexity: 70.4 million cell evaluations

Hash Computation Performance:

  • XxHash: 1.13M rows/second (16-character hashes)
  • SHA256: 925K rows/second (64-character hashes)
  • Performance Gain: Significant improvements with Arrow-direct optimization
  • Memory Efficiency: XxHash provides better memory usage than SHA256

Optimization History

Arrow-Direct Hashing Breakthrough:

  • Before: ScalarValue conversion bottleneck (717K rows/sec)
  • After: Direct Arrow array access (1.13M rows/sec)
  • Improvement: Faster hash computation through direct Arrow array processing
  • Key Innovation: Eliminated expensive object allocations during hash computation

Python Wrapper Performance (Optimized):

  • Batch Consolidation: Individual record batches → 10k-row batches
  • Conversion Overhead: <0.1 seconds (was 30+ seconds)
  • Overhead Ratio: Minimal with batch consolidation

Performance Breakdown

Component Time Throughput Memory
Hash Computation (XxHash) 0.7s 1.13M rows/sec Low
Hash Computation (SHA256) 0.9s 925K rows/sec Higher
Full Pipeline (800k rows) ~10s ~82.5K rows/sec ~10-12GB

Hash Algorithm Comparison

Algorithm Speed Hash Length Memory Usage Use Case
XxHash (default) 1.13M rows/sec 16 chars 1.5MB Production workloads
SHA256 (legacy) 925K rows/sec 64 chars 66MB Legacy compatibility

Recommendation: Use XxHash for optimal performance unless cryptographic properties or legacy SHA256 compatibility is required.

Testing

Run the test suites:

# Rust tests
cargo test

# Python tests  
uv run python -m pytest tests/test_bitemporal.py -v

# Benchmarks
cargo bench

Development

Requirements

  • Rust (latest stable)
  • Python 3.8+ with development headers
  • uv for Python dependency management

Python Version Compatibility

This project uses PyO3 v0.21.2 which officially supports Python up to 3.12. If you're using Python 3.13+:

For Rust tests:

export PYO3_USE_ABI3_FORWARD_COMPATIBILITY=1
cargo test

For Python bindings (recommended):

uv run maturin develop  # Uses venv Python 3.12 automatically

Setup

# Install Rust dependencies and run tests
export PYO3_USE_ABI3_FORWARD_COMPATIBILITY=1  # If using Python 3.13+
cargo test

# Build Python extension (development mode)
uv run maturin develop

# Run Python tests  
uv run python -m pytest tests/test_bitemporal_manual.py -v

Permanent Fix for Python 3.13+

Add to your shell profile (~/.bashrc or ~/.zshrc):

export PYO3_USE_ABI3_FORWARD_COMPATIBILITY=1

Project Structure

Modular Architecture (274 lines total in main file, down from 1,085):

  • src/lib.rs - Main processing function and Python bindings (280 lines)
  • src/types.rs - Core data structures and constants (90 lines)
  • src/overlap.rs - Overlap detection and record categorization (70 lines)
  • src/timeline.rs - Timeline event processing algorithm (250 lines)
  • src/conflation.rs - Record conflation and batch consolidation (280 lines)
  • src/batch_utils.rs - Arrow RecordBatch utilities with hash functions (490 lines)
  • tests/integration_tests.rs - Rust integration tests (5 test scenarios)
  • tests/test_bitemporal_manual.py - Python test suite (22 test scenarios)
  • benches/bitemporal_benchmarks.rs - Performance benchmarks
  • CLAUDE.md - Project context and development notes

Key Commands

# Build release version
cargo build --release

# Run benchmarks with HTML reports
cargo bench

# Build Python wheel  
uv run maturin build --release

# Development install
uv run maturin develop

Module Responsibilities

  1. types.rs - Data structures (BitemporalRecord, ChangeSet, UpdateMode) and type conversions
  2. overlap.rs - Determines which records overlap in time and need timeline processing vs direct insertion
  3. timeline.rs - Core algorithm that processes overlapping records through event timeline
  4. conflation.rs - Post-processes results to merge adjacent segments and consolidate batches for optimal performance
  5. batch_utils.rs - Arrow utilities for RecordBatch creation, timestamp handling, and SHA256 hash computation

Dependencies

  • arrow (53.4) - Columnar data processing
  • pyo3 (0.21) - Python bindings
  • chrono (0.4) - Date/time handling
  • sha2 (0.10) - SHA256 hashing for legacy compatibility
  • xxhash-rust (0.8) - Fast XxHash algorithm for optimal performance
  • rayon (1.8) - Parallel processing
  • criterion (0.5) - Benchmarking framework

Architecture

Rust Core

  • Arrow-Direct Processing: Zero-deserialization hash computation for maximum speed
  • Configurable Hashing: XxHash (default) or SHA256 (legacy) algorithms
  • Parallel Execution: Rayon-based parallelization with adaptive thresholds
  • Post-Processing Conflation: Optimal storage through adjacent segment merging
  • Batch Consolidation: Efficient Python conversion with consolidated batches
  • Modular Design: Clean separation of concerns across specialized modules

Python Interface

  • High-level DataFrame API (BitemporalTimeseriesProcessor)
  • Low-level Arrow API (compute_changes)
  • Optimized batch conversion with minimal overhead
  • PyO3 bindings for seamless integration
  • Compatible with pandas DataFrames via efficient conversion

Performance Monitoring

This project includes comprehensive performance monitoring with flamegraph analysis:

📊 Release Performance Reports

View performance metrics and flamegraphs for each release at: Release Benchmarks

Each version tag automatically generates comprehensive performance documentation with flamegraphs, creating a historical record of performance evolution across releases.

🔥 Generating Flamegraphs Locally

# Generate flamegraphs for key benchmarks
cargo bench --bench bitemporal_benchmarks medium_dataset -- --profile-time 5
cargo bench --bench bitemporal_benchmarks conflation_effectiveness -- --profile-time 5 
cargo bench --bench bitemporal_benchmarks "scaling_by_dataset_size/records/500000" -- --profile-time 5

# Add flamegraph links to HTML reports  
python3 scripts/add_flamegraphs_to_html.py

# View reports locally
python3 -m http.server 8000 --directory target/criterion
# Then visit: http://localhost:8000/report/

📈 Performance Expectations

Dataset Size Processing Time Flamegraph Available
Small (5 records) ~30-35 µs
Medium (100 records) ~165-170 µs
Large (500k records) ~900-950 ms
Conflation test ~28 µs

🎯 Key Optimization Areas (from Flamegraph Analysis)

  • process_id_timeline: Core algorithm logic
  • Rayon parallelization: Thread management overhead
  • Arrow operations: Columnar data processing
  • SHA256 hashing: Value fingerprinting for conflation

See docs/benchmark-publishing.md for complete setup details.

Contributing

  1. Check CLAUDE.md for project context and conventions
  2. Run tests before submitting changes
  3. Follow existing code style and patterns
  4. Update benchmarks for performance-related changes
  5. Use flamegraphs to validate performance improvements
  6. Maintain modular architecture when adding features

License

MIT License - see LICENSE file for details.

Built With

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

pytemporal-1.3.12-cp312-cp312-manylinux_2_34_x86_64.whl (2.5 MB view details)

Uploaded CPython 3.12manylinux: glibc 2.34+ x86-64

pytemporal-1.3.12-cp311-cp311-manylinux_2_34_x86_64.whl (2.5 MB view details)

Uploaded CPython 3.11manylinux: glibc 2.34+ x86-64

pytemporal-1.3.12-cp310-cp310-manylinux_2_34_x86_64.whl (2.5 MB view details)

Uploaded CPython 3.10manylinux: glibc 2.34+ x86-64

pytemporal-1.3.12-cp39-cp39-manylinux_2_34_x86_64.whl (2.5 MB view details)

Uploaded CPython 3.9manylinux: glibc 2.34+ x86-64

File details

Details for the file pytemporal-1.3.12-cp312-cp312-manylinux_2_34_x86_64.whl.

File metadata

File hashes

Hashes for pytemporal-1.3.12-cp312-cp312-manylinux_2_34_x86_64.whl
Algorithm Hash digest
SHA256 101ac4f1fd23b35397f0aca3d484bed32aad4bafc29b3bb36ebfcddeb917a425
MD5 fadebf6227cda332d1d5516a4935d972
BLAKE2b-256 aa124823fc26df4a709b1146131ebbf819112253c9f41b8664661a4ca04ecb31

See more details on using hashes here.

Provenance

The following attestation bundles were made for pytemporal-1.3.12-cp312-cp312-manylinux_2_34_x86_64.whl:

Publisher: build-wheels.yml on gingermike/pytemporal

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pytemporal-1.3.12-cp311-cp311-manylinux_2_34_x86_64.whl.

File metadata

File hashes

Hashes for pytemporal-1.3.12-cp311-cp311-manylinux_2_34_x86_64.whl
Algorithm Hash digest
SHA256 f51488544c8b5e811a80bc368d8f0662d33105c0ddf37ef61367d776bed4bd85
MD5 146742fd83f611ee65dd935d24702f3f
BLAKE2b-256 6bafee6c9e4cd34b6ef9ca1c9a8bc806c2c10d32d3934f6bbcd37dd05f67dceb

See more details on using hashes here.

Provenance

The following attestation bundles were made for pytemporal-1.3.12-cp311-cp311-manylinux_2_34_x86_64.whl:

Publisher: build-wheels.yml on gingermike/pytemporal

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pytemporal-1.3.12-cp310-cp310-manylinux_2_34_x86_64.whl.

File metadata

File hashes

Hashes for pytemporal-1.3.12-cp310-cp310-manylinux_2_34_x86_64.whl
Algorithm Hash digest
SHA256 ffa31188e983f5e244171afb6d2d60f787b33acbf263b075a7680741b9b8a7b5
MD5 220e1959364ace3cd7e6086264ffa22a
BLAKE2b-256 e7b2a4c4aebbbb60bbda04662706761b6e02599eeace5b9d895a6c418b3a8256

See more details on using hashes here.

Provenance

The following attestation bundles were made for pytemporal-1.3.12-cp310-cp310-manylinux_2_34_x86_64.whl:

Publisher: build-wheels.yml on gingermike/pytemporal

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pytemporal-1.3.12-cp39-cp39-manylinux_2_34_x86_64.whl.

File metadata

File hashes

Hashes for pytemporal-1.3.12-cp39-cp39-manylinux_2_34_x86_64.whl
Algorithm Hash digest
SHA256 1187b742b9da2432bb6916fd1dbfc61b134eb3d912fab7e070f2f25b3023584c
MD5 a9cc3c44cbf1be131cbfb7f0f74938f5
BLAKE2b-256 e42be0544512736b3743aefff7106b3c6cd5810a6dad4e42af759826ab8496ea

See more details on using hashes here.

Provenance

The following attestation bundles were made for pytemporal-1.3.12-cp39-cp39-manylinux_2_34_x86_64.whl:

Publisher: build-wheels.yml on gingermike/pytemporal

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page