Skip to main content

Lightweight task flow framework for Python

Project description

LiteFlow

A lightweight task flow framework

Getting started

Installation

pip install liteflow

Basic Usage

Flow initializes a flow with a thread pool executor. Tasks are added to the flow and can be run by their name.

from concurrent.futures import ThreadPoolExecutor
from liteflow import Flow, TaskOutput, NextTask, Context, StreamChunk
from liteflow.executor import PoolExecutor

executor = PoolExecutor(ThreadPoolExecutor(max_workers=4))

# thread pool executor is optional, defaults to 4 workers
flow = Flow(executor=executor)

# Simple task that returns a result
@flow.task("greet")
def my_task(context: Context) -> TaskOutput:
    return TaskOutput(output="Hello World!")

result = flow.run("greet")  # Returns {"greet": "Hello World!"}
print(result)

Using Ray for Distributed Execution

LiteFlow supports distributed execution using Ray. This allows you to scale your workflows across multiple cores or even multiple machines.

Installation with Ray Support

Ray is an optional dependency. To use Ray with LiteFlow, install it with:

pip install liteflow[ray]

Example Usage

from liteflow import Flow, TaskOutput, NextTask, RayExecutor

# Initialize Ray executor
executor = RayExecutor()  # Connects to local Ray instance

# For connecting to an existing Ray cluster:
# executor = RayExecutor(address="auto")

# Create flow with Ray executor
flow = Flow(executor=executor)

@flow.task("distributed_task")
def distributed_task(context):
    # This task will be executed as a Ray task
    return TaskOutput(output="Executed in Ray!")

result = flow.run("distributed_task")
print(result)  # Returns {"distributed_task": "Executed in Ray!"}

# Don't forget to shut down Ray when done
executor.shutdown()

Task Chaining

# Tasks can trigger other tasks
@flow.task("task1")
def task1(context: Context) -> TaskOutput:
    return TaskOutput(output="result1", next_tasks=[NextTask("task2")])

@flow.task("task2")
def task2(context: Context) -> TaskOutput:
    # Access results from previous tasks
    t1_result = context.get("task1")  # waits for task1 to complete
    print(t1_result)
    return TaskOutput(output="result2")

result = flow.run("task1")  # Returns {"task2": "result2"}
print(result)

Parallel Execution

import time

@flow.task("starter")
def starter(context: Context) -> TaskOutput:
    # Launch multiple tasks in parallel by simply adding them to the next_tasks list
    return TaskOutput(output="started", next_tasks=[NextTask("slow_task1"), NextTask("slow_task2")])

@flow.task("slow_task1")
def slow_task1(context: Context) -> TaskOutput:
    time.sleep(1)
    return TaskOutput(output="result1")

@flow.task("slow_task2")
def slow_task2(context: Context) -> TaskOutput:
    time.sleep(1)
    return TaskOutput(output="result2")

# Both slow_tasks execute in parallel, taking ~1 second total
result = flow.run("starter")
print(flow.context.get("starter"))
print(result)

Streaming Results

@flow.task("streaming_task")
def streaming_task(context: Context) -> TaskOutput:
    # Stream intermediate results
    stream = context.get_stream()
    for i in range(3):
        # (task_id, chunk_value)
        stream.put(StreamChunk("streaming_task", f"interim_{i}"))
    return TaskOutput(output="final")

# Get results as they arrive
for stream_chunk in flow.stream("streaming_task"):
    print(f"{stream_chunk.task_id}: {stream_chunk.value}")

# Prints:
# streaming_task: interim_0
# streaming_task: interim_1
# streaming_task: interim_2
# streaming_task: final

Dynamic Workflows

@flow.task("conditional_task")
def conditional_task(context: Context) -> TaskOutput:
    count = context.get("count", 0)

    if count >= 3:
        return TaskOutput(output="done")

    context.set("count", count + 1)
    return TaskOutput(
        output=f"iteration_{count}", next_tasks=[NextTask("conditional_task")]
    )


# Task will loop 3 times before finishing
flow.add_task("finish", lambda ctx: TaskOutput("completed", None))
result = flow.run("conditional_task")
print(result)

# Prints:
# {'conditional_task': 'done'}

Input Parameters

@flow.task("greet")
def parameterized_task(context: Context) -> TaskOutput:
    name = context.get("user_name")
    return TaskOutput(output=f"Hello {name}!")

result = flow.run("greet", inputs={"user_name": "Alice"})
print(result)
# Returns {"greet": "Hello Alice!"}

Push next task with inputs

def task1(ctx):
    return TaskOutput("result1", [NextTask("task2", inputs={"input1": "value1"})])

def task2(ctx, inputs):
    assert inputs == {"input1": "value1"}
    return TaskOutput("result2")

flow.add_task("task1", task1)
flow.add_task("task2", task2)
result = flow.run("task1")
print(result)
# Returns {"task2": "result2"}

Dynamic Routing

@flow.task("router")
def router(context: Context) -> TaskOutput:
    task_type = context.get("type")
    routes = {
        "process": [NextTask("process_task")],
        "analyze": [NextTask("analyze_task")],
        "report": [NextTask("report_task")]
    }
    return TaskOutput(output=f"routing to {task_type}", next_tasks=routes.get(task_type, []))

@flow.task("process_task")
def process_task(context: Context) -> TaskOutput:
    return TaskOutput(output="processed data")

result = flow.run("router", inputs={"type": "process"})
print(result)
# Returns {"process_task": "processed data"}

State Management

context = Context()
context.from_dict({"task1": "result1"})

flow = Flow(executor=executor, context=context)
flow.add_task("task2", lambda ctx: TaskOutput("result2"))
flow.run("task2")

print(flow.context.get("task1"))  # Should print "result1"
print(flow.context.get("task2"))  # Should print "result2"

# Serialize the context to a dictionary
print(flow.get_context().to_dict())
# Returns {"task1": "result1", "task2": "result2"}

Map Reduce

@flow.task("task1")
def task1(ctx):
    ctx.set("collector", [])

    return TaskOutput("result1", [
        NextTask("task2", spawn_another=True),
        NextTask("task2", spawn_another=True),
        NextTask("task2", spawn_another=True)
    ])

@flow.task("task2")
def task2(ctx):
    collector = ctx.get("collector")
    collector.append("result2")
    ctx.set("collector", collector)

    return TaskOutput("", [NextTask("task3")])

@flow.task("task3")
def task3(ctx):
    collector = ctx.get("collector")
    return TaskOutput(collector)

result = flow.run("task1")
print(result)
assert result == {"task3": ["result2", "result2", "result2"]}

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

liteflow-0.2.0.tar.gz (11.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

liteflow-0.2.0-py3-none-any.whl (13.1 kB view details)

Uploaded Python 3

File details

Details for the file liteflow-0.2.0.tar.gz.

File metadata

  • Download URL: liteflow-0.2.0.tar.gz
  • Upload date:
  • Size: 11.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.1 CPython/3.12.9 Linux/6.8.0-1021-azure

File hashes

Hashes for liteflow-0.2.0.tar.gz
Algorithm Hash digest
SHA256 6434bc04da2b33a89c74fbb7ddc41aaf563ac94016a03ca8f03952f6299b51f6
MD5 45001c383cefd70052338e1af1700a13
BLAKE2b-256 fb0c1c74a1c2bbc27542045a7587f00942e4db3337b5379033da8d1a30dd948b

See more details on using hashes here.

File details

Details for the file liteflow-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: liteflow-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 13.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.1 CPython/3.12.9 Linux/6.8.0-1021-azure

File hashes

Hashes for liteflow-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 4306fa5f790db95dbb6c08c802ad1bc8cc2092103e2ca0905f2f8bc525d5a906
MD5 ea403ec781f78dab0213565f865fcc96
BLAKE2b-256 b095a2331cd34c05f7b07811c05f6c1d9bb24ba047ff9ff962ca6618a8d0e749

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page