Skip to main content

Safe concurrent data operations for ML/AI workloads, Python implementation of Apache Iceberg concepts

Project description

				  //
//
//
//
//
//
//
//
//
//
//
//
//___________________
// /
//___________________/

D A T A S H A R D


===============================================================================
DataShard - Iceberg-Inspired Safe Concurrent Data Operations for Python
===============================================================================

What is DataShard?
==================

DataShard is a Python implementation of Apache Iceberg's core concepts, providing
ACID-compliant data operations for ML/AI workloads. It ensures your data stays
safe even when multiple processes read and write simultaneously.

Key Features:
- ✅ ACID Transactions: Operations fully complete or fully rollback
- ✅ Time Travel: Query data as it existed at any point in time
- ✅ Safe Concurrency: Multiple processes can write without corruption
- ✅ Optimistic Concurrency Control (OCC): Automatic conflict resolution
- ✅ Pure Python: No Java dependencies, easy setup
- ✅ pandas Integration: Native DataFrame support


Why DataShard Matters: The Data Corruption Problem
===================================================

**The Problem:** Regular file operations in Python are NOT safe for concurrent access.

When multiple processes write to the same file, you get:
❌ Data loss from race conditions
❌ Partial writes that corrupt your data
❌ Silent failures that go unnoticed
❌ Unpredictable results

**Our Stress Test Results:**

Test Setup: 12 processes, each performing 10,000 operations
Expected Result: 120,000 total operations

Regular Files:
- Actual Result: Only 8,566 operations completed
- Data Loss: 111,434 operations LOST (93% failure rate!)
- Cause: Race conditions from simultaneous file access

DataShard (Iceberg):
- Actual Result: 120,000 operations completed
- Data Loss: ZERO (0% failure rate)
- Cause: ACID transactions with OCC prevent all race conditions


Installation
============

Basic installation:
pip install datashard

With pandas support (recommended):
pip install datashard[pandas]

With development tools:
pip install datashard[dev]


Quick Start
===========

from datashard import create_table, Schema

# 1. Define your schema
schema = Schema(
schema_id=1,
fields=[
{"id": 1, "name": "user_id", "type": "long", "required": True},
{"id": 2, "name": "event", "type": "string", "required": True},
{"id": 3, "name": "timestamp", "type": "long", "required": True}
]
)

# 2. Create a table
table = create_table("/path/to/my_table", schema)

# 3. Append records safely (even from multiple processes!)
records = [
{"user_id": 100, "event": "login", "timestamp": 1700000000},
{"user_id": 101, "event": "logout", "timestamp": 1700000100}
]
success = table.append_records(records, schema)

# 4. Query current data
snapshot = table.current_snapshot()
print(f"Snapshot ID: {snapshot.snapshot_id}")

# 5. Time travel to previous state
old_snapshot = table.snapshot_by_id(previous_snapshot_id)


Real-World Use Case: Workflow Execution Logging
================================================

**Problem:** A distributed workflow engine runs 100s of tasks across multiple
workers. Each task needs to log its execution details (status, duration, errors)
to a central location. Traditional approaches fail:

❌ Database: Too slow, adds latency to task execution
❌ Log Files: Corrupted by concurrent writes, not queryable
❌ JSON Files: Race conditions cause data loss

**Solution: DataShard as a Queryable Audit Log**

DataShard provides ACID-compliant logging with pandas-queryable tables:

```python
from datashard import create_table, Schema
import time

# Define task log schema
task_log_schema = Schema(
schema_id=1,
fields=[
{"id": 1, "name": "task_id", "type": "string", "required": True},
{"id": 2, "name": "workflow_id", "type": "string", "required": True},
{"id": 3, "name": "status", "type": "string", "required": True},
{"id": 4, "name": "started_at", "type": "long", "required": True},
{"id": 5, "name": "completed_at", "type": "long", "required": True},
{"id": 6, "name": "duration_ms", "type": "long", "required": True},
{"id": 7, "name": "error_message", "type": "string", "required": False},
{"id": 8, "name": "worker_id", "type": "string", "required": True}
]
)

# Create task logs table (shared across all workers)
task_logs = create_table("/shared/storage/task_logs", task_log_schema)

# Worker 1: Log successful task execution
task_logs.append_records([{
"task_id": "task-001",
"workflow_id": "wf-123",
"status": "success",
"started_at": 1700000000000,
"completed_at": 1700000150000,
"duration_ms": 150000,
"error_message": "",
"worker_id": "worker-1"
}], task_log_schema)

# Worker 2: Log failed task (concurrent with Worker 1)
task_logs.append_records([{
"task_id": "task-002",
"workflow_id": "wf-123",
"status": "failed",
"started_at": 1700000000000,
"completed_at": 1700000200000,
"duration_ms": 200000,
"error_message": "Connection timeout",
"worker_id": "worker-2"
}], task_log_schema)

# Query logs with pandas (from any machine)
import pandas as pd
from datashard import load_table
from datashard.file_manager import FileManager

table = load_table("/shared/storage/task_logs")
snapshot = table.current_snapshot()
file_manager = FileManager(table.table_path, table.metadata_manager)

# Read manifest list
manifest_list_path = os.path.join(table.table_path, snapshot.manifest_list)
manifests = file_manager.read_manifest_list_file(manifest_list_path)

# Read all data files
data_frames = []
for manifest_file in manifests:
manifest_path = os.path.join(table.table_path, manifest_file.manifest_path)
data_files = file_manager.read_manifest_file(manifest_path)

for data_file in data_files:
parquet_path = os.path.join(table.table_path, data_file.file_path.lstrip('/'))
df = pd.read_parquet(parquet_path)
data_frames.append(df)

logs_df = pd.concat(data_frames, ignore_index=True)

# Analyze logs
failed_tasks = logs_df[logs_df['status'] == 'failed']
avg_duration = logs_df['duration_ms'].mean()
print(f"Failed tasks: {len(failed_tasks)}")
print(f"Average duration: {avg_duration}ms")
```

**Why DataShard Wins for Logging:**

✅ **Safe Concurrent Writes:** 100 workers can log simultaneously without corruption
✅ **Queryable with pandas:** Analyze logs with familiar DataFrame operations
✅ **Time Travel:** Debug by querying logs as they existed at incident time
✅ **ACID Guarantees:** Never lose log entries, even during crashes
✅ **Scalable:** Handles millions of log entries efficiently
✅ **No Database Needed:** File-based storage, no DB maintenance overhead


Other Real-World Use Cases
===========================

1. **ML Training Metrics:**
- Multiple training runs logging metrics simultaneously
- Query metrics history for model comparison
- Time travel to analyze model performance at specific epochs

2. **A/B Test Results:**
- Concurrent experiments writing results safely
- Aggregate results across all test variants
- Historical analysis of test performance

3. **Data Pipeline Checkpointing:**
- Multiple pipeline stages writing progress markers
- Safe recovery from failures using checkpoints
- Query pipeline state for monitoring

4. **Feature Store:**
- Multiple processes computing and storing features
- Time travel for point-in-time feature retrieval
- Consistent feature snapshots for reproducibility


API Reference
=============

Creating and Loading Tables
----------------------------

from datashard import create_table, load_table, Schema

# Create new table
schema = Schema(schema_id=1, fields=[...])
table = create_table("/path/to/table", schema)

# Load existing table
table = load_table("/path/to/table")


Writing Data
------------

# Append records
records = [{"id": 1, "value": "data"}]
success = table.append_records(records, schema)

# Append with transactions (for complex operations)
with table.new_transaction() as tx:
tx.append_data(records, schema)
tx.commit()


Reading Data
------------

# Get current snapshot
snapshot = table.current_snapshot()

# Get specific snapshot
snapshot = table.snapshot_by_id(snapshot_id)

# List all snapshots
snapshots = table.snapshots()

# Time travel
snapshot = table.time_travel(snapshot_id=12345)
snapshot = table.time_travel(timestamp=1700000000000)


Schema Definition
-----------------

from datashard import Schema

schema = Schema(
schema_id=1, # Required: unique schema version ID
fields=[ # Required: list of field definitions
{
"id": 1, # Field ID (unique within schema)
"name": "user_id", # Field name
"type": "long", # Data type: long, int, string, double, boolean
"required": True # Whether field is required
},
# ... more fields
]
)


How DataShard Compares to Apache Iceberg
=========================================

Apache Iceberg:
- Java-based, requires JVM
- Built for big data platforms (Spark, Flink, Hive)
- Excellent performance for petabyte-scale data
- Complex setup, requires distributed infrastructure
- Production-grade for enterprise data lakes

DataShard:
- Pure Python, no JVM required
- Built for individual data scientists and ML engineers
- Optimized for personal projects and small teams
- Simple pip install, file-based storage
- Production-ready for Python-centric workflows

**Use Apache Iceberg if:** You're working with petabytes of data in a big data
platform with Spark/Flink/Hive infrastructure.

**Use DataShard if:** You're a Python developer who needs safe concurrent data
operations without Java dependencies or complex setup.


Technical Details
=================

DataShard implements:
- Optimistic Concurrency Control (OCC) with automatic retry
- Snapshot isolation for consistent reads
- Manifest-based metadata tracking (Iceberg's approach)
- Parquet format for efficient columnar storage
- ACID transaction semantics


Performance Characteristics
===========================

- Writes: O(1) for append operations (no data rewriting)
- Reads: O(n) where n = number of data files in snapshot
- Concurrency: Scales linearly with number of processes
- Storage: Efficient columnar compression with Parquet
- Metadata: Lightweight JSON-based manifest tracking


Limitations
===========

DataShard is optimized for:
✅ Append-heavy workloads (logging, metrics, events)
✅ Hundreds of concurrent writers
✅ Datasets up to hundreds of millions of records
✅ Local or network file systems

DataShard is NOT optimized for:
❌ High-frequency updates/deletes (use a database)
❌ Complex queries (use a query engine like DuckDB)
❌ Petabyte-scale data (use Apache Iceberg with Spark)
❌ Distributed query processing (use Presto/Trino)


Bug Fix: v0.1.3
================

**Fixed:** Manifest list creation bug where empty manifest arrays were written
during append operations, causing queries to return no data despite
parquet files being written correctly.

**Impact:** Queries using the manifest API now return correct data. If you're
upgrading from v0.1.2, existing tables will continue to work but
won't benefit from the fix until new snapshots are created.

**Details:** Transaction.commit() now correctly extracts data files from queued
operations and passes them to _create_manifest_list_with_id().


Development and Testing
=======================

Run tests:
pytest tests/ -v

Run specific test:
pytest tests/test_manifest_creation.py -v

With coverage:
pytest tests/ --cov=datashard --cov-report=html


Why I Built DataShard
======================

DataShard was born from my work on the Highway Workflow Engine, a strictly
atomic workflow system. I needed to record massive amounts of execution metadata
from hundreds of concurrent workers without risking data corruption.

I've had painful experiences with Java-based solutions in the past. Apache
Iceberg conceptually fit my needs perfectly, but I wanted a pure Python solution
that was simple to deploy and maintain.

Building a production-grade concurrent storage system seemed daunting, but we're
in 2025 now, and modern AI tools (specifically Gemini) made it possible to
implement complex Optimistic Concurrency Control logic in pure Python.

The result is DataShard: a battle-tested, production-ready solution for safe
concurrent data operations that's helped Highway log millions of workflow
executions without a single data corruption issue.

If you're building distributed systems in Python and need safe concurrent data
operations, DataShard can help.

Enjoy using DataShard!
Farshid.


License
=======

Copyright (c) RODMENA LIMITED
Licensed under Apache License 2.0

For full license text, see LICENSE file in the repository.


Links
=====

Homepage: https://github.com/rodmena-limited/datashard
Documentation: https://datashard.readthedocs.io/
Issues: https://github.com/rodmena-limited/datashard/issues

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

datashard-0.2.1.tar.gz (39.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

datashard-0.2.1-py3-none-any.whl (29.9 kB view details)

Uploaded Python 3

File details

Details for the file datashard-0.2.1.tar.gz.

File metadata

  • Download URL: datashard-0.2.1.tar.gz
  • Upload date:
  • Size: 39.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.3

File hashes

Hashes for datashard-0.2.1.tar.gz
Algorithm Hash digest
SHA256 391f77b110aa5c362da0a8328e28717310b388a5b170b1a9f1f6baca376c75a6
MD5 1ff6ee26c7b1f004cb29b63298db5a22
BLAKE2b-256 4440dc2611c68d9e3ce105629c25029df9420700eeb1af2406f993888e545359

See more details on using hashes here.

File details

Details for the file datashard-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: datashard-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 29.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.3

File hashes

Hashes for datashard-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 7e19c8158c1c920266fa7547e3ba588377b73850119e8637670c60899c28f6fa
MD5 7f8d8a69ef2ad44a66d4c32724741c49
BLAKE2b-256 9c7df697d973106e02ef4323b70a1aa8796b9b0ebce41b280fa357031542c3e1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page