Skip to main content

Multi-source data retrieval with intelligent caching and storage backends

Project description

omnifetch

Multi-source data retrieval with intelligent caching and storage backends

Fetch data from anywhere, any type - with automatic TTL-based caching, multiple storage backends, and smart synchronization.

✨ Features

  • Multi-source data retrieval - Local files, IBM Cloud Object Storage, or custom backends
  • Intelligent TTL-based caching - Automatic freshness management
  • Concurrent access protection - File locking for safe multi-process usage
  • Batch operations - Efficient bulk data retrieval
  • Retry strategies - Configurable retry logic for reliability
  • Multiple data formats - Parquet, CSV, NetCDF, pickle, and more
  • Session caching - In-memory cache for single-run optimization
  • Flexible configuration - YAML-based or programmatic setup

📦 Installation

From GitHub

pip install git+https://github.com/deinnovatie/omnifetch.git

Local development

git clone https://github.com/deinnovatie/omnifetch.git
cd omnifetch
pip install -e ".[dev]"

🚀 Quick Start

Basic Usage

from omnifetch import DataManager

# Initialize with configuration
config = {
    "storage": {
        "backend": "local",
        "local": {
            "base_path": "./data"
        }
    },
    "data_sources": {
        "my_dataset": {
            "backend": "local",
            "ttl_seconds": 3600,  # 1 hour
            "file_patterns": {
                "default": "datasets/my_data.parquet"
            }
        }
    }
}

manager = DataManager(config)

# Fetch data (automatically cached)
data = manager.get_data("my_dataset", "default")

# Force refresh from source
fresh_data = manager.get_data("my_dataset", "default", force_refresh=True)

With IBM Cloud Object Storage

import os
from omnifetch import DataManager

# Set environment variables
os.environ["COS_ENDPOINT"] = "https://s3.us-south.cloud-object-storage.appdomain.cloud"
os.environ["COS_ACCESS_KEY"] = "your-access-key"
os.environ["COS_SECRET_KEY"] = "your-secret-key"
os.environ["COS_BUCKET"] = "your-bucket-name"

config = {
    "storage": {
        "backend": "ibm_cos",
        "ibm_cos": {
            "bucket_name": os.environ["COS_BUCKET"],
            "service_endpoint": os.environ["COS_ENDPOINT"],
            "access_key_env": "COS_ACCESS_KEY",
            "secret_key_env": "COS_SECRET_KEY"
        },
        "local": {
            "base_path": "./cache"
        }
    },
    "data_sources": {
        "cloud_dataset": {
            "backend": "ibm_cos",
            "ttl_seconds": 86400,  # 24 hours
            "file_patterns": {
                "default": "datasets/cloud_data.parquet"
            }
        }
    }
}

manager = DataManager(config)
data = manager.get_data("cloud_dataset", "default")

Using YAML Configuration Adapter

from omnifetch.adapters import create_datamanager_config

# Load configuration from YAML file (e.g., R pipeline config)
config = create_datamanager_config(config_path="./config/data_sources.yml")

manager = DataManager(config)
data = manager.get_data("dataset_name", "default")

Batch Operations

from omnifetch import DataManager, FileSpec

manager = DataManager(config)

# Define multiple files to fetch
specs = [
    FileSpec("dataset_a", "default", {}),
    FileSpec("dataset_b", "default", {}),
    FileSpec("dataset_c", "regional", {"region": "us-east"}),
]

# Fetch all at once
results = manager.get_data_batch(specs)

for spec, data in zip(specs, results):
    print(f"Loaded {spec.data_source}: {data.shape}")

Cache Management

# Get cache statistics
stats = manager.get_cache_stats()
print(f"Total cached files: {stats['file_count']}")
print(f"Fresh files: {stats['fresh_count']}")
print(f"Stale files: {stats['stale_count']}")

# Clear session cache (in-memory only)
manager.clear_session_cache()

🗂️ Configuration

Configuration Structure

storage:
  backend: "ibm_cos"  # or "local"
  ibm_cos:
    bucket_name: "my-bucket"
    service_endpoint: "https://s3.region.cloud-object-storage.appdomain.cloud"
    access_key_env: "COS_ACCESS_KEY"
    secret_key_env: "COS_SECRET_KEY"
  local:
    base_path: "./cache"

data_sources:
  dataset_name:
    backend: "ibm_cos"  # Backend for this specific dataset
    ttl_seconds: 86400  # 24 hours (supports: "30d", "24h", "60m", "3600s", or integer)
    file_patterns:
      default: "path/to/file.parquet"
      regional: "path/to/{region}/file.parquet"  # With parameters
    validation:
      required_columns: ["id", "timestamp", "value"]
    retry_strategy:
      max_retries: 3
      retry_delay: 5
      backoff_multiplier: 2.0

TTL Format

TTL can be specified in multiple formats:

  • Seconds: 3600 or "3600s"
  • Minutes: "60m"
  • Hours: "24h"
  • Days: "30d"
  • Weeks: "4w"

File Patterns with Parameters

# Configuration
data_sources:
  regional_data:
    file_patterns:
      default: "data/{year}/{month}/{region}.parquet"

# Usage
data = manager.get_data(
    "regional_data",
    "default",
    year="2024",
    month="10",
    region="us-east"
)
# Fetches: data/2024/10/us-east.parquet

🔧 Supported Storage Backends

Local Filesystem

config = {
    "storage": {
        "backend": "local",
        "local": {
            "base_path": "./data_repository"
        }
    }
}

IBM Cloud Object Storage (COS)

config = {
    "storage": {
        "backend": "ibm_cos",
        "ibm_cos": {
            "bucket_name": "my-bucket",
            "service_endpoint": "https://s3.us-south.cloud-object-storage.appdomain.cloud",
            "access_key_env": "COS_ACCESS_KEY",
            "secret_key_env": "COS_SECRET_KEY"
        },
        "local": {
            "base_path": "./cache"  # Local cache for downloaded files
        }
    }
}

Custom Backends

Extend StorageBackend to create your own:

from omnifetch.storage_backends import StorageBackend

class MyCustomBackend(StorageBackend):
    def exists(self, path: str) -> bool:
        # Implementation
        pass

    def save(self, data, path: str, format: str = "parquet") -> None:
        # Implementation
        pass

    def load(self, path: str, format: str = "parquet"):
        # Implementation
        pass

    # ... implement other abstract methods

📊 Supported Data Formats

  • Parquet - Columnar storage (via PyArrow)
  • CSV - Comma-separated values (via Pandas)
  • NetCDF - Multidimensional arrays (via xarray/netCDF4)
  • Pickle - Python object serialization
  • JSON - Structured data

Format is auto-detected from file extension or can be specified explicitly.

🔍 How It Works

Data Retrieval Flow

1. Request data from DataManager
   ↓
2. Check session cache (in-memory)
   ↓ (miss)
3. Check local disk cache with TTL
   ↓ (miss or stale)
4. Download from remote backend (IBM COS, etc.)
   ↓
5. Save to local cache with metadata
   ↓
6. Load from cache and store in session
   ↓
7. Return data to caller

Cache Architecture

┌─────────────────────────────────────┐
│     DataManager (Session Cache)     │  <- In-memory, per-run
└─────────────────────────────────────┘
              ↓
┌─────────────────────────────────────┐
│   CacheManager (Disk Cache + TTL)   │  <- Persistent, with metadata
└─────────────────────────────────────┘
              ↓
┌─────────────────────────────────────┐
│  SyncManager (Backend Sync Logic)   │  <- Download/upload coordination
└─────────────────────────────────────┘
              ↓
┌─────────────────────────────────────┐
│    StorageBackend (Local/IBM COS)   │  <- Actual data source
└─────────────────────────────────────┘

🧪 Testing

# Run tests
pytest

# With coverage
pytest --cov=omnifetch --cov-report=html

# Run specific test
pytest tests/test_data_manager.py::test_get_data_from_cache

📝 Examples

See the examples/ directory for complete working examples:

🤝 Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

📄 License

This project is licensed under the MIT License - see the LICENSE file for details.

🙏 Acknowledgments

Originally developed as part of the SIMEG R Toolkit project for energy market simulations.

📧 Contact

Marco Bonoli - marco@deinnovatie.com

Project Link: https://github.com/deinnovatie/omnifetch

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

omnifetch-1.0.0.tar.gz (31.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

omnifetch-1.0.0-py3-none-any.whl (31.3 kB view details)

Uploaded Python 3

File details

Details for the file omnifetch-1.0.0.tar.gz.

File metadata

  • Download URL: omnifetch-1.0.0.tar.gz
  • Upload date:
  • Size: 31.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for omnifetch-1.0.0.tar.gz
Algorithm Hash digest
SHA256 19b24d10d3f99620d0f987ec76b5c38d10ba06e4a115d7264382fda711787c92
MD5 3eadd727ff21607829b5c642cf32da45
BLAKE2b-256 c012e386cc512a278bf854c264c17376973cd3627d624d34e2fb49e66a70d345

See more details on using hashes here.

File details

Details for the file omnifetch-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: omnifetch-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 31.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for omnifetch-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 19c59e2282300b0c922802682c15cf0696ca7e8a65402d23a1bcd7692e2b5ec4
MD5 6681a17e64272bb6cee4ef9543346d06
BLAKE2b-256 ddde4ebeb7f8cdbcfff85e27e5f7eb5117d8e5caaa0e92d075da78c37a8a1779

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page