Skip to main content

Core utilities for building MongoDB/Cosmos DB aggregation pipelines

Project description

abs-pipelines-core

MongoDB aggregation pipeline builders and file operations for record enrichment.

Installation

# From PyPI
pip install abs-pipelines-core

# Or with Poetry
poetry add abs-pipelines-core

Quick Start - Record Enrichment

Use prepare_enrichment() to get everything you need in one call:

from abs_pipelines_core import RecordEnricher

enricher = RecordEnricher(
    blob_service=blob_service,
    user_collection_name="user_documents"
)

# Fetch ALL fields for the entity
fields = (await field_repository.get_all(filters)).get("founds", [])

# Prepare enrichment config (with optional record filter)
config = enricher.prepare_enrichment(
    fields=fields,
    record_ids=["rec_id_1", "rec_id_2"],  # Optional: filter by specific IDs
    option_field_ids=["status_field_id"]   # Optional: fields needing label resolution
)

# Apply to query
find = ListFilter()
if config.pre_filter:
    find.pre_filters = config.pre_filter
find.reference_fields = AggregationStage(pipeline=config.pipeline)

# Get records
response = await repository.get_all(find, collection_name)

# Add file URLs (post-processing)
response = await enricher.process_file_fields(config.file_field_ids, response)

File Operations - EntityRecordBlobHandler

File upload/download operations for entity records:

from abs_pipelines_core import EntityRecordBlobHandler

# Initialize with blob repository
blob_handler = EntityRecordBlobHandler(blob_repository)

# Upload single file
result = await blob_handler.upload_record_field_file(entity_id, field_id, file)
# Returns: {field_id: file_id}

# Upload multiple files (filename format: {field_id}_actualfilename)
results = await blob_handler.upload_record_field_files(entity_id, files)
# Returns: [{field_id: file_id}, ...]

# Get file URL
url = await blob_handler.get_record_field_file(file_id)

# Get multiple file URLs
urls = await blob_handler.get_record_field_files(entity_id, file_ids)
# Returns: [{file_id: url}, ...]

# Delete file
await blob_handler.delete_record_field_file(file_id)

# Delete field folder
await blob_handler.delete_record_field_folder(entity_id, field_id)

# Delete entity folder
await blob_handler.delete_record_folder(entity_id)

Storage Path Convention

Files are stored with this path structure:

  • entity-{entity_id}/field-{field_id}/{file_id}

Fields with Associations Pipeline

Use build_fields_with_associations_pipeline() to fetch fields with their associated definitions populated:

from abs_pipelines_core import build_fields_with_associations_pipeline

# Build the pipeline
pipeline = build_fields_with_associations_pipeline(user_collection_name="users_documents")

# Add a match stage and run
full_pipeline = [{"$match": {"entity_id": entity_id}}, *pipeline]
fields = await db.fields.aggregate(full_pipeline).to_list(1000)

# Result for association field:
# {
#     "_id": "field_123",
#     "type": "association",
#     "reference": {
#         "entity_id": "entity_456",
#         "entity_fields": [  # <-- Populated!
#             {"_id": "...", "name": "Name", "type": "text", ...},
#         ]
#     }
# }

# Result for user field:
# {
#     "_id": "field_789",
#     "type": "user",
#     "user_filter": {
#         "is_active": true,
#         "users": [  # <-- Populated!
#             {"_id": "...", "name": "John Doe", ...},
#         ]
#     }
# }

EnrichmentConfig

The prepare_enrichment() method returns an EnrichmentConfig with:

Property Type Description
pipeline List[Dict] Combined $lookup and $addFields aggregation stages
pre_filter Optional[Dict] Filter schema for record_ids (use with find.pre_filters)
file_field_ids List[str] Field IDs for file URL processing

Individual Pipeline Methods

build_lookup_pipeline(fields)

Build $lookup stages for association and user fields:

lookup_pipeline = enricher.build_lookup_pipeline(fields)

build_option_label_pipeline(field_id, value_to_label, is_multi_select)

Build $addFields stage for option label resolution:

value_to_label = {"uuid-1": "Active", "uuid-2": "Inactive"}
option_pipeline = enricher.build_option_label_pipeline(
    field_id="status",
    value_to_label=value_to_label,
    is_multi_select=False
)

process_file_fields(file_fields, response, expiry_minutes)

Attach file URLs to records (async, post-processing):

response = await enricher.process_file_fields(
    file_fields=["file_field_id"],
    response=response,
    expiry_minutes=5
)

Package Structure

abs_pipelines_core/
├── __init__.py          # Public API exports
├── record_enricher.py   # RecordEnricher, schemas, and protocols
├── blob_handler.py      # EntityRecordBlobHandler for file operations
└── fields_pipeline.py   # build_fields_with_associations_pipeline

Exports

from abs_pipelines_core import (
    # Record enrichment
    RecordEnricher,                          # Aggregation pipeline builder
    EnrichmentConfig,                        # Return type for prepare_enrichment()
    FieldConfig,                             # Schema for field configurations
    FieldReference,                          # Schema for association references
    OptionConfig,                            # Schema for select options
    BlobStorageProtocol,                     # Protocol for blob service (get_file_url)

    # Blob/file operations
    EntityRecordBlobHandler,                 # File operations handler
    RequestData,                             # Request data for file uploads
    BlobRepositoryProtocol,                  # Protocol for full blob repository

    # Fields pipeline
    build_fields_with_associations_pipeline, # Fields with associations aggregation
)

Requirements

  • Python 3.11+
  • motor >= 3.6.0
  • pymongo >= 4.10.0
  • pydantic >= 2.10.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

abs_pipelines_core-0.1.1.tar.gz (10.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

abs_pipelines_core-0.1.1-py3-none-any.whl (11.4 kB view details)

Uploaded Python 3

File details

Details for the file abs_pipelines_core-0.1.1.tar.gz.

File metadata

  • Download URL: abs_pipelines_core-0.1.1.tar.gz
  • Upload date:
  • Size: 10.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.2 CPython/3.13.7 Darwin/23.6.0

File hashes

Hashes for abs_pipelines_core-0.1.1.tar.gz
Algorithm Hash digest
SHA256 bcd6e5ed3ccd2671bf8669bcecdb6dc18e93270a161deec01ec4cd3720fe8a16
MD5 ae45ac82006527efd8fc75ab4dcf8f0f
BLAKE2b-256 34bb91c66330c19645b0f64456a8eb86585f74c7e6236ae0ee9c6740b656ca6c

See more details on using hashes here.

File details

Details for the file abs_pipelines_core-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: abs_pipelines_core-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 11.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.2 CPython/3.13.7 Darwin/23.6.0

File hashes

Hashes for abs_pipelines_core-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 b8140c4b7032b1ea17bbc35fe3c5dd02649d2752f6f3ca374fa67591a8da8677
MD5 df17469a8b79fbbd4af7e39375fb1656
BLAKE2b-256 d038a7f05f7ecee4aa59f92594d920879e7ced6a1cf6dc968d3a64cb1f407130

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page