Skip to main content

A lightweight, declarative framework for python analysis workflows.

Project description

yaflux

A declarative framework for managing complex analytical workflows in Python.

Overview

yaflux provides a structured approach to managing complex data analysis pipelines where tracking transformations, ensuring reproducibility, and maintaining clear provenance are essential. It offers a pure Python solution for declaring dependencies between analysis steps and managing results immutably.

Key Features

  • Declarative Workflow Definition: Analysis steps are defined through decorators that explicitly state their inputs and outputs
  • Immutable Results Management: Results are tracked and protected from inadvertent mutation
  • Dependency Tracking: Automatic tracking of dependencies between analysis steps
  • Progress Monitoring: Built-in tracking of completed analysis steps
  • Serialization: Simple persistence of complete analysis states
  • Portable Results: Analysis results can be shared and loaded without original class definitions

Documentation

The full documentation for yaflux can be found at yaflux.readthedocs.io.

Example

With yaflux, you can define complex analytical workflows in a structured and reproducible way.

All methods are functional and the step decorator handles mutations to the analysis object. You can specify dependencies between steps and yaflux will automatically track them. This allows you to focus on the functional implementation of each step and limit side effects.

import yaflux as yf

class MyAnalysis(yf.Base):
    """An example analysis class."""

    # Define analysis steps
    @yf.step(creates="raw_data")
    def workflow_step_a(self) -> list[int]:
        return [i for i in range(10)]

    # Specify dependencies between steps
    @yf.step(creates="processed_data", requires="raw_data")
    def workflow_step_b(self) -> list[int]:
        return [i * 2 for i in self.results.raw_data]

    # Combine results from previous steps
    @yf.step(creates="final_data", requires=["raw_data", "processed_data"])
    def workflow_step_c(self) -> list[int]:
        return [i + j for i in self.results.raw_data for j in self.results.processed_data]

# Initialize the analysis
analysis = MyAnalysis()

# Yaflux will infer the correct order of execution
analysis.execute_all()

# Access results
final = analysis.results.final_data

# Save and load analysis state
analysis.save("analysis.yax")

# Load analysis state
loaded = MyAnalysis.load("analysis.yax")

# Load analysis without original class definition
loaded = yf.Base.load("analysis.yax")

# Skip redudant steps
analysis.workflow_step_a() # skipped

# Force re-run of a step
analysis.workflow_step_a(force=True) # re-run

# Visualize the analysis (using graphviz)
analysis.visualize_dependencies()

# See how an analysis step was run and its metadata
metadata = analysis.get_step_metadata("workflow_step_b")

Visualizing Complex Workflows

yaflux provides a built-in method for visualizing the dependencies between analysis steps. This can be useful for understanding complex workflows and ensuring that all dependencies are correctly specified.

Let's first define a complex analysis with multiple steps and dependencies:

import yaflux as yf

class MyAnalysis(yf.Base):

    @yf.step(creates=["x", "y", "z"])
    def load_data(self) -> tuple[int, int, int]:
        return 1, 2, 3

    @yf.step(creates="proc_x", requires="x")
    def process_x(self) -> int:
        return self.results.x + 1

    @yf.step(creates=["proc_y1", "proc_y2", "_marked"], requires="y")
    def process_y(self) -> tuple[int, int]:
        return (
            self.results.y + 1,
            self.results.y + 2,
        )

    @yf.step(creates="proc_z", requires=["proc_y1", "proc_y2", "z"])
    def process_z(self) -> int:
        return self.results.proc_y1 + self.results.proc_y2 + self.results.z

    @yf.step(creates="final", requires=["proc_x", "proc_z", "_marked"])
    def final(self) -> int:
        return self.results.proc_x + self.results.proc_z

Now we can visualize the dependencies between the analysis steps:

analysis = MyAnalysis()
analysis.visualize_dependencies()

Dependency Graph

As we run the analysis, we can fill in the dependency graph and see where we are in the workflow.

analysis.load_data()
analysis.execute(target_step="process_y") # Run up to `process_y`

# Visualize the updated dependencies
analysis.visualize_dependencies()

Dependency Graph

Avoiding Dependency Errors

One of the benefits of a declarative workflow is you can avoid a whole class of errors related to missing or incorrect dependencies.

In yaflux you can specify dependencies between steps using the requires argument in the @step decorator. The step function parses the decorated method's abstract syntax tree (AST) to determine the dependencies and ensure they are met.

This means that if you try to access a result that hasn't been created yet, yaflux will raise an error at definition time rather than at runtime.

The below code will raise an error at class definition time because step_b uses z but does not require it:

import yaflux as yf

class BadAnalysis(yf.Base):

    @yf.step(creates="x")
    def step_a(self) -> int:
        return 1

    @yf.step(creates="y") # Missing `z` in `requires`
    def step_b(self) -> int:
        return self.results.z + 1

This is especially useful when you have a typo in your analysis but don't realize it until much later in the workflow. yaflux acts as a static analysis tool for your analysis workflow, catching errors early and saving you time debugging.

import yaflux as yf

class BadAnalysis(yf.Base):

    @yf.step(creates="some_complex_name")
    def step_a(self) -> int:
        return 1

    @yf.step(creates="y", requires="some_complex_name") # Typo in `requires`
    def step_b(self) -> int:
        return self.results.some_complx_name + 1

Installation

For a base python installation with zero external dependencies use:

pip install yaflux

For a more feature-rich installation with additional dependencies use:

pip install yaflux[full]

Or if you want a specific subset of features, you can install individual extras:

pip install yaflux[viz]

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

yaflux-0.2.3.tar.gz (2.5 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

yaflux-0.2.3-py3-none-any.whl (28.6 kB view details)

Uploaded Python 3

File details

Details for the file yaflux-0.2.3.tar.gz.

File metadata

  • Download URL: yaflux-0.2.3.tar.gz
  • Upload date:
  • Size: 2.5 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.4.30

File hashes

Hashes for yaflux-0.2.3.tar.gz
Algorithm Hash digest
SHA256 67433d78a9097fda9f72f9813d848b49f5eb391931c1bdf0103e5b8325b94904
MD5 312526bffa06fb476b474c7f74532aeb
BLAKE2b-256 3bcf35e1b2426cbf1ca44c8cea60fa144072ed1b2af11911b016861829ee8027

See more details on using hashes here.

File details

Details for the file yaflux-0.2.3-py3-none-any.whl.

File metadata

  • Download URL: yaflux-0.2.3-py3-none-any.whl
  • Upload date:
  • Size: 28.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.4.30

File hashes

Hashes for yaflux-0.2.3-py3-none-any.whl
Algorithm Hash digest
SHA256 29ffc05039ea567e47dd0d35218617dedde61f8a13197311dfed544b4d177ec1
MD5 fdc9be69c8e357c1434e1b58fd218567
BLAKE2b-256 b3fa618b2dd957173a8c9d449ba8fb8b05650287a8dd764e573d8055a31576fd

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page