Skip to main content

Core utilities for building MongoDB/Cosmos DB aggregation pipelines

Project description

abs-pipelines-core

MongoDB aggregation pipeline builders and file operations for record enrichment.

Installation

poetry add ../libs/abs-pipelines-core

Quick Start - Record Enrichment

Use prepare_enrichment() to get everything you need in one call:

from abs_pipelines_core import RecordEnricher

enricher = RecordEnricher(
    blob_service=blob_service,
    user_collection_name="user_documents"
)

# Fetch ALL fields for the entity
fields = (await field_repository.get_all(filters)).get("founds", [])

# Prepare enrichment config (with optional record filter)
config = enricher.prepare_enrichment(
    fields=fields,
    record_ids=["rec_id_1", "rec_id_2"],  # Optional: filter by specific IDs
    option_field_ids=["status_field_id"]   # Optional: fields needing label resolution
)

# Apply to query
find = ListFilter()
if config.pre_filter:
    find.pre_filters = config.pre_filter
find.reference_fields = AggregationStage(pipeline=config.pipeline)

# Get records
response = await repository.get_all(find, collection_name)

# Add file URLs (post-processing)
response = await enricher.process_file_fields(config.file_field_ids, response)

File Operations - EntityRecordBlobHandler

File upload/download operations for entity records:

from abs_pipelines_core import EntityRecordBlobHandler

# Initialize with blob repository
blob_handler = EntityRecordBlobHandler(blob_repository)

# Upload single file
result = await blob_handler.upload_record_field_file(entity_id, field_id, file)
# Returns: {field_id: file_id}

# Upload multiple files (filename format: {field_id}_actualfilename)
results = await blob_handler.upload_record_field_files(entity_id, files)
# Returns: [{field_id: file_id}, ...]

# Get file URL
url = await blob_handler.get_record_field_file(file_id)

# Get multiple file URLs
urls = await blob_handler.get_record_field_files(entity_id, file_ids)
# Returns: [{file_id: url}, ...]

# Delete file
await blob_handler.delete_record_field_file(file_id)

# Delete field folder
await blob_handler.delete_record_field_folder(entity_id, field_id)

# Delete entity folder
await blob_handler.delete_record_folder(entity_id)

Storage Path Convention

Files are stored with this path structure:

  • entity-{entity_id}/field-{field_id}/{file_id}

EnrichmentConfig

The prepare_enrichment() method returns an EnrichmentConfig with:

Property Type Description
pipeline List[Dict] Combined $lookup and $addFields aggregation stages
pre_filter Optional[Dict] Filter schema for record_ids (use with find.pre_filters)
file_field_ids List[str] Field IDs for file URL processing

Individual Pipeline Methods

build_lookup_pipeline(fields)

Build $lookup stages for association and user fields:

lookup_pipeline = enricher.build_lookup_pipeline(fields)

build_option_label_pipeline(field_id, value_to_label, is_multi_select)

Build $addFields stage for option label resolution:

value_to_label = {"uuid-1": "Active", "uuid-2": "Inactive"}
option_pipeline = enricher.build_option_label_pipeline(
    field_id="status",
    value_to_label=value_to_label,
    is_multi_select=False
)

process_file_fields(file_fields, response, expiry_minutes)

Attach file URLs to records (async, post-processing):

response = await enricher.process_file_fields(
    file_fields=["file_field_id"],
    response=response,
    expiry_minutes=5
)

Exports

from abs_pipelines_core import (
    # Main classes
    RecordEnricher,           # Aggregation pipeline builder
    EntityRecordBlobHandler,  # File operations handler

    # Config/Schema
    EnrichmentConfig,         # Return type for prepare_enrichment()
    FieldConfig,              # Schema for field configurations
    FieldReference,           # Schema for association references
    OptionConfig,             # Schema for select options
    RequestData,              # Request data for file uploads

    # Protocols
    BlobStorageProtocol,      # Protocol for blob service (get_file_url only)
    BlobRepositoryProtocol,   # Protocol for full blob repository
)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

abs_pipelines_core-0.1.0.tar.gz (8.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

abs_pipelines_core-0.1.0-py3-none-any.whl (8.8 kB view details)

Uploaded Python 3

File details

Details for the file abs_pipelines_core-0.1.0.tar.gz.

File metadata

  • Download URL: abs_pipelines_core-0.1.0.tar.gz
  • Upload date:
  • Size: 8.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.2 CPython/3.13.7 Darwin/23.6.0

File hashes

Hashes for abs_pipelines_core-0.1.0.tar.gz
Algorithm Hash digest
SHA256 c35c534609d9f99071a0e65f22e48bf215cce4172a4464a10fd6cf1fd16ed0f8
MD5 3dfd5bc9559fb74bdb30664dbb5e80d1
BLAKE2b-256 f24ba46ed4687d35688b1f1f49bbe791ed0b38dea8660ecd20bc05fffd1b8372

See more details on using hashes here.

File details

Details for the file abs_pipelines_core-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: abs_pipelines_core-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 8.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.2 CPython/3.13.7 Darwin/23.6.0

File hashes

Hashes for abs_pipelines_core-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 7afee63e87f5705e6b3c2d20c924753e80e3ca6c1c69db23b108bdc32cdc944e
MD5 e7d11307b9de6cb9e42e1b37387891c3
BLAKE2b-256 abe0f768e2f9223e7c003aeda9b9f899d6a4f20ad263d0a7913c5f9703b77f6b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page