Skip to main content

A lightweight, declarative framework for python analysis workflows.

Project description

yaflux

A declarative framework for managing complex analytical workflows in Python.

Overview

yaflux provides a structured approach to managing complex data analysis pipelines where tracking transformations, ensuring reproducibility, and maintaining clear provenance are essential. It offers a pure Python solution for declaring dependencies between analysis steps and managing results immutably.

Key Features

  • Declarative Workflow Definition: Analysis steps are defined through decorators that explicitly state their inputs and outputs
  • Immutable Results Management: Results are tracked and protected from inadvertent mutation
  • Dependency Tracking: Automatic tracking of dependencies between analysis steps
  • Progress Monitoring: Built-in tracking of completed analysis steps
  • Serialization: Simple persistence of complete analysis states
  • Portable Results: Analysis results can be shared and loaded without original class definitions

Example

With yaflux, you can define complex analytical workflows in a structured and reproducible way.

All methods are functional and the step decorator handles mutations to the analysis object. You can specify dependencies between steps and yaflux will automatically track them. This allows you to focus on the functional implementation of each step and limit side effects.

import yaflux as yf

class MyAnalysis(yf.Base):
    """An example analysis class."""

    # Define analysis steps
    @yf.step(creates="raw_data")
    def workflow_step_a(self) -> list[int]:
        return [i for i in range(10)]

    # Specify dependencies between steps
    @yf.step(creates="processed_data", requires="raw_data")
    def workflow_step_b(self) -> list[int]:
        return [i * 2 for i in self.results.raw_data]

    # Combine results from previous steps
    @yf.step(creates="final_data", requires=["raw_data", "processed_data"])
    def workflow_step_c(self) -> list[int]:
        return [i + j for i in self.results.raw_data for j in self.results.processed_data]

    # Define a complete workflow however you'd like
    def run(self):
        self.workflow_step_a()
        self.workflow_step_b()
        self.workflow_step_c()

# Define and run an analysis
analysis = MyAnalysis()
analysis.run()

# Access results
final = analysis.results.final_data

# Save and load analysis state
analysis.save("analysis.pkl")

# Load analysis state
loaded = MyAnalysis.load("analysis.pkl")

# Load analysis without original class definition
loaded = yf.load_portable("analysis.pkl")

# Skip redudant steps
analysis.workflow_step_a() # skipped

# Force re-run of a step
analysis.workflow_step_a(force=True) # re-run

# Visualize the analysis (using graphviz)
analysis.visualize_dependencies()

# See how an analysis step was run and its metadata
metadata = analysis.get_step_metadata("workflow_step_b")

Visualizing Complex Workflows

yaflux provides a built-in method for visualizing the dependencies between analysis steps. This can be useful for understanding complex workflows and ensuring that all dependencies are correctly specified.

Let's first define a complex analysis with multiple steps and dependencies:

import yaflux as yf


class MyAnalysis(yf.Base):

    @yf.step(creates=["x", "y", "z"])
    def load_data(self) -> tuple[int, int, int]:
        return 1, 2, 3

    @yf.step(creates="proc_x", requires="x")
    def process_x(self) -> int:
        return self.results.x + 1

    @yf.step(creates=["proc_y1", "proc_y2"], requires="y")
    def process_y(self) -> tuple[int, int]:
        return (
            self.results.y + 1,
            self.results.y + 2,
        )

    @yf.step(creates="proc_z", requires=["proc_y1", "proc_y2", "z"])
    def process_z(self) -> int:
        return self.results.proc_y1 + self.results.proc_y2 + self.results.z

    @yf.step(creates="final", requires=["proc_x", "proc_z"])
    def final(self) -> int:
        return self.results.proc_x + self.results.proc_z

    def run(self):
        self.load_data()
        self.process_x()
        self.process_y()
        self.process_z()
        self.final()

Now we can visualize the dependencies between the analysis steps:

analysis = MyAnalysis()
analysis.visualize_dependencies()

Dependency Graph

As we run the analysis, we can fill in the dependency graph and see where we are in the workflow.

analysis.load_data()
analysis.process_x()
analysis.process_y()

# Visualize the updated dependencies
analysis.visualize_dependencies()

Dependency Graph

Avoiding Dependency Errors

One of the benefits of a declarative workflow is you can avoid a whole class of errors related to missing or incorrect dependencies.

In yaflux you can specify dependencies between steps using the requires argument in the @step decorator. The step function parses the decorated method's abstract syntax tree (AST) to determine the dependencies and ensure they are met.

This means that if you try to access a result that hasn't been created yet, yaflux will raise an error at definition time rather than at runtime.

The below code will raise an error at class definition time because step_b uses z but does not require it:

import yaflux as yf

class BadAnalysis(yf.Base):

    @yf.step(creates="x")
    def step_a(self) -> int:
        return 1

    @yf.step(creates="y") # Missing `z` in `requires`
    def step_b(self) -> int:
        return self.results.z + 1

This is especially useful when you have a typo in your analysis but don't realize it until much later in the workflow. yaflux acts as a static analysis tool for your analysis workflow, catching errors early and saving you time debugging.

import yaflux as yf

class BadAnalysis(yf.Base):

    @yf.step(creates="some_complex_name")
    def step_a(self) -> int:
        return 1

    @yf.step(creates="y", requires="some_complex_name") # Typo in `requires`
    def step_b(self) -> int:
        return self.results.some_complx_name + 1

Installation

For a base python installation with zero external dependencies use:

pip install yaflux

For a more feature-rich installation with additional dependencies use:

pip install yaflux[full]

Or if you want a specific subset of features, you can install individual extras:

pip install yaflux[viz]

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

yaflux-0.1.12.tar.gz (2.4 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

yaflux-0.1.12-py3-none-any.whl (15.6 kB view details)

Uploaded Python 3

File details

Details for the file yaflux-0.1.12.tar.gz.

File metadata

  • Download URL: yaflux-0.1.12.tar.gz
  • Upload date:
  • Size: 2.4 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.4.30

File hashes

Hashes for yaflux-0.1.12.tar.gz
Algorithm Hash digest
SHA256 2ce28456690858ffe5ff8b218b1e6fde0df776e57f60629dc5a7e6249f47bb4e
MD5 d7ef381eed684be355dc08fda4fe59e1
BLAKE2b-256 7d1459729fb6296ace61350a499334e265c63bdfee2283f1a6097f74a27755cb

See more details on using hashes here.

File details

Details for the file yaflux-0.1.12-py3-none-any.whl.

File metadata

  • Download URL: yaflux-0.1.12-py3-none-any.whl
  • Upload date:
  • Size: 15.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.4.30

File hashes

Hashes for yaflux-0.1.12-py3-none-any.whl
Algorithm Hash digest
SHA256 16d184d64ec1462d684ad01c757fc61180873c9846291ce3428e82385a150325
MD5 761ea1f541253c59b1acc79a4698724f
BLAKE2b-256 401f283752c95b9f24d7b158d9061b983271a79b8038d098f99f74ec22521c1d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page