A lightweight, asset-based task flow engine
Project description
kazeflow
kazeflow is a lightweight, asset-based task flow engine inspired by Dagster. It is designed to be simple, flexible, and easy to use.
Example
When you run this script, kazeflow will execute the assets in the correct order, handle the failure of failing_asset gracefully, and provide a rich terminal UI to visualize the progress.
Here is a simple example of how to use kazeflow to define and execute a data flow with dependencies, inputs/outputs, and logging.
example.py:
import asyncio
from kazeflow.assets import asset
from kazeflow.context import AssetContext
from kazeflow.flow import Flow
# A simple asset with no dependencies
@asset()
async def users() -> list[str]:
"""This asset returns a list of user names."""
return ["Alice", "Bob", "Charlie"]
# This asset depends on the `users` asset.
# The output of `users` is automatically passed as an argument.
@asset(deps=["users"])
async def greetings(users: list[str], context: AssetContext) -> list[str]:
"""This asset receives the list of users and a context object.
It uses the context to get a logger and log a message.
"""
context.logger.info(f"Generating greetings for {len(users)} users.")
return [f"Hello, {user}!" for user in users]
# This asset fails intentionally to demonstrate error handling.
@asset(deps=["users"])
async def failing_asset(users: list[str]):
"""This asset always fails."""
raise ValueError("This asset is designed to fail.")
if __name__ == "__main__":
# Define a flow that includes the final assets we want to generate.
# kazeflow automatically includes all upstream dependencies.
flow = Flow(asset_names=["greetings", "failing_asset"])
# Run the flow asynchronously.
# You can limit the number of concurrent assets with `max_concurrency`.
asyncio.run(flow.run_async(max_concurrency=2))
❯ uv run example.py
Task Flow (Execution Order)
└── users
├── failing_asset
└── greetings
Execution Logs
INFO Executing asset: users
INFO Finished executing asset: users in 0.00s
INFO Executing asset: greetings
INFO Generating greetings for 3 users.
INFO Finished executing asset: greetings in 0.00s
INFO Executing asset: failing_asset
ERROR Error executing asset failing_asset: This asset is designed to
fail.
╭───────────── Traceback (most recent call last) ─────────────╮
│ /Users/kh03/work/repos/myflow/src/kazeflow/flow.py:82 in │
│ _execute_asset │
│ │
│ 79 │ │ │ │ input_kwargs["context"] = context │
│ 80 │ │ │ │
│ 81 │ │ │ if asyncio.iscoroutinefunction(asset_func │
│ ❱ 82 │ │ │ │ output = await asset_func(**input_kwa │
│ 83 │ │ │ else: │
│ 84 │ │ │ │ # Run sync function in a thread pool │
│ 85 │ │ │ │ loop = asyncio.get_running_loop() │
│ │
│ /Users/kh03/work/repos/myflow/example.py:31 in │
│ failing_asset │
│ │
│ 28 @asset(deps=["users"]) │
│ 29 async def failing_asset(users: list[str]): │
│ 30 │ """This asset always fails.""" │
│ ❱ 31 │ raise ValueError("This asset is designed to fail." │
│ 32 │
│ 33 │
│ 34 if __name__ == "__main__": │
╰─────────────────────────────────────────────────────────────╯
ValueError: This asset is designed to fail.
╭─────────────────────────────── Assets ───────────────────────────────╮
│ ✓ users (0.00s) │
│ ✓ greetings (0.00s) │
│ ✗ failing_asset (0.04s) │
╰──────────────────────────────────────────────────────────────────────╯
Overall Progress ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 3/3 0:00:00
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kazeflow-0.1.0a0.tar.gz.
File metadata
- Download URL: kazeflow-0.1.0a0.tar.gz
- Upload date:
- Size: 6.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5873e1250c99a6536bd2f7f57a5a981104e794135fb44bb809dcfb1df8507565
|
|
| MD5 |
b90b0894b3cc7e4318c5042c6eb666bc
|
|
| BLAKE2b-256 |
040614bc6afe1dcbadd10fc21e488c1bd6db9935664018151d7b5990e70d1a32
|
Provenance
The following attestation bundles were made for kazeflow-0.1.0a0.tar.gz:
Publisher:
publish.yml on kj-9/kazeflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
kazeflow-0.1.0a0.tar.gz -
Subject digest:
5873e1250c99a6536bd2f7f57a5a981104e794135fb44bb809dcfb1df8507565 - Sigstore transparency entry: 518115435
- Sigstore integration time:
-
Permalink:
kj-9/kazeflow@cc76bbe71074ebc0b060997f83330a3d11c79393 -
Branch / Tag:
refs/tags/v0.1.0a0 - Owner: https://github.com/kj-9
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@cc76bbe71074ebc0b060997f83330a3d11c79393 -
Trigger Event:
release
-
Statement type:
File details
Details for the file kazeflow-0.1.0a0-py3-none-any.whl.
File metadata
- Download URL: kazeflow-0.1.0a0-py3-none-any.whl
- Upload date:
- Size: 9.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dbf2cd316dbf869960cc99a837dbe6144d6dfd8fdce1b984fa6b8ff5b0047e3c
|
|
| MD5 |
249812d19976025ebc56d6989dc624bc
|
|
| BLAKE2b-256 |
b785ef62d2f1ad059f2eaeccdc726747e9a93f3d650a577e41300b3da515bb6c
|
Provenance
The following attestation bundles were made for kazeflow-0.1.0a0-py3-none-any.whl:
Publisher:
publish.yml on kj-9/kazeflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
kazeflow-0.1.0a0-py3-none-any.whl -
Subject digest:
dbf2cd316dbf869960cc99a837dbe6144d6dfd8fdce1b984fa6b8ff5b0047e3c - Sigstore transparency entry: 518115440
- Sigstore integration time:
-
Permalink:
kj-9/kazeflow@cc76bbe71074ebc0b060997f83330a3d11c79393 -
Branch / Tag:
refs/tags/v0.1.0a0 - Owner: https://github.com/kj-9
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@cc76bbe71074ebc0b060997f83330a3d11c79393 -
Trigger Event:
release
-
Statement type: