Skip to main content

A data managment and lineage tool for ML applications.

Project description

Aligned

A data managment tool for ML applications.

Similar to have DBT is a data managment tool for business analytics, will Aligned manage ML projects. Therefore, Aligned makes it possible to collect data lineage between models, feature transformations etc. While also making it easy to reduce data leakage with point-in-time valid data and fix other problems described in Sculley et al. [2015].

Docs

Check out the Aligned Docs, but keep in mind that they are still work in progress.


Want to look at examples of how to use aligned? View the MatsMoll/aligned-example repo.

This is done by providing an new innovative way of describing feature transformations, and data flow in ML systems. While also collecting dependency metadata that would otherwise be too inconvenient and error prone to manually type out.

Therefore, you get the following:

All from the simple API of defining

As a result, loading model features is as easy as:

entities = {"passenger_id": [1, 2, 3, 4]}
await store.model("titanic").features_for(entities).to_pandas()

Aligned is still in active development, so changes are likely.

Feature Views

Write features as the should be, as data models. Then get code completion and typesafety by referencing them in other features.

This makes the features light weight, data source indipendent, and flexible.

@feature_view(
    name="passenger",
    description="Some features from the titanic dataset",
    source=FileSource.csv_at("titanic.csv"),
    materialized_source=FileSource.parquet_at("titanic.parquet"),
)
class TitanicPassenger:

    passenger_id = Int32().as_entity()

    age = (
        Float()
            .description("A float as some have decimals")
            .lower_bound(0)
            .upper_bound(110)
    )

    name = String()
    sex = String().accepted_values(["male", "female"])
    survived = Bool().description("If the passenger survived")
    sibsp = Int32().lower_bound(0).description("Number of siblings on titanic")
    cabin = String().is_optional()

    # Creates two one hot encoded values
    is_male, is_female = sex.one_hot_encode(['male', 'female'])

Data sources

Alinged makes handling data sources easy, as you do not have to think about how it is done. Only define where the data is, and we handle the dirty work.

Furthermore, you can also add materialised sources which can be used as intermediate sources.

my_db = PostgreSQLConfig(env_var="DATABASE_URL")
redis = RedisConfig(env_var="REDIS_URL")

@feature_view(
    name="passenger",
    description="Some features from the titanic dataset",
    source=my_db.table(
        "passenger",
        mapping_keys={
            "Passenger_Id": "passenger_id"
        }
    ),
    materialized_source=my_db.with_schema("inter").table("passenger"),
    stream_source=redis.stream(topic="titanic")
)
class TitanicPassenger:

    passenger_id = Int32().as_entity()

    # Some features
    ...

Fast development

Making iterativ and fast exploration in ML is important. This is why Aligned also makes it super easy to combine, and test multiple sources.

my_db = PostgreSQLConfig.localhost()

aws_bucket = AwsS3Config(...)

@feature_view(
    name="passengers",
    description="...",
    source=my_db.table("passengers")
)
class TitanicPassenger:

    passenger_id = Int32().as_entity()

    # Some features
    ...

# Change data source
passenger_view = TitanicPassenger.query()

psql_passengers = await passenger_view.all().to_pandas()
aws_passengers = await passenger_view.using_source(
    aws_bucket.parquet_at("passengers.parquet")
).to_pandas()

Describe Models

Usually will you need to combine multiple features for each model. This is where a Model comes in. Here can you define which features should be exposed.

passenger = TitanicPassenger()
location = LocationFeatures()

@model_contract(
    name="titanic",
    features=[ # aka. the model input
        passenger.constant_filled_age,
        passenger.ordinal_sex,
        passenger.sibsp,

        location.distance_to_shore,
        location.distance_to_closest_boat
    ]
)
class Titanic:

    # Referencing the passenger's survived feature as the target
    did_survive = passenger.survived.as_classification_target()

Data Freshness

Making sure a source contains fresh data is a crucial part to create propper ML applications. Therefore, Aligned provides an easy way to check how fresh a source is.

@feature_view(
    name="departures",
    description="Features related to the departure of a taxi ride",
    source=taxi_db.table("departures"),
)
class TaxiDepartures:

    trip_id = UUID().as_entity()

    pickuped_at = EventTimestamp()

    number_of_passengers = Int32()

    dropoff_latitude = Float().is_required()
    dropoff_longitude = Float().is_required()

    pickup_latitude = Float().is_required()
    pickup_longitude = Float().is_required()


freshness = await TaxiDepartures.freshness_in_batch_source()

if freshness < datetime.now() - timedelta(days=2):
    raise ValueError("To old data to create an ML model")

Data Enrichers

In many cases will extra data be needed in order to generate some features. We therefore need some way of enriching the data. This can easily be done with Alinged's DataEnrichers.

my_db = PostgreSQLConfig.localhost()
redis = RedisConfig.localhost()

user_location = my_db.data_enricher( # Fetch all user locations
    sql="SELECT * FROM user_location"
).cache( # Cache them for one day
    ttl=timedelta(days=1),
    cache_key="user_location_cache"
).lock( # Make sure only one processer fetches the data at a time
    lock_name="user_location_lock",
    redis_config=redis
)


async def distance_to_users(df: DataFrame) -> Series:
    user_location_df = await user_location.load()
    ...
    return distances

@feature_view(...)
class SomeFeatures:

    latitude = Float()
    longitude = Float()

    distance_to_users = Float().transformed_using_features_pandas(
        [latitude, longitude],
        distance_to_users
    )

Access Data

You can easily create a feature store that contains all your feature definitions. This can then be used to genreate data sets, setup an instce to serve features, DAG's etc.

store = await FileSource.json_at("./feature-store.json").feature_store()

# Select all features from a single feature view
df = await store.all_for("passenger", limit=100).to_pandas()

Centraliced Feature Store Definition

You would often share the features with other coworkers, or split them into different stages, like staging, shadow, or production. One option is therefore to reference the storage you use, and load the FeatureStore from there.

aws_bucket = AwsS3Config(...)
store = await aws_bucket.json_at("production.json").feature_store()

# This switches from the production online store to the offline store
# Aka. the batch sources defined on the feature views
experimental_store = store.offline_store()

This json file can be generated by running aligned apply.

Select multiple feature views

df = await store.features_for({
    "passenger_id": [1, 50, 110]
}, features=[
    "passenger:scaled_age",
    "passenger:is_male",
    "passenger:sibsp"

    "other_features:distance_to_closest_boat",
]).to_polars()

Model Service

Selecting features for a model is super simple.

df = await store.model("titanic_model").features_for({
    "passenger_id": [1, 50, 110]
}).to_pandas()

Feature View

If you want to only select features for a specific feature view, then this is also possible.

prev_30_days = await store.feature_view("match").previous(days=30).to_pandas()
sample_of_20 = await store.feature_view("match").all(limit=20).to_pandas()

Data quality

Alinged will make sure all the different features gets formatted as the correct datatype. In addition will aligned also make sure that the returend features aligne with defined constraints.

@feature_view(...)
class TitanicPassenger:

    ...

    age = (
        Float()
            .is_required()
            .lower_bound(0)
            .upper_bound(110)
    )
    sibsp = Int32().lower_bound(0, is_inclusive=True)

Then since our feature view have a is_required and a lower_bound, will the .validate(...) command filter out the entites that do not follow that behavior.

from aligned.validation.pandera import PanderaValidator

df = await store.model("titanic_model").features_for({
    "passenger_id": [1, 50, 110]
}).validate(
    PanderaValidator()  # Validates all features
).to_pandas()

Feature Server

You can define how to serve your features with the FeatureServer. Here can you define where you want to load, and potentially write your features to.

By default will it aligned look for a file called server.py, and a FeatureServer object called server. However, this can be defined manually as well.

from aligned import RedisConfig, FileSource
from aligned.schemas.repo_definition import FeatureServer

store = FileSource.json_at("feature-store.json")

server = FeatureServer.from_reference(
    store,
    RedisConfig.localhost()
)

Then run aligned serve, and a FastAPI server will start. Here can you push new features, which then transforms and stores the features, or just fetch them.

Stream Worker

You can also setup stream processing with a similar structure. However, here will a StreamWorker be used.

by default will aligned look for a worker.py file with an object called worker. An example would be the following.

from aligned import RedisConfig, FileSource
from aligned.schemas.repo_definition import FeatureServer

store = FileSource.json_at("feature-store.json")

server = FeatureServer.from_reference(
    store,
    RedisConfig.localhost()
)

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aligned-0.0.59.tar.gz (134.9 kB view details)

Uploaded Source

Built Distribution

aligned-0.0.59-py3-none-any.whl (168.5 kB view details)

Uploaded Python 3

File details

Details for the file aligned-0.0.59.tar.gz.

File metadata

  • Download URL: aligned-0.0.59.tar.gz
  • Upload date:
  • Size: 134.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.1 CPython/3.10.9 Linux/6.2.0-1018-azure

File hashes

Hashes for aligned-0.0.59.tar.gz
Algorithm Hash digest
SHA256 2ff4a84269111f8749d26864bb73504e98cb17495cf6e27d257ecdd20e1aa944
MD5 76e1122318917632c90492e025263828
BLAKE2b-256 e311a83748486442dd67941b945ca9b744dc1b1b78ca19e5c855521828efd70d

See more details on using hashes here.

File details

Details for the file aligned-0.0.59-py3-none-any.whl.

File metadata

  • Download URL: aligned-0.0.59-py3-none-any.whl
  • Upload date:
  • Size: 168.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.1 CPython/3.10.9 Linux/6.2.0-1018-azure

File hashes

Hashes for aligned-0.0.59-py3-none-any.whl
Algorithm Hash digest
SHA256 014957a8309cb2f46350bd7dcba9e928987007d5a752a0cb247ca200b0eb9401
MD5 5e3b01bda94b5fd0ad0538c67b787404
BLAKE2b-256 7790f9d5a8c405c1eb1e189327b0f06795a117e0c901eb16d09fd69a999d1ecc

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page