Skip to main content

Rust rewrite of bcftools consensus for online enformer expression data

Project description

pyconsensus

A Rust rewrite of bcftools consensus for the enformer expression-prediction pipeline.

Personal diploid consensus sequences — online, in-memory, byte-exact.

PyPI License: MIT Rust Python 3.10+ Built with PyO3

Exposed to Python as the native extension package pyconsensus.


💡 Why?

The original pipeline (make_consensus_enformer_new.py) spawns one bcftools consensus subprocess per gene × sample × haplotype — on the order of 10M invocations. Each pays for: process start → reopen & reparse the same VCF → re-read the ref slice → write fasta → build .fai.

The bottleneck is the repeated spawning, reparsing, and disk I/O — not the consensus algorithm itself.

⚙️ How

Original pipeline pyconsensus
Inputs Reloaded per call Loaded once, resident in memory
Consensus Subprocess per task Long-lived Rust worker pool
Output Fasta + .fai on disk bytes via lazy iterator — no intermediate files
GIL Released while workers run

✨ Features

  • 🔬 Byte-exact parity with bcftools consensus 1.23 — a faithful line-by-line port of forks/bcftools/consensus.c, covering SNP / MNP / ins / del / complex-indel / <DEL> / gVCF <*> / <NON_REF>, -H {R,A,I,1,2,1pIu,2pIu,LR,LA,SR,SA} (+NpIu), -I, missing/unphased GT, -a / -M / --mark-{del,ins,snv} / -m --mask-with / -c chain. Pinned by #[ignore] tests diffing against a real bcftools binary.
  • ⚡ Online, no disk — sequences yielded as bytes via a lazy iterator; no intermediate fasta, no temp files.
  • 🧠 Resident + multithreaded — VCFs parsed once into a compiled in-memory store with a binary-searchable region index, allele-operation metadata, compact GT stores, and an optional .cvcf cache; .fai built once; GIL released while Rust workers run.
  • 📈 Built-in profile counterscompile_stats() reports VCF record/allele/GT layout counts, and consensus_many_profile() reports lane hit counts, fallback reasons, throughput rates, records seen, and output length totals without returning sequence bytes.
  • 📦 Bounded group sizemax_tasks_per_group=0 keeps legacy unlimited grouping; positive values split large region groups for better load balance and a predictable in-flight task bound.

Installation

pip install pyconsensus-rs

Pre-built wheels are published on PyPI — no Rust toolchain needed. The wheel is abi3-py310 and manylinux_2_34-compliant, so a single build covers CPython 3.10+ on x86-64 Linux.

Build from source
maturin build --release -o dist            # -> dist/py_consensus-*.whl
pip install dist/py_consensus-*.whl

For local development, maturin develop --release installs straight into the active venv. The first source build downloads htslib plus the compression-lib tarballs (zlib / bzip2 / xz / libdeflate / zstd) and compiles them with -fPIC so they link cleanly into the cdylib; on restricted networks run labpon (or export http_proxy / https_proxy) first. scripts/build_wheels.sh builds and verifies the wheel across CPython 3.10–3.15.

Usage

from pyconsensus import ConsensusEngine, Task

# The engine preprocesses and holds the inputs (ref + VCFs); the thread count
# is passed per call — the engine itself owns no thread pool.
engine = ConsensusEngine(
    ref_path="ref/hg38.fa",
    vcfs={"chr1": "data/variants/chr1.vcf.gz"},
    iupac_codes=True,          # corresponds to -I
    max_tasks_per_group=8,     # 0 = unlimited; positive values split large region groups
)

# Task coordinates are 1-based inclusive; vcf_key selects an entry of `vcfs`.
# TSS-centered slice (enformer window): start = tss - 393216//2, end = tss + 393216//2 - 1
tss = 2_917_619
start, end = tss - 393216 // 2, tss + 393216 // 2 - 1

tasks = [
    Task("chr1", start, end, "chr1", "ENSG00000263280", "NA12878", "1pIu"),
    Task("chr1", start, end, "chr1", "ENSG00000263280", "NA12878", "2pIu"),
]

# (1) Eager: run a flat task list, get results back in input order.
results = engine.consensus_many(tasks, threads=8)
for r in results:
    print(r.gene_id, r.sample, r.haplotype, len(r.seq or b""))   # r.seq: bytes | None

# (2) Lazy: producer-consumer iterator; GIL released while workers run.
it = engine.consensus_iter(tasks, threads=8, prefetch_steps=16)
for idx, r in it:
    ...   # r.seq feeds the downstream consumer directly

# For very large result streams, batch the Python boundary crossing.
it = engine.consensus_iter(tasks, threads=8, prefetch_steps=16)
while (batch := it.next_batch(256)) is not None:
    for idx, r in batch:
        ...

it = engine.consensus_iter(tasks, threads=8, prefetch_steps=16)
while (batch := it.next_batch_bytes(256)) is not None:
    for idx, seq in batch:
        ...

# (3) Cartesian product (regions × samples × haplotypes) -> lazy iterator.
regions    = [Task("chr1", start, end, "chr1", "ENSG00000263280")]
samples    = ["NA12878", "NA12879"]
haplotypes = ["1pIu", "2pIu"]
for idx, r in engine.consensus_regions(regions, samples, haplotypes, threads=16):
    ...

# (4) Observability: key-value lines suitable for logs.
print("\n".join(engine.compile_stats()))
print("\n".join(engine.consensus_many_profile(tasks, threads=8)))

consensus_iter / consensus_regions yield in completion order by default, each result carrying its input idx for re-pairing; pass ordered=True to yield in input order (results are buffered to reassemble). Task expansion is region-major (all haplotypes of a sample for one gene are contiguous, then the next sample, then the next gene) — matching the original script's layout.

When max_tasks_per_group > 0, the number of in-flight tasks is bounded by max(prefetch_steps, 1) × max_tasks_per_group; 0 preserves the original unlimited group behavior.

API reference

ConsensusEngine(ref_path, vcfs, **opts)

Constructor options (besides ref_path and vcfs: {vcf_key: path}):

Option Type Default bcftools Notes
iupac_codes bool False -I IUPAC-encode ambiguous alleles
missing str|None None -M char for missing alleles (single char)
absent str|None None -a char for absent alleles (single char)
mark_del str|None None --mark-del char marking deletions (single char)
mark_ins str|None None --mark-ins "uc" / "lc" / single char
mark_snv str|None None --mark-snv "uc" / "lc" / single char
mask str|None None -m path to a mask BED
mask_with str "N" --mask-with "uc" / "lc" / single char
chain bool False -c emit a VCF chain alongside each seq
regions_overlap int 0 record filter: 0=POS-in-region, 1=record-span overlap, 2=variant-span overlap
max_tasks_per_group int 0 0=unlimited; positive splits large region groups
compile_threads int|None None rayon pool for VCF compile (defaults to available parallelism capped at the unique-VCF count)
log_level str "info" off / error / warn / info / debug

Engine methods

Method Returns Notes
consensus_many(tasks, threads=1) list[ConsensusResult] input order
consensus_iter(tasks, prefetch_steps=None, warmup=False, ordered=False, threads=1) _ConsensusIter lazy; yields (idx, ConsensusResult)
consensus_regions(regions, samples=None, haplotypes=None, *, threads=1, prefetch_steps=None, warmup=False, ordered=False) _ConsensusIter cartesian product → lazy iterator
consensus_many_stats(tasks, threads=1) (n, total_len, min_len, max_len) consumes bytes in Rust; builds no Python list
consensus_iter_stats(tasks, prefetch_steps=None, warmup=False, ordered=False, threads=1) (n, total_len, min_len, max_len) iter version of the above
consensus_many_profile(tasks, threads=1) list[str] throughput + lane/fallback kv lines
compile_stats() list[str] VCF record/allele/GT layout kv lines
log_level (read/write property) str runtime engine log level

Iterator (_ConsensusIter)

Member Returns
__iter__ / __next__ yields (idx, ConsensusResult); GIL released while blocking
next_batch(batch_size) list[(idx, ConsensusResult)] | None
next_batch_bytes(batch_size) list[(idx, bytes)] | None — lowest-overhead streaming

Module-level functions

from pyconsensus import build_tasks, build_cache, get_htslib_log_level, set_htslib_log_level

# Expand regions × samples × haplotypes into a flat task list (eager).
# A None (or empty) dimension collapses to one task with that field = None.
tasks = build_tasks(regions, samples=["NA12878", "NA12879"], haplotypes=["1pIu", "2pIu"])

# Pre-build .cvcf caches without loading the reference or building the engine.
results = build_cache(
    ["data/variants/chr1.vcf.gz", "data/variants/chr2.vcf.gz"],
    compile_threads=4, force=False,
)
for r in results:
    print(r.path, r.cache_path, r.status, r.records, r.samples, f"{r.cache_mb:.1f}MB")
    # r.status ∈ {"hit", "built", "rebuilt", "forced"}

# htslib log level (off/error/warn/info/debug/trace) — independent of engine log_level.
set_htslib_log_level("error")
print(get_htslib_log_level())

Data classes

Class Fields
Task chr, start, end, vcf_key, gene_id, sample, haplotype (all get/set)
ConsensusResult gene_id, sample, haplotype, seq (bytes|None), chain (str|None), error (str|None) — read-only
CacheResult path, cache_path, status, records, samples, cache_mb, elapsed_sec — read-only

License

MIT — see LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

pyconsensus_rs-0.1.0-cp310-abi3-manylinux_2_34_x86_64.whl (1.3 MB view details)

Uploaded CPython 3.10+manylinux: glibc 2.34+ x86-64

pyconsensus_rs-0.1.0-cp310-abi3-manylinux_2_28_x86_64.whl (1.3 MB view details)

Uploaded CPython 3.10+manylinux: glibc 2.28+ x86-64

File details

Details for the file pyconsensus_rs-0.1.0-cp310-abi3-manylinux_2_34_x86_64.whl.

File metadata

  • Download URL: pyconsensus_rs-0.1.0-cp310-abi3-manylinux_2_34_x86_64.whl
  • Upload date:
  • Size: 1.3 MB
  • Tags: CPython 3.10+, manylinux: glibc 2.34+ x86-64
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.16 {"installer":{"name":"uv","version":"0.11.16","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"22.04","id":"jammy","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for pyconsensus_rs-0.1.0-cp310-abi3-manylinux_2_34_x86_64.whl
Algorithm Hash digest
SHA256 6f7600e796049af4e892f963b1a370e9f6690a1c966d82fc96095c0f367166cd
MD5 cca72dddb1f98edd907fee7017eb3e4e
BLAKE2b-256 b8a68dc3849166385dc658b40cafd7a324b77c8a3e86335bbfacdf88d0068a03

See more details on using hashes here.

File details

Details for the file pyconsensus_rs-0.1.0-cp310-abi3-manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for pyconsensus_rs-0.1.0-cp310-abi3-manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 5c8920da3506fa1eee3c9427cb4392553c200da2087325efc1ccbe39f3e84eb8
MD5 0b6a941aea115d3be930f11a68eb8593
BLAKE2b-256 f04698c771d01654bf48dd893acbea20a41882c9a12a5054e3a4bbec13996816

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page