Skip to main content

A lightweight, asset-based task flow engine

Project description

kazeflow

kazeflow is a lightweight, asset-based task flow engine inspired by Dagster. It is designed to be simple, flexible, and easy to use.

Example

Here is a simple example of how to use kazeflow to define and execute a data flow with dependencies, inputs/outputs, and logging.

When you run this script, kazeflow will execute the assets in the correct order based on their dependencies and provide a rich terminal UI to visualize the progress.

example.py:

import time
from pathlib import Path

import kazeflow


# Asset 1: Create a raw data file
@kazeflow.asset
def create_raw_data(context: kazeflow.AssetContext) -> Path:
    """Creates a dummy raw data file."""
    context.logger.info("Creating raw data file...")
    raw_data_path = Path("raw_data.txt")
    raw_data_path.write_text("hello world\nkazeflow is awesome\nhello kazeflow")
    time.sleep(1)
    context.logger.info(f"Raw data created at {raw_data_path}")
    return raw_data_path


# Asset 2: Process the raw data file
@kazeflow.asset
def process_data(create_raw_data: Path, context: kazeflow.AssetContext) -> Path:
    """Reads the raw data, processes it, and saves to a new file."""
    context.logger.info(f"Processing data from {create_raw_data}...")
    processed_data_path = Path("processed_data.txt")
    content = create_raw_data.read_text()
    processed_content = content.upper()
    processed_data_path.write_text(processed_content)
    time.sleep(1)
    context.logger.info(f"Processed data saved at {processed_data_path}")
    return processed_data_path


# Asset 3: Summarize the results
@kazeflow.asset
def summarize(process_data: Path, context: kazeflow.AssetContext):
    """Reads the processed data and prints a summary."""
    context.logger.info(f"Summarizing data from {process_data}...")
    content = process_data.read_text()
    word_count = len(content.split())
    context.logger.info(f"Summary: The processed file contains {word_count} words.")
    time.sleep(1)


if __name__ == "__main__":
    kazeflow.run(
        asset_names=["summarize"],
        run_config={"max_concurrency": 2},
    )

This will produce the following output:

 uv run python example.py
Task Flow (Execution Order)
└── create_raw_data
    └── process_data
        └── summarize

Execution Logs
INFO     Executing asset: create_raw_data                                       
INFO     Creating raw data file...                                              
INFO     Raw data created at raw_data.txt                                       
INFO     Finished executing asset: create_raw_data in 1.01s                     
INFO     Executing asset: process_data                                          
INFO     Processing data from raw_data.txt...                                   
INFO     Processed data saved at processed_data.txt                             
INFO     Finished executing asset: process_data in 1.01s                        
INFO     Executing asset: summarize                                             
INFO     Summarizing data from processed_data.txt...                            
INFO     Summary: The processed file contains 7 words.                          
INFO     Finished executing asset: summarize in 1.01s                           
╭─────────────────────────────────── Assets ───────────────────────────────────╮
│  create_raw_data                (1.01s)                                     │
│  process_data                   (1.01s)                                     │
│  summarize                      (1.01s)                                     │
╰──────────────────────────────────────────────────────────────────────────────╯
Overall Progress ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 3/3 0:00:03

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kazeflow-0.1.0a2.tar.gz (6.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kazeflow-0.1.0a2-py3-none-any.whl (9.0 kB view details)

Uploaded Python 3

File details

Details for the file kazeflow-0.1.0a2.tar.gz.

File metadata

  • Download URL: kazeflow-0.1.0a2.tar.gz
  • Upload date:
  • Size: 6.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for kazeflow-0.1.0a2.tar.gz
Algorithm Hash digest
SHA256 3e67dcb6c8ed004eb18672540f6e8440fab2ff1896521f3435b21bc61045fdaf
MD5 5f7eb618dac815f032be778e33c05b2c
BLAKE2b-256 4e78b1c63d88113a3d2441310596d2c053080ebf7dca5e9920c896e15f72af2b

See more details on using hashes here.

Provenance

The following attestation bundles were made for kazeflow-0.1.0a2.tar.gz:

Publisher: publish.yml on kj-9/kazeflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file kazeflow-0.1.0a2-py3-none-any.whl.

File metadata

  • Download URL: kazeflow-0.1.0a2-py3-none-any.whl
  • Upload date:
  • Size: 9.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for kazeflow-0.1.0a2-py3-none-any.whl
Algorithm Hash digest
SHA256 3eca63c75dd9fad6f2e23058610bc21bd8ffc1add0224b3325632efdfed653c2
MD5 3b64677e8f1e5271a70253d65d07974b
BLAKE2b-256 7df1c1c221f5de6dc88088fd04a948b8b60064f248ff5e1d4416ad6b315200fb

See more details on using hashes here.

Provenance

The following attestation bundles were made for kazeflow-0.1.0a2-py3-none-any.whl:

Publisher: publish.yml on kj-9/kazeflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page