Skip to main content

A headless ETL / ELT / data pipeline and integration SDK for Python.

Project description

Hyperion

A headless ETL / ELT / data pipeline and integration SDK for Python.

pre-commit pytest codecov PyPI - License PyPI - Python Version GitHub Release

Features

  • Data Catalog System: Manage and organize data assets across S3 buckets
  • Schema Management: Validate and store schema definitions for data assets
  • AWS Infrastructure Abstractions: Simplified interfaces for S3, DynamoDB, SQS, and Secrets Manager
  • Source Framework: Define data sources that extract data and store in the catalog
  • Caching: In-memory, local file, and DynamoDB caching options
  • Asynchronous Processing: Utilities for async operations and task queues
  • Geo Utilities: Location-based services with Google Maps integration

Core Concepts

Assets

Assets are the fundamental units of data in Hyperion. Each asset represents a dataset stored in a specific location with a defined schema. Hyperion supports three types of assets:

DataLakeAsset

  • Represents raw, immutable data stored in a data lake
  • Time-partitioned by date
  • Each partition has a schema version
  • Example use cases: raw API responses, event logs, or any source data that needs to be preserved in its original form

FeatureAsset

  • Represents processed feature data with time resolution
  • Used for analytics, machine learning features, and derived datasets
  • Supports different time resolutions (seconds, minutes, hours, days, weeks, months, years)
  • Can include additional partition keys for finer-grained organization
  • Example use cases: aggregated metrics, processed signals, ML features

PersistentStoreAsset

  • Represents persistent data storage without time partitioning
  • Used for reference data, lookup tables, or any data that doesn't change frequently
  • Example use cases: reference data, configuration settings, master data

Schema Management

All assets in Hyperion have associated schemas that define their structure:

  • Schema Store: The SchemaStore manages asset schemas in Avro format
  • Schema Validation: All data is validated against its schema during storage
  • Schema Versioning: Assets include a schema version to support evolution over time
  • Schema Storage: Schemas can be stored in the local filesystem or S3

If a schema is missing for an asset:

  1. An error will be raised when attempting to store or retrieve the asset
  2. You need to define the schema in Avro format and store it in the schema store
  3. The schema should be named according to the pattern: {asset_type}/{asset_name}.v{version}.avro.json

Catalog

The Catalog is the central component that manages asset storage and retrieval:

  • Storage Location: Maps asset types to their appropriate storage buckets
  • Asset Retrieval: Provides methods to retrieve assets by name, date, and schema version
  • Partitioning: Handles partitioning logic for different asset types
  • Notifications: Can send notifications when new assets arrive

Source Framework

Sources are responsible for extracting data from external systems and storing it in the catalog:

  • Standardized Interface: All sources implement a common interface
  • AWS Lambda Support: Easy integration with AWS Lambda for scheduled extraction
  • Backfill Capability: Support for historical data backfill
  • Incremental Processing: Extract data with date-based filtering

Installation

From PyPI

If you only want to use hyperion, you can install it from PyPI:

pip install hyperion-etl
# or
poetry add hyperion-etl

We'll try to respect Semantic Versioning, so you can pin your dependencies to a specific version, or use a version selector to make sure no breaking changes are introduced (e.g., ^0.4.0).

From Source

Hyperion uses Poetry for dependency management:

# Clone the repository
git clone https://github.com/tomasvotava/hyperion.git
cd hyperion

# Install dependencies
poetry install

Configuration

Hyperion is configured through environment variables. You can use a .env file for local development:

# Common settings
HYPERION_COMMON_LOG_PRETTY=True
HYPERION_COMMON_LOG_LEVEL=INFO
HYPERION_COMMON_SERVICE_NAME=my-service

# Storage settings
HYPERION_STORAGE_DATA_LAKE_BUCKET=my-data-lake-bucket
HYPERION_STORAGE_FEATURE_STORE_BUCKET=my-feature-store-bucket
HYPERION_STORAGE_PERSISTENT_STORE_BUCKET=my-persistent-store-bucket
HYPERION_STORAGE_SCHEMA_PATH=s3://my-schema-bucket/schemas
HYPERION_STORAGE_MAX_CONCURRENCY=5

# Queue settings
HYPERION_QUEUE_URL=https://sqs.us-east-1.amazonaws.com/123456789012/my-queue

# Secrets settings
HYPERION_SECRETS_BACKEND=AWSSecretsManager

# HTTP settings (optional)
HYPERION_HTTP_PROXY_HTTP=http://proxy:8080
HYPERION_HTTP_PROXY_HTTPS=http://proxy:8080

# Geo settings (optional)
HYPERION_GEO_GMAPS_API_KEY=your-google-maps-api-key

Before any real documentation is written, you can check the hyperion/config.py file for all available configuration options. Hyperion is using EnvProxy for configuration.

Usage Examples

Working with Assets

Creating and Storing a DataLakeAsset

from hyperion.catalog import Catalog
from hyperion.entities.catalog import DataLakeAsset
from datetime import datetime, timezone

# Initialize the catalog
catalog = Catalog.from_config()

# Create a data lake asset
asset = DataLakeAsset(
    name="customer_data",
    date=datetime.now(timezone.utc),
    schema_version=1
)

# Store data in the asset
data = [
    {"id": 1, "name": "Customer 1", "timestamp": datetime.now(timezone.utc)},
    {"id": 2, "name": "Customer 2", "timestamp": datetime.now(timezone.utc)},
]

catalog.store_asset(asset, data)

Working with FeatureAssets

from hyperion.catalog import Catalog
from hyperion.entities.catalog import FeatureAsset
from hyperion.dateutils import TimeResolution
from datetime import datetime, timezone

# Initialize the catalog
catalog = Catalog.from_config()

# Create a feature asset with daily resolution
resolution = TimeResolution(1, "d")  # 1 day resolution
asset = FeatureAsset(
    name="customer_activity",
    partition_date=datetime.now(timezone.utc),
    resolution=resolution,
    schema_version=1
)

# Store aggregated feature data
feature_data = [
    {"customer_id": 1, "activity_score": 87.5, "date": datetime.now(timezone.utc)},
    {"customer_id": 2, "activity_score": 92.1, "date": datetime.now(timezone.utc)},
]

catalog.store_asset(asset, feature_data)

# Retrieve feature data for a specific time period
from_date = datetime(2023, 1, 1, tzinfo=timezone.utc)
to_date = datetime(2023, 1, 31, tzinfo=timezone.utc)

for feature_asset in catalog.iter_feature_store_partitions(
    feature_name="customer_activity",
    resolution="1d",  # Can use string format too
    date_from=from_date,
    date_to=to_date
):
    data = catalog.retrieve_asset(feature_asset)
    for record in data:
        print(record)

Working with PersistentStoreAssets

from hyperion.catalog import Catalog
from hyperion.entities.catalog import PersistentStoreAsset

# Initialize the catalog
catalog = Catalog.from_config()

# Create a persistent store asset
asset = PersistentStoreAsset(
    name="product_catalog",
    schema_version=1
)

# Store reference data
products = [
    {"id": "P001", "name": "Product 1", "category": "Electronics"},
    {"id": "P002", "name": "Product 2", "category": "Clothing"},
]

catalog.store_asset(asset, products)

# Retrieve reference data
for product in catalog.retrieve_asset(asset):
    print(product)

Creating a Custom Source

import asyncio
from datetime import datetime, timezone
from typing import AsyncIterator

from hyperion.catalog import Catalog
from hyperion.entities.catalog import DataLakeAsset
from hyperion.sources.base import Source, SourceAsset


class MyCustomSource(Source):
    source = "my-custom-source"

    async def run(self, start_date=None, end_date=None) -> AsyncIterator[SourceAsset]:
        # Fetch your data (this is where you'd implement your data extraction logic)
        data = [
            {"id": 1, "name": "Item 1", "timestamp": datetime.now(timezone.utc)},
            {"id": 2, "name": "Item 2", "timestamp": datetime.now(timezone.utc)},
        ]

        # Create asset
        asset = DataLakeAsset(
            name="my-custom-data",
            date=datetime.now(timezone.utc)
        )

        # Yield source asset
        yield SourceAsset(asset=asset, data=data)


# Use with AWS Lambda
def lambda_handler(event, context):
    MyCustomSource.handle_aws_lambda_event(event, context)


# Use standalone
if __name__ == "__main__":
    asyncio.run(MyCustomSource._run(Catalog.from_config()))

Working with Schemas

To create and register a schema for an asset:

import json
from pathlib import Path

# Define schema in Avro format
schema = {
    "type": "record",
    "name": "CustomerData",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "name", "type": "string"},
        {"name": "timestamp", "type": {"type": "long", "logicalType": "timestamp-millis"}}
    ]
}

# Save schema to local file
schema_path = Path("schemas/data_lake/customer_data.v1.avro.json")
schema_path.parent.mkdir(parents=True, exist_ok=True)
with open(schema_path, "w") as f:
    json.dump(schema, f)

# Or upload to S3 if using S3SchemaStore
import boto3
s3_client = boto3.client('s3')
s3_client.put_object(
    Bucket="my-schema-bucket",
    Key="data_lake/customer_data.v1.avro.json",
    Body=json.dumps(schema)
)

Advanced Features

Asset Collections

Asset collections provide a high-level interface for fetching and working with groups of assets. You can define a collection class that specifies the assets you need and fetch them all at once.

See docs/asset_collections.md for more information.

Repartitioning Data

from hyperion.catalog import Catalog
from hyperion.entities.catalog import DataLakeAsset
from hyperion.dateutils import TimeResolutionUnit
from datetime import datetime, timezone
import asyncio

async def repartition_data():
    catalog = Catalog.from_config()

    # Original asset with day-level partitioning
    asset = DataLakeAsset(
        name="web_logs",
        date=datetime.now(timezone.utc),
        schema_version=1
    )

    # Repartition by hour
    await catalog.repartition(
        asset,
        granularity=TimeResolutionUnit("h"),
        date_attribute="timestamp"
    )

asyncio.run(repartition_data())

Caching

from hyperion.infrastructure.cache import Cache

# Get cache from configuration
cache = Cache.from_config()

# Store data in cache
cache.set("my-key", "my-value")

# Retrieve data from cache
value = cache.get("my-key")
print(value)  # "my-value"

# Check if key exists
if cache.hit("my-key"):
    print("Cache hit!")

# Delete key
cache.delete("my-key")

Geo Utilities

from hyperion.infrastructure.geo import GoogleMaps, Location

# Initialize Google Maps client
gmaps = GoogleMaps.from_config()

# Geocode an address
with gmaps:
    location = gmaps.geocode("1600 Amphitheatre Parkway, Mountain View, CA")
    print(f"Latitude: {location.latitude}, Longitude: {location.longitude}")

    # Reverse geocode a location
    named_location = gmaps.reverse_geocode(location)
    print(f"Address: {named_location.address}")
    print(f"Country: {named_location.country}")

Development

Setup Development Environment

# Install development dependencies
poetry install

# Install pre-commit hooks
poetry run pre-commit install

Running Tests

# Run all tests
poetry run pytest

# Run with coverage
poetry run pytest --cov=hyperion

# Run specific test files
poetry run pytest tests/test_asyncutils.py

Code Style

This project uses pre-commit hooks to enforce code style:

# Run pre-commit on all files
poetry run pre-commit run -a

The project uses:

Architecture

Core Components

  • Catalog: Manages data assets and their storage in S3
  • SchemaStore: Handles schema validation and storage
  • Source: Base class for implementing data sources
  • Infrastructure: Abstractions for AWS services (S3, DynamoDB, SQS, etc.)
  • Utils: Helper functions for dates, async operations, etc.

Asset Types

  • DataLakeAsset: Raw data stored in a data lake
  • FeatureAsset: Processed features with time resolution
  • PersistentStoreAsset: Persistent data storage

Contributing

See CONTRIBUTING.md for guidelines on contributing to this project.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

hyperion_sdk-0.5.0.tar.gz (40.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

hyperion_sdk-0.5.0-py3-none-any.whl (45.0 kB view details)

Uploaded Python 3

File details

Details for the file hyperion_sdk-0.5.0.tar.gz.

File metadata

  • Download URL: hyperion_sdk-0.5.0.tar.gz
  • Upload date:
  • Size: 40.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for hyperion_sdk-0.5.0.tar.gz
Algorithm Hash digest
SHA256 09f939eb835d1a494d6a16000b5d8d550af5912dcaabc0509d7b3e24f89f375f
MD5 415bfbdc34fec35f45c1680acbdcbfbf
BLAKE2b-256 86c7b9afa2ca2859a35813f478420da39cc3641f022a76fbca55db00cb04edce

See more details on using hashes here.

Provenance

The following attestation bundles were made for hyperion_sdk-0.5.0.tar.gz:

Publisher: create-release.yml on tomasvotava/hyperion

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file hyperion_sdk-0.5.0-py3-none-any.whl.

File metadata

  • Download URL: hyperion_sdk-0.5.0-py3-none-any.whl
  • Upload date:
  • Size: 45.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for hyperion_sdk-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e15779a44517463750d661019e27e95cde30ab833ba1fd943b3c80c37c5ec64f
MD5 4a6cfef079cfd3c41b1c534104c08b40
BLAKE2b-256 2d8dd986e952a240abbae1af10f0b102b365e0ebd43167a6bd1aed036219a342

See more details on using hashes here.

Provenance

The following attestation bundles were made for hyperion_sdk-0.5.0-py3-none-any.whl:

Publisher: create-release.yml on tomasvotava/hyperion

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page