A headless ETL / ELT / data pipeline and integration SDK for Python.
Project description
Hyperion
A headless ETL / ELT / data pipeline and integration SDK for Python.
Features
- Data Catalog System: Manage and organize data assets across S3 buckets
- Schema Management: Validate and store schema definitions for data assets
- AWS Infrastructure Abstractions: Simplified interfaces for S3, DynamoDB, SQS, and Secrets Manager
- Source Framework: Define data sources that extract data and store in the catalog
- Caching: In-memory, local file, and DynamoDB caching options
- Asynchronous Processing: Utilities for async operations and task queues
- Geo Utilities: Location-based services with Google Maps integration
Core Concepts
Assets
Assets are the fundamental units of data in Hyperion. Each asset represents a dataset stored in a specific location with a defined schema. Hyperion supports three types of assets:
DataLakeAsset
- Represents raw, immutable data stored in a data lake
- Time-partitioned by date
- Each partition has a schema version
- Example use cases: raw API responses, event logs, or any source data that needs to be preserved in its original form
FeatureAsset
- Represents processed feature data with time resolution
- Used for analytics, machine learning features, and derived datasets
- Supports different time resolutions (seconds, minutes, hours, days, weeks, months, years)
- Can include additional partition keys for finer-grained organization
- Example use cases: aggregated metrics, processed signals, ML features
PersistentStoreAsset
- Represents persistent data storage without time partitioning
- Used for reference data, lookup tables, or any data that doesn't change frequently
- Example use cases: reference data, configuration settings, master data
Schema Management
All assets in Hyperion have associated schemas that define their structure:
- Schema Store: The SchemaStore manages asset schemas in Avro format
- Schema Validation: All data is validated against its schema during storage
- Schema Versioning: Assets include a schema version to support evolution over time
- Schema Storage: Schemas can be stored in the local filesystem or S3
If a schema is missing for an asset:
- An error will be raised when attempting to store or retrieve the asset
- You need to define the schema in Avro format and store it in the schema store
- The schema should be named according to the pattern:
{asset_type}/{asset_name}.v{version}.avro.json
Catalog
The Catalog is the central component that manages asset storage and retrieval:
- Storage Location: Maps asset types to their appropriate storage buckets
- Asset Retrieval: Provides methods to retrieve assets by name, date, and schema version
- Partitioning: Handles partitioning logic for different asset types
- Notifications: Can send notifications when new assets arrive
Source Framework
Sources are responsible for extracting data from external systems and storing it in the catalog:
- Standardized Interface: All sources implement a common interface
- AWS Lambda Support: Easy integration with AWS Lambda for scheduled extraction
- Backfill Capability: Support for historical data backfill
- Incremental Processing: Extract data with date-based filtering
Installation
From PyPI
If you only want to use hyperion, you can install it from PyPI:
pip install hyperion-etl
# or
poetry add hyperion-etl
We'll try to respect Semantic Versioning, so you can pin your dependencies to a specific version,
or use a version selector to make sure no breaking changes are introduced (e.g., ^0.4.0).
From Source
Hyperion uses Poetry for dependency management:
# Clone the repository
git clone https://github.com/tomasvotava/hyperion.git
cd hyperion
# Install dependencies
poetry install
Configuration
Hyperion is configured through environment variables. You can use a .env file for local development:
# Common settings
HYPERION_COMMON_LOG_PRETTY=True
HYPERION_COMMON_LOG_LEVEL=INFO
HYPERION_COMMON_SERVICE_NAME=my-service
# Storage settings
HYPERION_STORAGE_DATA_LAKE_BUCKET=my-data-lake-bucket
HYPERION_STORAGE_FEATURE_STORE_BUCKET=my-feature-store-bucket
HYPERION_STORAGE_PERSISTENT_STORE_BUCKET=my-persistent-store-bucket
HYPERION_STORAGE_SCHEMA_PATH=s3://my-schema-bucket/schemas
HYPERION_STORAGE_MAX_CONCURRENCY=5
# Queue settings
HYPERION_QUEUE_URL=https://sqs.us-east-1.amazonaws.com/123456789012/my-queue
# Secrets settings
HYPERION_SECRETS_BACKEND=AWSSecretsManager
# HTTP settings (optional)
HYPERION_HTTP_PROXY_HTTP=http://proxy:8080
HYPERION_HTTP_PROXY_HTTPS=http://proxy:8080
# Geo settings (optional)
HYPERION_GEO_GMAPS_API_KEY=your-google-maps-api-key
Before any real documentation is written, you can check the
hyperion/config.py file for all available configuration options. Hyperion is using EnvProxy for configuration.
Usage Examples
Working with Assets
Creating and Storing a DataLakeAsset
from hyperion.catalog import Catalog
from hyperion.entities.catalog import DataLakeAsset
from datetime import datetime, timezone
# Initialize the catalog
catalog = Catalog.from_config()
# Create a data lake asset
asset = DataLakeAsset(
name="customer_data",
date=datetime.now(timezone.utc),
schema_version=1
)
# Store data in the asset
data = [
{"id": 1, "name": "Customer 1", "timestamp": datetime.now(timezone.utc)},
{"id": 2, "name": "Customer 2", "timestamp": datetime.now(timezone.utc)},
]
catalog.store_asset(asset, data)
Working with FeatureAssets
from hyperion.catalog import Catalog
from hyperion.entities.catalog import FeatureAsset
from hyperion.dateutils import TimeResolution
from datetime import datetime, timezone
# Initialize the catalog
catalog = Catalog.from_config()
# Create a feature asset with daily resolution
resolution = TimeResolution(1, "d") # 1 day resolution
asset = FeatureAsset(
name="customer_activity",
partition_date=datetime.now(timezone.utc),
resolution=resolution,
schema_version=1
)
# Store aggregated feature data
feature_data = [
{"customer_id": 1, "activity_score": 87.5, "date": datetime.now(timezone.utc)},
{"customer_id": 2, "activity_score": 92.1, "date": datetime.now(timezone.utc)},
]
catalog.store_asset(asset, feature_data)
# Retrieve feature data for a specific time period
from_date = datetime(2023, 1, 1, tzinfo=timezone.utc)
to_date = datetime(2023, 1, 31, tzinfo=timezone.utc)
for feature_asset in catalog.iter_feature_store_partitions(
feature_name="customer_activity",
resolution="1d", # Can use string format too
date_from=from_date,
date_to=to_date
):
data = catalog.retrieve_asset(feature_asset)
for record in data:
print(record)
Working with PersistentStoreAssets
from hyperion.catalog import Catalog
from hyperion.entities.catalog import PersistentStoreAsset
# Initialize the catalog
catalog = Catalog.from_config()
# Create a persistent store asset
asset = PersistentStoreAsset(
name="product_catalog",
schema_version=1
)
# Store reference data
products = [
{"id": "P001", "name": "Product 1", "category": "Electronics"},
{"id": "P002", "name": "Product 2", "category": "Clothing"},
]
catalog.store_asset(asset, products)
# Retrieve reference data
for product in catalog.retrieve_asset(asset):
print(product)
Creating a Custom Source
import asyncio
from datetime import datetime, timezone
from typing import AsyncIterator
from hyperion.catalog import Catalog
from hyperion.entities.catalog import DataLakeAsset
from hyperion.sources.base import Source, SourceAsset
class MyCustomSource(Source):
source = "my-custom-source"
async def run(self, start_date=None, end_date=None) -> AsyncIterator[SourceAsset]:
# Fetch your data (this is where you'd implement your data extraction logic)
data = [
{"id": 1, "name": "Item 1", "timestamp": datetime.now(timezone.utc)},
{"id": 2, "name": "Item 2", "timestamp": datetime.now(timezone.utc)},
]
# Create asset
asset = DataLakeAsset(
name="my-custom-data",
date=datetime.now(timezone.utc)
)
# Yield source asset
yield SourceAsset(asset=asset, data=data)
# Use with AWS Lambda
def lambda_handler(event, context):
MyCustomSource.handle_aws_lambda_event(event, context)
# Use standalone
if __name__ == "__main__":
asyncio.run(MyCustomSource._run(Catalog.from_config()))
Working with Schemas
To create and register a schema for an asset:
import json
from pathlib import Path
# Define schema in Avro format
schema = {
"type": "record",
"name": "CustomerData",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "timestamp", "type": {"type": "long", "logicalType": "timestamp-millis"}}
]
}
# Save schema to local file
schema_path = Path("schemas/data_lake/customer_data.v1.avro.json")
schema_path.parent.mkdir(parents=True, exist_ok=True)
with open(schema_path, "w") as f:
json.dump(schema, f)
# Or upload to S3 if using S3SchemaStore
import boto3
s3_client = boto3.client('s3')
s3_client.put_object(
Bucket="my-schema-bucket",
Key="data_lake/customer_data.v1.avro.json",
Body=json.dumps(schema)
)
Advanced Features
Asset Collections
Asset collections provide a high-level interface for fetching and working with groups of assets. You can define a collection class that specifies the assets you need and fetch them all at once.
See docs/asset_collections.md for more information.
Repartitioning Data
from hyperion.catalog import Catalog
from hyperion.entities.catalog import DataLakeAsset
from hyperion.dateutils import TimeResolutionUnit
from datetime import datetime, timezone
import asyncio
async def repartition_data():
catalog = Catalog.from_config()
# Original asset with day-level partitioning
asset = DataLakeAsset(
name="web_logs",
date=datetime.now(timezone.utc),
schema_version=1
)
# Repartition by hour
await catalog.repartition(
asset,
granularity=TimeResolutionUnit("h"),
date_attribute="timestamp"
)
asyncio.run(repartition_data())
Caching
from hyperion.infrastructure.cache import Cache
# Get cache from configuration
cache = Cache.from_config()
# Store data in cache
cache.set("my-key", "my-value")
# Retrieve data from cache
value = cache.get("my-key")
print(value) # "my-value"
# Check if key exists
if cache.hit("my-key"):
print("Cache hit!")
# Delete key
cache.delete("my-key")
Geo Utilities
from hyperion.infrastructure.geo import GoogleMaps, Location
# Initialize Google Maps client
gmaps = GoogleMaps.from_config()
# Geocode an address
with gmaps:
location = gmaps.geocode("1600 Amphitheatre Parkway, Mountain View, CA")
print(f"Latitude: {location.latitude}, Longitude: {location.longitude}")
# Reverse geocode a location
named_location = gmaps.reverse_geocode(location)
print(f"Address: {named_location.address}")
print(f"Country: {named_location.country}")
Development
Setup Development Environment
# Install development dependencies
poetry install
# Install pre-commit hooks
poetry run pre-commit install
Running Tests
# Run all tests
poetry run pytest
# Run with coverage
poetry run pytest --cov=hyperion
# Run specific test files
poetry run pytest tests/test_asyncutils.py
Code Style
This project uses pre-commit hooks to enforce code style:
# Run pre-commit on all files
poetry run pre-commit run -a
The project uses:
- ruff for linting
- mypy for type checking
- commitizen for standardized commits
Architecture
Core Components
- Catalog: Manages data assets and their storage in S3
- SchemaStore: Handles schema validation and storage
- Source: Base class for implementing data sources
- Infrastructure: Abstractions for AWS services (S3, DynamoDB, SQS, etc.)
- Utils: Helper functions for dates, async operations, etc.
Asset Types
- DataLakeAsset: Raw data stored in a data lake
- FeatureAsset: Processed features with time resolution
- PersistentStoreAsset: Persistent data storage
Contributing
See CONTRIBUTING.md for guidelines on contributing to this project.
License
This project is licensed under the MIT License - see the LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file hyperion_sdk-0.5.1.tar.gz.
File metadata
- Download URL: hyperion_sdk-0.5.1.tar.gz
- Upload date:
- Size: 40.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
154923214df44cb30bd8cd46405e1d005fcc8f304750a9a19914cf160740460a
|
|
| MD5 |
69e7a9ec9750d77d50e5308712c2be4e
|
|
| BLAKE2b-256 |
afa325526ec07ed0f79bdfd34845c7ce3707607f3018b5e2ae84b577388b4d5f
|
Provenance
The following attestation bundles were made for hyperion_sdk-0.5.1.tar.gz:
Publisher:
create-release.yml on tomasvotava/hyperion
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
hyperion_sdk-0.5.1.tar.gz -
Subject digest:
154923214df44cb30bd8cd46405e1d005fcc8f304750a9a19914cf160740460a - Sigstore transparency entry: 184231770
- Sigstore integration time:
-
Permalink:
tomasvotava/hyperion@29a33f8cab5820548faf75fb40a1a3d78167784e -
Branch / Tag:
refs/heads/master - Owner: https://github.com/tomasvotava
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
create-release.yml@5f847e2998ac88d26ec6ffbd49a14146b9109fde -
Trigger Event:
pull_request
-
Statement type:
File details
Details for the file hyperion_sdk-0.5.1-py3-none-any.whl.
File metadata
- Download URL: hyperion_sdk-0.5.1-py3-none-any.whl
- Upload date:
- Size: 45.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e30e088ddf413d03dfa11fd5105eeb4927decc47a7c22ecc356e455430bddbb6
|
|
| MD5 |
9932e59c7513a7435c9ca145474d598d
|
|
| BLAKE2b-256 |
79af1c6308497f4dbd285568ca93ba8c930058e405d340abd2ff9f2cea1b2035
|
Provenance
The following attestation bundles were made for hyperion_sdk-0.5.1-py3-none-any.whl:
Publisher:
create-release.yml on tomasvotava/hyperion
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
hyperion_sdk-0.5.1-py3-none-any.whl -
Subject digest:
e30e088ddf413d03dfa11fd5105eeb4927decc47a7c22ecc356e455430bddbb6 - Sigstore transparency entry: 184231776
- Sigstore integration time:
-
Permalink:
tomasvotava/hyperion@29a33f8cab5820548faf75fb40a1a3d78167784e -
Branch / Tag:
refs/heads/master - Owner: https://github.com/tomasvotava
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
create-release.yml@5f847e2998ac88d26ec6ffbd49a14146b9109fde -
Trigger Event:
pull_request
-
Statement type: