Skip to main content

A simple configuration manager with Pydantic and JSON export.

Project description

HowdenPipeline

A DAG-based async pipeline for processing PDF documents through configurable steps (parsing, LLM extraction, etc.) with built-in MLflow observability.

Overview

HowdenPipeline orchestrates multi-step document processing pipelines as directed acyclic graphs (DAGs). Each PDF is processed concurrently, with steps executing in topological order per file. Results are cached on disk, so unchanged steps are skipped on re-runs.

PDF files
   │
   ├── [Parser] ──► result.md
   │       │
   │       ├── [LLM: payment_date] ──► result.json
   │       └── [LLM: payment_info] ──► result.json
   │
   └── ... (all files processed concurrently)

Requirements

  • Python >= 3.12, < 3.14
  • uv (recommended)

Installation

uv sync

Usage

import asyncio
from pathlib import Path
from HowdenPipeline.flow.graph_pipeline import GraphPipeline
from HowdenPipeline.manager.tracker import Tracker

async def main():
    pipeline = GraphPipeline(
        pdf=Path("data/pdfs"),   # directory of subfolders, each containing one PDF
        delete_folder=True,       # clean step output folders before each run
        tracker=tracker,          # optional MLflow tracker
        parameter=parameter       # HowdenConfig parameter object
    )

    pipeline.add_step(parser, output_filetype="md")
    pipeline.add_step(payment_date, dependencies=[parser], output_filetype="json", track=True)
    pipeline.add_step(payment_info, dependencies=[parser], output_filetype="json", track=True)

    matches = await pipeline.execute()

asyncio.run(main())

add_step parameters

Parameter Type Description
step Any Callable step (sync or async). Receives input path, returns string result.
dependencies list Steps that must complete before this one.
output_filetype str File extension for cached result ("md", "json", etc.).
track bool If True, logs the result artifact to MLflow.
name str Override the display name for this step.
input_result Any Pass a specific result from an earlier step as secondary input.

Input folder structure

Each subfolder under the pdf root should contain one PDF and optionally a GT.json for accuracy evaluation:

data/pdfs/
├── claim_001/
│   ├── document.pdf
│   └── GT.json          # optional ground truth
├── claim_002/
│   └── document.pdf

Step output caching

Step results are written to disk alongside the input PDF. If a result file already exists, the step is skipped. To force re-execution, set delete_folder=True or delete the output folders manually.

data/pdfs/claim_001/
├── document.pdf
├── Parser/
│   ├── result.md
│   └── parameter.json
└── Parser/payment_info/
    ├── result.json
    └── parameter.json

MLflow Observability

When a Tracker is provided, the pipeline logs:

  • All pipeline parameters
  • Per-step timing (avg, min, max, total)
  • Step result artifacts
  • LLM token usage and model metadata
  • Accuracy against ground truth JSON files
  • Prompt templates with accuracy annotations

Start the MLflow UI:

mlflow ui --backend-store-uri sqlite:///mlflow.db --port 5000

To disable tracking, pass tracker=None (default).

Running Tests

uv run pytest
uv run pytest -v        # verbose
uv run pytest tests/test_async_script.py::test_files_run_concurrently  # single test

Project Structure

HowdenPipeline/
├── flow/
│   ├── graph_pipeline.py        # Main orchestrator — GraphPipeline class
│   ├── file_pipeline_runner.py  # Single-file execution with step traversal
│   ├── pipeline_graph_manager.py # DAG management with NetworkX
│   ├── match.py                 # Result dataclass (path, ground truth, file path)
│   └── parameter_serializer.py  # Serialization for step parameter logging
└── manager/
    ├── tracker.py               # MLflow / LangSmith logging abstraction
    └── jsonMatcher.py           # Accuracy comparison against ground truth
tests/
├── test_async_script.py         # Async concurrency tests
└── file_pipeline_runner_tests.py

Architecture Notes

  • Concurrency: All PDFs are processed in parallel via asyncio.gather. Steps within a single file are sequential (topological order).
  • Steps: Any callable — sync or async — that accepts a Path input and returns a str. The runner detects asyncio.iscoroutinefunction and awaits accordingly.
  • Graph copy: Each file gets its own copy of the DAG so step state (result paths on edges) does not bleed between files.
  • Git guard: When MLflow tracking is enabled, the pipeline will warn if there are uncommitted changes — ensuring experiment runs are tied to a clean git state.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

howdenpipeline-4.0.3.tar.gz (168.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

howdenpipeline-4.0.3-py3-none-any.whl (21.3 kB view details)

Uploaded Python 3

File details

Details for the file howdenpipeline-4.0.3.tar.gz.

File metadata

  • Download URL: howdenpipeline-4.0.3.tar.gz
  • Upload date:
  • Size: 168.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for howdenpipeline-4.0.3.tar.gz
Algorithm Hash digest
SHA256 89660f704a75294f45e8b679971ec20383afcab3c9ec9665a3098ce8d821ef4c
MD5 07fe323545925187d6f8f77325e43bf1
BLAKE2b-256 ec17d4831271a80e61c4f41ca3d437c3e756d0faf2cbab378d4a4e0cab2aef11

See more details on using hashes here.

File details

Details for the file howdenpipeline-4.0.3-py3-none-any.whl.

File metadata

  • Download URL: howdenpipeline-4.0.3-py3-none-any.whl
  • Upload date:
  • Size: 21.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for howdenpipeline-4.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 2936028c5a405a245d9ac42931620bbe15a563fc333c4fe72b20074cced4bf7b
MD5 d175ae69763c802e0c9a15a639eafa14
BLAKE2b-256 7a1d4151ebcdf29f3befcb74b892b0b15fef04b3cfb773eb85958c8e6e3c8050

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page