Multi-source data retrieval with intelligent caching and storage backends
Project description
omnifetch
Multi-source data retrieval with intelligent caching and storage backends
Fetch data from anywhere, any type - with automatic TTL-based caching, multiple storage backends, and smart synchronization.
✨ Features
- ✅ Multi-source data retrieval - Local files, IBM Cloud Object Storage, or custom backends
- ✅ Intelligent TTL-based caching - Automatic freshness management
- ✅ Concurrent access protection - File locking for safe multi-process usage
- ✅ Batch operations - Efficient bulk data retrieval
- ✅ Retry strategies - Configurable retry logic for reliability
- ✅ Multiple data formats - Parquet, CSV, NetCDF, pickle, and more
- ✅ Session caching - In-memory cache for single-run optimization
- ✅ Flexible configuration - YAML-based or programmatic setup
📦 Installation
From GitHub
pip install git+https://github.com/deinnovatie/omnifetch.git
Local development
git clone https://github.com/deinnovatie/omnifetch.git
cd omnifetch
pip install -e ".[dev]"
🚀 Quick Start
Basic Usage
from omnifetch import DataManager
# Initialize with configuration
config = {
"storage": {
"backend": "local",
"local": {
"base_path": "./data"
}
},
"data_sources": {
"my_dataset": {
"backend": "local",
"ttl_seconds": 3600, # 1 hour
"file_patterns": {
"default": "datasets/my_data.parquet"
}
}
}
}
manager = DataManager(config)
# Fetch data (automatically cached)
data = manager.get_data("my_dataset", "default")
# Force refresh from source
fresh_data = manager.get_data("my_dataset", "default", force_refresh=True)
With IBM Cloud Object Storage
import os
from omnifetch import DataManager
# Set environment variables
os.environ["COS_ENDPOINT"] = "https://s3.us-south.cloud-object-storage.appdomain.cloud"
os.environ["COS_ACCESS_KEY"] = "your-access-key"
os.environ["COS_SECRET_KEY"] = "your-secret-key"
os.environ["COS_BUCKET"] = "your-bucket-name"
config = {
"storage": {
"backend": "ibm_cos",
"ibm_cos": {
"bucket_name": os.environ["COS_BUCKET"],
"service_endpoint": os.environ["COS_ENDPOINT"],
"access_key_env": "COS_ACCESS_KEY",
"secret_key_env": "COS_SECRET_KEY"
},
"local": {
"base_path": "./cache"
}
},
"data_sources": {
"cloud_dataset": {
"backend": "ibm_cos",
"ttl_seconds": 86400, # 24 hours
"file_patterns": {
"default": "datasets/cloud_data.parquet"
}
}
}
}
manager = DataManager(config)
data = manager.get_data("cloud_dataset", "default")
Using YAML Configuration Adapter
from omnifetch.adapters import create_datamanager_config
# Load configuration from YAML file (e.g., R pipeline config)
config = create_datamanager_config(config_path="./config/data_sources.yml")
manager = DataManager(config)
data = manager.get_data("dataset_name", "default")
Batch Operations
from omnifetch import DataManager, FileSpec
manager = DataManager(config)
# Define multiple files to fetch
specs = [
FileSpec("dataset_a", "default", {}),
FileSpec("dataset_b", "default", {}),
FileSpec("dataset_c", "regional", {"region": "us-east"}),
]
# Fetch all at once
results = manager.get_data_batch(specs)
for spec, data in zip(specs, results):
print(f"Loaded {spec.data_source}: {data.shape}")
Cache Management
# Get cache statistics
stats = manager.get_cache_stats()
print(f"Total cached files: {stats['file_count']}")
print(f"Fresh files: {stats['fresh_count']}")
print(f"Stale files: {stats['stale_count']}")
# Clear session cache (in-memory only)
manager.clear_session_cache()
🗂️ Configuration
Configuration Structure
storage:
backend: "ibm_cos" # or "local"
ibm_cos:
bucket_name: "my-bucket"
service_endpoint: "https://s3.region.cloud-object-storage.appdomain.cloud"
access_key_env: "COS_ACCESS_KEY"
secret_key_env: "COS_SECRET_KEY"
local:
base_path: "./cache"
data_sources:
dataset_name:
backend: "ibm_cos" # Backend for this specific dataset
ttl_seconds: 86400 # 24 hours (supports: "30d", "24h", "60m", "3600s", or integer)
file_patterns:
default: "path/to/file.parquet"
regional: "path/to/{region}/file.parquet" # With parameters
validation:
required_columns: ["id", "timestamp", "value"]
retry_strategy:
max_retries: 3
retry_delay: 5
backoff_multiplier: 2.0
TTL Format
TTL can be specified in multiple formats:
- Seconds:
3600or"3600s" - Minutes:
"60m" - Hours:
"24h" - Days:
"30d" - Weeks:
"4w"
File Patterns with Parameters
# Configuration
data_sources:
regional_data:
file_patterns:
default: "data/{year}/{month}/{region}.parquet"
# Usage
data = manager.get_data(
"regional_data",
"default",
year="2024",
month="10",
region="us-east"
)
# Fetches: data/2024/10/us-east.parquet
🔧 Supported Storage Backends
Local Filesystem
config = {
"storage": {
"backend": "local",
"local": {
"base_path": "./data_repository"
}
}
}
IBM Cloud Object Storage (COS)
config = {
"storage": {
"backend": "ibm_cos",
"ibm_cos": {
"bucket_name": "my-bucket",
"service_endpoint": "https://s3.us-south.cloud-object-storage.appdomain.cloud",
"access_key_env": "COS_ACCESS_KEY",
"secret_key_env": "COS_SECRET_KEY"
},
"local": {
"base_path": "./cache" # Local cache for downloaded files
}
}
}
Custom Backends
Extend StorageBackend to create your own:
from omnifetch.storage_backends import StorageBackend
class MyCustomBackend(StorageBackend):
def exists(self, path: str) -> bool:
# Implementation
pass
def save(self, data, path: str, format: str = "parquet") -> None:
# Implementation
pass
def load(self, path: str, format: str = "parquet"):
# Implementation
pass
# ... implement other abstract methods
📊 Supported Data Formats
- Parquet - Columnar storage (via PyArrow)
- CSV - Comma-separated values (via Pandas)
- NetCDF - Multidimensional arrays (via xarray/netCDF4)
- Pickle - Python object serialization
- JSON - Structured data
Format is auto-detected from file extension or can be specified explicitly.
🔍 How It Works
Data Retrieval Flow
1. Request data from DataManager
↓
2. Check session cache (in-memory)
↓ (miss)
3. Check local disk cache with TTL
↓ (miss or stale)
4. Download from remote backend (IBM COS, etc.)
↓
5. Save to local cache with metadata
↓
6. Load from cache and store in session
↓
7. Return data to caller
Cache Architecture
┌─────────────────────────────────────┐
│ DataManager (Session Cache) │ <- In-memory, per-run
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ CacheManager (Disk Cache + TTL) │ <- Persistent, with metadata
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ SyncManager (Backend Sync Logic) │ <- Download/upload coordination
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ StorageBackend (Local/IBM COS) │ <- Actual data source
└─────────────────────────────────────┘
🧪 Testing
# Run tests
pytest
# With coverage
pytest --cov=omnifetch --cov-report=html
# Run specific test
pytest tests/test_data_manager.py::test_get_data_from_cache
📝 Examples
See the examples/ directory for complete working examples:
- basic_usage.py - Basic local file caching
- ibm_cos_example.py - IBM COS integration
- yaml_config_example.py - YAML configuration
- custom_backend_example.py - Custom storage backend
🤝 Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
📄 License
This project is licensed under the MIT License - see the LICENSE file for details.
🙏 Acknowledgments
Originally developed as part of the SIMEG R Toolkit project for energy market simulations.
📧 Contact
Marco Bonoli - marco@deinnovatie.com
Project Link: https://github.com/deinnovatie/omnifetch
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file omnifetch-1.0.0.tar.gz.
File metadata
- Download URL: omnifetch-1.0.0.tar.gz
- Upload date:
- Size: 31.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
19b24d10d3f99620d0f987ec76b5c38d10ba06e4a115d7264382fda711787c92
|
|
| MD5 |
3eadd727ff21607829b5c642cf32da45
|
|
| BLAKE2b-256 |
c012e386cc512a278bf854c264c17376973cd3627d624d34e2fb49e66a70d345
|
File details
Details for the file omnifetch-1.0.0-py3-none-any.whl.
File metadata
- Download URL: omnifetch-1.0.0-py3-none-any.whl
- Upload date:
- Size: 31.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
19c59e2282300b0c922802682c15cf0696ca7e8a65402d23a1bcd7692e2b5ec4
|
|
| MD5 |
6681a17e64272bb6cee4ef9543346d06
|
|
| BLAKE2b-256 |
ddde4ebeb7f8cdbcfff85e27e5f7eb5117d8e5caaa0e92d075da78c37a8a1779
|