PRISMA 2020 systematic literature review agent powered by Pydantic AI
Project description
PRISMA Agent — Pydantic AI Systematic Review
A standalone, agent-based systematic literature review tool following PRISMA 2020 guidelines. Built with pydantic-ai for structured LLM interactions and typed outputs via OpenRouter.
Architecture
prisma-review-agent/
├── models.py # Pydantic v2 models (Article, Protocol, Evidence, GRADE, etc.)
├── clients.py # HTTP clients: PubMed (NCBI E-utilities), bioRxiv, SQLite cache
├── agents.py # 12 pydantic-ai agents with typed outputs + runner functions
├── evidence.py # Evidence extraction + source grounding validation gate
├── validation.py # Source grounding validator — rapidfuzz fuzzy matching
├── pipeline.py # Async orchestrator — 16-step PRISMA pipeline with cache
├── compare.py # Multi-model compare mode — parallel runs + consensus synthesis
├── export.py # Export: Markdown, JSON, BibTeX, CSV formats
├── main.py # Standalone CLI with argparse + interactive mode
└── prisma_review_agent/
└── cache/ # PostgreSQL cache sub-package
├── models.py # CacheEntry, SimilarityConfig, StoredArticle, PipelineCheckpoint
├── similarity.py # SHA-256 fingerprinting + weighted fuzzy scoring
├── store.py # CacheStore — async PostgreSQL CRUD
├── article_store.py # ArticleStore — article persistence + full-text search
├── skill.py # pydantic-ai CacheAgent with @agent.tool tools
├── admin.py # list/inspect/clear cache entries
└── migrations/001_initial.sql
Design Principles
- Agent-per-task: Each PRISMA step that requires LLM reasoning has a dedicated pydantic-ai
Agentwith a typedoutput_type. No raw string parsing — the LLM returns validated Pydantic models. - No hardcoded heuristics: Evidence extraction, screening, bias assessment, and synthesis are all handled by specialized LLM agents. No keyword lists or regex scoring.
- Source grounding: Every extracted evidence span is verified against its source article using rapidfuzz fuzzy matching before being included. Ungrounded spans are silently dropped.
- Typed throughout: Every data structure is a Pydantic
BaseModelwith validation. Structured outputs from agents are parsed and validated automatically by pydantic-ai. - PostgreSQL result cache: Reviews with ≥ 95% similar criteria are served from cache in seconds instead of minutes. All fetched articles are indexed for future source reuse.
- Parallel per-article processing: Steps 7–15 (T/A screening, FT screening, evidence extraction, data extraction, RoB, charting, appraisal, narrative rows) run with configurable
asyncioconcurrency in both the standard pipeline and compare mode. Two shared module-level helpers (_parallel_ta_screening,_parallel_ft_screeninginpipeline.py) are reused by both paths, keeping the parallelism logic in one place. At the default of 5 parallel LLM calls, a 100-article review that would take ~70 min sequentially completes in ~15 min. Set--concurrency 10for larger reviews. - Standalone: No web framework dependency. PostgreSQL is optional — the pipeline degrades gracefully without it.
Installation
From PyPI (recommended)
pip install prisma-review-agent
From source
git clone https://github.com/tekrajchhetri/prisma-review-agent.git
cd prisma-review-agent
python -m pip install uv
uv install
Quick Start
Set API Key
export OPENROUTER_API_KEY="sk-or-v1-..."
# Optional: higher PubMed rate limits (10 req/s vs 3 req/s)
export NCBI_API_KEY="your-ncbi-key"
Alternatively, pass the key inline with --api-key (takes precedence over the env var):
prisma-review --title "CRISPR gene therapy" --api-key "sk-or-v1-..."
CLI — installed package
After pip install prisma-review-agent the prisma-review command is available globally:
# Simple review
prisma-review \
--title "CRISPR gene therapy efficacy" \
--inclusion "Clinical trials, human subjects, English" \
--exclusion "Animal-only studies, reviews, commentaries"
# Full PICO specification
prisma-review \
--title "GLP-1 agonists for type 2 diabetes: a systematic review" \
--objective "Evaluate efficacy of GLP-1 RAs vs placebo for glycemic control" \
--population "Adults with type 2 diabetes mellitus" \
--intervention "GLP-1 receptor agonists" \
--comparison "Placebo or standard care" \
--outcome "HbA1c reduction, weight change, adverse events" \
--inclusion "RCTs, English, 2019-2024, peer-reviewed" \
--exclusion "Case reports, editorials, conference abstracts" \
--model "anthropic/claude-sonnet-4" \
--max-results 30 \
--hops 2 \
--rob-tool "RoB 2" \
--extract-data \
--concurrency 10 \
--export md json bib
# Interactive mode
prisma-review --interactive
CLI — from source (without installing)
python main.py --title "..." --interactive
Plan Confirmation (CLI)
By default, the pipeline pauses after generating the search strategy, shows the plan, and waits for your input before fetching any articles.
# Default — prompts for confirmation when running in a terminal
prisma-review \
--title "CRISPR gene therapy efficacy" \
--inclusion "Clinical trials, human subjects" \
--exclusion "Animal-only studies"
# Auto mode — no prompt (for scripts, CI, batch jobs)
prisma-review \
--title "CRISPR gene therapy efficacy" \
--auto \
--export ttl jsonld
# Limit re-generation attempts to 2
prisma-review \
--title "CRISPR gene therapy efficacy" \
--max-plan-iterations 2
Confirmation prompt:
══════════════════════════════════════════════════
Generated Search Plan (Iteration 1)
══════════════════════════════════════════════════
Research question: CRISPR gene therapy efficacy in clinical trials
PubMed queries (3):
1. CRISPR gene therapy clinical trials efficacy
2. CRISPR-Cas9 human trials outcomes
3. gene editing therapy safety efficacy RCT
MeSH terms: CRISPR-Cas Systems, Gene Therapy, Clinical Trials as Topic
Rationale: Focused on clinical evidence to match inclusion criteria...
══════════════════════════════════════════════════
Confirm plan? [yes / no / <feedback>]:
- Press Enter or type yes → proceed to article retrieval
- Type no → halt with rejection message and exit 1
- Type feedback (e.g.,
"add pediatric studies") → plan is re-generated with your input
Python API
import asyncio
from pathlib import Path
from prisma_review_agent import (
PRISMAReviewPipeline,
ReviewProtocol,
RoBTool,
to_markdown,
to_json,
)
protocol = ReviewProtocol(
title="Gut microbiome and depression",
objective="Examine the relationship between gut microbiota composition and depressive disorders",
pico_population="Adults with major depressive disorder",
pico_intervention="Gut microbiome profiling",
pico_comparison="Healthy controls",
pico_outcome="Microbiome diversity, specific taxa abundance",
inclusion_criteria="Human studies, English, 2018-2024",
exclusion_criteria="Animal studies, reviews, case reports",
max_hops=10,
rob_tool=RoBTool.NEWCASTLE_OTTAWA,
article_concurrency=10, # parallel LLM calls per article step (default: 5)
# Domain-specific charting questions — answered per included article and stored
# in DataChartingRubric.custom_fields (question text → extracted answer).
# Leave out entirely to use only the built-in sections A–G.
charting_questions=[
"What sequencing method was used (16S rRNA, shotgun metagenomics, or other)?",
"Which taxonomic level was the primary analysis performed at?",
"What alpha-diversity indices were reported (Shannon, Simpson, Chao1, …)?",
"Was the gut-brain axis or HPA axis explicitly discussed?",
"Were dietary intake data collected and reported?",
],
# Override the four default appraisal domain names for this review type.
# Unspecified positions (here: 3 and 4) keep their defaults.
appraisal_domains=[
"Participant Recruitment and Microbiome Sampling Quality",
"Sequencing and Bioinformatic Pipeline Quality",
],
)
async def run():
pipeline = PRISMAReviewPipeline(
api_key="sk-or-v1-...",
model_name="anthropic/claude-sonnet-4",
protocol=protocol,
max_per_query=25,
related_depth=1,
)
result = await pipeline.run()
# Export
Path("review.md").write_text(to_markdown(result))
Path("review.json").write_text(to_json(result))
# Access structured data
print(f"Included: {result.flow.included_synthesis} studies")
for article in result.included_articles:
rob = article.risk_of_bias.overall.value if article.risk_of_bias else "?"
print(f" [{article.pmid}] {article.authors} ({article.year}) — RoB: {rob}")
for span in result.evidence_spans[:5]:
print(f" Evidence [{span.paper_pmid}]: {span.text[:100]}...")
asyncio.run(run())
Python API — Plan Confirmation
Use the confirm_callback parameter to intercept the generated plan from any Python environment (scripts, Jupyter, web APIs) without any terminal dependency.
import asyncio
from prisma_review_agent.models import ReviewPlan, PlanRejectedError, MaxIterationsReachedError
from prisma_review_agent.pipeline import PRISMAReviewPipeline
from prisma_review_agent.models import ReviewProtocol
protocol = ReviewProtocol(
title="CRISPR gene therapy efficacy",
inclusion_criteria="Clinical trials, human subjects",
exclusion_criteria="Animal-only studies",
)
def confirm(plan: ReviewPlan) -> bool | str:
"""Inspect the plan and return True, False, or feedback text."""
print(f"Iteration {plan.iteration}: {len(plan.pubmed_queries)} PubMed queries")
for q in plan.pubmed_queries:
print(f" - {q}")
answer = input("Approve? [yes/no/feedback]: ").strip()
if answer.lower() in ("yes", "y", ""):
return True
if answer.lower() in ("no", "abort"):
return False
return answer # feedback string → triggers re-generation
async def main():
pipeline = PRISMAReviewPipeline(
api_key="sk-or-v1-...",
model_name="anthropic/claude-sonnet-4",
protocol=protocol,
)
try:
result = await pipeline.run(
confirm_callback=confirm,
max_plan_iterations=3,
)
print(f"Review complete: {result.flow.included_synthesis} studies included")
except PlanRejectedError as e:
print(f"Stopped: {e}")
except MaxIterationsReachedError as e:
print(f"Too many iterations: {e}")
asyncio.run(main())
Auto mode — skip confirmation entirely:
# No confirmation prompts; runs end-to-end
result = await pipeline.run(auto_confirm=True)
Multi-Model Compare Mode
Run the same protocol through two or more LLMs in parallel. Article acquisition (PubMed/bioRxiv search, deduplication) runs once; all LLM-dependent steps (screening, evidence extraction, data extraction, RoB, charting, appraisal, narrative) run independently per model using the same shared parallel helpers as the standard pipeline — steps 7–15 are fully parallel within each model's pipeline. Results are merged into a single CompareReviewResult with per-field agreement indicators and an LLM-generated consensus synthesis.
Plan confirmation and strategy revision in compare mode
The "Generated Search Plan" review/approve/revise gate works identically in compare mode. Article acquisition (search strategy generation, PubMed/bioRxiv search, deduplication) runs once and is shared across all models. The plan confirmation loop fires during that shared step — before any per-model LLM work begins.
CLI — plan prompt in compare mode
Omit --auto to see the prompt. The same three-way response applies:
prisma-review \
--title "CRISPR gene therapy efficacy" \
--inclusion "Clinical trials, human subjects" \
--exclusion "Animal-only studies" \
--compare-models anthropic/claude-sonnet-4 openai/gpt-4o
══════════════════════════════════════════════════
Generated Search Plan (Iteration 1)
══════════════════════════════════════════════════
Research question: CRISPR gene therapy efficacy in clinical trials
PubMed queries (3):
1. CRISPR gene therapy clinical trials efficacy
2. CRISPR-Cas9 human trials outcomes
3. gene editing therapy safety efficacy RCT
MeSH terms: CRISPR-Cas Systems, Gene Therapy, Clinical Trials as Topic
Rationale: Focused on clinical evidence to match inclusion criteria...
══════════════════════════════════════════════════
Confirm plan? [yes / no / <feedback>]:
- Press Enter or type yes → proceed; the approved strategy is used for the shared article fetch, then all models run in parallel
- Type no → halt with
PlanRejectedErrorbefore any search is executed - Type feedback (e.g.,
"add pediatric CRISPR trials and broaden to gene editing") → the search strategy is revised and the updated plan is shown again for re-approval (up to--max-plan-iterationsrounds)
Revised plan after feedback:
══════════════════════════════════════════════════
Generated Search Plan (Iteration 2)
══════════════════════════════════════════════════
PubMed queries (4):
1. CRISPR gene therapy pediatric clinical trials efficacy
2. CRISPR-Cas9 children adolescents outcomes
3. gene editing therapy sickle cell beta-thalassemia pediatric
4. base editing clinical trial safety efficacy
Rationale: Expanded to include pediatric subgroups and broader gene editing
approaches as requested...
══════════════════════════════════════════════════
Confirm plan? [yes / no / <feedback>]:
CLI — skip confirmation (unattended / CI)
prisma-review \
--title "CRISPR gene therapy efficacy" \
--compare-models anthropic/claude-sonnet-4 openai/gpt-4o \
--auto
Requires at least 2 models; up to 5 supported per run.
Python API — compare mode
import asyncio
from pathlib import Path
from prisma_review_agent import (
PRISMAReviewPipeline, ReviewProtocol,
to_compare_markdown, to_compare_json,
to_compare_charting_markdown, to_compare_charting_json,
)
from prisma_review_agent.models import ReviewPlan, PlanRejectedError, MaxIterationsReachedError
protocol = ReviewProtocol(
title="CRISPR gene therapy efficacy",
inclusion_criteria="Clinical trials, human subjects, English",
exclusion_criteria="Animal-only studies, reviews",
)
def confirm_and_revise(plan: ReviewPlan) -> bool | str:
"""Called once per iteration. Return True to approve, False to abort,
or a feedback string to revise the strategy and re-prompt."""
print(f"\n--- Search Plan (iteration {plan.iteration}) ---")
print(f"Research question: {plan.research_question}")
print(f"PubMed queries ({len(plan.pubmed_queries)}):")
for q in plan.pubmed_queries:
print(f" - {q}")
if plan.mesh_terms:
print(f"MeSH: {', '.join(plan.mesh_terms)}")
print(f"Rationale: {plan.rationale[:120]}...")
answer = input("Approve? [yes / no / feedback to revise]: ").strip()
if answer.lower() in ("", "yes", "y"):
return True # approved — proceed with shared article acquisition
if answer.lower() in ("no", "abort"):
return False # rejected — raises PlanRejectedError
return answer # feedback string — strategy is re-generated and callback fires again
async def run():
pipeline = PRISMAReviewPipeline(
api_key="sk-or-v1-...",
model_name="anthropic/claude-sonnet-4", # used for search strategy generation
protocol=protocol,
)
try:
compare_result = await pipeline.run_compare(
models=["anthropic/claude-sonnet-4", "openai/gpt-4o"],
auto_confirm=False, # enable plan review + revision
confirm_callback=confirm_and_revise, # same interface as pipeline.run()
max_plan_iterations=3, # max revision rounds (default 3)
consensus_model="anthropic/claude-sonnet-4",
assemble_timeout=3600.0,
)
except PlanRejectedError:
print("Review aborted — plan rejected by user.")
return
except MaxIterationsReachedError as e:
print(f"Stopped after {e.max_allowed} revision rounds without approval.")
return
# Per-model and merged exports
Path("compare.md").write_text(to_compare_markdown(compare_result))
Path("compare.json").write_text(to_compare_json(compare_result))
Path("charting_compare.md").write_text(to_compare_charting_markdown(compare_result))
# Access structured results
for run in compare_result.model_results:
if run.succeeded:
print(f"{run.model_name}: {len(run.result.included_articles or [])} included")
else:
print(f"{run.model_name}: FAILED — {run.error}")
print("\nConsensus:")
print(compare_result.merged.consensus_synthesis[:300])
print(f"\nDivergences: {len(compare_result.merged.synthesis_divergences)}")
for div in compare_result.merged.synthesis_divergences:
print(f" [{div.topic}]")
for model, pos in div.positions.items():
print(f" {model}: {pos[:80]}")
# Field-level agreement
agreed = sum(1 for fa in compare_result.merged.field_agreement.values() if fa.agreed)
total = len(compare_result.merged.field_agreement)
print(f"\nField agreement: {agreed}/{total} fields agreed")
asyncio.run(run())
CompareReviewResult structure
| Attribute | Type | Description |
|---|---|---|
compare_models |
list[str] |
Ordered list of model names used |
model_results |
list[ModelReviewRun] |
One entry per model; .succeeded / .result / .error |
merged.consensus_synthesis |
str |
LLM-generated prose summarising agreed findings |
merged.synthesis_divergences |
list[SynthesisDivergence] |
Per-topic disagreements with per-model positions |
merged.field_agreement |
dict[str, FieldAgreement] |
Key: "{source_id}::{section_key}::{field_name}" |
protocol |
ReviewProtocol |
Shared protocol used for all model runs |
Partial failures are handled gracefully: if one model fails, its ModelReviewRun has error set and result=None; the remaining models' results and the consensus synthesis (if ≥2 succeeded) are still returned.
Structured Report Output (result.prisma_review)
Every successful run with at least one included study produces a PrismaReview object on result.prisma_review. It is a complete, publication-ready PRISMA 2020 document with all major sections as typed Pydantic models.
import asyncio
from prisma_review_agent.models import ReviewProtocol
from prisma_review_agent.pipeline import PRISMAReviewPipeline
async def run():
protocol = ReviewProtocol(
title="Machine learning for ADHD diagnosis",
objective="Evaluate ML classifiers for ADHD detection from EEG signals",
inclusion_criteria="EEG studies, human subjects, ML classifier reported",
exclusion_criteria="Animal studies, reviews without primary data",
)
pipeline = PRISMAReviewPipeline(
api_key="sk-or-v1-...",
protocol=protocol,
enable_cache=False,
)
result = await pipeline.run(auto_confirm=True)
review = result.prisma_review
if review:
# Access structured sections
print(review.abstract.background)
print(review.abstract.conclusion)
print(f"{len(review.results.themes)} themes identified")
for theme in review.results.themes:
print(f" - {theme.theme_name}: {', '.join(theme.key_findings[:2])}")
print(review.conclusion.recommendations)
asyncio.run(run())
Per-study structured data:
review = result.prisma_review
if review and review.results.extracted_studies:
for study in review.results.extracted_studies:
print(f"[{study.metadata.source_id}] {study.metadata.title[:60]}")
print(f" Design: {study.design.study_design}")
print(f" Country: {study.design.country_or_region}")
print(f" Year: {study.metadata.year}")
Configurable rendering format:
Pass output_synthesis_style to control how results are rendered. Default is "paragraph"; also supports "question_answer", "bullet_list", "table".
result = await pipeline.run(
auto_confirm=True,
output_synthesis_style="question_answer",
)
review = result.prisma_review
for qa in (review.results.question_answer_summary or []):
print(f"Q: {qa.question}")
print(f"A: {qa.answer}\n")
Backward compatibility: All existing flat fields (result.synthesis_text, result.structured_abstract, result.introduction_text, result.conclusions_text) are automatically backfilled from the structured report and continue to work unchanged.
Per-Rubric Section Output Formats
Configure how each data charting section (A–G + custom) renders its answer. Five format types are supported: descriptive (default), yes_no, table, bullet_list, numeric. For table, bullet_list, and numeric sections a prose summary is also generated automatically.
Simple API — section_output_formats dict:
from prisma_review_agent.models import ReviewProtocol
protocol = ReviewProtocol(
title="Digital biomarkers for Parkinson's disease",
objective="Identify ML-based biomarkers from wearable sensor data",
inclusion_criteria="Wearable sensor studies, PD patients, ML classifier",
exclusion_criteria="Non-PD populations, no ML methods",
section_output_formats={
"Study Design": "table",
"Participants: Disordered Group": "yes_no",
"Features and Models": "bullet_list",
"Data Collection": "table",
},
)
result = await pipeline.run(auto_confirm=True)
# Access structured section outputs per study
for rubric in result.data_charting_rubrics:
for section_title, out in rubric.section_outputs.items():
print(f"[{rubric.source_id}] {section_title} ({out.format_used})")
print(out.formatted_answer)
if out.section_summary:
print(f" Summary: {out.section_summary}")
Full config API — custom titles, ordering, and formats:
from prisma_review_agent.models import ReviewProtocol, RubricSectionConfig
protocol = ReviewProtocol(
title="Emotion recognition from physiological signals",
objective="...",
inclusion_criteria="...",
exclusion_criteria="...",
rubric_section_config=[
RubricSectionConfig(section_key="F", section_name="ML Models & Performance", order=1, output_format="table"),
RubricSectionConfig(section_key="B", section_name="Study Design", order=2, output_format="table"),
RubricSectionConfig(section_key="C", section_name="Patient Cohort", order=3, output_format="yes_no"),
RubricSectionConfig(section_key="G", section_name="Key Findings", order=4, output_format="bullet_list"),
],
)
Export per-rubric outputs:
from prisma_review_agent.export import to_rubric_markdown, to_rubric_json
# Markdown: one heading per study, one sub-heading per section
Path("rubric_extraction.md").write_text(to_rubric_markdown(result))
# JSON: list of {source_id, title, sections: {title: {format_used, formatted_answer, section_summary}}}
Path("rubric_extraction.json").write_text(to_rubric_json(result))
The combined per-study outputs are also available on result.prisma_review.methods.data_extraction (one StudyDataExtractionReport per included study, sections in configured order).
Validation: Invalid format values raise ValueError at ReviewProtocol construction time. Unknown section names in section_output_formats log a UserWarning and are ignored. If the LLM cannot produce the requested format for a section it falls back to descriptive and logs a warning — formatted_answer is never empty.
Field-Level Charting & Appraisal Output
Configure per-field answer constraints (enumerated options, yes/no, free text, numeric) and a structured critical appraisal instrument with domain-level concern aggregation.
Zero-config — built-in defaults:
from prisma_review_agent import PRISMAReviewPipeline, ReviewProtocol
from prisma_review_agent.export import to_charting_markdown, to_charting_json, to_appraisal_markdown, to_appraisal_json
from pathlib import Path
protocol = ReviewProtocol(
title="Bio-acoustic ML in neurological disorders",
inclusion_criteria="...",
exclusion_criteria="...",
# charting_template and critical_appraisal_config default to built-in schemas
)
result = await PRISMAReviewPipeline(api_key="...", protocol=protocol).run(auto_confirm=True)
# Per-study field-level extraction
Path("charting.md").write_text(to_charting_markdown(result))
Path("charting.json").write_text(to_charting_json(result))
# Structured appraisal with cross-study summary
Path("appraisal.md").write_text(to_appraisal_markdown(result))
Path("appraisal.json").write_text(to_appraisal_json(result))
Access the structured data directly:
for study in result.prisma_review.methods.data_extraction:
print(f"\n=== {study.source_id} ===")
for section_key, section in study.field_answers.items():
print(f" {section.section_title}")
for fa in section.field_answers:
print(f" {fa.field_name}: {fa.value} [{fa.confidence}]")
for appraisal in result.prisma_review.methods.critical_appraisal_results:
print(f"\n=== {appraisal.source_id} ===")
for domain in appraisal.domains:
print(f" {domain.domain_name}: {domain.domain_concern}")
Customise a single field's options:
from prisma_review_agent.agents import default_charting_template
template = default_charting_template()
custom = template.override_field(
section_key="B",
field_name="Study Design",
options=["Cross-sectional", "Longitudinal", "Retrospective cohort", "Prospective cohort"],
)
protocol = ReviewProtocol(..., charting_template=custom)
Fully custom charting template:
from prisma_review_agent.models import ChartingTemplate, ChartingSection, FieldDefinition
template = ChartingTemplate(sections=[
ChartingSection(
section_key="1",
section_title="Study Overview",
fields=[
FieldDefinition(
field_name="Design",
description="Overall study design",
answer_type="enumerated",
options=["RCT", "Cohort", "Case-control", "Cross-sectional"],
),
FieldDefinition(field_name="Sample Size", description="Total N", answer_type="numeric"),
FieldDefinition(field_name="Country", description="Study country", answer_type="free_text"),
],
),
ChartingSection(
section_key="2",
section_title="Outcomes",
fields=[
FieldDefinition(
field_name="Primary Outcome Reported",
description="Was the primary outcome clearly reported?",
answer_type="yes_no_extended",
options=["Yes", "No", "Not Reported"],
),
FieldDefinition(
field_name="Key Results",
description="Headline result",
answer_type="free_text",
),
FieldDefinition(
field_name="Reviewer Assessment",
description="Qualitative assessment — filled by reviewer",
answer_type="free_text",
reviewer_only=True, # excluded from LLM extraction
),
],
),
])
protocol = ReviewProtocol(..., charting_template=template)
reviewer_only=True fields are excluded from the LLM prompt and rendered as [Human reviewer] in Markdown exports and {"value": null, "reviewer_only": true} in JSON exports.
Custom critical appraisal instrument:
from prisma_review_agent.models import CriticalAppraisalConfig, AppraisalDomainSpec, AppraisalItemSpec
config = CriticalAppraisalConfig(domains=[
AppraisalDomainSpec(
domain_name="Reporting Quality",
concern_aggregation_rule="majority_yes", # or "strict" / "lenient"
items=[
AppraisalItemSpec(
item_text="Were CONSORT/STROBE reporting guidelines followed?",
allowed_ratings=["Yes", "Partial", "No", "Not Reported"],
),
AppraisalItemSpec(
item_text="Was the primary outcome pre-registered?",
allowed_ratings=["Yes", "No", "N/A"],
),
],
),
])
protocol = ReviewProtocol(..., critical_appraisal_config=config)
domain_concern (Low / Some / High) is derived deterministically in Python from item ratings — it is never left to the LLM. The three aggregation rules:
| Rule | Low | Some | High |
|---|---|---|---|
majority_yes |
> 50% Yes | mixed | > 50% No / Not Reported |
strict |
all Yes | any Partial or one No | two or more No |
lenient |
any Yes | all Partial / mixed | all No / Not Reported |
Save and reload a template:
from pathlib import Path
from prisma_review_agent.models import ChartingTemplate
from prisma_review_agent.agents import default_charting_template
template = default_charting_template()
Path("my_template.json").write_text(template.model_dump_json(indent=2))
loaded = ChartingTemplate.model_validate_json(Path("my_template.json").read_text())
assert loaded == template # full round-trip fidelity
confirm_callback return value semantics:
| Return value | Meaning | Pipeline action |
|---|---|---|
True |
Plan approved | Continue to article retrieval |
False |
Plan rejected | Raise PlanRejectedError |
"" (empty string) |
Treated as approval | Continue to article retrieval |
"<feedback text>" |
Re-generate with feedback | Call agent again with feedback; increment iteration |
FastAPI Integration
The pipeline's confirm_callback and progress_callback hooks make it straightforward to build a live UI on top of FastAPI. Progress messages now carry structured per-article information (stage name, done/total/remaining counts) that the server parses into typed SSE events so the UI can render live progress bars, article cards, and stage indicators without any client-side string parsing.
Shared helpers (used by all patterns)
Put these in a shared module (e.g. shared.py) imported by every pattern below.
# shared.py
import os
import re
import json
from pydantic import BaseModel as PydanticBase, Field
# ── concurrency default ───────────────────────────────────────────────────────
def _default_concurrency() -> int:
"""2× logical CPUs, clamped to [8, 10]. Falls back to 8 in restricted containers."""
try:
return max(8, min((os.cpu_count() or 4) * 2, 10))
except Exception:
return 8
# ── request model ─────────────────────────────────────────────────────────────
class ReviewRequest(PydanticBase):
title: str
inclusion: str = ""
exclusion: str = ""
assemble_timeout: float = 3600.0
concurrency: int = Field(
default_factory=_default_concurrency,
ge=1, le=20,
description="Max concurrent LLM calls per article step. Auto-detected from CPU count (8–10).",
)
section_output_formats: dict[str, str] = {}
rubric_section_config: list[dict] = []
# ── progress message parser ───────────────────────────────────────────────────
# Matches: "✓ 28087124 [5/38 done, 33 remaining]" (completion line)
_RE_ARTICLE_DONE = re.compile(r"✓\s+(\S+)\s+\[(\d+)/(\d+) done,\s*(\d+) remaining\]")
# Matches: "Charting [3/38, 35 remaining] 28087124…" (start line)
_RE_ARTICLE_START = re.compile(r"\[(\d+)/(\d+),\s*(\d+) remaining\]\s+(\S+)")
# Matches: "[1/38] Some title…" (extraction / RoB start line)
_RE_IDX_TITLE = re.compile(r"\[(\d+)/(\d+)\]\s+(.+)")
# Stage keywords → canonical stage name
_STAGE_KEYWORDS = {
"Screening": "screening",
"Extracting evidence": "evidence_extraction",
"Extracting data from": "data_extraction",
"Assessing risk of bias": "risk_of_bias",
"Charting": "data_charting",
"Appraising": "critical_appraisal",
"Narrative": "narrative_synthesis",
"Synthesizing": "synthesis",
"Assessing bias": "bias_assessment",
"GRADE": "grade",
}
def parse_progress_message(msg: str, session: dict) -> dict:
"""Convert a raw pipeline progress string into a typed event dict.
Updates session["stage"], session["stage_done"], session["stage_total"],
and session["stage_remaining"] in place so /progress can serve a snapshot.
Returned dict always has a "type" key. Types:
log — generic informational line
stage_start — a new pipeline stage has begun
article_start — a single article started processing (non-blocking)
article_done — a single article finished; includes done/total/remaining
stage_done — all articles in the current stage finished (remaining == 0)
"""
stripped = msg.strip()
# ── article completion line ────────────────────────────────────────────────
m = _RE_ARTICLE_DONE.search(stripped)
if m:
pmid, done, total, remaining = m.group(1), int(m.group(2)), int(m.group(3)), int(m.group(4))
session["stage_done"] = done
session["stage_total"] = total
session["stage_remaining"] = remaining
event = {
"type": "article_done",
"pmid": pmid,
"done": done,
"total": total,
"remaining": remaining,
"stage": session.get("stage", ""),
"message": stripped,
}
if remaining == 0:
event["type"] = "stage_done"
return event
# ── article start line (Charting / Appraising / Narrative) ────────────────
m = _RE_ARTICLE_START.search(stripped)
if m:
idx, total, remaining, pmid = int(m.group(1)), int(m.group(2)), int(m.group(3)), m.group(4)
session["stage_total"] = total
session["stage_remaining"] = remaining
return {
"type": "article_start",
"pmid": pmid,
"index": idx,
"total": total,
"remaining": remaining,
"stage": session.get("stage", ""),
"message": stripped,
}
# ── extraction / RoB start line "[1/38] Title…" ───────────────────────────
m = _RE_IDX_TITLE.search(stripped)
if m:
idx, total, title = int(m.group(1)), int(m.group(2)), m.group(3)
session["stage_total"] = total
return {
"type": "article_start",
"index": idx,
"total": total,
"title": title[:80],
"stage": session.get("stage", ""),
"message": stripped,
}
# ── stage start line ──────────────────────────────────────────────────────
for keyword, stage_name in _STAGE_KEYWORDS.items():
if keyword in stripped:
session["stage"] = stage_name
session["stage_done"] = 0
session["stage_remaining"] = 0
# extract total if present: "Extracting data from 38 studies"
m_total = re.search(r"(\d+)\s+(?:studies|articles)", stripped)
if m_total:
session["stage_total"] = int(m_total.group(1))
return {
"type": "stage_start",
"stage": stage_name,
"total": session.get("stage_total", 0),
"message": stripped,
}
# ── generic log line ──────────────────────────────────────────────────────
return {"type": "log", "message": stripped}
Pattern 1 — Full session with plan confirmation, structured progress, and SSE
This is the recommended pattern for a production UI. It exposes:
POST /review/start— start a review, get asession_idGET /review/{id}/stream— SSE stream of typed eventsGET /review/{id}/progress— polling snapshot (alternative to SSE)GET /review/{id}/plan— retrieve the generated search planPOST /review/confirm— approve / reject / give feedback on the planGET /review/{id}/status— final result once complete
import asyncio
import json
import uuid
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel as PydanticBase
from prisma_review_agent.models import (
ReviewPlan, ReviewProtocol, PlanRejectedError, MaxIterationsReachedError,
RubricSectionConfig,
)
from prisma_review_agent.pipeline import PRISMAReviewPipeline
from shared import ReviewRequest, _default_concurrency, parse_progress_message
app = FastAPI()
_sessions: dict[str, dict] = {}
def _new_session() -> dict:
return {
"status": "starting", # starting | running | awaiting_confirmation
# | complete | rejected | error | timeout
"stage": None, # current pipeline stage name
"stage_total": 0, # articles in current stage
"stage_done": 0, # articles completed in current stage
"stage_remaining": 0, # articles still pending in current stage
"articles_included": 0, # running count of included articles
"plan": None, # ReviewPlan dict (set when awaiting confirmation)
"events": [], # structured event dicts (consumed by SSE)
"log": [], # raw progress strings (full audit trail)
"result": None, # PRISMAReviewResult dict once complete
"error": None,
# internal only — not serialised to clients
"_confirm_event": asyncio.Event(),
"_confirm_response": None,
}
class ConfirmRequest(PydanticBase):
session_id: str
response: str # "yes" | "no" | feedback text
# ── helpers ───────────────────────────────────────────────────────────────────
def _append_event(session: dict, msg: str) -> None:
"""Parse a progress message, update session state, and append a typed event."""
session["log"].append(msg)
event = parse_progress_message(msg, session)
session["events"].append(event)
# track running included count from flow summary lines
m_inc = __import__("re").search(r"Final included:\s*(\d+)", msg)
if m_inc:
session["articles_included"] = int(m_inc.group(1))
def _public_session(session: dict) -> dict:
"""Session fields safe to return to the client (no internal asyncio objects)."""
return {
"status": session["status"],
"stage": session["stage"],
"stage_total": session["stage_total"],
"stage_done": session["stage_done"],
"stage_remaining": session["stage_remaining"],
"articles_included": session["articles_included"],
"plan": session["plan"],
"result": session["result"],
"error": session["error"],
}
# ── endpoints ─────────────────────────────────────────────────────────────────
@app.post("/review/start")
async def start_review(req: ReviewRequest):
session_id = str(uuid.uuid4())
session = _new_session()
_sessions[session_id] = session
concurrency = min(req.concurrency, 10) # server-side hard cap
timeout = min(req.assemble_timeout, 7200.0)
rubric_cfg = [RubricSectionConfig(**c) for c in req.rubric_section_config]
protocol = ReviewProtocol(
title=req.title,
inclusion_criteria=req.inclusion,
exclusion_criteria=req.exclusion,
section_output_formats=req.section_output_formats,
rubric_section_config=rubric_cfg,
article_concurrency=concurrency,
)
pipeline = PRISMAReviewPipeline(
api_key="sk-or-v1-...",
model_name="anthropic/claude-sonnet-4",
protocol=protocol,
)
def confirm_callback(plan: ReviewPlan) -> bool | str:
session["plan"] = plan.model_dump()
session["status"] = "awaiting_confirmation"
session["_confirm_event"].clear()
_append_event(session, f"Plan ready — iteration {plan.iteration}")
asyncio.get_event_loop().run_until_complete(session["_confirm_event"].wait())
return session["_confirm_response"]
def progress_callback(msg: str) -> None:
session["status"] = "running"
_append_event(session, msg)
asyncio.create_task(_run(pipeline, session, confirm_callback, progress_callback, timeout))
return {"session_id": session_id, "concurrency": concurrency}
async def _run(pipeline, session, confirm_cb, progress_cb, timeout: float):
try:
result = await pipeline.run(
confirm_callback=confirm_cb,
progress_callback=progress_cb,
assemble_timeout=timeout,
)
session["result"] = result.model_dump(mode="json")
session["status"] = "complete"
session["events"].append({"type": "done", "status": "complete"})
except PlanRejectedError:
session["status"] = "rejected"
session["events"].append({"type": "done", "status": "rejected"})
except asyncio.TimeoutError:
session["status"] = "timeout"
session["error"] = f"Assembly exceeded {timeout:.0f}s"
session["events"].append({"type": "done", "status": "timeout"})
except MaxIterationsReachedError as e:
session["status"] = f"max_iterations"
session["error"] = str(e)
session["events"].append({"type": "done", "status": "max_iterations"})
except Exception as e:
session["status"] = "error"
session["error"] = str(e)
session["events"].append({"type": "done", "status": "error", "detail": str(e)})
@app.get("/review/{session_id}/stream")
async def stream_events(session_id: str):
"""SSE stream of typed events. Each event has a named type and a JSON data payload.
Event types emitted:
log — generic informational line {"message": "..."}
stage_start — new pipeline stage began {"stage": "data_charting", "total": 38, ...}
article_start — single article started {"pmid": "...", "index": 3, "total": 38, "remaining": 35, ...}
article_done — single article finished {"pmid": "...", "done": 3, "total": 38, "remaining": 35, ...}
stage_done — all articles in stage finished {"stage": "data_charting", "done": 38, ...}
plan_ready — search plan awaits confirmation {"plan": {...}}
done — pipeline finished (any outcome) {"status": "complete"|"error"|"rejected"|...}
"""
session = _sessions.get(session_id)
if not session:
raise HTTPException(404, "Session not found")
last_idx = 0
async def generator():
nonlocal last_idx
while True:
events = session["events"]
while last_idx < len(events):
ev = events[last_idx]
last_idx += 1
ev_type = ev.get("type", "log")
# emit plan_ready as a named SSE event so the browser can
# listen with addEventListener("plan_ready", ...)
if ev_type == "log" and session["status"] == "awaiting_confirmation":
yield f"event: plan_ready\ndata: {json.dumps({'plan': session['plan']})}\n\n"
else:
yield f"event: {ev_type}\ndata: {json.dumps(ev)}\n\n"
if ev_type == "done":
return
await asyncio.sleep(0.2)
return StreamingResponse(generator(), media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
@app.get("/review/{session_id}/progress")
async def get_progress(session_id: str):
"""Polling alternative to SSE — returns current stage snapshot and recent log lines."""
session = _sessions.get(session_id)
if not session:
raise HTTPException(404, "Session not found")
snap = _public_session(session)
snap["recent_log"] = session["log"][-20:] # last 20 raw lines for debugging
return snap
@app.get("/review/{session_id}/plan")
async def get_plan(session_id: str):
"""Returns the generated search plan when status == 'awaiting_confirmation'."""
session = _sessions.get(session_id)
if not session:
raise HTTPException(404, "Session not found")
if session["status"] == "awaiting_confirmation" and session["plan"]:
return {"status": "awaiting_confirmation", "plan": session["plan"]}
return {"status": session["status"]}
@app.post("/review/confirm")
async def confirm_plan(req: ConfirmRequest):
"""Approve, reject, or give feedback on the search plan."""
session = _sessions.get(req.session_id)
if not session:
raise HTTPException(404, "Session not found")
r = req.response.strip()
if r.lower() in ("yes", "y", ""):
session["_confirm_response"] = True
elif r.lower() in ("no", "abort"):
session["_confirm_response"] = False
else:
session["_confirm_response"] = r # feedback → plan re-generation
session["_confirm_event"].set()
return {"status": "acknowledged"}
@app.get("/review/{session_id}/status")
async def get_status(session_id: str):
"""Returns the full result once status == 'complete'."""
session = _sessions.get(session_id)
if not session:
raise HTTPException(404, "Session not found")
if session["status"] == "timeout":
raise HTTPException(504, session.get("error", "Assembly timed out"))
return _public_session(session)
Pattern 2 — JavaScript / TypeScript client
Consume the SSE stream and drive a live progress UI. Paste this into any framework (React, Vue, plain JS).
// reviewStream.ts
export type EventType =
| "log" | "stage_start" | "article_start" | "article_done"
| "stage_done" | "plan_ready" | "done";
export interface ProgressEvent {
type: EventType;
message?: string;
stage?: string;
pmid?: string;
index?: number;
done?: number;
total?: number;
remaining?: number;
plan?: Record<string, unknown>;
status?: string;
detail?: string;
}
export interface ProgressState {
status: string;
stage: string | null;
stageTotal: number;
stageDone: number;
stageRemaining: number;
articlesIncluded: number;
log: string[];
}
export function connectReviewStream(
sessionId: string,
onEvent: (ev: ProgressEvent, state: ProgressState) => void,
onDone: (status: string) => void,
): () => void {
const state: ProgressState = {
status: "running",
stage: null,
stageTotal: 0,
stageDone: 0,
stageRemaining: 0,
articlesIncluded: 0,
log: [],
};
const es = new EventSource(`/review/${sessionId}/stream`);
const handle = (type: EventType) => (raw: MessageEvent) => {
const ev: ProgressEvent = { ...JSON.parse(raw.data), type };
// keep local state in sync
if (type === "stage_start") {
state.stage = ev.stage ?? state.stage;
state.stageTotal = ev.total ?? 0;
state.stageDone = 0;
state.stageRemaining = ev.total ?? 0;
}
if (type === "article_done" || type === "stage_done") {
state.stageDone = ev.done ?? state.stageDone;
state.stageRemaining = ev.remaining ?? 0;
}
if (type === "log" && ev.message) {
state.log.push(ev.message);
if (state.log.length > 200) state.log.shift();
// parse "Final included: N" from log
const m = ev.message.match(/Final included:\s*(\d+)/);
if (m) state.articlesIncluded = parseInt(m[1], 10);
}
if (type === "done") {
state.status = ev.status ?? "done";
es.close();
onDone(state.status);
return;
}
onEvent(ev, { ...state });
};
// one listener per event type
(["log","stage_start","article_start","article_done","stage_done","plan_ready","done"] as EventType[])
.forEach(t => es.addEventListener(t, handle(t) as EventListener));
es.onerror = () => {
state.status = "error";
es.close();
onDone("error");
};
return () => es.close(); // call to disconnect early
}
Usage in a React component:
import { useEffect, useState } from "react";
import { connectReviewStream, ProgressState, ProgressEvent } from "./reviewStream";
export function ReviewProgress({ sessionId }: { sessionId: string }) {
const [state, setState] = useState<ProgressState | null>(null);
const [events, setEvents] = useState<ProgressEvent[]>([]);
useEffect(() => {
const disconnect = connectReviewStream(
sessionId,
(ev, snap) => {
setState({ ...snap });
setEvents(prev => [...prev.slice(-100), ev]); // keep last 100
},
(finalStatus) => console.log("done:", finalStatus),
);
return disconnect;
}, [sessionId]);
if (!state) return <p>Connecting…</p>;
const pct = state.stageTotal > 0
? Math.round((state.stageDone / state.stageTotal) * 100)
: 0;
return (
<div>
<p>Status: {state.status} · Stage: {state.stage ?? "—"}</p>
<p>Articles included so far: {state.articlesIncluded}</p>
{/* progress bar */}
{state.stageTotal > 0 && (
<div style={{ background: "#eee", borderRadius: 4, height: 8, width: "100%" }}>
<div style={{ background: "#4f46e5", width: `${pct}%`, height: "100%", borderRadius: 4 }} />
</div>
)}
<p>{state.stageDone}/{state.stageTotal} done · {state.stageRemaining} remaining</p>
{/* live log */}
<ul style={{ fontFamily: "monospace", fontSize: 12 }}>
{state.log.slice(-15).map((line, i) => <li key={i}>{line}</li>)}
</ul>
{/* article cards for done events */}
{events.filter(e => e.type === "article_done").slice(-5).map((e, i) => (
<div key={i} style={{ border: "1px solid #ccc", padding: 6, marginTop: 4 }}>
✓ {e.pmid}
<span style={{ color: "#888" }}>
[{e.done}/{e.total}, {e.remaining} remaining]
</span>
</div>
))}
</div>
);
}
Pattern 3 — Polling fallback (no SSE)
For environments where SSE is unavailable (some proxies, load balancers), poll /progress every 2 seconds instead:
async function pollProgress(sessionId: string, onUpdate: (snap: object) => void) {
while (true) {
const res = await fetch(`/review/${sessionId}/progress`);
const snap = await res.json();
onUpdate(snap);
if (["complete","error","rejected","timeout"].includes(snap.status)) break;
await new Promise(r => setTimeout(r, 2000));
}
}
/progress returns:
{
"status": "running",
"stage": "data_charting",
"stage_total": 38,
"stage_done": 12,
"stage_remaining": 26,
"articles_included": 38,
"plan": null,
"result": null,
"error": null,
"recent_log": [
" Charting [11/38, 27 remaining] 28087124…",
" ✓ Charted 28087124 [12/38 done, 26 remaining]",
" Charting [13/38, 25 remaining] 36175756…"
]
}
Key points:
parse_progress_messageruns server-side — the UI receives clean typed events and never parses strings.stage_remainingcounts down to 0 as parallel article tasks complete; the UI can use it to drive a live countdown or progress bar for each stage.X-Accel-Buffering: noon the SSE response header is required when running behind nginx so it does not buffer the stream.- The in-memory
_sessionsdict works for single-process development. In production use Redis pub/sub for SSE fan-out and a persistent store for session state. asyncio.get_event_loop().run_until_complete(event.wait())inconfirm_callbackworks in a single-threaded asyncio loop; if the pipeline runs in a thread pool, useloop.call_soon_threadsafe(event.set)instead.- Server-side cap
min(req.concurrency, 10)prevents an untrusted caller from flooding the LLM API. _default_concurrency()usesos.cpu_count() * 2, clamped to 8–10, with a hard fallback of 8 whencpu_count()returnsNone(common in Docker with restricted cgroups).
Suggested UI for the Plan Confirmation Phase
Inspired by research review tools (see design reference in the project), the plan confirmation screen should feel like a structured "contract" the user approves before the pipeline does any expensive work. Suggested layout (following the KSynth-style design):
flowchart TB
subgraph Screen["Plan Confirmation — Protocol Tab"]
direction TB
Nav["Protocol ← selected · Progress · Synthesis · PRISMA Flow · Export"]
subgraph Plan["Generated Search Plan — Iteration 1"]
direction TB
RQ["Research Question\nCRISPR gene therapy efficacy in clinical trials"]
subgraph QBox["Queries — editable before approval"]
direction LR
PQ["PubMed × 3\n1. CRISPR gene therapy clinical trials\n2. CRISPR-Cas9 human trials outcomes\n3. gene editing therapy safety RCT"]
BQ["bioRxiv × 2\n1. CRISPR Cas9 gene editing safety\n2. CRISPR therapy clinical outcomes preprint"]
end
MeSH["MeSH pills · CRISPR-Cas Systems · Gene Therapy · Clinical Trials as RCTs"]
KC["Key concepts · efficacy · safety · clinical trial · gene editing"]
RT["Rationale: Focused on clinical evidence matching inclusion criteria..."]
FB["Feedback optional: Add pediatric studies, extend date range..."]
end
subgraph Actions["Actions"]
direction LR
B1["✗ Reject"] --- B2["↻ Regenerate"] --- B3["✓ Approve →"]
end
end
Nav --> Plan --> Actions
RQ --> QBox --> MeSH --> KC --> RT --> FB
Key UX decisions:
- Plan appears inline in the "Progress" tab (not a modal) — so the user can scroll up to review the protocol they entered before approving
- Queries are editable before approval — send edited queries back as feedback text via
confirm_callback - MeSH terms and key concepts render as pill badges (matching the "Charting Questions" style from the screenshot)
- Feedback textarea is pre-populated with
""and only sent if non-empty; empty submit ="yes" - Reject posts
response: "no"and redirects to the project list - Regenerate posts the feedback text; the plan card replaces itself with the new iteration
- Approve posts
response: "yes"and transitions the Progress tab to the live SSE log view
Progress tab after approval (SSE stream view):
flowchart TB
subgraph Screen["Progress Tab — Live SSE View"]
direction TB
Hdr["Running · 0 included so far Cancel"]
subgraph Log["Pipeline Log"]
direction TB
S1["✓ Plan approved — Iteration 1"]
S2["✓ Searching PubMed — 3 queries sent"]
S3["✓ 47 records retrieved"]
S4["✓ Deduplication — 6 duplicates removed"]
S5["⟳ Screening title/abstract — 41 records in progress"]
subgraph Cards["Per-article decisions streamed live"]
direction LR
C1["PMID 33283989\n✓ Include\nCRISPR-Cas9 for SCD trial"]
C2["PMID 38661449\n✓ Include\nExagamglogene Autotemcel"]
C3["PMID 29301234\n✗ Exclude\nAnimal model only"]
end
end
end
Hdr --> Log
S1 --> S2 --> S3 --> S4 --> S5 --> Cards
This mirrors the "Running · 0 included" sidebar state in the KSynth screenshot and the evidence card grid in the Evidence tab.
Enhanced Output Formats
The PRISMA Agent now includes comprehensive structured outputs for systematic review documentation:
Data Charting Rubric (CSV)
Structured extraction of study characteristics across 7 sections (A-G):
- Section A: Publication Information (title, authors, year, journal, DOI, database)
- Section B: Study Design (goals, design type, sample size, tasks, settings)
- Section C: Disordered Group Participants (diagnosis, assessment, demographics)
- Section D: Healthy Controls (inclusion, matching criteria)
- Section E: Data Collection (data types, tasks, equipment, datasets)
- Section F: Features & Models (feature types, algorithms, performance metrics)
- Section G: Synthesis (key findings, limitations, future directions)
PRISMA Narrative Rows (CSV)
Condensed 6-cell summary format derived from charting data:
- Study design/sample/dataset
- Methods (feature extraction, modeling, validation)
- Outcomes (key performance results + findings)
- Key limitations
- Relevance notes
- Review-specific questions
Critical Appraisal Rubric (CSV)
Quality assessment across 4 domains:
- Domain 1: Participant & Sample Quality (5 items)
- Domain 2: Data Collection Quality (3 items)
- Domain 3: Feature & Model Quality (5 items)
- Domain 4: Bias & Transparency (4 items)
Each domain includes item-level ratings (Yes/Partial/No/Not Reported/N/A) and overall concern (Low/Some/High).
Enhanced Markdown
Professional systematic literature review brief with HTML styling, figures, and comprehensive documentation including:
- Executive Summary with key findings and statistics
- Background & Rationale with PICO framework
- Detailed Methods with eligibility criteria tables and search strategies
- Comprehensive Results with PRISMA flow diagrams, study characteristics, and visual data representations
- Discussion with implications for practice and research
- Conclusions with key takeaways
- References in academic format
- Detailed Appendices with data charting rubrics, critical appraisal results, and evidence spans
The enhanced format produces publication-ready SLR briefs with professional styling, color-coded sections, and visual elements suitable for stakeholder presentations and academic publications.
Export Options
# Default enhanced format
prisma-review --title "..." --export enhanced_md
# All structured formats
prisma-review --title "..." --export enhanced_md charting_csv narrative_csv appraisal_csv
# Individual formats
prisma-review --title "..." --export charting narrative appraisal json
# Compare-mode exports (after running with --compare-models)
prisma-review --title "..." --compare-models anthropic/claude-sonnet-4 openai/gpt-4o \
--auto --export md json
# RDF / Linked Data formats
prisma-review --title "..." --export ttl # Turtle RDF
prisma-review --title "..." --export jsonld # JSON-LD
prisma-review --title "..." --export ttl jsonld md # all three together
# Persist a queryable pyoxigraph store
prisma-review --title "..." --export ttl --rdf-store-path review.ttl
RDF / Linked Data Export
Export results as RDF using the SLR Ontology (v0.2.0). The Turtle and JSON-LD files are self-contained linked-data documents that can be loaded into any SPARQL endpoint (Apache Jena, Oxigraph, Blazegraph, etc.) or processed with standard RDF tools.
Namespace prefixes used:
| Prefix | URI |
|---|---|
slr: |
https://w3id.org/slr-ontology/ |
prov: |
http://www.w3.org/ns/prov# |
dcterms: |
http://purl.org/dc/terms/ |
fabio: |
http://purl.org/spar/fabio/ |
bibo: |
http://purl.org/ontology/bibo/ |
oa: |
http://www.w3.org/ns/oa# |
xsd: |
http://www.w3.org/2001/XMLSchema# |
Python API:
from prisma_review_agent.export import to_turtle, to_jsonld
turtle_str = to_turtle(result)
jsonld_str = to_jsonld(result)
Pyoxigraph SPARQL Store
For in-process SPARQL queries, load the result directly into a pyoxigraph store:
from prisma_review_agent.export import to_oxigraph_store
store = to_oxigraph_store(result)
# Find all included sources
rows = store.query("""
PREFIX slr: <https://w3id.org/slr-ontology/>
PREFIX dcterms: <http://purl.org/dc/terms/>
SELECT ?src ?title WHERE {
?src a slr:IncludedSource ;
dcterms:title ?title .
}
""")
# Check provenance timestamp
rows = store.query("""
PREFIX prov: <http://www.w3.org/ns/prov#>
SELECT ?review ?t WHERE { ?review prov:generatedAtTime ?t }
""")
# Save store to disk for later re-use
store.save("review_store.ttl")
Or from the CLI — pass --rdf-store-path to write the store after export:
prisma-review --title "..." --export ttl --rdf-store-path review_store.ttl
Note: The system processes ALL studies that pass screening criteria through complete data charting and critical appraisal. There are no artificial limits on corpus size — from small pilot reviews (5-10 studies) to comprehensive systematic reviews (50+ studies).
Performance
Steps 7–15 are fully parallelised with asyncio in both the standard pipeline and compare mode. Multiple articles are processed concurrently, bounded by a semaphore so you never exceed your LLM provider's rate limit. T/A screening (step 7) and FT screening (step 9) share the same helper functions across both execution paths, so tuning --concurrency applies uniformly everywhere.
In compare mode, per-model pipelines run concurrently with each other and each pipeline internally parallelises all article-level steps — so the combined speedup compounds.
Expected speedup
| Articles included | Sequential | Concurrency 5 | Concurrency 10 |
|---|---|---|---|
| 38 | ~70 min | ~15 min | ~8 min |
| 100 | ~3 h | ~40 min | ~20 min |
| 1 000 + 10 citation hops | ~15 h | ~3 h | ~1.5 h |
Times are approximate and depend on model latency and API rate limits. Compare-mode runs see an additional multiplier because each model pipeline is itself fully parallel.
Tuning concurrency
CLI:
# Moderate parallelism (default) — safe for most OpenRouter tiers
prisma-review --title "..." --concurrency 5
# High parallelism — use if your API tier supports it
prisma-review --title "..." --concurrency 10
# Sequential (debugging / strict rate-limit compliance)
prisma-review --title "..." --concurrency 1
Python API:
protocol = ReviewProtocol(
title="...",
inclusion_criteria="...",
exclusion_criteria="...",
article_concurrency=10, # 1–20; default 5
)
Guidance:
- Default (5) — good starting point; respects most provider rate limits.
- 10 — recommended for large reviews (100+ included articles) when your OpenRouter tier allows higher throughput.
- 1 — fully sequential; useful for debugging or very strict rate-limit environments.
- Values above 10 rarely improve wall-clock time because the bottleneck shifts to LLM latency rather than throughput.
Pipeline Steps (17-step Enhanced PRISMA)
| Step | Agent | Output Type | Description |
|---|---|---|---|
| 1. Search Strategy | search_strategy_agent |
SearchStrategy |
Generates PubMed + bioRxiv queries from protocol |
| 2. PubMed Search | — (HTTP) | list[Article] |
E-utilities esearch + efetch |
| 3. bioRxiv Search | — (HTTP) | list[Article] |
bioRxiv API keyword matching |
| 4. Related Articles | — (HTTP) | list[str] |
elink neighbor_score |
| 5. Citation Hops | — (HTTP) | list[Article] |
Forward (cited-by) + backward navigation |
| 6. Deduplication | — (logic) | list[Article] |
DOI/PMID dedup |
| 7. Title/Abstract Screening ⚡ | screening_agent |
ScreeningBatchResult |
LLM batch screening (inclusive) — parallel batches of 15 |
| 8. Full-text Retrieval | — (HTTP) | dict[str, str] |
PMC efetch |
| 9. Full-text Screening ⚡ | screening_agent |
ScreeningBatchResult |
LLM batch screening (strict) — parallel batches of 10 |
| 10. Evidence Extraction ⚡ | evidence_extraction_agent |
BatchEvidenceExtraction |
LLM identifies claims + evidence spans — parallel batches of 5 |
| 11. Data Extraction ⚡ | data_extraction_agent |
StudyDataExtraction |
Per-study structured data — fully parallel |
| 12. Risk of Bias ⚡ | rob_agent |
RiskOfBiasResult |
Per-study RoB 2 / ROBINS-I / NOS — fully parallel |
| 13. Data Charting ⚡ | data_charting_agent |
DataChartingRubric |
Structured charting across 7 sections (A-G) — fully parallel |
| 14. Critical Appraisal ⚡ | critical_appraisal_agent |
CriticalAppraisalRubric |
Quality assessment across 4 domains — fully parallel |
| 15. Narrative Rows ⚡ | narrative_row_agent |
PRISMANarrativeRow |
Condensed 6-cell summary format — fully parallel |
| 16. Synthesis | synthesis_agent |
str |
Grounded narrative with PMID citations |
| 17. Bias + GRADE | bias_summary_agent + grade_agent |
str + GRADEAssessment |
Parallel assessment |
| 18. Limitations | limitations_agent |
str |
Review limitations section |
⚡ = runs with asyncio concurrency bounded by article_concurrency (default 5, set via --concurrency). Steps 7 and 9 use shared helpers _parallel_ta_screening / _parallel_ft_screening reused by both the standard pipeline and compare mode.
Agents Reference
Agent Architecture
Each agent is defined as a module-level pydantic_ai.Agent with:
- Typed output: Pydantic model that the LLM must conform to
- System prompt: Static instructions + dynamic context from
RunContext[AgentDeps] - Deferred model:
defer_model_check=True— model is provided at runtime viabuild_model() - Dependencies:
AgentDepsdataclass carrying protocol + API credentials
from agents import AgentDeps, build_model, rob_agent
from models import ReviewProtocol
deps = AgentDeps(
protocol=ReviewProtocol(title="..."),
api_key="sk-or-v1-...",
model_name="anthropic/claude-sonnet-4",
)
model = build_model(deps.api_key, deps.model_name)
# Run directly
result = await rob_agent.run(
"Title: ...\nAbstract: ...",
deps=deps,
model=model,
)
rob: RiskOfBiasResult = result.output
print(rob.overall) # RoBJudgment.LOW
Selecting a Model
Pass any OpenRouter model ID via --model on the CLI or the model_name argument in Python.
CLI
# Claude Sonnet 4 (default)
prisma-review --title "..." --model anthropic/claude-sonnet-4
# Gemini 2.5 Pro
prisma-review --title "..." --model google/gemini-2.5-pro
# GPT-4o
prisma-review --title "..." --model openai/gpt-4o
# DeepSeek (cost-effective)
prisma-review --title "..." --model deepseek/deepseek-chat
Python API
pipeline = PRISMAReviewPipeline(
api_key="sk-or-v1-...",
model_name="google/gemini-2.5-pro", # ← change here
protocol=protocol,
)
Interactive mode — prompts you to type a model name at startup:
prisma-review --interactive
# Enter model ID when prompted, or press Enter for the default
Supported Models (via OpenRouter)
Any model available on OpenRouter works. Tested with:
| Model | ID | Notes |
|---|---|---|
| Claude Sonnet 4 | anthropic/claude-sonnet-4 |
Best balance of quality/speed |
| Claude Haiku 4 | anthropic/claude-haiku-4 |
Faster, good for screening |
| Gemini 2.5 Pro | google/gemini-2.5-pro |
Good structured output |
| Gemini 2.5 Flash | google/gemini-2.5-flash |
Fast; uses text fallback for charting/appraisal |
| GPT-4o | openai/gpt-4o |
Strong general performance |
| DeepSeek Chat | deepseek/deepseek-chat |
Cost-effective |
| Llama 3.1 70B | meta-llama/llama-3.1-70b-instruct |
Open-source option |
Schema compatibility note: Google Gemini models reject structured-output schemas with many optional properties (HTTP 400 "too much branching"). The charting and critical appraisal steps automatically detect this error and retry in text mode — the model returns JSON as plain text, which is then parsed into the same data model. All other pipeline steps are unaffected. No configuration is needed; the fallback is transparent.
Data Models
Core Models
| Model | Purpose |
|---|---|
Article |
Research article with metadata, full text, RoB, extracted data |
EvidenceSpan |
Single evidence sentence with source, claim label, relevance score |
ReviewProtocol |
Full PRISMA protocol: PICO, criteria, databases, registration |
PRISMAFlowCounts |
PRISMA flow diagram counts for all stages |
PRISMAReviewResult |
Complete review result with all outputs |
LLM Output Models
| Model | Used By | Description |
|---|---|---|
SearchStrategy |
search_strategy_agent | PubMed/bioRxiv queries, MeSH terms |
ScreeningBatchResult |
screening_agent | Batch of include/exclude decisions |
RiskOfBiasResult |
rob_agent | Per-domain RoB with overall judgment |
StudyDataExtraction |
data_extraction_agent | Study design, findings, effect measures |
GRADEAssessment |
grade_agent | GRADE domains + overall certainty |
BatchEvidenceExtraction |
evidence_extraction_agent | Evidence spans per article |
Export Formats
Markdown
Full PRISMA 2020 structured report with:
- Abstract, Introduction (rationale + PICO), Methods (criteria, search strategy, selection, RoB)
- Results (flow table, study characteristics, synthesis, RoB, GRADE)
- Discussion (limitations), Other Information (registration, funding)
- References, Appendix (evidence spans)
JSON
Complete PRISMAReviewResult serialized via model_dump_json().
BibTeX
Standard @article{} entries for all included studies.
Caching
HTTP Cache (SQLite)
SQLite cache (prisma_agent_cache.db) stores raw HTTP responses with a 72-hour TTL:
- PubMed search results
- Article metadata and full text
- Related article links
- bioRxiv search results
Disable with --no-cache or enable_cache=False.
Review Result Cache (PostgreSQL)
When --pg-dsn is provided, completed review results are cached in PostgreSQL. On subsequent runs with ≥ 95% similar criteria (configurable), the full result is served from cache in seconds rather than minutes.
# Run with PostgreSQL cache
prisma-review \
--title "GLP-1 agonists for type 2 diabetes" \
--inclusion "RCTs, English, 2019-2024" \
--pg-dsn "postgresql://user:pass@localhost/prisma_db" \
--cache-threshold 0.95 \
--export md
# Force a fresh run (bypass cache)
prisma-review --title "..." --pg-dsn "..." --force-refresh
Setup — run the migration once before first use:
psql "$PRISMA_PG_DSN" -f prisma_review_agent/cache/migrations/001_initial.sql
Or set the DSN via environment variable:
export PRISMA_PG_DSN="postgresql://user:pass@localhost/prisma_db"
prisma-review --title "..."
The Markdown export includes a cache banner when a result is served from cache:
⚡ Served from cache (similarity 97.3%) — matched: *GLP-1 agonists for type 2 diabetes*
Article Store (PostgreSQL)
All fetched articles are persisted to the article_store table (same PostgreSQL connection). Full-text content is indexed with a GIN/tsvector index for fast retrieval. On subsequent runs, stored full text is used as the primary source before falling back to live PubMed fetch — reducing API calls and improving reproducibility.
Iterative Large-Review Processing (PostgreSQL)
For reviews with hundreds of included articles, the pipeline automatically processes each stage in batches and checkpoints results to a pipeline_checkpoints table after every batch. If the process crashes or times out, re-running with the same review_id resumes from the last completed batch rather than restarting from scratch.
Setup — run the migration once:
psql "$PRISMA_PG_DSN" -f prisma_review_agent/cache/migrations/003_add_pipeline_checkpoints.sql
CLI:
# Run a large review with a stable review ID so it can be resumed
prisma-review \
--title "CRISPR gene editing: systematic review" \
--pg-dsn "postgresql://user:pass@localhost/prisma_db" \
--review-id "crispr-2026-001" \
--synthesis-batch-size 20
# If interrupted, re-run the same command — completed batches are skipped automatically
prisma-review --title "..." --pg-dsn "..." --review-id "crispr-2026-001"
Python API:
protocol = ReviewProtocol(
title="CRISPR gene editing: systematic review",
pg_dsn="postgresql://user:pass@localhost/prisma_db",
review_id="crispr-2026-001", # stable ID enables resume
synthesis_batch_size=20, # articles per synthesis chunk (default: 20)
max_batch_retries=3, # retries per failed batch (default: 3)
)
result = await pipeline.run(protocol)
# Re-run with same review_id → completed stages are skipped
result = await pipeline.run(protocol)
# Force a complete re-run
protocol.force_refresh = True
result = await pipeline.run(protocol)
How it works:
- Each pipeline stage (screening, charting, RoB, appraisal, narrative, synthesis) writes per-batch results to
pipeline_checkpointskeyed by(review_id, stage_name, batch_index). - Synthesis is split into chunks of
synthesis_batch_sizearticles. If there is more than one chunk, a dedicated merge agent combines the partial syntheses into a single coherent output — replacing the previous hardcoded top-20 limit. CacheStore.load_completed_stages(review_id)returns all stages where every batch iscomplete; the pipeline skips those stages on startup.BatchMaxRetriesErroris raised if a batch exceedsmax_batch_retriesconsecutive failures.- When
pg_dsnis not set, checkpointing is silently skipped and the pipeline runs as before.
CLI Reference
prisma-review [OPTIONS]
Protocol:
--title, -t Review title / research question
--objective Detailed objective
--population PICO: Population
--intervention PICO: Intervention
--comparison PICO: Comparison
--outcome PICO: Outcome
--inclusion Inclusion criteria
--exclusion Exclusion criteria
--registration PROSPERO registration number
Search:
--model, -m OpenRouter model (default: anthropic/claude-sonnet-4)
--databases Databases to search (default: PubMed bioRxiv)
--max-results Max results per query (default: 20)
--related-depth Related article depth (default: 1)
--hops Citation hop depth 0-4 (default: 1)
--biorxiv-days bioRxiv lookback days (default: 180)
--date-start Start date YYYY-MM-DD
--date-end End date YYYY-MM-DD
--rob-tool RoB 2 | ROBINS-I | Newcastle-Ottawa Scale
Pipeline:
--no-cache Disable SQLite cache
--extract-data Enable per-study data extraction
--auto Skip plan confirmation; run end-to-end without prompts
--max-plan-iterations Max plan re-generation attempts before aborting (default: 3)
--concurrency N Max concurrent LLM calls per article step (default: 5, max: 20)
Cache (PostgreSQL):
--pg-dsn PostgreSQL DSN (or set PRISMA_PG_DSN env var)
--force-refresh Bypass cache and run fresh pipeline
--cache-threshold Similarity threshold for cache hit (default: 0.95)
--cache-ttl-days Cache entry TTL in days; 0=never expire (default: 30)
Output:
--export, -e Export formats: md json bib ttl jsonld (default: md)
--rdf-store-path Save pyoxigraph RDF store to this Turtle file path
--interactive, -i Interactive protocol setup
Extending
Add a New Agent
- Define the output model in
models.py:
class MyOutput(BaseModel):
field: str
score: float
- Create the agent in
agents.py:
my_agent = Agent(
output_type=MyOutput,
deps_type=AgentDeps,
system_prompt="...",
defer_model_check=True,
name="my_agent",
)
async def run_my_agent(data: str, deps: AgentDeps) -> MyOutput:
model = build_model(deps.api_key, deps.model_name)
result = await my_agent.run(data, deps=deps, model=model)
return result.output
- Integrate into
pipeline.py.
Add a New Data Source
- Create a client class in
clients.pyfollowing thePubMedClientpattern. - Add it to
PRISMAReviewPipeline.__init__(). - Add a search step in
pipeline.py.
Running E2E Tests
The tests/e2e/ suite exercises the full review workflow through both the CLI and the Python API. All mock tests use pydantic-ai TestModel — no API key required.
# All e2e tests (mock mode — no API key needed)
pytest tests/e2e/ -v
# CLI tests only
pytest tests/e2e/test_cli_e2e.py -v
# Python API tests only
pytest tests/e2e/test_python_api_e2e.py -v
# Export format tests (requires tests/fixtures/minimal_review_result.json)
pytest tests/e2e/test_export_validation.py -v
# Full real-API smoke tests
export OPENROUTER_API_KEY="sk-..."
export RUN_E2E=1
pytest tests/e2e/ -m smoke -v
Build the export fixture (once, then commit):
export OPENROUTER_API_KEY="sk-..."
python scripts/build_e2e_fixture.py
See specs/011-e2e-review-tests/quickstart.md for full details.
Dependencies
| Package | Version | Purpose |
|---|---|---|
pydantic-ai |
>=1.0 | Agent framework with typed outputs |
pydantic |
>=2.0 | Data validation and serialization |
httpx |
>=0.25 | Async-capable HTTP client |
psycopg[async] |
>=3.1 | Async PostgreSQL driver (optional) |
psycopg-pool |
>=3.1 | Async connection pooling (optional) |
rapidfuzz |
>=3.0 | Fuzzy string matching for cache similarity + source grounding |
rdflib |
>=6.0 | RDF graph construction and Turtle / JSON-LD serialization |
pyoxigraph |
>=0.3 | Fast in-process SPARQL store for queryable RDF output |
License
Apache 2.0 — see LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file prisma_review_agent-0.2.7.tar.gz.
File metadata
- Download URL: prisma_review_agent-0.2.7.tar.gz
- Upload date:
- Size: 2.9 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.30 {"installer":{"name":"uv","version":"0.9.30","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
300d106c7c4b93132a3493f224e96d201dc8e4b8a6ad897e5567074e6b62bb8b
|
|
| MD5 |
46ee2a92a35a50489f955696a8ee702e
|
|
| BLAKE2b-256 |
11233d74f1098b61ac2764f9c9f1d237024d70ee3d2d08e7059bac3621f14641
|
File details
Details for the file prisma_review_agent-0.2.7-py3-none-any.whl.
File metadata
- Download URL: prisma_review_agent-0.2.7-py3-none-any.whl
- Upload date:
- Size: 180.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.30 {"installer":{"name":"uv","version":"0.9.30","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5f8ff0508439f5996ff3088a08822764c5edab1969d54f6231d1a8349e6478f4
|
|
| MD5 |
9efc7e3071fd424cd427b96d1d3d1f77
|
|
| BLAKE2b-256 |
ff73a7219cfac842dba1acfbcb63507db22902b6c22915adaf1ee18dcbb8fd34
|