Skip to main content

Chronon python API library

Project description

Chronon Python API

Overview

Chronon Python API for materializing configs to be run by the Chronon Engine. Contains python helpers to help managed a repo of feature and join definitions to be executed by the chronon scala engine.

User API Overview

Sources

Most fields are self explanatory. Time columns are expected to be in milliseconds (unixtime).

# File <repo>/sources/test_sources.py
from ai.chronon.query import (
  Query,
  select,
)
from ai.chronon.api.ttypes import Source, EventSource, EntitySource

# Sample query
Query(
  selects=select(
      user="user_id",
      created_at="created_at",
  ),
  wheres=["has_availability = 1"],
  start_partition="2021-01-01",  # Defines the beginning of time for computations related to the source.
  setups=["...UDF..."],
  time_column="ts",
  end_partition=None,
  mutation_time_column="mutation_timestamp",
  reversal_column="CASE WHEN mutation_type IN ('DELETE', 'UPDATE_BEFORE') THEN true ELSE false END"
)

user_activity = Source(entities=EntitySource(
  snapshotTable="db_exports.table",
  mutationTable="mutations_namespace.table_mutations",
  mutationTopic="mutationsKafkaTopic",
  query=Query(...)
)

website__views = Source(events=EventSource(
  table="namespace.table",
  topic="kafkaTopicForEvents",
)
Group By (Features)

Group Bys are aggregations over sources that define features. For example:

# File <repo>/group_bys/example_team/example_group_by.py
from ai.chronon.group_by import (
  GroupBy,
  Window,
  TimeUnit,
  Accuracy,
  Operation,
  Aggregations,
  Aggregation,
  DefaultAggregation,
)
from sources import test_sources

sum_cols = [f"active_{x}_days" for x in [30, 90, 120]]


v0 = GroupBy(
  sources=test_source.user_activity,
  keys=["user"],
  aggregations=Aggregations(
    user_active_1_day=Aggregation(operation=Operation.LAST),
    second_feature=Aggregation(
      input_column="active_7_days",
      operation=Operation.SUM,
      windows=[
        Window(n, TimeUnit.DAYS) for n in [3, 5, 9]
      ]
    ),
  ) + [
    Aggregation(
      input_column=col,
      operation=Operation.SUM
    ) for col in sum_columns           # Alternative syntax for defining aggregations.
  ] + [
    Aggregation(
      input_column="device",
      operation=LAST_K(10)
    )
  ],
  dependencies=[
    "db_exports.table/ds={{ ds }}"      # If not defined will be derived from the Source info.
  ],
  accuracy=Accuracy.SNAPSHOT,          # This could be TEMPORAL for point in time correctness.
  env={
    "backfill": {                      # Execution environment variables for each of the modes for `run.py`
      "EXECUTOR_MEMORY": "4G"
     },
  },
  online=True,                         # True if this group by needs to be uploaded to a KV Store.
  production=False                     # True if this group by is production level.
)
Join

A Join is a collection of feature values for the keys and (times if applicable) defined on the left (source). Example:

# File <repo>/joins/example_team/example_join.py
from ai.chronon.join import Join, JoinPart
from sources import test_sources
from group_bys.example_team import example_group_by

v1 = Join(
    left=test_sources.website__views,
    right_parts=[
        JoinPart(group_by=example_group_by.v0),
    ],
    online=True,       # True if this join will be fetched in production.
    production=False,  # True if this join should not use non-production group bys.
    env={"backfill": {"PARALLELISM": "10"}, "streaming": {"STREAMING_ENV_VAR": "VALUE"}},
)
Pre-commit Setup
  1. Install pre-commit and other dev libraries:
pip install -r requirements/dev.txt
  1. Run the following command under api/py to install the git hook scripts:
pre-commit install

To support more pre-commit hooks, add them to the .pre-commit-config.yaml file.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

chronon_ai-0.0.84.tar.gz (79.9 kB view details)

Uploaded Source

Built Distribution

chronon_ai-0.0.84-py3-none-any.whl (99.6 kB view details)

Uploaded Python 3

File details

Details for the file chronon_ai-0.0.84.tar.gz.

File metadata

  • Download URL: chronon_ai-0.0.84.tar.gz
  • Upload date:
  • Size: 79.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.10.13

File hashes

Hashes for chronon_ai-0.0.84.tar.gz
Algorithm Hash digest
SHA256 42f3097e1ccd83ec3350f42df11d57a288e8be3142f85685d8b2063a50f157c7
MD5 bb6ab41085d8424be9eca6d8c8835691
BLAKE2b-256 37121030594956207ff7c188a3a1ec8d0df9ae9267e3c6e9aa6dec0ebdb4eb7c

See more details on using hashes here.

File details

Details for the file chronon_ai-0.0.84-py3-none-any.whl.

File metadata

  • Download URL: chronon_ai-0.0.84-py3-none-any.whl
  • Upload date:
  • Size: 99.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.10.13

File hashes

Hashes for chronon_ai-0.0.84-py3-none-any.whl
Algorithm Hash digest
SHA256 e2dd1834d2eae92ed039bd9fe80a05e5e628e6803ba3a013e0b5520cafa4bd24
MD5 ae50e1cc750aec0c670e8b21c00cb862
BLAKE2b-256 168cb552f183ea999cb03b50d511057a24c92b333739a3de19e8e21265c6eab1

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page