Core utilities for building MongoDB/Cosmos DB aggregation pipelines
Project description
abs-pipelines-core
MongoDB aggregation pipeline builders and file operations for record enrichment.
Installation
# From PyPI
pip install abs-pipelines-core
# Or with Poetry
poetry add abs-pipelines-core
Quick Start - Record Enrichment
Use prepare_enrichment() to get everything you need in one call:
from abs_pipelines_core import RecordEnricher
enricher = RecordEnricher(
blob_service=blob_service,
user_collection_name="user_documents"
)
# Fetch ALL fields for the entity
fields = (await field_repository.get_all(filters)).get("founds", [])
# Prepare enrichment config (with optional record filter)
config = enricher.prepare_enrichment(
fields=fields,
record_ids=["rec_id_1", "rec_id_2"], # Optional: filter by specific IDs
option_field_ids=["status_field_id"] # Optional: fields needing label resolution
)
# Apply to query
find = ListFilter()
if config.pre_filter:
find.pre_filters = config.pre_filter
find.reference_fields = AggregationStage(pipeline=config.pipeline)
# Get records
response = await repository.get_all(find, collection_name)
# Add file URLs (post-processing)
response = await enricher.process_file_fields(config.file_field_ids, response)
File Operations - EntityRecordBlobHandler
File upload/download operations for entity records:
from abs_pipelines_core import EntityRecordBlobHandler
# Initialize with blob repository
blob_handler = EntityRecordBlobHandler(blob_repository)
# Upload single file
result = await blob_handler.upload_record_field_file(entity_id, field_id, file)
# Returns: {field_id: file_id}
# Upload multiple files (filename format: {field_id}_actualfilename)
results = await blob_handler.upload_record_field_files(entity_id, files)
# Returns: [{field_id: file_id}, ...]
# Get file URL
url = await blob_handler.get_record_field_file(file_id)
# Get multiple file URLs
urls = await blob_handler.get_record_field_files(entity_id, file_ids)
# Returns: [{file_id: url}, ...]
# Delete file
await blob_handler.delete_record_field_file(file_id)
# Delete field folder
await blob_handler.delete_record_field_folder(entity_id, field_id)
# Delete entity folder
await blob_handler.delete_record_folder(entity_id)
Storage Path Convention
Files are stored with this path structure:
entity-{entity_id}/field-{field_id}/{file_id}
Fields with Associations Pipeline
Use build_fields_with_associations_pipeline() to fetch fields with their associated definitions populated:
from abs_pipelines_core import build_fields_with_associations_pipeline
# Build the pipeline
pipeline = build_fields_with_associations_pipeline(user_collection_name="users_documents")
# Add a match stage and run
full_pipeline = [{"$match": {"entity_id": entity_id}}, *pipeline]
fields = await db.fields.aggregate(full_pipeline).to_list(1000)
# Result for association field:
# {
# "_id": "field_123",
# "type": "association",
# "reference": {
# "entity_id": "entity_456",
# "entity_fields": [ # <-- Populated!
# {"_id": "...", "name": "Name", "type": "text", ...},
# ]
# }
# }
# Result for user field:
# {
# "_id": "field_789",
# "type": "user",
# "user_filter": {
# "is_active": true,
# "users": [ # <-- Populated!
# {"_id": "...", "name": "John Doe", ...},
# ]
# }
# }
EnrichmentConfig
The prepare_enrichment() method returns an EnrichmentConfig with:
| Property | Type | Description |
|---|---|---|
pipeline |
List[Dict] |
Combined $lookup and $addFields aggregation stages |
pre_filter |
Optional[Dict] |
Filter schema for record_ids (use with find.pre_filters) |
file_field_ids |
List[str] |
Field IDs for file URL processing |
Individual Pipeline Methods
build_lookup_pipeline(fields)
Build $lookup stages for association and user fields:
lookup_pipeline = enricher.build_lookup_pipeline(fields)
build_option_label_pipeline(field_id, value_to_label, is_multi_select)
Build $addFields stage for option label resolution:
value_to_label = {"uuid-1": "Active", "uuid-2": "Inactive"}
option_pipeline = enricher.build_option_label_pipeline(
field_id="status",
value_to_label=value_to_label,
is_multi_select=False
)
process_file_fields(file_fields, response, expiry_minutes)
Attach file URLs to records (async, post-processing):
response = await enricher.process_file_fields(
file_fields=["file_field_id"],
response=response,
expiry_minutes=5
)
Package Structure
abs_pipelines_core/
├── __init__.py # Public API exports
├── record_enricher.py # RecordEnricher, schemas, and protocols
├── blob_handler.py # EntityRecordBlobHandler for file operations
└── fields_pipeline.py # build_fields_with_associations_pipeline
Exports
from abs_pipelines_core import (
# Record enrichment
RecordEnricher, # Aggregation pipeline builder
EnrichmentConfig, # Return type for prepare_enrichment()
FieldConfig, # Schema for field configurations
FieldReference, # Schema for association references
OptionConfig, # Schema for select options
BlobStorageProtocol, # Protocol for blob service (get_file_url)
# Blob/file operations
EntityRecordBlobHandler, # File operations handler
RequestData, # Request data for file uploads
BlobRepositoryProtocol, # Protocol for full blob repository
# Fields pipeline
build_fields_with_associations_pipeline, # Fields with associations aggregation
)
Requirements
- Python 3.11+
- motor >= 3.6.0
- pymongo >= 4.10.0
- pydantic >= 2.10.0
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file abs_pipelines_core-0.1.1.tar.gz.
File metadata
- Download URL: abs_pipelines_core-0.1.1.tar.gz
- Upload date:
- Size: 10.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.2 CPython/3.13.7 Darwin/23.6.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bcd6e5ed3ccd2671bf8669bcecdb6dc18e93270a161deec01ec4cd3720fe8a16
|
|
| MD5 |
ae45ac82006527efd8fc75ab4dcf8f0f
|
|
| BLAKE2b-256 |
34bb91c66330c19645b0f64456a8eb86585f74c7e6236ae0ee9c6740b656ca6c
|
File details
Details for the file abs_pipelines_core-0.1.1-py3-none-any.whl.
File metadata
- Download URL: abs_pipelines_core-0.1.1-py3-none-any.whl
- Upload date:
- Size: 11.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.2 CPython/3.13.7 Darwin/23.6.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b8140c4b7032b1ea17bbc35fe3c5dd02649d2752f6f3ca374fa67591a8da8677
|
|
| MD5 |
df17469a8b79fbbd4af7e39375fb1656
|
|
| BLAKE2b-256 |
d038a7f05f7ecee4aa59f92594d920879e7ced6a1cf6dc968d3a64cb1f407130
|