Parallel Iteration with File-Based Coordination
Project description
Laufband: Embarrassingly parallel, embarrassingly simple!
Laufband enables parallel iteration over a dataset from multiple processes, utilizing file-based locking and communication to ensure each item is processed exactly once.
Installation
Install Laufband using pip:
pip install laufband
Usage
Using Laufband is similar to the familiar tqdm progress bar for sequential iteration.
from laufband import Laufband
data = list(range(100))
for item in Laufband(data):
# Process each item in the dataset
pass
The true power of Laufband emerges when you run your script in parallel. Multiple processes will coordinate using file-based locking to ensure that each item in the dataset is processed by only one process.
Here's a typical example demonstrating parallel processing with Laufband and file-based locking for shared resource access:
import json
import time
from pathlib import Path
from laufband import Laufband
output_file = Path("data.json")
output_file.write_text(json.dumps({"processed_data": []}))
data = list(range(100))
worker = Laufband(data, tqdm_kwargs={"desc": "using Laufband"})
for item in worker:
# Simulate some computationally intensive task
time.sleep(0.1)
with worker.lock:
# Access and modify a shared resource (e.g., a file) safely using the lock
file_content = json.loads(output_file.read_text())
file_content["processed_data"].append(item)
output_file.write_text(json.dumps(file_content))
To execute this script (main.py) in parallel, you can use a command like the following in your terminal (this example launches 10 background processes):
for i in {1..10} ; do python main.py & done
[!IMPORTANT] The different processes may finish at different times. Therefore, the order of items in
file_contentis not guaranteed. If the order is important, you will need to implement sorting logic afterwards.
Failure Policy
In Laufband, a job will be automatically marked as failed if the iteration is interrupted by:
- an unhandled Exception
- or an explicit break.
from laufband import Laufband
data = list(range(100))
# Example 1: break
for item in Laufband(data):
if item == 50:
break # Job 50 will be marked as failed
# Example 2: Exception
for item in Laufband(data):
if item == 70:
raise ValueError("Something went wrong") # Job 70 will be marked as failed
If you want to exit early but still mark the job as successfully completed,
you should use Laufband.close() instead of break:
from laufband import Laufband
data = list(range(100))
worker = Laufband(data)
for item in worker:
if item == 50:
worker.close() # Job 50 will be marked as completed, and iteration will stop cleanly
Context Manager Protocol
Laufband supports context manager usage for better worker lifecycle management.
This is recommended, if you want to iterate over the Laufband or Graphband instance multiple times.
from laufband import Laufband
data = list(range(100))
# Using context manager for multiple iterations (recommended)
with Laufband(data) as worker:
while True:
for item in worker:
# do something
pass
# Worker automatically goes offline when exiting context
Context Manager Benefits:
- Proper cleanup: Workers are automatically set to
OFFLINEstatus when exiting the context
Worker Status Lifecycle:
- Without context manager: Worker goes from
IDLE→BUSY→OFFLINE(after completing all tasks) - With context manager: Worker goes from
IDLE→BUSY→IDLE(during processing) →OFFLINE(on context exit)
[!NOTE] Graphband uses two locks:
worker.lock: user-facing lock for protecting shared resources in your code (files, sockets, etc.).- An internal
db_lock: used by laufband for database coordination. It’s managed by the library and typically should not be acquired directly by user code.
Examples
ASE Calculator
For atomistic data, the ASE package is widely used to calculate energies and forces of atomic configurations using either ab initio methods or machine-learned interatomic potentials (MLIPs).
You can use Laufband to parallelize these calculations easily without duplication or manual bookkeeping and automatic checkpointing.
The following example uses a MACE foundation model to compute energies and forces on the ASE S22 dataset.
[!TIP] You can safely run this script multiple times — even across multiple SLURM jobs — without any modifications. Laufband will automatically coordinate which configurations are processed. For local parallelization, you can use bash:
for i in {1..10} ; do python main.py & done
import ase.io
from ase.collections import s22
from laufband import Laufband
from mace.calculators import mace_mp
# Initialize calculator
calc = mace_mp(model="medium", dispersion=False, default_dtype="float32")
worker = Laufband(list(s22))
for atoms in worker:
atoms.calc = calc
energy = atoms.get_potential_energy()
worker.iterator.set_description(f"{energy = }")
with worker.lock:
ase.io.write("frames.xyz", atoms, append=True)
You can use the laufband watch to follow the progress across all active workers.
Graphband
Laufband supports dependency-aware tasks through laufband.Graphband.
To use this, provide an iterator that yields tasks in a valid execution order.
[!NOTE]
laufband.Laufbandeffectively uses a directed graph with no edges as the input tolaufband.Graphband.
import networkx as nx
from laufband import Task
def graph_tasks():
digraph = nx.DiGraph()
edges = [
("a", "b"),
("a", "c"),
("b", "d"),
("b", "e"),
("c", "f"),
("c", "g"),
]
digraph.add_edges_from(edges)
for node in nx.topological_sort(digraph):
yield Task(
id=node, # unique string representation of the task
data=node, # optional data associated with the task
dependencies=set(digraph.predecessors(node)), # dependencies of the task
)
Given this generator, you can iterate the graph in parallel using laufband.Graphband.
[!WARNING] Once
laufband.Graphbandhas executed a task, it will not re-execute it, even if a dependency is added later on.
from laufband import Graphband
worker = Graphband(graph_tasks())
for task in worker:
print(task.id, task.data)
Labels, Requirements, and Multiple Workers per Task
You can assign requirements to your tasks and labels to workers to control their execution.
from laufband import Task, Graphband
def iterator():
yield Task(id="task1")
yield Task(id="task2", requirements={"gpu"})
w1 = Graphband(iterator(), identifier="w1")
w2 = Graphband(iterator(), identifier="w2", labels={"gpu"})
print([x.id for x in w1])
# ["task1"]
print([x.id for x in w2])
# ["task2"]
Sometimes a task supports internal parallel execution (e.g., nested use of laufband). In such a case, you can assign multiple workers to one task.
[!NOTE] Keep in mind that
laufbanddoes not actually schedule the execution. The number of available workers per task depends on how many workers are spawned.
from laufband import Task, Graphband
def iterator():
yield Task(id="task1", max_parallel_workers=2)
# At most 2 workers will be assigned to this job until both successfully finish.
worker = Graphband(iterator())
for item in worker:
# Code that can be executed multiple times, e.g., via laufband itself.
...
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file laufband-0.2.1.tar.gz.
File metadata
- Download URL: laufband-0.2.1.tar.gz
- Upload date:
- Size: 198.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.18
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
550083fe4bce1a95db99f4b61d9ec523121c9e37d8cbf352d02325fb2b4da06a
|
|
| MD5 |
a56a544be2e45e8b5050c04565f97862
|
|
| BLAKE2b-256 |
9ca854a04b70936d162395c556c859047810da1a2829e5d9e077228f0132b8c9
|
File details
Details for the file laufband-0.2.1-py3-none-any.whl.
File metadata
- Download URL: laufband-0.2.1-py3-none-any.whl
- Upload date:
- Size: 19.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.18
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d9adcae469029bf06a0ac3a9741daceb7f754b10d0661cd8b0184eb45589f019
|
|
| MD5 |
1577ec2ce619596b840dde4f7287ab89
|
|
| BLAKE2b-256 |
caccae496d34403f0e27a845d8ebb63dc32c8ce0418c4df34a97ddef188c6fbd
|