Lightweight task flow framework for Python
Project description
LiteFlow
A lightweight task flow framework
Getting started
Installation
pip install liteflow
Basic Usage
Flow initializes a flow with a thread pool executor. Tasks are added to the flow and can be run by their name.
from concurrent.futures import ThreadPoolExecutor
from liteflow import Flow, TaskOutput, NextTask, Context, StreamChunk
from liteflow.executor import PoolExecutor
executor = PoolExecutor(ThreadPoolExecutor(max_workers=4))
# thread pool executor is optional, defaults to 4 workers
flow = Flow(executor=executor)
# Simple task that returns a result
@flow.task("greet")
def my_task(context: Context) -> TaskOutput:
return TaskOutput(output="Hello World!")
result = flow.run("greet") # Returns {"greet": "Hello World!"}
print(result)
Using Ray for Distributed Execution
LiteFlow supports distributed execution using Ray. This allows you to scale your workflows across multiple cores or even multiple machines.
Installation with Ray Support
Ray is an optional dependency. To use Ray with LiteFlow, install it with:
pip install liteflow[ray]
Example Usage
from liteflow import Flow, TaskOutput, NextTask, RayExecutor
# Initialize Ray executor
executor = RayExecutor() # Connects to local Ray instance
# For connecting to an existing Ray cluster:
# executor = RayExecutor(address="auto")
# Create flow with Ray executor
flow = Flow(executor=executor)
@flow.task("distributed_task")
def distributed_task(context):
# This task will be executed as a Ray task
return TaskOutput(output="Executed in Ray!")
result = flow.run("distributed_task")
print(result) # Returns {"distributed_task": "Executed in Ray!"}
# Don't forget to shut down Ray when done
executor.shutdown()
Task Chaining
# Tasks can trigger other tasks
@flow.task("task1")
def task1(context: Context) -> TaskOutput:
return TaskOutput(output="result1", next_tasks=[NextTask("task2")])
@flow.task("task2")
def task2(context: Context) -> TaskOutput:
# Access results from previous tasks
t1_result = context.get("task1") # waits for task1 to complete
print(t1_result)
return TaskOutput(output="result2")
result = flow.run("task1") # Returns {"task2": "result2"}
print(result)
Parallel Execution
import time
@flow.task("starter")
def starter(context: Context) -> TaskOutput:
# Launch multiple tasks in parallel by simply adding them to the next_tasks list
return TaskOutput(output="started", next_tasks=[NextTask("slow_task1"), NextTask("slow_task2")])
@flow.task("slow_task1")
def slow_task1(context: Context) -> TaskOutput:
time.sleep(1)
return TaskOutput(output="result1")
@flow.task("slow_task2")
def slow_task2(context: Context) -> TaskOutput:
time.sleep(1)
return TaskOutput(output="result2")
# Both slow_tasks execute in parallel, taking ~1 second total
result = flow.run("starter")
print(flow.context.get("starter"))
print(result)
Streaming Results
@flow.task("streaming_task")
def streaming_task(context: Context) -> TaskOutput:
# Stream intermediate results
stream = context.get_stream()
for i in range(3):
# (task_id, chunk_value)
stream.put(StreamChunk("streaming_task", f"interim_{i}"))
return TaskOutput(output="final")
# Get results as they arrive
for stream_chunk in flow.stream("streaming_task"):
print(f"{stream_chunk.task_id}: {stream_chunk.value}")
# Prints:
# streaming_task: interim_0
# streaming_task: interim_1
# streaming_task: interim_2
# streaming_task: final
Dynamic Workflows
@flow.task("conditional_task")
def conditional_task(context: Context) -> TaskOutput:
count = context.get("count", 0)
if count >= 3:
return TaskOutput(output="done")
context.set("count", count + 1)
return TaskOutput(
output=f"iteration_{count}", next_tasks=[NextTask("conditional_task")]
)
# Task will loop 3 times before finishing
flow.add_task("finish", lambda ctx: TaskOutput("completed", None))
result = flow.run("conditional_task")
print(result)
# Prints:
# {'conditional_task': 'done'}
Input Parameters
@flow.task("greet")
def parameterized_task(context: Context) -> TaskOutput:
name = context.get("user_name")
return TaskOutput(output=f"Hello {name}!")
result = flow.run("greet", inputs={"user_name": "Alice"})
print(result)
# Returns {"greet": "Hello Alice!"}
Push next task with inputs
def task1(ctx):
return TaskOutput("result1", [NextTask("task2", inputs={"input1": "value1"})])
def task2(ctx, inputs):
assert inputs == {"input1": "value1"}
return TaskOutput("result2")
flow.add_task("task1", task1)
flow.add_task("task2", task2)
result = flow.run("task1")
print(result)
# Returns {"task2": "result2"}
Dynamic Routing
@flow.task("router")
def router(context: Context) -> TaskOutput:
task_type = context.get("type")
routes = {
"process": [NextTask("process_task")],
"analyze": [NextTask("analyze_task")],
"report": [NextTask("report_task")]
}
return TaskOutput(output=f"routing to {task_type}", next_tasks=routes.get(task_type, []))
@flow.task("process_task")
def process_task(context: Context) -> TaskOutput:
return TaskOutput(output="processed data")
result = flow.run("router", inputs={"type": "process"})
print(result)
# Returns {"process_task": "processed data"}
State Management
context = Context()
context.from_dict({"task1": "result1"})
flow = Flow(executor=executor, context=context)
flow.add_task("task2", lambda ctx: TaskOutput("result2"))
flow.run("task2")
print(flow.context.get("task1")) # Should print "result1"
print(flow.context.get("task2")) # Should print "result2"
# Serialize the context to a dictionary
print(flow.get_context().to_dict())
# Returns {"task1": "result1", "task2": "result2"}
Map Reduce
@flow.task("task1")
def task1(ctx):
ctx.set("collector", [])
return TaskOutput("result1", [
NextTask("task2", spawn_another=True),
NextTask("task2", spawn_another=True),
NextTask("task2", spawn_another=True)
])
@flow.task("task2")
def task2(ctx):
collector = ctx.get("collector")
collector.append("result2")
ctx.set("collector", collector)
return TaskOutput("", [NextTask("task3")])
@flow.task("task3")
def task3(ctx):
collector = ctx.get("collector")
return TaskOutput(collector)
result = flow.run("task1")
print(result)
assert result == {"task3": ["result2", "result2", "result2"]}
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file liteflow-0.2.0.tar.gz.
File metadata
- Download URL: liteflow-0.2.0.tar.gz
- Upload date:
- Size: 11.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.1 CPython/3.12.9 Linux/6.8.0-1021-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6434bc04da2b33a89c74fbb7ddc41aaf563ac94016a03ca8f03952f6299b51f6
|
|
| MD5 |
45001c383cefd70052338e1af1700a13
|
|
| BLAKE2b-256 |
fb0c1c74a1c2bbc27542045a7587f00942e4db3337b5379033da8d1a30dd948b
|
File details
Details for the file liteflow-0.2.0-py3-none-any.whl.
File metadata
- Download URL: liteflow-0.2.0-py3-none-any.whl
- Upload date:
- Size: 13.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.1 CPython/3.12.9 Linux/6.8.0-1021-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4306fa5f790db95dbb6c08c802ad1bc8cc2092103e2ca0905f2f8bc525d5a906
|
|
| MD5 |
ea403ec781f78dab0213565f865fcc96
|
|
| BLAKE2b-256 |
b095a2331cd34c05f7b07811c05f6c1d9bb24ba047ff9ff962ca6618a8d0e749
|