Core utilities for building MongoDB/Cosmos DB aggregation pipelines
Project description
abs-pipelines-core
MongoDB aggregation pipeline builders and file operations for record enrichment.
Installation
poetry add ../libs/abs-pipelines-core
Quick Start - Record Enrichment
Use prepare_enrichment() to get everything you need in one call:
from abs_pipelines_core import RecordEnricher
enricher = RecordEnricher(
blob_service=blob_service,
user_collection_name="user_documents"
)
# Fetch ALL fields for the entity
fields = (await field_repository.get_all(filters)).get("founds", [])
# Prepare enrichment config (with optional record filter)
config = enricher.prepare_enrichment(
fields=fields,
record_ids=["rec_id_1", "rec_id_2"], # Optional: filter by specific IDs
option_field_ids=["status_field_id"] # Optional: fields needing label resolution
)
# Apply to query
find = ListFilter()
if config.pre_filter:
find.pre_filters = config.pre_filter
find.reference_fields = AggregationStage(pipeline=config.pipeline)
# Get records
response = await repository.get_all(find, collection_name)
# Add file URLs (post-processing)
response = await enricher.process_file_fields(config.file_field_ids, response)
File Operations - EntityRecordBlobHandler
File upload/download operations for entity records:
from abs_pipelines_core import EntityRecordBlobHandler
# Initialize with blob repository
blob_handler = EntityRecordBlobHandler(blob_repository)
# Upload single file
result = await blob_handler.upload_record_field_file(entity_id, field_id, file)
# Returns: {field_id: file_id}
# Upload multiple files (filename format: {field_id}_actualfilename)
results = await blob_handler.upload_record_field_files(entity_id, files)
# Returns: [{field_id: file_id}, ...]
# Get file URL
url = await blob_handler.get_record_field_file(file_id)
# Get multiple file URLs
urls = await blob_handler.get_record_field_files(entity_id, file_ids)
# Returns: [{file_id: url}, ...]
# Delete file
await blob_handler.delete_record_field_file(file_id)
# Delete field folder
await blob_handler.delete_record_field_folder(entity_id, field_id)
# Delete entity folder
await blob_handler.delete_record_folder(entity_id)
Storage Path Convention
Files are stored with this path structure:
entity-{entity_id}/field-{field_id}/{file_id}
EnrichmentConfig
The prepare_enrichment() method returns an EnrichmentConfig with:
| Property | Type | Description |
|---|---|---|
pipeline |
List[Dict] |
Combined $lookup and $addFields aggregation stages |
pre_filter |
Optional[Dict] |
Filter schema for record_ids (use with find.pre_filters) |
file_field_ids |
List[str] |
Field IDs for file URL processing |
Individual Pipeline Methods
build_lookup_pipeline(fields)
Build $lookup stages for association and user fields:
lookup_pipeline = enricher.build_lookup_pipeline(fields)
build_option_label_pipeline(field_id, value_to_label, is_multi_select)
Build $addFields stage for option label resolution:
value_to_label = {"uuid-1": "Active", "uuid-2": "Inactive"}
option_pipeline = enricher.build_option_label_pipeline(
field_id="status",
value_to_label=value_to_label,
is_multi_select=False
)
process_file_fields(file_fields, response, expiry_minutes)
Attach file URLs to records (async, post-processing):
response = await enricher.process_file_fields(
file_fields=["file_field_id"],
response=response,
expiry_minutes=5
)
Exports
from abs_pipelines_core import (
# Main classes
RecordEnricher, # Aggregation pipeline builder
EntityRecordBlobHandler, # File operations handler
# Config/Schema
EnrichmentConfig, # Return type for prepare_enrichment()
FieldConfig, # Schema for field configurations
FieldReference, # Schema for association references
OptionConfig, # Schema for select options
RequestData, # Request data for file uploads
# Protocols
BlobStorageProtocol, # Protocol for blob service (get_file_url only)
BlobRepositoryProtocol, # Protocol for full blob repository
)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file abs_pipelines_core-0.1.0.tar.gz.
File metadata
- Download URL: abs_pipelines_core-0.1.0.tar.gz
- Upload date:
- Size: 8.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.2 CPython/3.13.7 Darwin/23.6.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c35c534609d9f99071a0e65f22e48bf215cce4172a4464a10fd6cf1fd16ed0f8
|
|
| MD5 |
3dfd5bc9559fb74bdb30664dbb5e80d1
|
|
| BLAKE2b-256 |
f24ba46ed4687d35688b1f1f49bbe791ed0b38dea8660ecd20bc05fffd1b8372
|
File details
Details for the file abs_pipelines_core-0.1.0-py3-none-any.whl.
File metadata
- Download URL: abs_pipelines_core-0.1.0-py3-none-any.whl
- Upload date:
- Size: 8.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.2 CPython/3.13.7 Darwin/23.6.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7afee63e87f5705e6b3c2d20c924753e80e3ca6c1c69db23b108bdc32cdc944e
|
|
| MD5 |
e7d11307b9de6cb9e42e1b37387891c3
|
|
| BLAKE2b-256 |
abe0f768e2f9223e7c003aeda9b9f899d6a4f20ad263d0a7913c5f9703b77f6b
|