Skip to main content

Chronon python API library

Project description

Chronon Python API

Overview

Chronon Python API for materializing configs to be run by the Chronon Engine. Contains python helpers to help managed a repo of feature and join definitions to be executed by the chronon scala engine.

User API Overview

Sources

Most fields are self explanatory. Time columns are expected to be in milliseconds (unixtime).

# File <repo>/sources/test_sources.py
from ai.chronon.query import (
  Query,
  select,
)
from ai.chronon.api.ttypes import Source, EventSource, EntitySource

# Sample query
Query(
  selects=select(
      user="user_id",
      created_at="created_at",
  ),
  wheres=["has_availability = 1"],
  start_partition="2021-01-01",  # Defines the beginning of time for computations related to the source.
  setups=["...UDF..."],
  time_column="ts",
  end_partition=None,
  mutation_time_column="mutation_timestamp",
  reversal_column="CASE WHEN mutation_type IN ('DELETE', 'UPDATE_BEFORE') THEN true ELSE false END"
)

user_activity = Source(entities=EntitySource(
  snapshotTable="db_exports.table",
  mutationTable="mutations_namespace.table_mutations",
  mutationTopic="mutationsKafkaTopic",
  query=Query(...)
)

website__views = Source(events=EventSource(
  table="namespace.table",
  topic="kafkaTopicForEvents",
)
Group By (Features)

Group Bys are aggregations over sources that define features. For example:

# File <repo>/group_bys/example_team/example_group_by.py
from ai.chronon.group_by import (
  GroupBy,
  Window,
  TimeUnit,
  Accuracy,
  Operation,
  Aggregations,
  Aggregation,
  DefaultAggregation,
)
from sources import test_sources

sum_cols = [f"active_{x}_days" for x in [30, 90, 120]]


v0 = GroupBy(
  sources=test_source.user_activity,
  keys=["user"],
  aggregations=Aggregations(
    user_active_1_day=Aggregation(operation=Operation.LAST),
    second_feature=Aggregation(
      input_column="active_7_days",
      operation=Operation.SUM,
      windows=[
        Window(n, TimeUnit.DAYS) for n in [3, 5, 9]
      ]
    ),
  ) + [
    Aggregation(
      input_column=col,
      operation=Operation.SUM
    ) for col in sum_columns           # Alternative syntax for defining aggregations.
  ] + [
    Aggregation(
      input_column="device",
      operation=LAST_K(10)
    )
  ],
  dependencies=[
    "db_exports.table/ds={{ ds }}"      # If not defined will be derived from the Source info.
  ],
  accuracy=Accuracy.SNAPSHOT,          # This could be TEMPORAL for point in time correctness.
  env={
    "backfill": {                      # Execution environment variables for each of the modes for `run.py`
      "EXECUTOR_MEMORY": "4G"
     },
  },
  online=True,                         # True if this group by needs to be uploaded to a KV Store.
  production=False                     # True if this group by is production level.
)
Join

A Join is a collection of feature values for the keys and (times if applicable) defined on the left (source). Example:

# File <repo>/joins/example_team/example_join.py
from ai.chronon.join import Join, JoinPart
from sources import test_sources
from group_bys.example_team import example_group_by

v1 = Join(
    left=test_sources.website__views,
    right_parts=[
        JoinPart(group_by=example_group_by.v0),
    ],
    online=True,       # True if this join will be fetched in production.
    production=False,  # True if this join should not use non-production group bys.
    env={"backfill": {"PARALLELISM": "10"}, "streaming": {"STREAMING_ENV_VAR": "VALUE"}},
)
Pre-commit Setup
  1. Install pre-commit and other dev libraries:
pip install -r requirements/dev.txt
  1. Run the following command under api/py to install the git hook scripts:
pre-commit install

To support more pre-commit hooks, add them to the .pre-commit-config.yaml file.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

chronon_ai-0.0.85.tar.gz (79.9 kB view details)

Uploaded Source

Built Distribution

chronon_ai-0.0.85-py3-none-any.whl (99.6 kB view details)

Uploaded Python 3

File details

Details for the file chronon_ai-0.0.85.tar.gz.

File metadata

  • Download URL: chronon_ai-0.0.85.tar.gz
  • Upload date:
  • Size: 79.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.10.13

File hashes

Hashes for chronon_ai-0.0.85.tar.gz
Algorithm Hash digest
SHA256 b122d0f3c0fc6ffb0f715536a86b3945b5c1553d045873842b917f1f6cc35050
MD5 faade6dfaa01bdc42dac5282ebb81a69
BLAKE2b-256 7cdf11c2700bb94a99054fedc7bfa3ab5c4cd1ec656fcc2b1e8e6d82b9185902

See more details on using hashes here.

File details

Details for the file chronon_ai-0.0.85-py3-none-any.whl.

File metadata

  • Download URL: chronon_ai-0.0.85-py3-none-any.whl
  • Upload date:
  • Size: 99.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.10.13

File hashes

Hashes for chronon_ai-0.0.85-py3-none-any.whl
Algorithm Hash digest
SHA256 8a6413af42b47ea164065c398326d4b5844205728763e781099b2591a4828309
MD5 0dbc23ed351ecfe31401d1debf38a5b2
BLAKE2b-256 df1a4d1e0fbaf7b7367d7aab6637b3d27e3ac5a6e7f305641c419aac84dd0d54

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page