Skip to main content

A lightweight, asset-based task flow engine

Project description

kazeflow

kazeflow is a lightweight, asset-based task flow engine inspired by Dagster. It is designed to be simple, flexible, and easy to use.

Example

When you run this script, kazeflow will execute the assets in the correct order, handle the failure of failing_asset gracefully, and provide a rich terminal UI to visualize the progress.

Here is a simple example of how to use kazeflow to define and execute a data flow with dependencies, inputs/outputs, and logging.

example.py:

import asyncio

from kazeflow.assets import asset
from kazeflow.context import AssetContext
from kazeflow.flow import Flow


# A simple asset with no dependencies
@asset()
async def users() -> list[str]:
    """This asset returns a list of user names."""
    return ["Alice", "Bob", "Charlie"]


# This asset depends on the `users` asset.
# The output of `users` is automatically passed as an argument.
@asset(deps=["users"])
async def greetings(users: list[str], context: AssetContext) -> list[str]:
    """This asset receives the list of users and a context object.

    It uses the context to get a logger and log a message.
    """
    context.logger.info(f"Generating greetings for {len(users)} users.")
    return [f"Hello, {user}!" for user in users]


# This asset fails intentionally to demonstrate error handling.
@asset(deps=["users"])
async def failing_asset(users: list[str]):
    """This asset always fails."""
    raise ValueError("This asset is designed to fail.")


if __name__ == "__main__":
    # Define a flow that includes the final assets we want to generate.
    # kazeflow automatically includes all upstream dependencies.
    flow = Flow(asset_names=["greetings", "failing_asset"])

    # Run the flow asynchronously.
    # You can limit the number of concurrent assets with `max_concurrency`.
    asyncio.run(flow.run_async(max_concurrency=2))
 uv run example.py
Task Flow (Execution Order)
└── users
    ├── failing_asset
    └── greetings

Execution Logs
INFO     Executing asset: users                                         
INFO     Finished executing asset: users in 0.00s                       
INFO     Executing asset: greetings                                     
INFO     Generating greetings for 3 users.                              
INFO     Finished executing asset: greetings in 0.00s                   
INFO     Executing asset: failing_asset                                 
ERROR    Error executing asset failing_asset: This asset is designed to 
         fail.                                                          
         ╭───────────── Traceback (most recent call last) ─────────────╮
          /Users/kh03/work/repos/myflow/src/kazeflow/flow.py:82 in              _execute_asset                                                                                                                                 79             input_kwargs["context"] = context                  80                                                                 81          if asyncio.iscoroutinefunction(asset_func             82             output = await asset_func(**input_kwa              83          else:                                                  84             # Run sync function in a thread pool  │
             85             loop = asyncio.get_running_loop()                                                                                     /Users/kh03/work/repos/myflow/example.py:31 in                        failing_asset                                                                                                                                 28 @asset(deps=["users"])                                             29 async def failing_asset(users: list[str]):                         30    """This asset always fails."""                                31    raise ValueError("This asset is designed to fail."             32                                                                    33                                                                    34 if __name__ == "__main__":                                      ╰─────────────────────────────────────────────────────────────╯
         ValueError: This asset is designed to fail.                    
╭─────────────────────────────── Assets ───────────────────────────────╮
│  users                          (0.00s)                             │
│  greetings                      (0.00s)                             │
│  failing_asset                  (0.04s)                             │
╰──────────────────────────────────────────────────────────────────────╯
Overall Progress ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 3/3 0:00:00

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kazeflow-0.1.0a0.tar.gz (6.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kazeflow-0.1.0a0-py3-none-any.whl (9.3 kB view details)

Uploaded Python 3

File details

Details for the file kazeflow-0.1.0a0.tar.gz.

File metadata

  • Download URL: kazeflow-0.1.0a0.tar.gz
  • Upload date:
  • Size: 6.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for kazeflow-0.1.0a0.tar.gz
Algorithm Hash digest
SHA256 5873e1250c99a6536bd2f7f57a5a981104e794135fb44bb809dcfb1df8507565
MD5 b90b0894b3cc7e4318c5042c6eb666bc
BLAKE2b-256 040614bc6afe1dcbadd10fc21e488c1bd6db9935664018151d7b5990e70d1a32

See more details on using hashes here.

Provenance

The following attestation bundles were made for kazeflow-0.1.0a0.tar.gz:

Publisher: publish.yml on kj-9/kazeflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file kazeflow-0.1.0a0-py3-none-any.whl.

File metadata

  • Download URL: kazeflow-0.1.0a0-py3-none-any.whl
  • Upload date:
  • Size: 9.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for kazeflow-0.1.0a0-py3-none-any.whl
Algorithm Hash digest
SHA256 dbf2cd316dbf869960cc99a837dbe6144d6dfd8fdce1b984fa6b8ff5b0047e3c
MD5 249812d19976025ebc56d6989dc624bc
BLAKE2b-256 b785ef62d2f1ad059f2eaeccdc726747e9a93f3d650a577e41300b3da515bb6c

See more details on using hashes here.

Provenance

The following attestation bundles were made for kazeflow-0.1.0a0-py3-none-any.whl:

Publisher: publish.yml on kj-9/kazeflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page