A lightweight, asset-based task flow engine
Project description
kazeflow
kazeflow is a lightweight, asset-based task flow engine inspired by Dagster. It is designed to be simple, flexible, and easy to use.
Example
Here is a simple example of how to use kazeflow to define and execute a data flow with dependencies, inputs/outputs, and logging.
When you run this script, kazeflow will execute the assets in the correct order based on their dependencies and provide a rich terminal UI to visualize the progress.
example.py:
import time
from pathlib import Path
import kazeflow
# Asset 1: Create a raw data file
@kazeflow.asset
def create_raw_data(context: kazeflow.AssetContext) -> Path:
"""Creates a dummy raw data file."""
context.logger.info("Creating raw data file...")
raw_data_path = Path("raw_data.txt")
raw_data_path.write_text("hello world\nkazeflow is awesome\nhello kazeflow")
time.sleep(1)
context.logger.info(f"Raw data created at {raw_data_path}")
return raw_data_path
# Asset 2: Process the raw data file
@kazeflow.asset
def process_data(create_raw_data: Path, context: kazeflow.AssetContext) -> Path:
"""Reads the raw data, processes it, and saves to a new file."""
context.logger.info(f"Processing data from {create_raw_data}...")
processed_data_path = Path("processed_data.txt")
content = create_raw_data.read_text()
processed_content = content.upper()
processed_data_path.write_text(processed_content)
time.sleep(1)
context.logger.info(f"Processed data saved at {processed_data_path}")
return processed_data_path
# Asset 3: Summarize the results
@kazeflow.asset
def summarize(process_data: Path, context: kazeflow.AssetContext):
"""Reads the processed data and prints a summary."""
context.logger.info(f"Summarizing data from {process_data}...")
content = process_data.read_text()
word_count = len(content.split())
context.logger.info(f"Summary: The processed file contains {word_count} words.")
time.sleep(1)
if __name__ == "__main__":
kazeflow.run(
asset_names=["summarize"],
run_config={"max_concurrency": 2},
)
This will produce the following output:
❯ uv run python example.py
Task Flow (Execution Order)
└── create_raw_data
└── process_data
└── summarize
Execution Logs
INFO Executing asset: create_raw_data
INFO Creating raw data file...
INFO Raw data created at raw_data.txt
INFO Finished executing asset: create_raw_data in 1.01s
INFO Executing asset: process_data
INFO Processing data from raw_data.txt...
INFO Processed data saved at processed_data.txt
INFO Finished executing asset: process_data in 1.01s
INFO Executing asset: summarize
INFO Summarizing data from processed_data.txt...
INFO Summary: The processed file contains 7 words.
INFO Finished executing asset: summarize in 1.01s
╭─────────────────────────────────── Assets ───────────────────────────────────╮
│ ✓ create_raw_data (1.01s) │
│ ✓ process_data (1.01s) │
│ ✓ summarize (1.01s) │
╰──────────────────────────────────────────────────────────────────────────────╯
Overall Progress ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 3/3 0:00:03
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kazeflow-0.1.0a1.tar.gz.
File metadata
- Download URL: kazeflow-0.1.0a1.tar.gz
- Upload date:
- Size: 7.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c415b76b6c419a6a06f6c73e63d922c5f69e4bd80ab75df28e71c4b275c1938b
|
|
| MD5 |
2617c1a730b737679f39edbca0aaff1f
|
|
| BLAKE2b-256 |
eecdec0da49c17012e9d36e8c14142a58639836e9ee7c44b53e3a0d6b26ed19f
|
Provenance
The following attestation bundles were made for kazeflow-0.1.0a1.tar.gz:
Publisher:
publish.yml on kj-9/kazeflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
kazeflow-0.1.0a1.tar.gz -
Subject digest:
c415b76b6c419a6a06f6c73e63d922c5f69e4bd80ab75df28e71c4b275c1938b - Sigstore transparency entry: 550207205
- Sigstore integration time:
-
Permalink:
kj-9/kazeflow@de6fef8cc30eb8cf320077edea3111d92d1a6ba6 -
Branch / Tag:
refs/tags/v0.1.0a1 - Owner: https://github.com/kj-9
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@de6fef8cc30eb8cf320077edea3111d92d1a6ba6 -
Trigger Event:
release
-
Statement type:
File details
Details for the file kazeflow-0.1.0a1-py3-none-any.whl.
File metadata
- Download URL: kazeflow-0.1.0a1-py3-none-any.whl
- Upload date:
- Size: 9.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7f229cb572b590493b77464122c64cca45b0aa553cc953640032f640583b38ce
|
|
| MD5 |
50b43eef0dedab307578d81235e5d1db
|
|
| BLAKE2b-256 |
efe4610723d708002534cf67dd274771b2ccff7f9824d340282f74de01d86293
|
Provenance
The following attestation bundles were made for kazeflow-0.1.0a1-py3-none-any.whl:
Publisher:
publish.yml on kj-9/kazeflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
kazeflow-0.1.0a1-py3-none-any.whl -
Subject digest:
7f229cb572b590493b77464122c64cca45b0aa553cc953640032f640583b38ce - Sigstore transparency entry: 550207211
- Sigstore integration time:
-
Permalink:
kj-9/kazeflow@de6fef8cc30eb8cf320077edea3111d92d1a6ba6 -
Branch / Tag:
refs/tags/v0.1.0a1 - Owner: https://github.com/kj-9
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@de6fef8cc30eb8cf320077edea3111d92d1a6ba6 -
Trigger Event:
release
-
Statement type: