Safe atomic file writer for Pandas, Polars, NumPy, and other data objects
Project description
📖 Overview
Atio is a Python library that prevents data loss and ensures safe file writing. Through atomic writing, it protects existing data even when errors occur during file writing, and supports various data formats and database connections.
✨ Key Features
- 🔒 Atomic File Writing: Safe writing using temporary files
- 📊 Multiple Format Support: CSV, Parquet, Excel, JSON, etc.
- 🗄️ Database Support: Direct SQL and Database writing
- 📈 Progress Display: Progress monitoring for large data processing
- 🔄 Rollback Function: Automatic recovery when errors occur
- 🎯 Simple API: Intuitive and easy-to-use interface
- 📋 Version Management: Snapshot-based data version management
- 🧹 Auto Cleanup: Automatic deletion of old data
🚀 Installation
pip install atio
📚 Usage
atio.write() - Basic File/Database Writing
Purpose: Save data to a single file or database
Key Parameters:
obj: Data to save (pandas.DataFrame, polars.DataFrame, numpy.ndarray)target_path: File save path (required for file writing)format: Save format ('csv', 'parquet', 'excel', 'json', 'sql', 'database')show_progress: Whether to display progressverbose: Whether to output detailed performance information
Basic File Writing
import atio
import pandas as pd
df = pd.DataFrame({
"name": ["Alice", "Bob", "Charlie"],
"age": [25, 30, 35],
"city": ["Seoul", "Busan", "Incheon"]
})
# Save in various formats
atio.write(df, "users.parquet", format="parquet")
atio.write(df, "users.csv", format="csv", index=False)
atio.write(df, "users.xlsx", format="excel", sheet_name="Users")
Database Writing
import atio
import pandas as pd
from sqlalchemy import create_engine
df = pd.DataFrame({
"product_id": [101, 102, 103],
"product_name": ["Laptop", "Mouse", "Keyboard"],
"price": [1200, 25, 75]
})
# Save to SQL database
engine = create_engine('postgresql://user:password@localhost/dbname')
atio.write(df, format="sql", name="products", con=engine, if_exists="replace")
Advanced Features (Progress, Performance Monitoring)
# Save with progress display
atio.write(large_df, "big_data.parquet", format="parquet", show_progress=True)
# Output detailed performance information
atio.write(df, "data.parquet", format="parquet", verbose=True)
# Use Polars DataFrame
import polars as pl
polars_df = pl.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
atio.write(polars_df, "data.parquet", format="parquet")
atio.write_snapshot() - Version-Managed Table Storage
Purpose: Save data in table format with version management
Key Parameters:
obj: Data to savetable_path: Table save pathmode: Save mode ('overwrite', 'append')format: Save format
Version Management Usage
# Save with version management in table format
atio.write_snapshot(df, "my_table", mode="overwrite", format="parquet")
# Add to existing data (append mode)
new_data = pd.DataFrame({"name": ["David"], "age": [40], "city": ["Daejeon"]})
atio.write_snapshot(new_data, "my_table", mode="append", format="parquet")
atio.read_table() - Table Data Reading
Purpose: Read data from table
Key Parameters:
table_path: Table pathversion: Version number to read (None for latest)output_as: Output format ('pandas', 'polars')
Table Reading Usage
# Read latest data
latest_data = atio.read_table("my_table", output_as="pandas")
# Read specific version
version_1_data = atio.read_table("my_table", version=1, output_as="pandas")
# Read in Polars format
polars_data = atio.read_table("my_table", output_as="polars")
atio.expire_snapshots() - Old Data Cleanup
Purpose: Clean up old snapshots and orphaned files
Key Parameters:
table_path: Table pathkeep_for: Retention perioddry_run: Whether to actually delete (True for preview only)
Data Cleanup Usage
from datetime import timedelta
# Clean up old data (preview)
atio.expire_snapshots("my_table", keep_for=timedelta(days=7), dry_run=True)
# Execute actual deletion
atio.expire_snapshots("my_table", keep_for=timedelta(days=7), dry_run=False)
📊 Supported Formats
| Format | Description | Required Parameters | Example |
|---|---|---|---|
csv |
CSV file | target_path |
atio.write(df, "data.csv", format="csv") |
parquet |
Parquet file | target_path |
atio.write(df, "data.parquet", format="parquet") |
excel |
Excel file | target_path |
atio.write(df, "data.xlsx", format="excel") |
json |
JSON file | target_path |
atio.write(df, "data.json", format="json") |
sql |
SQL database | name, con |
atio.write(df, format="sql", name="table", con=engine) |
database |
Database (Polars) | table_name, connection_uri |
atio.write(df, format="database", table_name="table", connection_uri="...") |
🎯 Real-World Usage Scenarios
Scenario 1: Large CSV File Writing Interruption
Problem: A user was saving large analysis results to a .csv file using Pandas when an unexpected power outage or kernel force termination occurred. The result file was corrupted with only 3MB saved out of 50MB, and could not be read afterward.
Atio Solution: atio.write() first writes to a temporary file, then only replaces the original after all writing is successful. Therefore, even if interrupted, the existing file is preserved and corrupted temporary files are automatically cleaned up, ensuring stability.
Scenario 2: File Conflicts in Multiprocessing Environment
Problem: In a Python multiprocessing-based data collection pipeline, multiple processes were simultaneously saving to the same file, causing conflicts. As a result, log files were overwritten and lost, or some JSON files were saved in corrupted, unparseable forms.
Atio Solution: Using atio.write()'s atomic replacement method for file writing ensures that only one process can move to the final path at a time. This guarantees conflict-free, collision-free saving without race conditions.
Scenario 3: Data Pipeline Validation Issues
Problem: In ETL operations, the automated system could not determine whether .parquet saving was completed, so corrupted or incomplete data was used in the next stage. This resulted in missing values in model training data, causing quality degradation.
Atio Solution: Using atio.write_snapshot() creates a _SUCCESS flag file only when saving is successfully completed. Subsequent stages can safely run the pipeline based on the presence or absence of _SUCCESS.
Scenario 4: Lack of Data Version Management
Problem: As datasets for machine learning model training were updated multiple times, it became impossible to track which version of data was used to train which model. Experimental result reproducibility decreased and model performance comparison became difficult.
Atio Solution: Using atio.write_snapshot() and atio.read_table() allows automatic management of data versions. Snapshots are created for each version, allowing you to return to data from any specific point in time, ensuring experimental reproducibility.
Scenario 5: System Interruption Due to Disk Space Shortage
Problem: During large data processing, the system was interrupted due to insufficient disk space. Incomplete files from processing remained, continuing to occupy disk space and requiring manual cleanup.
Atio Solution: Using atio.expire_snapshots() allows automatic cleanup of snapshots and orphaned files older than the set retention period. You can preview files to be deleted with dry_run=True option, then safely perform cleanup operations.
Scenario 6: Network Error During Database Storage
Problem: While saving analysis results to a PostgreSQL database, the network connection was interrupted, stopping the save operation. Partially saved tables remained in the database, breaking data integrity.
Atio Solution: atio.write()'s database storage feature uses transactions to ensure all data is either successfully saved or not saved at all. When network errors occur, automatic rollback maintains data integrity.
Scenario 7: Complexity in Experimental Data Management
Problem: A research team was conducting multiple experiments simultaneously, causing experimental data to mix and making it difficult to track which data was used for which experiment. Experimental result reliability decreased and reproduction became impossible.
Atio Solution: Using atio.write_snapshot() creates independent tables for each experiment, and atio.read_table() can read the exact data for specific experiments. Automated version management and metadata tracking for each experiment ensures research reproducibility and reliability.
Scenario 8: Data Loss During Cloud Streaming
Problem: While processing real-time data collected from IoT sensors, system restart or network errors occurred. Data being processed was lost, breaking the continuity of important sensor data.
Atio Solution: Using atio.write_snapshot() buffers real-time data and saves it atomically at regular intervals. After system restart, data collection can resume from the last save point, ensuring data continuity.
Scenario 9: Memory Shortage During Large Data Processing
Problem: While processing DataFrames larger than 10GB, the process was force-terminated due to memory shortage. All intermediate results being processed were lost, requiring restart from the beginning.
Atio Solution: Using atio.write()'s show_progress=True option along with chunk-based data processing controls memory usage. Each chunk is processed after the previous one is successfully saved, so even if it fails in the middle, already saved data is preserved.
Scenario 10: Conflicts with Backup Systems
Problem: While trying to save a large file during automatic backup system execution, the backup software attempted to backup a file being written, causing file corruption. The backup file was also saved in an incomplete state.
Atio Solution: Using atio.write()'s atomic replacement method for file saving ensures that backup systems only see complete files when reading. Temporary files are excluded from backup targets, enabling conflict-free, safe backups.
🔍 Performance Monitoring
# Output detailed performance information
atio.write(df, "data.parquet", format="parquet", verbose=True)
Output example:
[INFO] Temporary directory created: /tmp/tmp12345
[INFO] Temporary file path: /tmp/tmp12345/data.parquet
[INFO] Writer to use: to_parquet (format: parquet)
[INFO] ✅ File writing completed (total time: 0.1234s)
📦 Dependencies
Required Dependencies
- Python 3.7+
- pandas
- numpy
Optional Dependencies
pyarroworfastparquet: Parquet format supportopenpyxlorxlsxwriter: Excel format supportsqlalchemy: SQL database supportpolars: Polars DataFrame support
📄 License
This project is distributed under the Apache 2.0 License. See the LICENSE file for details.
🐛 Bug Reports
Found a bug? Please report it on the Issues page.
Atio - Safe and Fast Data Writing Library 🚀
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file atio-2.1.0.tar.gz.
File metadata
- Download URL: atio-2.1.0.tar.gz
- Upload date:
- Size: 30.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.9.23
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
056ab3a35be97e5fb70e2ca692b9447fefab1e9b276a1c0f2ada93ba20d67a1d
|
|
| MD5 |
442b3fd82a0740d04b4677ffa1768c3d
|
|
| BLAKE2b-256 |
b3851e5edb696c80009ef0da676fa31de5ae3173d8966e4a0d0d4c7bfdd818c9
|
File details
Details for the file atio-2.1.0-py3-none-any.whl.
File metadata
- Download URL: atio-2.1.0-py3-none-any.whl
- Upload date:
- Size: 20.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.9.23
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a302bb89347059ba223e60b91ee2a52e4320a800fb8c640eb831ce9aebc41de2
|
|
| MD5 |
be5abf03265d2784b5c957b455ac82cf
|
|
| BLAKE2b-256 |
9f62aeae7044c0dd72bfd2a25043433a72d2cbeb7f2ad9d91d0b6cf7817d6b48
|