Cacheable, nested pipelines for Python. Define computations as configs; furu handles caching, state tracking, and result reuse across runs.
Project description
furu
Note:
v0.0.xis alpha and may include breaking changes.
A Python library for building cacheable, nested pipelines. Define computations as config objects; furu turns configs into stable on-disk artifact directories, records metadata/state, and reuses results across runs.
Built on chz for declarative configs.
Installation
uv add "furu[dashboard]"
Or with pip:
pip install "furu[dashboard]"
The [dashboard] extra includes the web dashboard. Omit it for the core library only.
Quickstart
- Subclass
furu.Furu[T] - Implement
_create(self) -> T(compute and write toself.furu_dir) - Implement
_load(self) -> T(load fromself.furu_dir) - Call
get()
# my_project/pipelines.py
import json
from pathlib import Path
import furu
class TrainModel(furu.Furu[Path]):
lr: float = furu.chz.field(default=1e-3)
steps: int = furu.chz.field(default=1000)
def _create(self) -> Path:
# Write outputs into the artifact directory
(self.furu_dir / "metrics.json").write_text(
json.dumps({"lr": self.lr, "steps": self.steps})
)
ckpt = self.furu_dir / "checkpoint.bin"
ckpt.write_bytes(b"...")
return ckpt
def _load(self) -> Path:
# Load outputs back from disk
return self.furu_dir / "checkpoint.bin"
# run_train.py
from my_project.pipelines import TrainModel
# First call: runs _create(), caches result
artifact = TrainModel(lr=3e-4, steps=5000).get()
# Second call with same config: loads from cache via _load()
artifact = TrainModel(lr=3e-4, steps=5000).get()
Tip: Define Furu classes in importable modules (not
__main__); the artifact namespace is derived from the class's module + qualified name.
Core Concepts
How Caching Works
Each Furu instance maps deterministically to a directory based on its config:
<root>/<namespace>/<hash>/
- namespace: Derived from the class's module + qualified name (e.g.,
my_project.pipelines/TrainModel) - hash: Computed from the object's config values using Blake2s
When you call get():
- If no cached result exists → run
_create(), save state as "success" - If cached result exists → run
_load()to retrieve it - If another process is running → wait for it to finish, then load
Nested Pipelines (Dependencies)
Furu objects compose via nested configs. Each dependency gets its own artifact folder:
import furu
class Dataset(furu.Furu[str]):
name: str = furu.chz.field(default="toy")
def _create(self) -> str:
(self.furu_dir / "data.txt").write_text("hello\nworld\n")
return "ready"
def _load(self) -> str:
return (self.furu_dir / "data.txt").read_text()
class TrainTextModel(furu.Furu[str]):
dataset: Dataset = furu.chz.field(default_factory=Dataset)
def _create(self) -> str:
data = self.dataset.get() # Triggers Dataset cache
(self.furu_dir / "model.txt").write_text(f"trained on:\n{data}")
return "trained"
def _load(self) -> str:
return (self.furu_dir / "model.txt").read_text()
Executors (Local + Slurm)
Use the execution helpers for batch runs and cluster scheduling:
from furu.execution import run_local
run_local(
[TrainModel(lr=3e-4, steps=5000), TrainModel(lr=1e-3, steps=2000)],
max_workers=8,
window_size="bfs",
)
import furu
from furu.execution import SlurmSpec, submit_slurm_dag
class TrainModel(furu.Furu[str]):
use_gpu: bool = furu.chz.field(default=False)
def _executor(self) -> SlurmSpec:
if self.use_gpu:
return SlurmSpec(partition="gpu", gpus=1, cpus=8, mem_gb=64, time_min=720)
return SlurmSpec(partition="cpu", cpus=8, mem_gb=32, time_min=120)
submit_slurm_dag([TrainModel(use_gpu=True)])
from furu.execution import SlurmSpec, run_slurm_pool
run_slurm_pool(
[TrainModel(use_gpu=True), TrainModel(use_gpu=False)],
max_workers_total=50,
window_size="bfs",
plan_refresh_interval_sec=60.0,
worker_health_check_interval_sec=15.0,
done_scan_interval_sec=2.0,
failed_scan_interval_sec=2.0,
stale_scan_interval_sec=10.0,
ready_refresh_interval_sec=2.0,
)
run_slurm_pool(
[TrainModel(use_gpu=True), TrainModel(use_gpu=False)],
max_workers_total={
SlurmSpec(partition="gpu", gpus=1, cpus=8, mem_gb=64, time_min=720): 4,
SlurmSpec(partition="cpu", cpus=8, mem_gb=32, time_min=120): 12,
"total": 14,
},
)
_executor() can return a single SlurmSpec or an ordered sequence of specs.
submit_slurm_dag and run_local use the first spec. run_slurm_pool workers can
claim tasks using any listed spec key, so you can offer fallback queues.
run_slurm_pool(max_workers_total=...) accepts either an integer or a mapping
of SlurmSpec | Literal["total"] to integer limits. In mapping mode, every
executor spec key seen in the active plan must have a per-spec limit entry.
"total" is optional; when omitted, the total cap defaults to the sum of all
per-spec limits. Extra mapping keys that do not appear in the plan are allowed.
run_local keeps an in-memory scheduler state with explicit buckets (READY,
BLOCKED, IN_PROGRESS_SELF, IN_PROGRESS_EXTERNAL, FAILED, COMPLETED)
and dispatches from READY without rebuilding the full dependency plan each loop.
It polls external in-progress nodes (external_poll_interval_sec, default 5.0),
runs underutilized reconciles when below capacity
(underutilized_reconcile_interval_sec, default 15.0), and runs a broader
periodic reconcile (plan_refresh_interval_sec, default 60.0). Nodes observed
as DONE stay done for the remainder of that executor run (snapshot semantics).
run_slurm_pool keeps run-local scheduler state in memory and periodically
refreshes from storage. Queue scans can be tuned independently with
done_scan_interval_sec, failed_scan_interval_sec, stale_scan_interval_sec,
and ready_refresh_interval_sec (all default to poll_interval_sec when
unspecified).
Submitit logs are stored under <FURU_PATH>/submitit by default. Override with
FURU_SUBMITIT_PATH when you want a different logs root. In run_slurm_pool,
worker stdout/stderr are written to each worker queue directory under
<run_dir>/queue/running/<spec_key>/<worker_id>/stdout.log|stderr.log.
Breaking Changes and Executor Semantics
load_or_create()is removed; useget()exclusively.get()no longer accepts per-callretry_failedoverrides. Configure retries viaFURU_RETRY_FAILEDorFURU_CONFIG.retry_failed.- Slurm executor selection is now
Furu._executor() -> SlurmSpec | Sequence[SlurmSpec]; remove_executor_spec_key()and do not passspecs=torun_slurm_poolorsubmit_slurm_dag. - Executor runs (
run_local,run_slurm_pool,submit_slurm_dag) fail fast if a dependency is FAILED whileretry_failedis disabled; with retries enabled, failed compute nodes are retried (bounded byFURU_MAX_COMPUTE_RETRIESretries). - Pool protocol/queue failures (invalid payloads, spec mismatch, missing artifacts) are
fatal even when
retry_failedis enabled; only compute failures are retried. FURU_ALWAYS_RERUNcauses matching nodes to recompute once per executor run, but repeated references in the same run reuse that result.
Storage Structure
Furu uses two roots: FURU_PATH for data/ + raw/, and
FURU_VERSION_CONTROLLED_PATH for artifacts/. Defaults:
FURU_PATH=<project>/furu-data
FURU_VERSION_CONTROLLED_PATH=<project>/furu-data/artifacts
<project> is the nearest directory containing pyproject.toml (falling back to
the git root). This means you can move FURU_PATH without relocating artifacts.
$FURU_PATH/
├── data/ # version_controlled=False
│ └── <module>/<Class>/<hash>/
└── raw/
$FURU_VERSION_CONTROLLED_PATH/ # version_controlled=True
└── <module>/<Class>/<hash>/
Features
FuruList: Managing Experiment Collections
FuruList provides a collection interface for organizing related experiments:
import furu
class MyExperiments(furu.FuruList[TrainModel]):
baseline = TrainModel(lr=1e-3, steps=1000)
fast_lr = TrainModel(lr=1e-2, steps=1000)
long_run = TrainModel(lr=1e-3, steps=10000)
# Can also use a dict for dynamic configs
configs = {
"tiny": TrainModel(lr=1e-3, steps=100),
"huge": TrainModel(lr=1e-4, steps=100000),
}
# Iterate over all experiments
for exp in MyExperiments:
exp.get()
# Access by name
exp = MyExperiments.by_name("baseline")
# Get all as list
all_exps = MyExperiments.all()
# Get (name, instance) pairs
for name, exp in MyExperiments.items():
print(f"{name}: {exp.exists()}")
Custom Validation
Override _validate() to add custom cache invalidation logic. Return False or
raise furu.FuruValidationError to force re-computation. In executor planning,
any other exception is logged and treated as invalid (no crash); in interactive
exists() calls, exceptions still surface:
class ModelWithValidation(furu.Furu[Path]):
checkpoint_name: str = "model.pt"
def _validate(self) -> bool:
# Return False (or raise FuruValidationError) to force re-computation
ckpt = self.furu_dir / self.checkpoint_name
return ckpt.exists() and ckpt.stat().st_size > 0
def _create(self) -> Path:
...
def _load(self) -> Path:
...
Checking State Without Loading
obj = TrainModel(lr=3e-4, steps=5000)
# Check if cached result exists (runs _validate())
if obj.exists():
print("Already computed!")
# Get metadata without triggering computation
metadata = obj.get_metadata()
print(f"Hash: {obj.furu_hash}")
print(f"Dir: {obj.furu_dir}")
Serialization
Furu objects can be serialized to/from dictionaries:
obj = TrainModel(lr=3e-4, steps=5000)
# Serialize to dict (for storage, transmission)
data = obj.to_dict()
# Reconstruct from dict
obj2 = TrainModel.from_dict(data)
# Get Python code representation (useful for logging)
print(obj.to_python())
# Output: TrainModel(lr=0.0003, steps=5000)
Raw Directory
For large files that shouldn't be versioned per-config, use the shared raw directory:
class LargeDataProcessor(furu.Furu[Path]):
def _create(self) -> Path:
# self.raw_dir is shared across all configs
# Create a subfolder for isolation if needed
my_raw = self.raw_dir / self.furu_hash
my_raw.mkdir(exist_ok=True)
large_file = my_raw / "huge_dataset.bin"
# ... write large file ...
return large_file
Version-Controlled Storage
For artifacts that should be stored separately (e.g., checked into git):
class VersionedConfig(furu.Furu[dict], version_controlled=True):
# Stored under $FURU_VERSION_CONTROLLED_PATH
# Default: <project>/furu-data/artifacts
...
<project> is the nearest directory containing pyproject.toml, or the git root
if pyproject.toml is missing.
It is typical to keep furu-data/data/ and furu-data/raw/ in .gitignore while
committing furu-data/artifacts/.
Logging
Furu installs stdlib logging handlers that capture logs to per-artifact files.
import logging
import furu
log = logging.getLogger(__name__)
class MyPipeline(furu.Furu[str]):
def _create(self) -> str:
log.info("Starting computation...") # Goes to furu.log
log.debug("Debug details...")
return "done"
Console Output
By default, furu logs to console using Rich in a compact format:
HHMMSS file.py:line message
Furu emits status messages like:
get TrainModel abc123def (missing->create)
get TrainModel abc123def (success->load)
Explicit Setup
import furu
# Eagerly install logging handlers (optional, happens automatically)
furu.configure_logging()
# Get the furu logger
logger = furu.get_logger()
Error Handling
from furu import FuruComputeError, FuruWaitTimeout, FuruLockNotAcquired
try:
result = obj.get()
except FuruComputeError as e:
print(f"Computation failed: {e}")
print(f"State file: {e.state_path}")
print(f"Original error: {e.original_error}")
except FuruWaitTimeout:
print("Timed out waiting for another process")
except FuruLockNotAcquired:
print("Could not acquire lock")
By default, failed artifacts are retried on the next get() call. Set
FURU_RETRY_FAILED=0 to keep failures sticky.
FURU_MAX_WAIT_SECS overrides the per-class _max_wait_time_sec (default 600s)
timeout used when waiting for compute locks before raising FuruWaitTimeout.
Failures during metadata collection or signal handler setup (before _create()
runs) raise FuruComputeError with the original exception attached. These
failures still mark the attempt as failed and record details in state.json
and furu.log.
Submitit Integration
Furu includes a SubmititAdapter for integrating submitit executors with the
state system. Executor helpers in furu.execution handle submission workflows.
Dashboard
The web dashboard provides experiment browsing, filtering, and dependency visualization.
Running the Dashboard
# Full dashboard with React frontend
furu-dashboard serve
# Or with options
furu-dashboard serve --host 0.0.0.0 --port 8000 --reload
# API server only (no frontend)
furu-dashboard api
Or via Python:
python -m furu.dashboard serve
API Endpoints
| Endpoint | Description |
|---|---|
GET /api/experiments |
List experiments with filtering/pagination |
GET /api/experiments/{namespace}/{hash} |
Get experiment details |
GET /api/experiments/{namespace}/{hash}/relationships |
Get dependencies |
GET /api/stats |
Aggregate statistics |
GET /api/dag |
Dependency graph for visualization |
Filtering
The /api/experiments endpoint supports:
result_status:absent,incomplete,success,failedattempt_status:queued,running,success,failed,cancelled,preempted,crashednamespace: Filter by namespace prefixbackend:local,submitithostname,user: Filter by execution environmentstarted_after,started_before: ISO datetime filtersconfig_filter: Filter by config field (e.g.,lr=0.001)
Configuration Reference
Environment Variables
| Variable | Default | Description |
|---|---|---|
FURU_PATH |
<project>/furu-data |
Base storage directory for non-versioned artifacts |
FURU_VERSION_CONTROLLED_PATH |
<project>/furu-data/artifacts |
Override version-controlled storage root |
FURU_SUBMITIT_PATH |
<FURU_PATH>/submitit |
Override submitit logs root |
FURU_LOG_LEVEL |
INFO |
Console verbosity (DEBUG, INFO, WARNING, ERROR) |
FURU_RICH_UNCAUGHT_TRACEBACKS |
true |
Use Rich for exception formatting (set 0 to disable) |
FURU_RECORD_GIT |
cached |
Git provenance capture: ignore skips git metadata, cached records once per process, uncached records every time |
FURU_ALLOW_NO_GIT_ORIGIN |
false |
Allow missing git origin when recording git metadata (invalid with FURU_RECORD_GIT=ignore) |
FURU_ALWAYS_RERUN |
"" |
Comma-separated class qualnames to always rerun (use ALL to bypass cache globally; cannot combine with other entries; entries must be importable) |
FURU_RETRY_FAILED |
true |
Retry failed artifacts by default (set to 0 to keep failures sticky) |
FURU_MAX_COMPUTE_RETRIES |
3 |
Maximum compute retries per node after the first failure |
FURU_POLL_INTERVAL_SECS |
10 |
Polling interval for queued/running jobs |
FURU_MAX_WAIT_SECS |
unset | Override wait timeout (falls back to _max_wait_time_sec, default 600s) |
FURU_WAIT_LOG_EVERY_SECS |
10 |
Interval between "waiting" log messages |
FURU_STALE_AFTER_SECS |
1800 |
Consider running jobs stale after this duration |
FURU_LEASE_SECS |
120 |
Compute lock lease duration |
FURU_HEARTBEAT_SECS |
lease/3 |
Heartbeat interval for running jobs (min 1s) |
FURU_PREEMPT_MAX |
5 |
Maximum submitit requeues on preemption |
FURU_CANCELLED_IS_PREEMPTED |
false |
Treat SLURM CANCELLED as preempted |
SLURM_JOB_ID |
unset | Read-only; set by Slurm to record job id and enable submitit context |
Local .env files are not loaded automatically. Call furu.load_env() when you
want to load .env values (requires python-dotenv).
Test and CI Environment Variables
| Variable | Default | Description |
|---|---|---|
FURU_DASHBOARD_DEV_DATA_DIR |
unset | Override data dir for make dashboard-dev (defaults to a temp dir) |
FURU_E2E_DATA_DIR |
unset | Required for Playwright e2e runs; used as the data root and to set FURU_PATH |
CI |
unset | Enables CI-friendly Playwright settings (retries, single worker, traces, screenshots, video) |
Programmatic Configuration
import furu
from pathlib import Path
# Set/get root directory
furu.set_furu_root(Path("/my/storage"))
root = furu.get_furu_root()
# Access config directly
furu.FURU_CONFIG.record_git = "uncached"
furu.FURU_CONFIG.poll_interval = 5.0
Testing with pytest
Use the built-in pytest fixture to isolate Furu storage in tests (each test gets its own temp root, so identical configs in separate tests will not collide):
# conftest.py
pytest_plugins = ["furu.testing"]
# test_pipeline.py
import json
from pathlib import Path
import furu
class TrainModel(furu.Furu[Path]):
lr: float = furu.chz.field(default=1e-3)
def _create(self) -> Path:
path = self.furu_dir / "metrics.json"
path.write_text(json.dumps({"lr": self.lr}))
return path
def _load(self) -> Path:
return self.furu_dir / "metrics.json"
def test_create_and_reload(furu_tmp_root):
obj = TrainModel(lr=1e-3)
first = obj.get()
second = obj.get()
assert first.read_text() == second.read_text()
assert (furu_tmp_root / "data").exists()
Override specific dependencies when you want to skip deeper chains:
from furu.testing import override_results
class Normalize(furu.Furu[str]):
def _create(self) -> str:
return "normalized"
def _load(self) -> str:
return "normalized"
class TrainModel(furu.Furu[str]):
normalizer: Normalize = furu.chz.field(default_factory=Normalize)
def _create(self) -> str:
return f"trained:{self.normalizer.get()}"
def _load(self) -> str:
return "trained"
def test_override_dependency(furu_tmp_root):
normalizer = Normalize()
model = TrainModel(normalizer=normalizer)
with override_results({normalizer: "stub"}):
assert model.get() == "trained:stub"
If you want to override without instantiating the dependency directly, target it
by dotted path from the root object (chz-style paths, e.g. deps.0 for lists and
deps.key for mappings):
from furu.testing import override_results_for
def test_override_by_path(furu_tmp_root):
model = TrainModel()
with override_results_for(model, {"normalizer": "stub"}):
assert model.get() == "trained:stub"
Class-Level Options
class MyPipeline(furu.Furu[Path], version_controlled=True):
_max_wait_time_sec = 3600.0 # Wait up to 1 hour (default: 600)
...
Metadata
Each artifact records:
| Category | Fields |
|---|---|
| Config | furu_python_def, furu_obj, furu_hash, furu_path |
| Git | git_commit, git_branch, git_remote, git_patch, git_submodules |
| Environment | timestamp, command, python_version, executable, platform, hostname, user, pid |
Access via:
metadata = obj.get_metadata()
print(metadata.git_commit)
print(metadata.hostname)
Public API
from furu import (
# Core
Furu,
FuruList,
FURU_CONFIG,
# Configuration
get_furu_root,
set_furu_root,
# Errors
FuruError,
FuruComputeError,
FuruLockNotAcquired,
FuruWaitTimeout,
MISSING,
# Serialization
FuruSerializer,
# Storage
StateManager,
MetadataManager,
# Runtime
configure_logging,
get_logger,
load_env,
# Adapters
SubmititAdapter,
# Re-exports
chz,
submitit,
# Version
__version__,
)
Non-goals / Caveats
- Prototype status: APIs and on-disk formats may change
- Not a workflow scheduler (for now): It's a lightweight caching layer for Python code
- No distributed coordination: Lock files work on shared filesystems but aren't distributed
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file furu-0.0.10.tar.gz.
File metadata
- Download URL: furu-0.0.10.tar.gz
- Upload date:
- Size: 269.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9ba3bc7a942f13ff2453faea9119fa85f840147d0ccaffeb7ff393490d3eb156
|
|
| MD5 |
6a22eacd5e92406af90066a46eb4d9dd
|
|
| BLAKE2b-256 |
08da4f30ae535369c7f4bed3dcbdd293025ae17441de2a19327bb777f20ec04a
|
Provenance
The following attestation bundles were made for furu-0.0.10.tar.gz:
Publisher:
release.yml on hermabr/furu
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
furu-0.0.10.tar.gz -
Subject digest:
9ba3bc7a942f13ff2453faea9119fa85f840147d0ccaffeb7ff393490d3eb156 - Sigstore transparency entry: 945301775
- Sigstore integration time:
-
Permalink:
hermabr/furu@2de5fac9798178a9350b1b13b1632069c8e0bc62 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/hermabr
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@2de5fac9798178a9350b1b13b1632069c8e0bc62 -
Trigger Event:
push
-
Statement type:
File details
Details for the file furu-0.0.10-py3-none-any.whl.
File metadata
- Download URL: furu-0.0.10-py3-none-any.whl
- Upload date:
- Size: 286.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ced06719c1d980a0d7c9419564b874d253147cf00b38533066a96a053d558313
|
|
| MD5 |
17d42fb53555b96bfa1b1637757715b6
|
|
| BLAKE2b-256 |
6050a17f70c949a5b973f9e0bae8d776ae25cb03caa7a3a8a94fc8088b30174a
|
Provenance
The following attestation bundles were made for furu-0.0.10-py3-none-any.whl:
Publisher:
release.yml on hermabr/furu
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
furu-0.0.10-py3-none-any.whl -
Subject digest:
ced06719c1d980a0d7c9419564b874d253147cf00b38533066a96a053d558313 - Sigstore transparency entry: 945301792
- Sigstore integration time:
-
Permalink:
hermabr/furu@2de5fac9798178a9350b1b13b1632069c8e0bc62 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/hermabr
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@2de5fac9798178a9350b1b13b1632069c8e0bc62 -
Trigger Event:
push
-
Statement type: