Skip to main content

Canso Platform's Python package for Data & Machine Learning Usecases

Project description

Overview

Canso is a Managed Data and Feature Platform for operationalizing Machine Learning initiatives. The goal of Canso is to enable ML Teams (Data Engineers, Data Scientists, ML Engineers) to define their requirements in a declarative and standardized manner via a concise DSL without having to focus on writing custom code for Features, DAGs etc and managing infrastructure. This enables ML teams to

  • Iterate fast i.e. move from development to production in hours/days as opposed to weeks
  • Promote Reliability i.e build standardized ML pipelines

Canso's core focus is on user experience and speed of iteration, without compromising on reliability -

  • Define data sources where features can be created and computed.
  • Specify data sinks where processed data is stored after a successful ML pipeline run.
  • Define Machine Learning features in a standardized manner on top of existing Datasources and deploy them. These features can be used while Model training as well as for Model inference. Canso supports Raw, Derived and Streaming features currently.
  • Register and deploy features to execute the ML pipeline.

User Experience

Getting Started

1. Install Gru Package

For installing gru package will need to username and PAT as password.

  • A Personal Access Token (PAT) is a kind of key that authenticates a user across all applications they have access to.
pip3 install git+https://github.com/Yugen-ai/gru.git

2. Create Yugen client

yugen_client = YugenClient(access_token=access_token, config_path="./gru/config.yaml")

3. Define a s3 Data Source

s3_data_source_obj = S3DataSource(
    name="survey_telemetry_data",
    bucket="internal-ml-demos",
    base_key="recsys/survey-data/phase3_3/survey-telemetry/",
    varying_key_suffix_format="%Y-%m-%d/%H%M",
    varying_key_suffix_freq="30min",
    time_offset=0,
    description="random desc of data source",
    owners=["xyz"],
    created_at=datetime.now(),
    file_type=CSVType(header=True),
    schema=schema_obj,
    event_timestamp_field="time",
    event_timestamp_format="yyyy-MM-dd HH:mm:ssXXX",
)

4. Register Data Source

yugen_client.register(s3_data_source_obj)

5. Define a Raw Feature

raw_feature = RawFeature(
    name="avg_order_val_3_days",
    description="Avg order per cusotmer for last 3 days",
    data_type=DataType.FLOAT,
    source_name=["survey_telemetry_data"],
    staging_sink=["s3_sink_ml_yugen_internal"],
    online_sink=["elasticache-redis-yugen"],
    owners=["vanshika@yugen.ai"],
    entity=["test"],
    feature_logic=FeatureLogic(
        field=["ad_id"],
        filter="""ad_id is NOT NULL""",
        transform=SlidingWindowAggregation(
            function="avg",
            partition_by="provider",
            order_by="cpi",
            # rangeBetween= {"frame_start": 1, "frame_end": 6},
            rowsBetween={"frame_start": 1, "frame_end": 2},
        ),
        time_window="3d",
        groupby_keys=["project_id"],
        timestamp_field="time",
        timestamp_format="yyyy-MM-dd HH:mm:ssXXX",
    ),
    online=True,
    offline=True,
    schedule="0 0 * * *",
    active=True,
    start_time=datetime(2023, 4, 1, 0, 0, 0),
)

6. Register Raw Feature

yugen_client.register(raw_feature)

7. Dry run Raw Feature

yugen_client.dry_run("avg_order_val_3_days", entity_type=EntityType.RAW_FEATURE, start_date=datetime(2023, 4, 1, 0, 0, 0), end_date=datetime(2023, 4, 2, 0, 0, 0))

8. Deploy Raw Feature

yugen_client.deploy("avg_order_val_3_days", EntityType.RAW_FEATURE)

9. Define a Derived Feature

derived_feature = DerivedFeature(
    name="total_purchases",
    description="Total purchase amount for the store",
    staging_sink=["s3_sink_ml_yugen_internal"],
    online_sink=["elasticache-redis-yugen"],
    data_type=DataType.FLOAT,
    owners=["all-ds@company.com"],
    schedule="0 0 * * *",
    entity=["CASE WHEN cpi> 0.5 THEN 10 ELSE 0 END"],
    online=False,
    offline=True,
    transform=multiply("avg_orders_last_3_days", "number_users"),
    start_time=datetime(2022, 8, 26, 0, 0, 0),
)

10. Register Derived Feature

yugen_client.register(derived_feature)

11. Dry run Derived Feature

yugen_client.dry_run("total_purchases", entity_type=EntityType.DERIVED_FEATURE, start_date=datetime(2022, 8, 26, 0, 0, 0), end_date=datetime(2022, 8, 27, 0, 0, 0))

12. Deploy Derived Feature

yugen_client.deploy("total_purchases", EntityType.DERIVED_FEATURE)

13. Define Pre-Processing Transform

ppt = PreProcessingTransform(
    transform_name="user_avg_spend_transform_final_testing_for_dry_run",
    description="test preprocess transform",
    data_source_names=["marketing_survey_data_info", "data_telemetry_info"],
    data_source_lookback_config={
        "marketing_survey_data_info": "1d",
        "data_telemetry_info": "1d",
    },
    staging_sink=["s3_sink_ml_yugen_internal"],
    logic=sql_logic,
    schedule="0 0 * * *",
    output_schema=schema_obj,
    owners=["john.doe@company.ai"],
    active=True,
    transform_start_time=datetime(2022, 8, 27, 0, 0, 0),
)

14. Register Pre-Processing Transform

yugen_client.register(ppt)

15. Dry run Pre-Processing Transform

yugen_client.dry_run(
    "user_avg_spend_transform_final_testing_for_dry_run",
    entity_type=EntityType.PRE_PROCESSING_TRANSFORM,
    start_date=datetime(2022, 8, 27, 0, 0, 0),
    end_date=datetime(2022, 8, 28, 0, 0, 0),
)

16. Deploy Pre-Processing Transform

yugen_client.deploy(
    "user_avg_spend_transform_final_testing_for_dry_run", EntityType.PRE_PROCESSING_TRANSFORM
)

17. Define Training Data

training_run = TrainingData(
    name="example_training_run",
    description="A sample run to generate training data",
    historical_data_source="user_events",
    entities=["project_id","cpi"],
    features=["raw_for_testing_gtd","raw_for_testing_gtd_for_telemetry","raw_for_testing_gtd_for_survey","raw_for_testing_gtd_for_user_events","raw_for_testing_gtd_for_weather_forecats"],
    ttl=5,
    owners=["john.doe@company.ai"],
)

18. Register Training Data

yugen_client.register(training_run)

19. Deploy Training Data

yugen_client.deploy("example_training_run", EntityType.TRAINING_DATA)

20. Define Infrastructure Data

register_infrastructure = RegisterInfrastructure(
    client_id="platform-release-1",
    cloud_provider="aws",
    cluster_name="yugen-platform-v2",
    region="ap-south-1",
    subnet_ids=["subnet-4b250507", "subnet-fac53691", "subnet-86cea5fd"],
    resource_node_group_instance_types={
        "instance_types": {"node_group_1": "t3.medium", "node_group_2": "t3.large",}
    },
    resource_node_group_scaling_config={
        "scaling_config": {
            "node_group_1": {"max_size": 6, "min_size": 2, "desired_size": 3},
            "node_group_2": {"max_size": 6, "min_size": 2, "desired_size": 3,},
        }
    },
    admins=[
        "arn:aws:iam::832344679060:user/ashish.prajapati@yugen.ai",
        "arn:aws:iam::832344679060:user/john.doe@company.ai",
        "arn:aws:iam::832344679060:user/sandeep.mishra@yugen.ai",
        "arn:aws:iam::832344679060:user/shaktimaan@yugen.ai",
        "arn:aws:iam::832344679060:user/shashank.mishra@yugen.ai",
        "arn:aws:iam::832344679060:user/soumanta@yugen.ai",
        "arn:aws:iam::832344679060:user/vanshika.agrawal@yugen.ai",
    ],
    airflow_users={
        "admin": {
            "username": "admin",
            "password": "yugen@123",
            "email": "admin@example.com",
            "firstName": "admin",
            "lastName": "admin",
        }
    },
    slack_details={
        "failure_alerts": {
            "host_url": "https://hooks.slack.com/services",
            "host_password": "/TSRAELEL9/B04Q09X9W75/PhfxMaFBE81ZBXjeAktTTIyN",
        },
        "notifications": {
            "host_url": "https://hooks.slack.com/services",
            "host_password": "/TSRAELEL9/B04Q09X9W75/PhfxMaFBE81ZBXjeAktTTIyN",
        },
    },
    created_at =datetime(2023, 4, 28, 0, 0, 0),
    
)

21. Register Infrastructure Data

yugen_client.register(register_infrastructure)

22. Deploy Infrastructure Data

yugen_client.deploy("platform-release-1_yugen-platform-v2_2023-04-28 00:00:00", EntityType.INFRASTRUCTURE)

Roadmap

DataSources

Batch

  • S3
  • GCS
  • RedShift
  • BigQuery
  • Snowflake

Streaming

  • Kafka

DataSinks

Online DataSinks

Online data sinks offers real-time data storage for fast write operations. It ensures low-latency access to data, making it suitable for applications requiring immediate data retrieval and updates, such as retrieval for ML predictions. Currently, Canso supports Redis cache for storing data online.

Offline DataSinks

Offline data sinks provides durable and scalable storage for batch-processed and historical data. It supports large volumes of data with high reliability, making it ideal for data warehousing and archival storage. Currently, Canso supports S3 storing data offline.

Batch

  • S3
  • Redis
  • RedShift
  • DynamoDB

Streaming

  • Kafka

Online Feature Store

  • Elasticache for Redis (AWS)
  • Memorystore for Redis (GCP)
  • DynamoDB
  • Bigtable

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gru-0.0.1rc39.tar.gz (100.9 kB view details)

Uploaded Source

Built Distribution

gru-0.0.1rc39-py3-none-any.whl (152.2 kB view details)

Uploaded Python 3

File details

Details for the file gru-0.0.1rc39.tar.gz.

File metadata

  • Download URL: gru-0.0.1rc39.tar.gz
  • Upload date:
  • Size: 100.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for gru-0.0.1rc39.tar.gz
Algorithm Hash digest
SHA256 0458ab830a17d68d1ba25fce2a664e695453abfedbbf1d9dd314f77b737a259a
MD5 b6ed9f127ce40f1a5d0773f76a1c8eec
BLAKE2b-256 68a68108a80eb65547169832374e7d49dc07ef531bdc745287b8844b41ba2809

See more details on using hashes here.

File details

Details for the file gru-0.0.1rc39-py3-none-any.whl.

File metadata

  • Download URL: gru-0.0.1rc39-py3-none-any.whl
  • Upload date:
  • Size: 152.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for gru-0.0.1rc39-py3-none-any.whl
Algorithm Hash digest
SHA256 4ab01bf4749f7bbe5c7a883fe012b10be698b371155bc49bee03ea7c6c537550
MD5 ddc440cdde2ae04751267aef692ac5c5
BLAKE2b-256 7d4de9bdd957f45ed4174dc317d063d1b6a5dd52a8b4c7b07b8a6b726c0af28e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page