Skip to main content

CLI tool for the Zipline AI platform

Project description

Chronon Python API

Overview

Chronon Python API for materializing configs to be run by the Chronon Engine. Contains python helpers to help managed a repo of feature and join definitions to be executed by the chronon scala engine.

User API Overview

Sources

Most fields are self explanatory. Time columns are expected to be in milliseconds (unixtime).

# File <repo>/sources/test_sources.py
from ai.chronon.query import (
  Query,
  select,
)
from ai.chronon.api.ttypes import Source, EventSource, EntitySource

# Sample query
Query(
  selects=select(
      user="user_id",
      created_at="created_at",
  ),
  wheres=["has_availability = 1"],
  start_partition="2021-01-01",  # Defines the beginning of time for computations related to the source.
  setups=["...UDF..."],
  time_column="ts",
  end_partition=None,
  mutation_time_column="mutation_timestamp",
  reversal_column="CASE WHEN mutation_type IN ('DELETE', 'UPDATE_BEFORE') THEN true ELSE false END"
)

user_activity = Source(entities=EntitySource(
  snapshotTable="db_exports.table",
  mutationTable="mutations_namespace.table_mutations",
  mutationTopic="mutationsKafkaTopic",
  query=Query(...)
)

website__views = Source(events=EventSource(
  table="namespace.table",
  topic="kafkaTopicForEvents",
)
Group By (Features)

Group Bys are aggregations over sources that define features. For example:

# File <repo>/group_bys/example_team/example_group_by.py
from ai.chronon.group_by import (
  GroupBy,
  Window,
  TimeUnit,
  Accuracy,
  Operation,
  Aggregations,
  Aggregation,
  DefaultAggregation,
)
from sources import test_sources

sum_cols = [f"active_{x}_days" for x in [30, 90, 120]]


v0 = GroupBy(
  sources=test_source.user_activity,
  keys=["user"],
  aggregations=Aggregations(
    user_active_1_day=Aggregation(operation=Operation.LAST),
    second_feature=Aggregation(
      input_column="active_7_days",
      operation=Operation.SUM,
      windows=[
        Window(n, TimeUnit.DAYS) for n in [3, 5, 9]
      ]
    ),
  ) + [
    Aggregation(
      input_column=col,
      operation=Operation.SUM
    ) for col in sum_columns           # Alternative syntax for defining aggregations.
  ] + [
    Aggregation(
      input_column="device",
      operation=LAST_K(10)
    )
  ],
  dependencies=[
    "db_exports.table/ds={{ ds }}"      # If not defined will be derived from the Source info.
  ],
  accuracy=Accuracy.SNAPSHOT,          # This could be TEMPORAL for point in time correctness.
  env={
    "backfill": {                      # Execution environment variables for each of the modes for `run.py`
      "EXECUTOR_MEMORY": "4G"
     },
  },
  online=True,                         # True if this group by needs to be uploaded to a KV Store.
  production=False                     # True if this group by is production level.
)
Join

A Join is a collection of feature values for the keys and (times if applicable) defined on the left (source). Example:

# File <repo>/joins/example_team/example_join.py
from ai.chronon.join import Join, JoinPart
from sources import test_sources
from group_bys.example_team import example_group_by

v1 = Join(
    left=test_sources.website__views,
    right_parts=[
        JoinPart(group_by=example_group_by.v0),
    ],
    online=True,       # True if this join will be fetched in production.
    production=False,  # True if this join should not use non-production group bys.
    env={"backfill": {"PARALLELISM": "10"}, "streaming": {"STREAMING_ENV_VAR": "VALUE"}},
)
Pre-commit Setup
  1. Install pre-commit and other dev libraries:
pip install -r requirements/dev.txt
  1. Run the following command under api/python to install the git hook scripts:
pre-commit install

To support more pre-commit hooks, add them to the .pre-commit-config.yaml file.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

awx_zipline_ai-0.3.0.tar.gz (143.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

awx_zipline_ai-0.3.0-py3-none-any.whl (173.7 kB view details)

Uploaded Python 3

File details

Details for the file awx_zipline_ai-0.3.0.tar.gz.

File metadata

  • Download URL: awx_zipline_ai-0.3.0.tar.gz
  • Upload date:
  • Size: 143.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.10

File hashes

Hashes for awx_zipline_ai-0.3.0.tar.gz
Algorithm Hash digest
SHA256 e9a315ba24ae7016023127c6117daace40c476295263f5427cce373e21ab3e1b
MD5 e4d7eca9fc2d8973cbe21e75d97f94c9
BLAKE2b-256 bc9bb405be69cf8bef52ba713c507de8334f39a532eaff27a1203e4d7fd5bc48

See more details on using hashes here.

File details

Details for the file awx_zipline_ai-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: awx_zipline_ai-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 173.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.10

File hashes

Hashes for awx_zipline_ai-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 0372da8cfa7a5d0f1004c5959594625ab33e9c4558b9583dde14c3595fb6fa9a
MD5 8779424220d454a4234451bdb612adfc
BLAKE2b-256 3f2574b349106153f3b06316abde6a69f34ec4db75138573f32e8b62ff8ae4a3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page