Skip to main content

Type-safe, fluent MongoDB aggregation pipeline builder

Project description

mongo-pipebuilder

Type-safe, fluent MongoDB aggregation pipeline builder for Python.

Overview

mongo-pipebuilder provides a clean, type-safe way to build MongoDB aggregation pipelines using the Builder Pattern with a fluent interface for maximum readability and safety.

Features

  • Type-safe: Full type hints support with IDE autocomplete
  • Fluent interface: Chain methods for readable, maintainable code
  • Zero dependencies: Pure Python, lightweight package
  • Extensible: Easy to add custom stages via add_stage()
  • Well tested: Comprehensive test suite with 96%+ coverage

Installation

pip install mongo-pipebuilder

Quick Start

from mongo_pipebuilder import PipelineBuilder

# Build a pipeline
pipeline = (
    PipelineBuilder()
    .match({"status": "active"})
    .lookup(
        from_collection="users",
        local_field="userId",
        foreign_field="_id",
        as_field="user"
    )
    .project({"name": 1, "user.email": 1})
    .sort({"name": 1})
    .limit(10)
    .build()
)

# Use with pymongo
from pymongo import MongoClient
client = MongoClient()
collection = client.db.my_collection
results = collection.aggregate(pipeline)

API Reference

PipelineBuilder

Main class for building aggregation pipelines.

Methods

match(conditions: Dict[str, Any]) -> Self

Adds a $match stage to filter documents.

.match({"status": "active", "age": {"$gte": 18}})
lookup(from_collection: str, local_field: str, foreign_field: str, as_field: str, pipeline: Optional[List[Dict[str, Any]]] = None) -> Self

Adds a $lookup stage to join with another collection.

.lookup(
    from_collection="users",
    local_field="userId",
    foreign_field="_id",
    as_field="user",
    pipeline=[{"$match": {"active": True}}]  # Optional nested pipeline
)
add_fields(fields: Dict[str, Any]) -> Self

Adds a $addFields stage to add or modify fields.

.add_fields({"fullName": {"$concat": ["$firstName", " ", "$lastName"]}})
project(fields: Dict[str, Any]) -> Self

Adds a $project stage to reshape documents.

.project({"name": 1, "email": 1, "_id": 0})
group(group_by: Dict[str, Any], accumulators: Dict[str, Any]) -> Self

Adds a $group stage to group documents.

.group(
    group_by={"category": "$category"},
    accumulators={"total": {"$sum": "$amount"}}
)
unwind(path: str, preserve_null_and_empty_arrays: bool = False, include_array_index: Optional[str] = None) -> Self

Adds a $unwind stage to deconstruct arrays.

.unwind("tags", preserve_null_and_empty_arrays=True)
.unwind("items", include_array_index="itemIndex")
sort(fields: Dict[str, int]) -> Self

Adds a $sort stage.

.sort({"createdAt": -1, "name": 1})
limit(limit: int) -> Self

Adds a $limit stage.

.limit(10)
skip(skip: int) -> Self

Adds a $skip stage.

.skip(20)
unset(fields: Union[str, List[str]]) -> Self

Adds a $unset stage to remove fields from documents.

.unset("temp_field")
.unset(["field1", "field2", "field3"])
replace_root(new_root: Dict[str, Any]) -> Self

Adds a $replaceRoot stage to replace the root document.

.replace_root({"newRoot": "$embedded"})
.replace_root({"newRoot": {"$mergeObjects": ["$doc1", "$doc2"]}})
replace_with(replacement: Any) -> Self

Adds a $replaceWith stage (alias for $replaceRoot in MongoDB 4.2+).

.replace_with("$embedded")
.replace_with({"$mergeObjects": ["$doc1", "$doc2"]})
facet(facets: Dict[str, List[Dict[str, Any]]]) -> Self

Adds a $facet stage for parallel execution of multiple sub-pipelines.

.facet({
    "items": [{"$skip": 10}, {"$limit": 20}],
    "meta": [{"$count": "total"}]
})
count(field_name: str = "count") -> Self

Adds a $count stage to count documents.

.match({"status": "active"}).count("active_count")
set_field(fields: Dict[str, Any]) -> Self

Adds a $set stage (alias for $addFields in MongoDB 3.4+).

.set_field({"status": "active", "updatedAt": "$$NOW"})
add_stage(stage: Dict[str, Any]) -> Self

Adds a custom stage for advanced use cases.

.add_stage({"$facet": {
    "categories": [{"$group": {"_id": "$category"}}],
    "total": [{"$count": "count"}]
}})
prepend(stage: Dict[str, Any]) -> Self

Adds a stage at the beginning of the pipeline.

builder.match({"status": "active"})
builder.prepend({"$match": {"deleted": False}})
# Pipeline: [{"$match": {"deleted": False}}, {"$match": {"status": "active"}}]
insert_at(position: int, stage: Dict[str, Any]) -> Self

Inserts a stage at a specific position (0-based index) in the pipeline.

builder.match({"status": "active"}).group({"_id": "$category"}, {"count": {"$sum": 1}})
builder.insert_at(1, {"$sort": {"name": 1}})
# Pipeline: [{"$match": {...}}, {"$sort": {...}}, {"$group": {...}}]

Note: For inserting before a specific stage type, combine with get_stage_types():

stage_types = builder.get_stage_types()
group_index = stage_types.index("$group")
builder.insert_at(group_index, {"$addFields": {"x": 1}})
validate() -> bool

Validates the pipeline before execution. Checks that:

  • Pipeline is not empty
  • $out and $merge stages are the last stages (critical MongoDB rule)
  • $out and $merge are not used together
builder = PipelineBuilder()
builder.match({"status": "active"}).validate()  # Returns True

# Invalid: $out not last
builder.add_stage({"$out": "output"}).match({"status": "active"})
builder.validate()  # Raises ValueError: $out stage must be the last stage
build() -> List[Dict[str, Any]]

Returns the complete pipeline as a list of stage dictionaries.

Examples

Complex Pipeline with Nested Lookup

pipeline = (
    PipelineBuilder()
    .match({"status": "published"})
    .lookup(
        from_collection="authors",
        local_field="authorId",
        foreign_field="_id",
        as_field="author"
    )
    .unwind("author", preserve_null_and_empty_arrays=True)
    .lookup(
        from_collection="categories",
        local_field="categoryId",
        foreign_field="_id",
        as_field="category",
        pipeline=[
            {"$match": {"active": True}},
            {"$project": {"name": 1, "slug": 1}}
        ]
    )
    .unwind("category")
    .add_fields({
        "authorName": "$author.name",
        "categoryName": "$category.name"
    })
    .project({
        "title": 1,
        "authorName": 1,
        "categoryName": 1,
        "publishedAt": 1
    })
    .sort({"publishedAt": -1})
    .limit(20)
    .build()
)

Aggregation with Grouping

pipeline = (
    PipelineBuilder()
    .match({"date": {"$gte": "2024-01-01"}})
    .group(
        group_by={"month": {"$dateToString": {"format": "%Y-%m", "date": "$date"}}},
        accumulators={
            "totalSales": {"$sum": "$amount"},
            "avgAmount": {"$avg": "$amount"},
            "count": {"$sum": 1}
        }
    )
    .sort({"month": 1})
    .build()
)

Development

Project Structure

mongo-pipebuilder/
├── src/
│   └── mongo_pipebuilder/
│       ├── __init__.py
│       └── builder.py
├── tests/
│   └── test_builder.py
├── examples/
│   └── examples.py
├── pyproject.toml
├── README.md
└── LICENSE

Running Tests

pytest tests/

Contributing

See DEVELOPMENT.md for development guidelines.

License

MIT License - see LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mongo_pipebuilder-0.2.2.tar.gz (19.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mongo_pipebuilder-0.2.2-py3-none-any.whl (10.4 kB view details)

Uploaded Python 3

File details

Details for the file mongo_pipebuilder-0.2.2.tar.gz.

File metadata

  • Download URL: mongo_pipebuilder-0.2.2.tar.gz
  • Upload date:
  • Size: 19.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.2

File hashes

Hashes for mongo_pipebuilder-0.2.2.tar.gz
Algorithm Hash digest
SHA256 0926ff3f415854637de03138f370317f846c850aba6a50f4c10406e85c33c452
MD5 e7571818d9d6f24dac4335aa71edd850
BLAKE2b-256 e9e92074d45b84ef568662fe209b536f1aa4b0ae872a2e5da95ed822ba7ad835

See more details on using hashes here.

File details

Details for the file mongo_pipebuilder-0.2.2-py3-none-any.whl.

File metadata

File hashes

Hashes for mongo_pipebuilder-0.2.2-py3-none-any.whl
Algorithm Hash digest
SHA256 d8b5d853bf7d2fe2018ce4de15a7e5e7e64653da46dc5f9b87ed0a2bdb3fdee2
MD5 3045fc61a7b7d5022f663b0fb55b3f2d
BLAKE2b-256 f3f941a2b57198d2e5a05b18b7f1601d3d8c0e884ab3d10f8b2e5effb168f887

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page