Skip to main content

A lightweight, asset-based task flow engine

Project description

kazeflow

kazeflow is a lightweight, asset-based task flow engine inspired by Dagster. It is designed to be simple, flexible, and easy to use.

Example

Here is a simple example of how to use kazeflow to define and execute a data flow with dependencies, inputs/outputs, and logging.

When you run this script, kazeflow will execute the assets in the correct order based on their dependencies and provide a rich terminal UI to visualize the progress.

example.py:

import time
from pathlib import Path

import kazeflow


# Asset 1: Create a raw data file
@kazeflow.asset
def create_raw_data(context: kazeflow.AssetContext) -> Path:
    """Creates a dummy raw data file."""
    context.logger.info("Creating raw data file...")
    raw_data_path = Path("raw_data.txt")
    raw_data_path.write_text("hello world\nkazeflow is awesome\nhello kazeflow")
    time.sleep(1)
    context.logger.info(f"Raw data created at {raw_data_path}")
    return raw_data_path


# Asset 2: Process the raw data file
@kazeflow.asset
def process_data(create_raw_data: Path, context: kazeflow.AssetContext) -> Path:
    """Reads the raw data, processes it, and saves to a new file."""
    context.logger.info(f"Processing data from {create_raw_data}...")
    processed_data_path = Path("processed_data.txt")
    content = create_raw_data.read_text()
    processed_content = content.upper()
    processed_data_path.write_text(processed_content)
    time.sleep(1)
    context.logger.info(f"Processed data saved at {processed_data_path}")
    return processed_data_path


# Asset 3: Summarize the results
@kazeflow.asset
def summarize(process_data: Path, context: kazeflow.AssetContext):
    """Reads the processed data and prints a summary."""
    context.logger.info(f"Summarizing data from {process_data}...")
    content = process_data.read_text()
    word_count = len(content.split())
    context.logger.info(f"Summary: The processed file contains {word_count} words.")
    time.sleep(1)


if __name__ == "__main__":
    kazeflow.run(
        asset_names=["summarize"],
        run_config={"max_concurrency": 2},
    )

This will produce the following output:

 uv run python example.py
Task Flow (Execution Order)
└── create_raw_data
    └── process_data
        └── summarize

Execution Logs
INFO     Executing asset: create_raw_data                                       
INFO     Creating raw data file...                                              
INFO     Raw data created at raw_data.txt                                       
INFO     Finished executing asset: create_raw_data in 1.01s                     
INFO     Executing asset: process_data                                          
INFO     Processing data from raw_data.txt...                                   
INFO     Processed data saved at processed_data.txt                             
INFO     Finished executing asset: process_data in 1.01s                        
INFO     Executing asset: summarize                                             
INFO     Summarizing data from processed_data.txt...                            
INFO     Summary: The processed file contains 7 words.                          
INFO     Finished executing asset: summarize in 1.01s                           
╭─────────────────────────────────── Assets ───────────────────────────────────╮
│  create_raw_data                (1.01s)                                     │
│  process_data                   (1.01s)                                     │
│  summarize                      (1.01s)                                     │
╰──────────────────────────────────────────────────────────────────────────────╯
Overall Progress ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 3/3 0:00:03

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kazeflow-0.1.0a1.tar.gz (7.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kazeflow-0.1.0a1-py3-none-any.whl (9.0 kB view details)

Uploaded Python 3

File details

Details for the file kazeflow-0.1.0a1.tar.gz.

File metadata

  • Download URL: kazeflow-0.1.0a1.tar.gz
  • Upload date:
  • Size: 7.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for kazeflow-0.1.0a1.tar.gz
Algorithm Hash digest
SHA256 c415b76b6c419a6a06f6c73e63d922c5f69e4bd80ab75df28e71c4b275c1938b
MD5 2617c1a730b737679f39edbca0aaff1f
BLAKE2b-256 eecdec0da49c17012e9d36e8c14142a58639836e9ee7c44b53e3a0d6b26ed19f

See more details on using hashes here.

Provenance

The following attestation bundles were made for kazeflow-0.1.0a1.tar.gz:

Publisher: publish.yml on kj-9/kazeflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file kazeflow-0.1.0a1-py3-none-any.whl.

File metadata

  • Download URL: kazeflow-0.1.0a1-py3-none-any.whl
  • Upload date:
  • Size: 9.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for kazeflow-0.1.0a1-py3-none-any.whl
Algorithm Hash digest
SHA256 7f229cb572b590493b77464122c64cca45b0aa553cc953640032f640583b38ce
MD5 50b43eef0dedab307578d81235e5d1db
BLAKE2b-256 efe4610723d708002534cf67dd274771b2ccff7f9824d340282f74de01d86293

See more details on using hashes here.

Provenance

The following attestation bundles were made for kazeflow-0.1.0a1-py3-none-any.whl:

Publisher: publish.yml on kj-9/kazeflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page