Rust rewrite of bcftools consensus for online enformer expression data
Project description
pyconsensus
A Rust rewrite of bcftools consensus for the enformer expression-prediction pipeline.
Personal diploid consensus sequences — online, in-memory, byte-exact.
Exposed to Python as the native extension package
pyconsensus.
💡 Why?
The original pipeline (make_consensus_enformer_new.py) spawns one bcftools consensus subprocess per gene × sample × haplotype — on the order of 10M invocations. Each pays for: process start → reopen & reparse the same VCF → re-read the ref slice → write fasta → build .fai.
The bottleneck is the repeated spawning, reparsing, and disk I/O — not the consensus algorithm itself.
⚙️ How
| Original pipeline | pyconsensus |
|
|---|---|---|
| Inputs | Reloaded per call | Loaded once, resident in memory |
| Consensus | Subprocess per task | Long-lived Rust worker pool |
| Output | Fasta + .fai on disk |
bytes via lazy iterator — no intermediate files |
| GIL | — | Released while workers run |
✨ Features
- 🔬 Byte-exact parity with
bcftools consensus1.23 — a faithful line-by-line port offorks/bcftools/consensus.c, covering SNP / MNP / ins / del / complex-indel /<DEL>/ gVCF<*>/<NON_REF>,-H {R,A,I,1,2,1pIu,2pIu,LR,LA,SR,SA}(+NpIu),-I, missing/unphased GT,-a/-M/--mark-{del,ins,snv}/-m --mask-with/-cchain. Pinned by#[ignore]tests diffing against a realbcftoolsbinary. - ⚡ Online, no disk — sequences yielded as
bytesvia a lazy iterator; no intermediate fasta, no temp files. - 🧠 Resident + multithreaded — VCFs parsed once into a compiled in-memory store with a binary-searchable region index, allele-operation metadata, compact GT stores, and an optional
.cvcfcache;.faibuilt once; GIL released while Rust workers run. - 📈 Built-in profile counters —
compile_stats()reports VCF record/allele/GT layout counts, andconsensus_many_profile()reports lane hit counts, fallback reasons, throughput rates, records seen, and output length totals without returning sequence bytes. - 📦 Bounded group size —
max_tasks_per_group=0keeps legacy unlimited grouping; positive values split large region groups for better load balance and a predictable in-flight task bound.
Installation
pip install pyconsensus-rs
Pre-built wheels are published on PyPI
— no Rust toolchain needed. The wheel is abi3-py310 and manylinux_2_34-compliant,
so a single build covers CPython 3.10+ on x86-64 Linux.
Build from source
maturin build --release -o dist # -> dist/py_consensus-*.whl
pip install dist/py_consensus-*.whl
For local development, maturin develop --release installs straight into the
active venv. The first source build downloads htslib plus the compression-lib
tarballs (zlib / bzip2 / xz / libdeflate / zstd) and compiles them with -fPIC
so they link cleanly into the cdylib; on restricted networks run labpon (or
export http_proxy / https_proxy) first. scripts/build_wheels.sh builds
and verifies the wheel across CPython 3.10–3.15.
Usage
from pyconsensus import ConsensusEngine, Task
# The engine preprocesses and holds the inputs (ref + VCFs); the thread count
# is passed per call — the engine itself owns no thread pool.
engine = ConsensusEngine(
ref_path="ref/hg38.fa",
vcfs={"chr1": "data/variants/chr1.vcf.gz"},
iupac_codes=True, # corresponds to -I
max_tasks_per_group=8, # 0 = unlimited; positive values split large region groups
)
# Task coordinates are 1-based inclusive; vcf_key selects an entry of `vcfs`.
# TSS-centered slice (enformer window): start = tss - 393216//2, end = tss + 393216//2 - 1
tss = 2_917_619
start, end = tss - 393216 // 2, tss + 393216 // 2 - 1
tasks = [
Task("chr1", start, end, "chr1", "ENSG00000263280", "NA12878", "1pIu"),
Task("chr1", start, end, "chr1", "ENSG00000263280", "NA12878", "2pIu"),
]
# (1) Eager: run a flat task list, get results back in input order.
results = engine.consensus_many(tasks, threads=8)
for r in results:
print(r.gene_id, r.sample, r.haplotype, len(r.seq or b"")) # r.seq: bytes | None
# (2) Lazy: producer-consumer iterator; GIL released while workers run.
it = engine.consensus_iter(tasks, threads=8, prefetch_steps=16)
for idx, r in it:
... # r.seq feeds the downstream consumer directly
# For very large result streams, batch the Python boundary crossing.
it = engine.consensus_iter(tasks, threads=8, prefetch_steps=16)
while (batch := it.next_batch(256)) is not None:
for idx, r in batch:
...
it = engine.consensus_iter(tasks, threads=8, prefetch_steps=16)
while (batch := it.next_batch_bytes(256)) is not None:
for idx, seq in batch:
...
# (3) Cartesian product (regions × samples × haplotypes) -> lazy iterator.
regions = [Task("chr1", start, end, "chr1", "ENSG00000263280")]
samples = ["NA12878", "NA12879"]
haplotypes = ["1pIu", "2pIu"]
for idx, r in engine.consensus_regions(regions, samples, haplotypes, threads=16):
...
# (4) Observability: key-value lines suitable for logs.
print("\n".join(engine.compile_stats()))
print("\n".join(engine.consensus_many_profile(tasks, threads=8)))
consensus_iter / consensus_regions yield in completion order by default,
each result carrying its input idx for re-pairing; pass ordered=True to
yield in input order (results are buffered to reassemble). Task expansion is
region-major (all haplotypes of a sample for one gene are contiguous, then
the next sample, then the next gene) — matching the original script's layout.
When max_tasks_per_group > 0, the number of in-flight tasks is bounded by
max(prefetch_steps, 1) × max_tasks_per_group; 0 preserves the original
unlimited group behavior.
API reference
ConsensusEngine(ref_path, vcfs, **opts)
Constructor options (besides ref_path and vcfs: {vcf_key: path}):
| Option | Type | Default | bcftools | Notes |
|---|---|---|---|---|
iupac_codes |
bool | False |
-I |
IUPAC-encode ambiguous alleles |
missing |
str|None | None |
-M |
char for missing alleles (single char) |
absent |
str|None | None |
-a |
char for absent alleles (single char) |
mark_del |
str|None | None |
--mark-del |
char marking deletions (single char) |
mark_ins |
str|None | None |
--mark-ins |
"uc" / "lc" / single char |
mark_snv |
str|None | None |
--mark-snv |
"uc" / "lc" / single char |
mask |
str|None | None |
-m |
path to a mask BED |
mask_with |
str | "N" |
--mask-with |
"uc" / "lc" / single char |
chain |
bool | False |
-c |
emit a VCF chain alongside each seq |
regions_overlap |
int | 0 |
— | record filter: 0=POS-in-region, 1=record-span overlap, 2=variant-span overlap |
max_tasks_per_group |
int | 0 |
— | 0=unlimited; positive splits large region groups |
compile_threads |
int|None | None |
— | rayon pool for VCF compile (defaults to available parallelism capped at the unique-VCF count) |
log_level |
str | "info" |
— | off / error / warn / info / debug |
Engine methods
| Method | Returns | Notes |
|---|---|---|
consensus_many(tasks, threads=1) |
list[ConsensusResult] |
input order |
consensus_iter(tasks, prefetch_steps=None, warmup=False, ordered=False, threads=1) |
_ConsensusIter |
lazy; yields (idx, ConsensusResult) |
consensus_regions(regions, samples=None, haplotypes=None, *, threads=1, prefetch_steps=None, warmup=False, ordered=False) |
_ConsensusIter |
cartesian product → lazy iterator |
consensus_many_stats(tasks, threads=1) |
(n, total_len, min_len, max_len) |
consumes bytes in Rust; builds no Python list |
consensus_iter_stats(tasks, prefetch_steps=None, warmup=False, ordered=False, threads=1) |
(n, total_len, min_len, max_len) |
iter version of the above |
consensus_many_profile(tasks, threads=1) |
list[str] |
throughput + lane/fallback kv lines |
compile_stats() |
list[str] |
VCF record/allele/GT layout kv lines |
log_level (read/write property) |
str |
runtime engine log level |
Iterator (_ConsensusIter)
| Member | Returns |
|---|---|
__iter__ / __next__ |
yields (idx, ConsensusResult); GIL released while blocking |
next_batch(batch_size) |
list[(idx, ConsensusResult)] | None |
next_batch_bytes(batch_size) |
list[(idx, bytes)] | None — lowest-overhead streaming |
Module-level functions
from pyconsensus import build_tasks, build_cache, get_htslib_log_level, set_htslib_log_level
# Expand regions × samples × haplotypes into a flat task list (eager).
# A None (or empty) dimension collapses to one task with that field = None.
tasks = build_tasks(regions, samples=["NA12878", "NA12879"], haplotypes=["1pIu", "2pIu"])
# Pre-build .cvcf caches without loading the reference or building the engine.
results = build_cache(
["data/variants/chr1.vcf.gz", "data/variants/chr2.vcf.gz"],
compile_threads=4, force=False,
)
for r in results:
print(r.path, r.cache_path, r.status, r.records, r.samples, f"{r.cache_mb:.1f}MB")
# r.status ∈ {"hit", "built", "rebuilt", "forced"}
# htslib log level (off/error/warn/info/debug/trace) — independent of engine log_level.
set_htslib_log_level("error")
print(get_htslib_log_level())
Data classes
| Class | Fields |
|---|---|
Task |
chr, start, end, vcf_key, gene_id, sample, haplotype (all get/set) |
ConsensusResult |
gene_id, sample, haplotype, seq (bytes|None), chain (str|None), error (str|None) — read-only |
CacheResult |
path, cache_path, status, records, samples, cache_mb, elapsed_sec — read-only |
License
MIT — see LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distributions
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pyconsensus_rs-0.1.0-cp310-abi3-manylinux_2_34_x86_64.whl.
File metadata
- Download URL: pyconsensus_rs-0.1.0-cp310-abi3-manylinux_2_34_x86_64.whl
- Upload date:
- Size: 1.3 MB
- Tags: CPython 3.10+, manylinux: glibc 2.34+ x86-64
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.16 {"installer":{"name":"uv","version":"0.11.16","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"22.04","id":"jammy","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6f7600e796049af4e892f963b1a370e9f6690a1c966d82fc96095c0f367166cd
|
|
| MD5 |
cca72dddb1f98edd907fee7017eb3e4e
|
|
| BLAKE2b-256 |
b8a68dc3849166385dc658b40cafd7a324b77c8a3e86335bbfacdf88d0068a03
|
File details
Details for the file pyconsensus_rs-0.1.0-cp310-abi3-manylinux_2_28_x86_64.whl.
File metadata
- Download URL: pyconsensus_rs-0.1.0-cp310-abi3-manylinux_2_28_x86_64.whl
- Upload date:
- Size: 1.3 MB
- Tags: CPython 3.10+, manylinux: glibc 2.28+ x86-64
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5c8920da3506fa1eee3c9427cb4392553c200da2087325efc1ccbe39f3e84eb8
|
|
| MD5 |
0b6a941aea115d3be930f11a68eb8593
|
|
| BLAKE2b-256 |
f04698c771d01654bf48dd893acbea20a41882c9a12a5054e3a4bbec13996816
|