Skip to main content

Chronon python API library

Project description

Chronon Python API

Overview

Chronon Python API for materializing configs to be run by the Chronon Engine. Contains python helpers to help managed a repo of feature and join definitions to be executed by the chronon scala engine.

User API Overview

Sources

Most fields are self explanatory. Time columns are expected to be in milliseconds (unixtime).

# File <repo>/sources/test_sources.py
from ai.chronon.query import (
  Query,
  select,
)
from ai.chronon.api.ttypes import Source, EventSource, EntitySource

# Sample query
Query(
  selects=select(
      user="user_id",
      created_at="created_at",
  ),
  wheres=["has_availability = 1"],
  start_partition="2021-01-01",  # Defines the beginning of time for computations related to the source.
  setups=["...UDF..."],
  time_column="ts",
  end_partition=None,
  mutation_time_column="mutation_timestamp",
  reversal_column="CASE WHEN mutation_type IN ('DELETE', 'UPDATE_BEFORE') THEN true ELSE false END"
)

user_activity = Source(entities=EntitySource(
  snapshotTable="db_exports.table",
  mutationTable="mutations_namespace.table_mutations",
  mutationTopic="mutationsKafkaTopic",
  query=Query(...)
)

website__views = Source(events=EventSource(
  table="namespace.table",
  topic="kafkaTopicForEvents",
)
Group By (Features)

Group Bys are aggregations over sources that define features. For example:

# File <repo>/group_bys/example_team/example_group_by.py
from ai.chronon.group_by import (
  GroupBy,
  Window,
  TimeUnit,
  Accuracy,
  Operation,
  Aggregations,
  Aggregation,
  DefaultAggregation,
)
from sources import test_sources

sum_cols = [f"active_{x}_days" for x in [30, 90, 120]]


v0 = GroupBy(
  sources=test_source.user_activity,
  keys=["user"],
  aggregations=Aggregations(
    user_active_1_day=Aggregation(operation=Operation.LAST),
    second_feature=Aggregation(
      input_column="active_7_days",
      operation=Operation.SUM,
      windows=[
        Window(n, TimeUnit.DAYS) for n in [3, 5, 9]
      ]
    ),
  ) + [
    Aggregation(
      input_column=col,
      operation=Operation.SUM
    ) for col in sum_columns           # Alternative syntax for defining aggregations.
  ] + [
    Aggregation(
      input_column="device",
      operation=LAST_K(10)
    )
  ],
  dependencies=[
    "db_exports.table/ds={{ ds }}"      # If not defined will be derived from the Source info.
  ],
  accuracy=Accuracy.SNAPSHOT,          # This could be TEMPORAL for point in time correctness.
  env={
    "backfill": {                      # Execution environment variables for each of the modes for `run.py`
      "EXECUTOR_MEMORY": "4G"
     },
  },
  online=True,                         # True if this group by needs to be uploaded to a KV Store.
  production=False                     # True if this group by is production level.
)
Join

A Join is a collection of feature values for the keys and (times if applicable) defined on the left (source). Example:

# File <repo>/joins/example_team/example_join.py
from ai.chronon.join import Join, JoinPart
from sources import test_sources
from group_bys.example_team import example_group_by

v1 = Join(
    left=test_sources.website__views,
    right_parts=[
        JoinPart(group_by=example_group_by.v0),
    ],
    online=True,       # True if this join will be fetched in production.
    production=False,  # True if this join should not use non-production group bys.
    env={"backfill": {"PARALLELISM": "10"}, "streaming": {"STREAMING_ENV_VAR": "VALUE"}},
)
Pre-commit Setup
  1. Install pre-commit and other dev libraries:
pip install -r requirements/dev.txt
  1. Run the following command under api/py to install the git hook scripts:
pre-commit install

To support more pre-commit hooks, add them to the .pre-commit-config.yaml file.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

chronon_ai-0.0.83.tar.gz (79.6 kB view details)

Uploaded Source

Built Distribution

chronon_ai-0.0.83-py3-none-any.whl (99.1 kB view details)

Uploaded Python 3

File details

Details for the file chronon_ai-0.0.83.tar.gz.

File metadata

  • Download URL: chronon_ai-0.0.83.tar.gz
  • Upload date:
  • Size: 79.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.9.17

File hashes

Hashes for chronon_ai-0.0.83.tar.gz
Algorithm Hash digest
SHA256 d5d19c8d91e1aceae0bb73cdb92490eb453e18ba192282c971be5e362653a26c
MD5 62f96e9cb068e7986b97b91817c568aa
BLAKE2b-256 42957c39697df350f32fe58c47d5921d16773e39d32630af26f8f8a97112df6a

See more details on using hashes here.

File details

Details for the file chronon_ai-0.0.83-py3-none-any.whl.

File metadata

  • Download URL: chronon_ai-0.0.83-py3-none-any.whl
  • Upload date:
  • Size: 99.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.9.17

File hashes

Hashes for chronon_ai-0.0.83-py3-none-any.whl
Algorithm Hash digest
SHA256 beca794f732e3f24f3f51b0a43f3b789697641c4751f262ebe53860613526d23
MD5 0ca9775eb3c41ed472f1e6842a6a7cca
BLAKE2b-256 9c25c63924dcac05758a8b4c914068e764a1783188b37ad6e713dcc10ad34328

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page