Python pipeline framework inspired by Snakemake.
Project description
necroflow
Python pipeline framework inspired by Snakemake. Define rules, wire them into pipelines, run with automatic parallelism and caching.
A local browser GUI for visualising pipelines and launching runs is available at necroflow_gui.
Core ideas
- Rules describe how to produce outputs from inputs — shell command templates with typed I/O.
- Pipelines wire rule calls together for a single config.
- DAG runs many pipelines at once, deduplicating shared upstream work across samples automatically.
- Paths are derived from a content-addressed hash of the full input chain — same inputs always produce the same path, different inputs produce different paths. The filesystem is the cache.
Install
cd necroflow
make venv
source .venv/bin/activate
Quick example
from __future__ import annotations # required for Type[name] return annotations
from necroflow import NodeType, Rules, Pipeline, DAG
# 1. Define types
class Fastq(NodeType):
"""Raw sequencing reads."""
filename = "reads.fastq.gz"
class Bam(NodeType):
"""Aligned reads."""
filename = "aligned.bam"
class Counts(NodeType):
"""Per-gene read counts."""
filename = "counts.txt"
# 2. Register rules
r = Rules()
@r.command("ln -s {path} {fastq}")
def raw_fastq(path: str) -> Fastq[fastq]:
"""Symlink a raw FASTQ file into the output tree."""
@r.command("bwa mem {ref} {fastq} > {bam}", threads=4)
def align(fastq: Fastq, ref: str) -> Bam[bam]:
"""Align reads to a reference genome with BWA-MEM."""
@r.command("featureCounts -a {gene_model} {bam} -o {counts}")
def count(bam: Bam, gene_model: str) -> Counts[counts]:
"""Count reads per gene using featureCounts."""
# 3. Build a pipeline
def rna_pipeline(config, r):
P = Pipeline()
P.fastq = r.raw_fastq(path=config.path)
P.bam = r.align(P.fastq, ref=config.ref)
P.counts = r.count(P.bam, gene_model=config.gene_model)
return P
The decorator syntax requires from __future__ import annotations so that Fastq[fastq] in the return annotation is stored as a string rather than evaluated (which would raise NameError on fastq). The original R.register(...) API continues to work unchanged.
Running one sample
DAG("results") sets the output directory where all computed files will be written (you can use any path you like).
from types import SimpleNamespace
config = SimpleNamespace(path="/data/s1.fastq.gz", ref="hg38", gene_model="gencode_v44")
dag = DAG("results") # output directory — change to any writable path
dag.add(rna_pipeline(config, R))
dag.execute()
Running many samples
configs = [
SimpleNamespace(path="/data/s1.fastq.gz", ref="hg38", gene_model="gencode_v44"),
SimpleNamespace(path="/data/s2.fastq.gz", ref="hg38", gene_model="gencode_v44"),
SimpleNamespace(path="/data/s3.fastq.gz", ref="hg38", gene_model="gencode_v44"),
]
dag = DAG("results")
for config in configs:
dag.add(rna_pipeline(config, R))
dag.execute() # runs all samples in parallel, skips any already-computed outputs
Nodes with identical upstream configs (e.g. a shared reference index) are deduplicated across samples — recognised by hash, run once.
Conditional pipelines
Pipeline factory functions are plain Python, so if/else branching on config values works naturally:
def my_pipeline(config, R):
P = Pipeline()
P.a = R.align(path=config.path, ref=config.ref)
if config.call_variants:
P.result = R.call_snps(P.a)
else:
P.result = R.count_reads(P.a)
return P
The branching config value (config.call_variants) does not need to be passed to any node. The rule name already encodes which branch was taken in the fingerprint, so call_snps and count_reads always produce distinct output paths regardless.
Two pipelines sharing the same upstream config (e.g. same path and ref) will reuse the align output — recognised as a cache hit — even if they take different branches downstream.
Pipeline attribute names cannot be overwritten. Assigning to the same name twice raises ValueError. If you want to build a pipeline in a loop, use distinct names:
for i, step in enumerate(steps):
setattr(P, f"result_{i}", R.process(step_node, mode=step))
The idiomatic pattern for multi-sample or multi-condition work is separate Pipeline objects added to a shared DAG — one pipeline per config, one dag.add(P) call per pipeline.
Inspecting a pipeline
from necroflow import resolve_command
P = rna_pipeline(config, R)
print(P) # layered ASCII DAG to stdout
P.save("pipeline.txt") # same render to a file
dag.save("dag.txt") # works on DAG too
P.resolve_paths("results")
for node in P.nodes:
print(resolve_command(node)) # fully-resolved shell command
Caching
Each output lives at outdir/{rule}/{hash16}/{filename}. The 16-character hash captures the entire upstream config chain, including rule name, command, config values, parent fingerprints, and declared Inputs/Outputs types (Constraints are excluded — execution resources don't affect output identity).
- Re-running with the same inputs is a no-op (cache hit).
- Changing any upstream parameter, command, or declared type produces a new path — old results are never overwritten.
- A parent whose mtime is newer than a child triggers a content-hash check: if the parent's bytes are unchanged, the child is not re-run. Only a genuine content change marks children STALE.
- Each output folder contains a
.rip/subdirectory with:dependencies.toml— full accumulated config for provenance.{filename}.hash— SHA-256 content hash, used for STALE detection on the next run.job.log— captured stdout/stderr.state— last recorded run state (running/up_to_date/failed/interrupted). If a process is killed mid-run thestatefile is left asrunning; on the next invocation necroflow detects this and re-runs the node even if its output exists on disk.
Concurrency
Only one necroflow instance may run against a given outdir at a time. execute() acquires an exclusive lock on outdir/.rip/necroflow.lock (via fcntl.flock) at startup and releases it on exit. A second instance targeting the same outdir will fail immediately with a clear error. Running two instances against overlapping outdirs (e.g. results and results/sub) is unsupported — there is no OS primitive to detect this, so avoid it.
Parallelism and scheduling
execute() runs nodes in parallel subject to resource caps. By default the thread cap is all available CPUs. Declare per-job requirements with Constraints; set global caps via resource_caps (Python API) or CLI flags.
@r.command("bwa mem {ref} {fastq} > {bam}", threads=4, ram="8Gi")
def align(fastq: Fastq, ref: str) -> Bam[bam]:
"""Align reads with BWA-MEM."""
dag.execute(resource_caps={"threads": 16, "ram": parse_resource("64Gi")})
Resource values accept SI (K M G T P = powers of 1000) and binary (Ki Mi Gi Ti Pi = powers of 1024) suffixes — e.g. "8Gi" is 8 GiB, "8G" is 8 GB. A job whose requirement exceeds the cap still runs solo when nothing else is running.
By default the scheduler prioritises nodes from the smallest connected component of remaining work — this tends to finish whole samples before starting new ones, keeping memory pressure low.
from necroflow import fifo_scheduler
dag.execute(resource_caps={"threads": 16}, scheduler=fifo_scheduler) # topological order instead
Custom schedulers:
def my_scheduler(ready, remaining):
return sorted(ready, key=lambda n: n.rule.constraints.get("threads", 1), reverse=True)
dag.execute(scheduler=my_scheduler)
Types and subtypes
NodeTypes form an inheritance hierarchy — a rule accepting Bam also accepts SortedBam:
class SortedBam(Bam):
"""Coordinate-sorted BAM."""
filename = "sorted.bam"
@r.command("samtools sort {bam} -o {sorted_bam}")
def sort(bam: Bam) -> SortedBam[sorted_bam]:
"""Sort BAM by coordinate with samtools."""
@r.command("featureCounts -a {gene_model} {bam} -o {counts}")
def quantify(bam: SortedBam, gene_model: str) -> Counts[counts]: # only accepts sorted bam
"""Count reads per gene using featureCounts."""
Failure handling
dag.execute(keep_going=True) # continue independent branches past failures
With keep_going=False (default) the first failure raises immediately. With keep_going=True independent branches keep running and all failures are collected into an ExceptionGroup at the end.
After each successful job, necroflow verifies that the declared output file exists. A command that exits 0 but writes no output is treated as a failure.
Run state is persisted to a plain-text state file inside each node's .rip/ directory between invocations. A node whose output exists on disk but whose previous run was interrupted by a signal or left in an unknown state is automatically re-executed next time.
Each job's stdout/stderr is captured to outdir/{rule}/{hash}/.rip/job.log. On failure the log is printed to the terminal.
Multi-output rules
A rule with multiple declared outputs runs its command once; all co-outputs are marked complete when the command finishes:
@r.command("bwa mem {ref} {fastq} > {bam} 2> {log}", threads=4)
def align(fastq: Fastq, ref: str) -> (Bam[bam], Log[log]):
"""Align reads with BWA-MEM, capturing the log."""
P = Pipeline()
P.fastq = R.raw_fastq(path=config.path)
P.bam, P.log = R.align(P.fastq, ref="hg38")
Cleaning orphan outputs
Outputs that existed from a previous run but are no longer in the required subgraph are classified as ORPHAN. Pass autoclean=True to delete them per-file (files via unlink, directories via rmtree):
dag.execute(autoclean=True)
Or via CLI:
necroflow --outdir results --autoclean job.toml # job.toml contains ".pipeline" key
Command-line interface
necroflow ships a necroflow command. Each positional argument is a job TOML — a self-contained file that specifies the pipeline factory, optional requested outputs, and user config params.
necroflow --outdir results [-c N|all] [--constraint KEY=VALUE ...] \
[--keep-going] [--autoclean] [--dry-run] JOB.toml [JOB2.toml ...]
| Flag | Meaning |
|---|---|
-c N / -call |
Thread cap — integer or all (default: all CPUs). |
--constraint KEY=VALUE |
Additional resource cap. Repeatable. Accepts SI/binary suffixes. |
--keep-going / -k |
Continue past failures; collect all errors at the end. |
--autoclean |
Delete orphan and intermediate outputs. |
--dry-run / -n |
Show what would run without executing. |
necroflow --outdir results -c8 --constraint ram=64Gi job.toml
Job TOML format
# required — path resolved from the directory where necroflow is invoked
".pipeline" = "path/to/factory.py:function_name"
# optional — pipeline_label names to request (defaults to all sinks)
".requests" = ["counts", "qc"]
# user config — passed as a plain dict to the factory
ref = "hg38"
sample = "NA12878"
Keys starting with . are necroflow metadata — stripped before the dict reaches the factory. They never appear in node configs or affect output hashes. User config can freely use any name, including pipeline or request.
Parameter grids
Any TOML key ending in __grid is expanded into a Cartesian product of all
combinations. The resulting output subfolders use the same naming scheme as
snakemakeconfigs.
".pipeline" = "factory.py:factory"
ref__grid = ["hg38", "mm10"]
aligner__grid = ["bwa", "bowtie2"]
This produces four pipelines: experiment__ref+hg38__aligner+bwa,
experiment__ref+hg38__aligner+bowtie2, etc. Grid expansion also applies to
pipeline itself, so a single job TOML can fan out across different factory functions.
Linked outputs
After every run the CLI creates one subfolder per grid combo under outdir/:
results/
{rule}/{hash16}/{file} ← real outputs (content-addressed)
experiment__ref+hg38__aligner+bwa/
{rule}/{hash16}/{file} ← symlinks to requested outputs only
manifest.toml ← requested output paths for this combo
experiment__ref+hg38__aligner+bowtie2/
...
Only the requested outputs (defaults to pipeline sinks) get a symlink — intermediate ancestors are excluded. manifest.toml lists the same outputs keyed by the Pipeline attribute name assigned in the factory:
[outputs]
counts = "count/a3f1bc92/counts.txt"
The key (counts) matches P.counts = R.count(...) in the factory function.
See examples/necroalchemy_grid.toml and examples/necroalchemy_factory.py
for a runnable example.
What is not yet implemented
- Cluster / cloud backends
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file necroflow-0.0.1.tar.gz.
File metadata
- Download URL: necroflow-0.0.1.tar.gz
- Upload date:
- Size: 45.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b5152799c4ee2c5746076596d4b3cc8629be12776cda8a0f21e1396229b2ce24
|
|
| MD5 |
6e66bf6a8020800c4f24db4066355013
|
|
| BLAKE2b-256 |
cc1613c6d624cfaf204546076f2498820be9539c86de8e0eecfcff72d0b5eee6
|
File details
Details for the file necroflow-0.0.1-py3-none-any.whl.
File metadata
- Download URL: necroflow-0.0.1-py3-none-any.whl
- Upload date:
- Size: 32.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
28a997de3390fbca53ecb69a53872f49d3889f5b2df06e859645ed6c664eff8f
|
|
| MD5 |
0851643f515f0e3b76f59c0db54122d0
|
|
| BLAKE2b-256 |
6f2d3bcf6011cb97cf651b090ba6ccf5e55a7d49797d1c9c29fd8a831adc3db2
|