Skip to main content

Horizon’s core data SDK

Project description

Horizon Data Core SDK

The Horizon Data Core SDK provides a simple interface for working with both PostgreSQL and Iceberg tables in the Horizon system.

Getting started

Install the SDK:

uv add horizon-data-core

Features

  • Pydantic BaseModels: Type-safe data models for all entities
  • PostgreSQL Operations: Full CRUD operations for PostgreSQL tables
  • Iceberg Operations: Write and read operations for Iceberg tables
  • Automatic Conversion: Seamless conversion between Pydantic models and SQLAlchemy ORM models

Quick Start

1. Initialize the SDK

from horizon_data_core.api import initialize_sdk
from horizon_data_core.client import PostgresClient
from pyiceberg.catalog import load_catalog
from uuid import uuid4

# Set up PostgreSQL client
postgres_client = PostgresClient(
    organization_id=uuid4(),
    user="postgres",
    passwd="password",
    host="localhost",
    port=5432,
    database_name="horizon"
)

# Set up Iceberg catalog
iceberg_catalog = load_catalog("rest", uri="http://localhost:8181")

# Initialize the SDK with organization_id
organization_id = uuid4()  # This should come from your user context
initialize_sdk(postgres_client, iceberg_catalog, organization_id)

2. Working with PostgreSQL Tables

from horizon_data_core.api import create_entity, read_entity, update_entity, delete_entity, list_entities
from horizon_data_core.base_types import Entity
from uuid import uuid4
from datetime import datetime

# Create a new entity
entity = Entity(
    id=uuid4(),
    name="My Entity",
    kind_id=uuid4(),
    # organization_id will be automatically set by the SDK
)
created_entity = create_entity(entity)

# Read the entity
retrieved_entity = read_entity(created_entity.id)

# Update the entity
retrieved_entity.name = "Updated Entity Name"
updated_entity = update_entity(retrieved_entity)

# List entities with filters (organization_id is automatically applied)
entities = list_entities()

# Delete the entity
delete_entity(updated_entity.id)

3. Working with Iceberg Tables

from horizon_data_core.api import create_data_row, create_metadata_row, list_data_rows, list_metadata_rows
from horizon_data_core.base_types import DataRow, MetadataRow
from datetime import datetime

# Create a data row
data_row = DataRow(
    data_stream_id="stream-123",
    datetime=datetime.now(),
    vector=[1.0, 2.0, 3.0, 4.0, 5.0],
    data_type="sensor_data",
    vector_start_bound=0.0,
    vector_end_bound=10.0
)
create_data_row(data_row)

# Create a metadata row
metadata_row = MetadataRow(
    data_stream_id="stream-123",
    datetime=datetime.now(),
    latitude=40.7128,
    longitude=-74.0060,
    altitude=10.5,
    speed=25.0,
    heading=90.0
)
create_metadata_row(metadata_row)

# List data rows with filters
data_rows = list_data_rows(data_stream_id="stream-123")
metadata_rows = list_metadata_rows(data_stream_id="stream-123")

Available Models

PostgreSQL Models

  • EntityKind: A descriptive class of which an entity is a physical instantiation of.
  • Entity: Core entity instances
  • DataStream: Data streams associated with entities
  • Mission: Mission definitions
  • MissionEntity: Mission-entity relationships
  • Ontology: Ontology definitions
  • OntologyClass: Classes within ontologies
  • BeamgramSpecification: The set of parameters used to specify how a beamgram is constructed
  • BearingTimeRecordSpecification: The set of parameters used to specify how a bearing time record is constructed
  • DataRow: Time-series data with vector information
  • MetadataRow: Location and movement metadata

Iceberg Models

  • DataRow: Time-series data with vector information
  • MetadataRow: Location and movement metadata

API Reference

PostgreSQL Operations

For each PostgreSQL model, the following operations are available:

  • create_[model](model_instance): Create a new record
  • read_[model](id): Read a record by ID
  • read_[model](model_instance): Read a record by matching non-id fields
  • update_[model](model_instance): Update an existing record
  • delete_[model](id): Delete a record by ID
  • list_[models](**filters): List records with optional filters

Iceberg Operations

For Iceberg models, the following operations are available:

  • create_[model](model_instance): Create a new record
  • list_[models](**filters): List records with optional filters

Note: Update and delete operations for Iceberg tables require table-specific implementation due to the nature of Iceberg's data model.

Error Handling

The SDK includes proper error handling for:

  • Invalid model data
  • Database connection issues
  • Missing records
  • Iceberg catalog connectivity

Examples

See below for complete working examples of SDK operations.

"""Example usage of the Horizon Data Core SDK."""

from datetime import datetime
from uuid import uuid4

from .api import initialize_sdk
from .base_types import DataRow, DataStream, Entity, MetadataRow, Mission
from .client import PostgresClient
from .helpers import name_to_uuid


def example_usage() -> None:
    """Example of how to use the Horizon Data Core SDK."""
    # Initialize the SDK
    postgres_client = PostgresClient(
        user="postgres",
        passwd="password",
        host="localhost",
        port=5432,
        database_name="horizon",
    )

    # Initialize the SDK with organization_id
    organization_id = uuid4()  # This should come from your user context
    sdk = initialize_sdk(postgres_client, {}, organization_id)

    # Create a new entity
    entity = Entity(
        id=uuid4(),
        name="Example Entity",
        kind_id=uuid4(),
        free_text="This is an example entity",
        # organization_id will be automatically set by the SDK
    )
    created_entity = sdk.create_entity(entity)
    print(f"Created entity: {created_entity}")

    # Read the entity back
    assert created_entity.id is not None
    retrieved_entity = sdk.read_entity(created_entity.id)
    print(f"Retrieved entity: {retrieved_entity}")

    # List entities with filters (organization_id is automatically applied)
    entities = sdk.list_entities()
    print(f"Found {len(entities)} entities")

    # Create a data stream
    data_stream = DataStream(
        id=uuid4(),
        entity_id=created_entity.id,
        # organization_id will be automatically set by the SDK
    )
    created_data_stream = sdk.create_data_stream(data_stream)
    print(f"Created data stream: {created_data_stream}")
    assert created_data_stream.id is not None
    # Create a mission
    mission = Mission(
        id=uuid4(),
        name="Example Mission",
        start_datetime=datetime.now(),
        # organization_id will be automatically set by the SDK
    )
    created_mission = sdk.create_mission(mission)
    print(f"Created mission: {created_mission}")

    # Create a data row in Postgres
    data_row = DataRow(
        data_stream_id=created_data_stream.id,
        datetime=datetime.now(),
        vector=[1.0, 2.0, 3.0, 4.0, 5.0],
        data_type="example_data",
        track_id=name_to_uuid(
            "example_track"
        ),  # This should be a valid specification/track_id that has been created previously
        vector_start_bound=0.0,
        vector_end_bound=10.0,
    )
    created_data_row = sdk.create_data_row(data_row)
    print(f"Created data row: {created_data_row}")

    # Create a metadata row in Iceberg table
    metadata_row = MetadataRow(
        data_stream_id=created_data_stream.id,
        datetime=datetime.now(),
        latitude=40.7128,
        longitude=-74.0060,
        altitude=10.5,
        speed=25.0,
        heading=90.0,
    )
    created_metadata_row = sdk.create_metadata_row(metadata_row)
    print(f"Created metadata row: {created_metadata_row}")


if __name__ == "__main__":
    example_usage()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

horizon_data_core-4.0.1.tar.gz (119.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

horizon_data_core-4.0.1-py3-none-any.whl (40.7 kB view details)

Uploaded Python 3

File details

Details for the file horizon_data_core-4.0.1.tar.gz.

File metadata

  • Download URL: horizon_data_core-4.0.1.tar.gz
  • Upload date:
  • Size: 119.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.15 {"installer":{"name":"uv","version":"0.9.15","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for horizon_data_core-4.0.1.tar.gz
Algorithm Hash digest
SHA256 23b6cda3b3f2354b338b2ff677bbbf31c79be5e1c34e00b0da2e5cdd15c3291a
MD5 9723e66b14d8c2dfeca217b245f5dc56
BLAKE2b-256 172fddd3997ab25f811fa681dac937a82a2264f8f1f8ff76baf00804d9c6cec5

See more details on using hashes here.

File details

Details for the file horizon_data_core-4.0.1-py3-none-any.whl.

File metadata

  • Download URL: horizon_data_core-4.0.1-py3-none-any.whl
  • Upload date:
  • Size: 40.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.15 {"installer":{"name":"uv","version":"0.9.15","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for horizon_data_core-4.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 0693604568c0eaf0137fe93d402f9408ce863a8652b645f8e92200297748f0cd
MD5 fa0ef92542496a98dcd72853ffdc9c7a
BLAKE2b-256 2ad3d5239cf798477cf5dc260c77ee424459ce5ba6dcfa3b6be22dca1b9724cf

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page