Skip to main content

Type-safe, fluent MongoDB aggregation pipeline builder

Project description

mongo-pipebuilder

PyPI version Python 3.9+ License: MIT Code style: black Test Coverage

Type-safe, fluent MongoDB aggregation pipeline builder for Python.

Overview

mongo-pipebuilder provides a clean, type-safe way to build MongoDB aggregation pipelines using the Builder Pattern with a fluent interface for maximum readability and safety.

Features

  • Type-safe: Full type hints support with IDE autocomplete
  • Fluent interface: Chain methods for readable, maintainable code
  • Zero dependencies: Pure Python, lightweight package
  • Extensible: Easy to add custom stages via add_stage()
  • Well tested: Comprehensive test suite with 96%+ coverage

Installation

pip install mongo-pipebuilder

Quick Start

from mongo_pipebuilder import PipelineBuilder

# Build a pipeline
pipeline = (
    PipelineBuilder()
    .match({"status": "active"})
    .lookup(
        from_collection="users",
        local_field="userId",
        foreign_field="_id",
        as_field="user"
    )
    .project({"name": 1, "user.email": 1})
    .sort({"name": 1})
    .limit(10)
    .build()
)

# Use with pymongo
from pymongo import MongoClient
client = MongoClient()
collection = client.db.my_collection
results = collection.aggregate(pipeline)

API Reference

PipelineBuilder

Main class for building aggregation pipelines.

Methods

match(conditions: Dict[str, Any]) -> Self

Adds a $match stage to filter documents.

.match({"status": "active", "age": {"$gte": 18}})
match_expr(expr: Dict[str, Any]) -> Self

Adds a $match stage with an $expr condition (expression-based filter; useful for comparing fields or using variables from let in subpipelines).

.match_expr({"$eq": ["$id", "$$teamId"]})
.match_expr({"$and": [{"$gte": ["$field", "$other"]}, {"$lte": ["$score", 100]}]})
lookup(from_collection: str, local_field: str, foreign_field: str, as_field: str, pipeline: Optional[List[Dict[str, Any]]] = None) -> Self

Adds a $lookup stage to join with another collection.

.lookup(
    from_collection="users",
    local_field="userId",
    foreign_field="_id",
    as_field="user",
    pipeline=[{"$match": {"active": True}}]  # Optional nested pipeline
)
lookup_let(from_collection: str, let: Dict[str, Any], pipeline: Union[List[Dict[str, Any]], PipelineBuilder], as_field: str) -> Self

Adds a $lookup stage with let and pipeline (join by expression; variables from the current document are available in the subpipeline as $$var). Use this when the join condition is an expression (e.g. $expr) rather than equality of two fields.

# With list of stages
.lookup_let(
    from_collection="teams",
    let={"teamId": "$idTeam"},
    pipeline=[
        {"$match": {"$expr": {"$eq": ["$_id", "$$teamId"]}}},
        {"$project": {"name": 1, "_id": 0}}
    ],
    as_field="team"
)

# With PipelineBuilder for the subpipeline (optionally using match_expr)
sub = PipelineBuilder().match_expr({"$eq": ["$_id", "$$teamId"]}).project({"name": 1, "_id": 0})
.lookup_let("teams", {"teamId": "$idTeam"}, sub, as_field="team")
union_with(coll: str, pipeline: Optional[Union[List[Dict[str, Any]], PipelineBuilder]] = None) -> Self

Adds a $unionWith stage to combine documents from the current pipeline with documents from another collection. Optionally runs a subpipeline on the other collection before merging.

# Union with another collection (no subpipeline)
.union_with("other_coll")

# With subpipeline as list of stages
.union_with("logs", [{"$match": {"level": "error"}}, {"$limit": 100}])

# With PipelineBuilder for the subpipeline
sub = PipelineBuilder().match({"source": "individual"}).project({"name": 1})
.union_with("sso_individual_statistics", sub)
lookup_hybrid(from_collection: str, as_field: str, local_field: Optional[str] = None, foreign_field: Optional[str] = None, let: Optional[Dict[str, Any]] = None, pipeline: Optional[Union[List[Dict[str, Any]], PipelineBuilder]] = None) -> Self

Adds a combined $lookup stage for hybrid join cases where localField/foreignField is used together with let and pipeline.

Rules:

  • local_field and foreign_field must be provided together.
  • let requires pipeline.
  • pipeline requires let.
  • Empty pipeline is not allowed.
.lookup_hybrid(
    from_collection="sso_matches",
    as_field="match",
    local_field="idMatch",
    foreign_field="id",
    let={"local_season_id": "$$season_id", "local_tournament_id": "$$tournament_id"},
    pipeline=[
        {"$match": {"$expr": {"$and": [
            {"$eq": ["$$local_season_id", "$idSeason"]},
            {"$eq": ["$$local_tournament_id", "$idTournament"]}
        ]}}}
    ],
)
add_fields(fields: Dict[str, Any]) -> Self

Adds a $addFields stage to add or modify fields.

.add_fields({"fullName": {"$concat": ["$firstName", " ", "$lastName"]}})
project(fields: Dict[str, Any]) -> Self

Adds a $project stage to reshape documents.

.project({"name": 1, "email": 1, "_id": 0})
group(group_by: Dict[str, Any], accumulators: Dict[str, Any]) -> Self

Adds a $group stage to group documents.

.group(
    group_by={"category": "$category"},
    accumulators={"total": {"$sum": "$amount"}}
)
unwind(path: str, preserve_null_and_empty_arrays: bool = False, include_array_index: Optional[str] = None) -> Self

Adds a $unwind stage to deconstruct arrays.

.unwind("tags", preserve_null_and_empty_arrays=True)
.unwind("items", include_array_index="itemIndex")
sort(fields: Dict[str, int]) -> Self

Adds a $sort stage.

.sort({"createdAt": -1, "name": 1})
limit(limit: int) -> Self

Adds a $limit stage.

.limit(10)
skip(skip: int) -> Self

Adds a $skip stage.

.skip(20)
unset(fields: Union[str, List[str]]) -> Self

Adds a $unset stage to remove fields from documents.

.unset("temp_field")
.unset(["field1", "field2", "field3"])
replace_root(new_root: Dict[str, Any]) -> Self

Adds a $replaceRoot stage to replace the root document.

.replace_root({"newRoot": "$embedded"})
.replace_root({"newRoot": {"$mergeObjects": ["$doc1", "$doc2"]}})
replace_with(replacement: Any) -> Self

Adds a $replaceWith stage (alias for $replaceRoot in MongoDB 4.2+).

.replace_with("$embedded")
.replace_with({"$mergeObjects": ["$doc1", "$doc2"]})
facet(facets: Dict[str, List[Dict[str, Any]]]) -> Self

Adds a $facet stage for parallel execution of multiple sub-pipelines.

.facet({
    "items": [{"$skip": 10}, {"$limit": 20}],
    "meta": [{"$count": "total"}]
})
count(field_name: str = "count") -> Self

Adds a $count stage to count documents.

.match({"status": "active"}).count("active_count")
set_field(fields: Dict[str, Any]) -> Self

Adds a $set stage (alias for $addFields in MongoDB 3.4+).

.set_field({"status": "active", "updatedAt": "$$NOW"})
add_stage(stage: Dict[str, Any]) -> Self

Adds a custom stage for advanced use cases.

.add_stage({"$facet": {
    "categories": [{"$group": {"_id": "$category"}}],
    "total": [{"$count": "count"}]
}})
add_stages(stages: Iterable[Dict[str, Any]]) -> Self

Adds multiple stages at once (e.g. a subpipeline from another builder). Empty dicts are skipped. Useful to avoid loops when inserting a ready-made list of stages.

# From a list
.add_stages([{"$match": {"level": "error"}}, {"$limit": 100}])

# From another builder
sub = PipelineBuilder().match({"source": "api"}).project({"name": 1})
.add_stages(sub.build())
prepend(stage: Dict[str, Any]) -> Self

Adds a stage at the beginning of the pipeline.

builder.match({"status": "active"})
builder.prepend({"$match": {"deleted": False}})
# Pipeline: [{"$match": {"deleted": False}}, {"$match": {"status": "active"}}]
insert_at(position: int, stage: Dict[str, Any]) -> Self

Inserts a stage at a specific position (0-based index) in the pipeline.

builder.match({"status": "active"}).group("$category", {"count": {"$sum": 1}})
builder.insert_at(1, {"$sort": {"name": 1}})
# Pipeline: [{"$match": {...}}, {"$sort": {...}}, {"$group": {...}}]

Note: For inserting before a specific stage type, combine with get_stage_types():

stage_types = builder.get_stage_types()
group_index = stage_types.index("$group")
builder.insert_at(group_index, {"$addFields": {"x": 1}})
copy() -> PipelineBuilder

Creates an independent copy of the builder with current stages. Useful for creating immutable variants and composing pipelines.

builder1 = PipelineBuilder().match({"status": "active"})
builder2 = builder1.copy()
builder2.limit(10)

# Original unchanged
assert len(builder1) == 1
assert len(builder2) == 2

See Composing and Reusing Pipelines for practical examples.

validate() -> bool

Validates the pipeline before execution. Checks that:

  • Pipeline is not empty
  • $out and $merge stages are the last stages (critical MongoDB rule)
  • $out and $merge are not used together
builder = PipelineBuilder()
builder.match({"status": "active"}).validate()  # Returns True

# Invalid: $out not last
builder.add_stage({"$out": "output"}).match({"status": "active"})
builder.validate()  # Raises ValueError: $out stage must be the last stage
get_stage_at(index: int) -> Dict[str, Any]

Gets a specific stage from the pipeline by index. Returns a copy of the stage.

builder = PipelineBuilder()
builder.match({"status": "active"}).limit(10)
stage = builder.get_stage_at(0)  # Returns {"$match": {"status": "active"}}
pretty_print(indent: int = 2, ensure_ascii: bool = False) -> str

Returns a formatted JSON string representation of the pipeline. Useful for debugging.

builder = PipelineBuilder()
builder.match({"status": "active"}).limit(10)
print(builder.pretty_print())
# [
#   {
#     "$match": {
#       "status": "active"
#     }
#   },
#   {
#     "$limit": 10
#   }
# ]
pretty_print_stage(stage: Union[int, Dict[str, Any]], indent: int = 2, ensure_ascii: bool = False) -> str

Returns a formatted JSON string representation of a single stage (by index or by dict).

builder = PipelineBuilder().match({"status": "active"}).limit(10)
print(builder.pretty_print_stage(0))  # Prints the $match stage
to_json_file(filepath: Union[str, Path], indent: int = 2, ensure_ascii: bool = False, metadata: Optional[Dict[str, Any]] = None) -> None

Saves the pipeline to a JSON file. Useful for debugging, comparison, or versioning.

builder = PipelineBuilder()
builder.match({"status": "active"}).limit(10)

# Basic usage
builder.to_json_file("debug_pipeline.json")

# With metadata
builder.to_json_file(
    "pipeline.json",
    metadata={"version": "1.0", "author": "developer"}
)
compare_with(other: PipelineBuilder, context_lines: int = 3) -> str

Returns a unified diff between two pipelines (useful for comparing “new” builder pipelines vs legacy/template pipelines).

legacy = PipelineBuilder().match({"status": "active"}).limit(10)
new = PipelineBuilder().match({"status": "inactive"}).limit(10)

print(new.compare_with(legacy))
build() -> List[Dict[str, Any]]

Returns the complete pipeline as a list of stage dictionaries.

Examples

Complex Pipeline with Nested Lookup

pipeline = (
    PipelineBuilder()
    .match({"status": "published"})
    .lookup(
        from_collection="authors",
        local_field="authorId",
        foreign_field="_id",
        as_field="author"
    )
    .unwind("author", preserve_null_and_empty_arrays=True)
    .lookup(
        from_collection="categories",
        local_field="categoryId",
        foreign_field="_id",
        as_field="category",
        pipeline=[
            {"$match": {"active": True}},
            {"$project": {"name": 1, "slug": 1}}
        ]
    )
    .unwind("category")
    .add_fields({
        "authorName": "$author.name",
        "categoryName": "$category.name"
    })
    .project({
        "title": 1,
        "authorName": 1,
        "categoryName": 1,
        "publishedAt": 1
    })
    .sort({"publishedAt": -1})
    .limit(20)
    .build()
)

Lookup by expression (lookup_let)

When the join condition is an expression (e.g. $expr) rather than matching two fields, use lookup_let. The subpipeline can be built with match_expr():

sub = (
    PipelineBuilder()
    .match_expr({"$eq": ["$_id", "$$teamId"]})
    .project({"name": 1, "slug": 1, "_id": 0})
)
pipeline = (
    PipelineBuilder()
    .match({"status": "active"})
    .lookup_let(
        from_collection="teams",
        let={"teamId": "$idTeam"},
        pipeline=sub,
        as_field="team"
    )
    .unwind("team", preserve_null_and_empty_arrays=True)
    .project({"title": 1, "teamName": "$team.name"})
    .build()
)

Hybrid lookup migration (raw stage -> lookup_hybrid)

For complex $lookup that combines localField/foreignField with let and pipeline, you can replace raw add_stage({"$lookup": ...}) with lookup_hybrid(...):

subpipeline = (
    PipelineBuilder()
    .match({"idPlayer": player_id, "idMatch": {"$ne": None}})
    .lookup_hybrid(
        from_collection="sso_matches",
        as_field="match",
        local_field="idMatch",
        foreign_field="id",
        let={
            "local_season_id": "$$season_id",
            "local_tournament_id": "$$tournament_id",
        },
        pipeline=[
            {
                "$match": {
                    "$expr": {
                        "$and": [
                            {"$eq": ["$$local_season_id", "$idSeason"]},
                            {"$eq": ["$$local_tournament_id", "$idTournament"]},
                        ]
                    }
                }
            }
        ],
    )
    .unwind("$match")
    .limit(1)
)

Hybrid lookup with PipelineBuilder subpipeline

You can also build the hybrid $lookup subpipeline with PipelineBuilder and pass it directly:

match_sub = (
    PipelineBuilder()
    .match_expr({"$and": [
        {"$eq": ["$$local_season_id", "$idSeason"]},
        {"$eq": ["$$local_tournament_id", "$idTournament"]},
    ]})
    .project({"idSeason": 1, "idTournament": 1, "_id": 0})
)

pipeline = (
    PipelineBuilder()
    .lookup_hybrid(
        from_collection="sso_matches",
        as_field="match",
        local_field="idMatch",
        foreign_field="id",
        let={
            "local_season_id": "$$season_id",
            "local_tournament_id": "$$tournament_id",
        },
        pipeline=match_sub,
    )
    .build()
)

Aggregation with Grouping

pipeline = (
    PipelineBuilder()
    .match({"date": {"$gte": "2024-01-01"}})
    .group(
        group_by={"month": {"$dateToString": {"format": "%Y-%m", "date": "$date"}}},
        accumulators={
            "totalSales": {"$sum": "$amount"},
            "avgAmount": {"$avg": "$amount"},
            "count": {"$sum": 1}
        }
    )
    .sort({"month": 1})
    .build()
)

Composing and Reusing Pipelines

The copy() method allows you to create immutable variants of pipelines, enabling safe composition and reuse. This is useful when you need to:

  • Create multiple variants from a base pipeline
  • Compose pipelines functionally
  • Cache base pipelines safely
  • Pass pipelines to functions without side effects

Example: Building Multiple Variants from a Base Pipeline

from mongo_pipebuilder import PipelineBuilder

# Base pipeline with common filtering and joining
base_pipeline = (
    PipelineBuilder()
    .match({"status": "published", "deleted": False})
    .lookup(
        from_collection="authors",
        local_field="authorId",
        foreign_field="_id",
        as_field="author"
    )
    .unwind("author", preserve_null_and_empty_arrays=True)
    .project({
        "title": 1,
        "authorName": "$author.name",
        "publishedAt": 1
    })
)

# Create variants with different sorting and limits
recent_posts = base_pipeline.copy().sort({"publishedAt": -1}).limit(10).build()
popular_posts = base_pipeline.copy().sort({"views": -1}).limit(5).build()
author_posts = base_pipeline.copy().match({"authorName": "John Doe"}).build()

# Base pipeline remains unchanged
assert len(base_pipeline) == 4  # Still has 4 stages

Example: Functional Composition Pattern

def add_pagination(builder, page: int, page_size: int = 10):
    """Add pagination to a pipeline."""
    return builder.copy().skip(page * page_size).limit(page_size)

def add_sorting(builder, sort_field: str, ascending: bool = True):
    """Add sorting to a pipeline."""
    return builder.copy().sort({sort_field: 1 if ascending else -1})

# Compose pipelines functionally
base = PipelineBuilder().match({"status": "active"})

# Create different variants
page1 = add_pagination(add_sorting(base, "createdAt"), page=0)
page2 = add_pagination(add_sorting(base, "createdAt"), page=1)
sorted_by_name = add_sorting(base, "name", ascending=True)

# All variants are independent
assert len(base) == 1  # Base unchanged
assert len(page1) == 3  # match + sort + skip + limit

Example: Caching Base Pipelines

from functools import lru_cache

@lru_cache(maxsize=100)
def get_base_pipeline(user_id: str):
    """Cache base pipeline for a user."""
    return (
        PipelineBuilder()
        .match({"userId": user_id, "status": "active"})
        .lookup(
            from_collection="profiles",
            local_field="userId",
            foreign_field="_id",
            as_field="profile"
        )
    )

# Reuse cached base pipeline with different modifications
user_id = "12345"
base = get_base_pipeline(user_id)

# Create multiple queries from cached base
recent = base.copy().sort({"createdAt": -1}).limit(10).build()
by_category = base.copy().match({"category": "tech"}).build()
with_stats = base.copy().group("$category", {"count": {"$sum": 1}}).build()

# Base pipeline is safely cached and reused

Best Practices

Array _id after $group: prefer $arrayElemAt and materialize fields

If you use $group with an array _id (e.g. ["_idSeason", "_idTournament"]), avoid relying on $_id later in the pipeline. Instead, extract elements with $arrayElemAt and store them into explicit fields, then use those fields in subsequent stages.

pipeline = (
    PipelineBuilder()
    .group(
        group_by=["$idSeason", "$idTournament"],
        accumulators={"idTeams": {"$addToSet": "$idTeam"}},
    )
    .project({
        "idSeason": {"$arrayElemAt": ["$_id", 0]},
        "idTournament": {"$arrayElemAt": ["$_id", 1]},
        "idTeams": 1,
        # Optional: preserve array _id explicitly if you really need it later
        # "_id": "$_id",
    })
    .build()
)

This pattern reduces surprises and helps avoid errors like: $first's argument must be an array, but is object.

Example: Pipeline Factories

class PipelineFactory:
    """Factory for creating common pipeline patterns."""
    
    @staticmethod
    def base_article_pipeline():
        """Base pipeline for articles."""
        return (
            PipelineBuilder()
            .match({"status": "published"})
            .lookup(
                from_collection="authors",
                local_field="authorId",
                foreign_field="_id",
                as_field="author"
            )
        )
    
    @staticmethod
    def with_author_filter(builder, author_name: str):
        """Add author filter to pipeline."""
        return builder.copy().match({"author.name": author_name})
    
    @staticmethod
    def with_date_range(builder, start_date: str, end_date: str):
        """Add date range filter to pipeline."""
        return builder.copy().match({
            "publishedAt": {"$gte": start_date, "$lte": end_date}
        })

# Usage
base = PipelineFactory.base_article_pipeline()
johns_articles = PipelineFactory.with_author_filter(base, "John Doe")
recent_johns = PipelineFactory.with_date_range(
    johns_articles, 
    start_date="2024-01-01",
    end_date="2024-12-31"
).sort({"publishedAt": -1}).limit(10).build()

Key Benefits:

  • Safe reuse: Base pipelines remain unchanged
  • Functional composition: Build pipelines from smaller parts
  • Caching friendly: Base pipelines can be safely cached
  • No side effects: Functions can safely modify copies
  • Thread-safe: Multiple threads can use copies independently

Development

Project Structure

mongo-pipebuilder/
├── src/
│   └── mongo_pipebuilder/
│       ├── __init__.py
│       └── builder.py
├── tests/
│   └── test_builder.py
├── examples/
│   └── examples.py
├── pyproject.toml
├── README.md
└── LICENSE

Running Tests

pytest tests/

Contributing

See DEVELOPMENT.md for development guidelines.

License

MIT License - see LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mongo_pipebuilder-0.6.0.tar.gz (36.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mongo_pipebuilder-0.6.0-py3-none-any.whl (17.4 kB view details)

Uploaded Python 3

File details

Details for the file mongo_pipebuilder-0.6.0.tar.gz.

File metadata

  • Download URL: mongo_pipebuilder-0.6.0.tar.gz
  • Upload date:
  • Size: 36.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.2

File hashes

Hashes for mongo_pipebuilder-0.6.0.tar.gz
Algorithm Hash digest
SHA256 cec2d82fb600a54a78777c533b7d3b65e5faa556aa9b2ed8b3ce10538fe70580
MD5 7e67d7946295b3a01f286a0e0ef7c795
BLAKE2b-256 b8cedd92e41aa57ec4de4e0cdbbab57499a8e7af2d02d86d9412a11896e0c20a

See more details on using hashes here.

File details

Details for the file mongo_pipebuilder-0.6.0-py3-none-any.whl.

File metadata

File hashes

Hashes for mongo_pipebuilder-0.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 bcfa0013f7578cd4bd127598a5df94cb1a7aca25b87b0224d70880f297e79ea0
MD5 dbee1f9ce7c17ab987abd007c77a069c
BLAKE2b-256 ff7c65806813e479e00bffe03d31a9d21b11cb9463b80ecad1284c01bb591bbe

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page