Skip to main content

Horizon’s core data SDK

Project description

Horizon Data Core SDK

The Horizon Data Core SDK provides a simple interface for working with both PostgreSQL and Iceberg tables in the Horizon system.

Getting started

Install the SDK:

uv add horizon-data-core

Features

  • Pydantic BaseModels: Type-safe data models for all entities
  • PostgreSQL Operations: Full CRUD operations for PostgreSQL tables
  • Iceberg Operations: Write and read operations for Iceberg tables
  • Automatic Conversion: Seamless conversion between Pydantic models and SQLAlchemy ORM models

Quick Start

1. Initialize the SDK

from horizon_data_core.api import initialize_sdk
from horizon_data_core.client import PostgresClient
from pyiceberg.catalog import load_catalog
from uuid import uuid4

# Set up PostgreSQL client
postgres_client = PostgresClient(
    organization_id=uuid4(),
    user="postgres",
    passwd="password",
    host="localhost",
    port=5432,
    database_name="horizon"
)

# Set up Iceberg catalog
iceberg_catalog = load_catalog("rest", uri="http://localhost:8181")

# Initialize the SDK with organization_id
organization_id = uuid4()  # This should come from your user context
initialize_sdk(postgres_client, iceberg_catalog, organization_id)

2. Working with PostgreSQL Tables

from horizon_data_core.api import create_entity, read_entity, update_entity, delete_entity, list_entities
from horizon_data_core.base_types import Entity
from uuid import uuid4
from datetime import datetime

# Create a new entity
entity = Entity(
    id=uuid4(),
    name="My Entity",
    kind_id=uuid4(),
    # organization_id will be automatically set by the SDK
)
created_entity = create_entity(entity)

# Read the entity
retrieved_entity = read_entity(created_entity.id)

# Update the entity
retrieved_entity.name = "Updated Entity Name"
updated_entity = update_entity(retrieved_entity)

# List entities with filters (organization_id is automatically applied)
entities = list_entities()

# Delete the entity
delete_entity(updated_entity.id)

3. Working with Iceberg Tables

from horizon_data_core.api import create_data_row, create_metadata_row, list_data_rows, list_metadata_rows
from horizon_data_core.base_types import DataRow, MetadataRow
from datetime import datetime

# Create a data row
data_row = DataRow(
    data_stream_id="stream-123",
    datetime=datetime.now(),
    vector=[1.0, 2.0, 3.0, 4.0, 5.0],
    data_type="sensor_data",
    vector_start_bound=0.0,
    vector_end_bound=10.0
)
create_data_row(data_row)

# Create a metadata row
metadata_row = MetadataRow(
    data_stream_id="stream-123",
    datetime=datetime.now(),
    latitude=40.7128,
    longitude=-74.0060,
    altitude=10.5,
    speed=25.0,
    heading=90.0
)
create_metadata_row(metadata_row)

# List data rows with filters
data_rows = list_data_rows(data_stream_id="stream-123")
metadata_rows = list_metadata_rows(data_stream_id="stream-123")

Available Models

PostgreSQL Models

  • EntityKind: A descriptive class of which an entity is a physical instantiation of.
  • Entity: Core entity instances
  • DataStream: Data streams associated with entities
  • Mission: Mission definitions
  • MissionEntity: Mission-entity relationships
  • Ontology: Ontology definitions
  • OntologyClass: Classes within ontologies
  • BeamgramSpecification: The set of parameters used to specify how a beamgram is constructed
  • BearingTimeRecordSpecification: The set of parameters used to specify how a bearing time record is constructed
  • DataRow: Time-series data with vector information
  • MetadataRow: Location and movement metadata

Iceberg Models

  • DataRow: Time-series data with vector information
  • MetadataRow: Location and movement metadata

API Reference

PostgreSQL Operations

For each PostgreSQL model, the following operations are available:

  • create_[model](model_instance): Create a new record
  • read_[model](id): Read a record by ID
  • read_[model](model_instance): Read a record by matching non-id fields
  • update_[model](model_instance): Update an existing record
  • delete_[model](id): Delete a record by ID
  • list_[models](**filters): List records with optional filters

Iceberg Operations

For Iceberg models, the following operations are available:

  • create_[model](model_instance): Create a new record
  • list_[models](**filters): List records with optional filters

Note: Update and delete operations for Iceberg tables require table-specific implementation due to the nature of Iceberg's data model.

Error Handling

The SDK includes proper error handling for:

  • Invalid model data
  • Database connection issues
  • Missing records
  • Iceberg catalog connectivity

Examples

See below for complete working examples of SDK operations.

"""Example usage of the Horizon Data Core SDK."""

from datetime import datetime
from uuid import uuid4

from .api import initialize_sdk
from .base_types import DataRow, DataStream, Entity, MetadataRow, Mission
from .client import PostgresClient
from .helpers import name_to_uuid


def example_usage() -> None:
    """Example of how to use the Horizon Data Core SDK."""
    # Initialize the SDK
    postgres_client = PostgresClient(
        user="postgres",
        passwd="password",
        host="localhost",
        port=5432,
        database_name="horizon",
    )

    # Initialize the SDK with organization_id
    organization_id = uuid4()  # This should come from your user context
    sdk = initialize_sdk(postgres_client, {}, organization_id)

    # Create a new entity
    entity = Entity(
        id=uuid4(),
        name="Example Entity",
        kind_id=uuid4(),
        free_text="This is an example entity",
        # organization_id will be automatically set by the SDK
    )
    created_entity = sdk.create_entity(entity)
    print(f"Created entity: {created_entity}")

    # Read the entity back
    assert created_entity.id is not None
    retrieved_entity = sdk.read_entity(created_entity.id)
    print(f"Retrieved entity: {retrieved_entity}")

    # List entities with filters (organization_id is automatically applied)
    entities = sdk.list_entities()
    print(f"Found {len(entities)} entities")

    # Create a data stream
    data_stream = DataStream(
        id=uuid4(),
        entity_id=created_entity.id,
        # organization_id will be automatically set by the SDK
    )
    created_data_stream = sdk.create_data_stream(data_stream)
    print(f"Created data stream: {created_data_stream}")
    assert created_data_stream.id is not None
    # Create a mission
    mission = Mission(
        id=uuid4(),
        name="Example Mission",
        start_datetime=datetime.now(),
        # organization_id will be automatically set by the SDK
    )
    created_mission = sdk.create_mission(mission)
    print(f"Created mission: {created_mission}")

    # Create a data row in Postgres
    data_row = DataRow(
        data_stream_id=created_data_stream.id,
        datetime=datetime.now(),
        vector=[1.0, 2.0, 3.0, 4.0, 5.0],
        data_type="example_data",
        track_id=name_to_uuid(
            "example_track"
        ),  # This should be a valid specification/track_id that has been created previously
        vector_start_bound=0.0,
        vector_end_bound=10.0,
    )
    created_data_row = sdk.create_data_row(data_row)
    print(f"Created data row: {created_data_row}")

    # Create a metadata row in Iceberg table
    metadata_row = MetadataRow(
        data_stream_id=created_data_stream.id,
        datetime=datetime.now(),
        latitude=40.7128,
        longitude=-74.0060,
        altitude=10.5,
        speed=25.0,
        heading=90.0,
    )
    created_metadata_row = sdk.create_metadata_row(metadata_row)
    print(f"Created metadata row: {created_metadata_row}")


if __name__ == "__main__":
    example_usage()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

horizon_data_core-4.2.1.tar.gz (119.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

horizon_data_core-4.2.1-py3-none-any.whl (40.9 kB view details)

Uploaded Python 3

File details

Details for the file horizon_data_core-4.2.1.tar.gz.

File metadata

  • Download URL: horizon_data_core-4.2.1.tar.gz
  • Upload date:
  • Size: 119.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.15 {"installer":{"name":"uv","version":"0.9.15","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for horizon_data_core-4.2.1.tar.gz
Algorithm Hash digest
SHA256 98a35a5445d7aad6c6a21ee822f1856b216bf2ddbcbef885864b5b2b18371af6
MD5 c36fbe05b21d16e3fdf7606df71ae7b7
BLAKE2b-256 b2ac5f480dcbefd41d122b0dd8ccd4bb62b974617990550f1ff2b0545c2b1676

See more details on using hashes here.

File details

Details for the file horizon_data_core-4.2.1-py3-none-any.whl.

File metadata

  • Download URL: horizon_data_core-4.2.1-py3-none-any.whl
  • Upload date:
  • Size: 40.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.15 {"installer":{"name":"uv","version":"0.9.15","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for horizon_data_core-4.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 610c0017256c32094a9f083bfa51be8236062fe865fc449a5ce99e2b94bbb1bf
MD5 57b5d0c46a9732c328afd6fd89e37b86
BLAKE2b-256 65d5525f8ee743488ecf66bb7a641aab2405a7250898a708e0e14d9bfd531c0e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page