Alfresco Operators for Pristy
Project description
Pristy Alfresco Operators for Apache Airflow
Custom Apache Airflow operators for interacting with Alfresco ECM REST API and transforming content to Pristy pivot format.
Features
- Search & Fetch: Query Alfresco nodes via Search API with pagination support
- Transform: Convert Alfresco nodes to standardized Pristy pivot format
- Export: Push transformed nodes to Kafka or filesystem
- Backup & Restore: Download/upload binary content with SHA1 checksum verification (v0.7.0+)
- SQLite Storage: Save/load node metadata to/from SQLite databases for backup portability (v0.7.0+)
- State Tracking: PostgreSQL-based migration state management
- Schema Validation: JSON Schema validation before export
Installation
pip install pristy-alfresco-operators
Or with uv (recommended):
uv add pristy-alfresco-operators
Requirements
- Python 3.12
- Apache Airflow 3.1+
- PostgreSQL (for state tracking)
- Apache Kafka (optional, for Kafka export)
Version Compatibility
- Version 0.6.0+: Requires Apache Airflow 3.1+ (breaking change), uses uv for dependency management
- Version 0.5.x and below: Compatible with Apache Airflow 2.9+
Migrating from Airflow 2.x
If you're upgrading from Airflow 2.x to 3.x, version 0.6.0 includes the following changes:
- Replaced deprecated
airflow.utils.helpers.merge_dictswith Python's native|operator - Updated dependency constraint to
apache-airflow>=3.1.1,<4.0.0 - Fixed deprecation warnings for Airflow 3.1+ imports
All operators remain API-compatible. No changes required in your DAG code.
Operators
Search & Fetch Operators
AlfrescoSearchOperator
Search Alfresco nodes using FTS (Full Text Search) with pagination.
from pristy.alfresco_operator.search_node_operator import AlfrescoSearchOperator
search_task = AlfrescoSearchOperator(
task_id="search_documents",
query="TYPE:'cm:content' AND ANCESTOR:'workspace://SpacesStore/site-id'",
page_size=100,
max_items=1000,
sort_field="cm:modified",
sort_ascending=False,
http_conn_id="alfresco_api"
)
AlfrescoFetchChildrenOperator
Fetch all children of a folder node.
from pristy.alfresco_operator.fetch_children_node_operator import AlfrescoFetchChildrenOperator
fetch_children = AlfrescoFetchChildrenOperator(
task_id="fetch_children",
folders="workspace://SpacesStore/folder-uuid",
page_size=50,
max_items=2000
)
AlfrescoFetchNodeOperator
Fetch a single node by UUID.
from pristy.alfresco_operator.fetch_node_operator import AlfrescoFetchNodeOperator
fetch_node = AlfrescoFetchNodeOperator(
task_id="fetch_node",
node_id="workspace://SpacesStore/node-uuid"
)
Transform Operators
TransformFileOperator
Transform Alfresco file nodes to Pristy pivot format.
from pristy.alfresco_operator.transform_file import TransformFileOperator
transform_files = TransformFileOperator(
task_id="transform_files",
child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}",
mapping_func=custom_metadata_mapper # Optional
)
TransformFolderOperator
Transform Alfresco folder nodes to Pristy pivot format.
from pristy.alfresco_operator.transform_folder import TransformFolderOperator
transform_folders = TransformFolderOperator(
task_id="transform_folders",
child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}"
)
Export Operators
PushToKafkaOperator
Push nodes to Kafka with JSON Schema validation.
from pristy.alfresco_operator.push_node_to_kafka import PushToKafkaOperator
push_to_kafka = PushToKafkaOperator(
task_id="push_to_kafka",
nodes="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
table_name="migration_tracking",
source_key="uuid"
)
PushToDirectoryOperator
Export nodes as JSON files to filesystem.
from pristy.alfresco_operator.push_node_to_directory import PushToDirectoryOperator
push_to_dir = PushToDirectoryOperator(
task_id="export_to_dir",
node="{{ task_instance.xcom_pull(task_ids='transform_files') }}"
)
Database Operators
CreateChildrenTableOperator
Create PostgreSQL tracking table.
from pristy.alfresco_operator.create_children_table import CreateChildrenTableOperator
create_table = CreateChildrenTableOperator(
task_id="create_table",
table_name="export_alfresco_folder_children"
)
SaveFolderToDbOperator
Save folder children to tracking table.
from pristy.alfresco_operator.save_folder_to_db import SaveFolderToDbOperator
save_to_db = SaveFolderToDbOperator(
task_id="save_folders",
child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}",
table_name="export_alfresco_folder_children"
)
Backup & Restore Operators (v0.7.0+)
AlfrescoDownloadContentOperator
Download binary content from Alfresco nodes with SHA1 checksum verification.
from pristy.alfresco_operator.download_content_operator import AlfrescoDownloadContentOperator
download_file = AlfrescoDownloadContentOperator(
task_id="download_file",
node_id="workspace://SpacesStore/file-uuid",
output_dir="/datas/backups/files",
filename="{uuid}.bin", # Supports {uuid}, {name} placeholders
calculate_checksum=True, # SHA1 checksum calculation
http_conn_id="alfresco_api"
)
AlfrescoUploadPristyOperator
Upload files or folders to Alfresco via Pristy custom endpoint with multipart/form-data.
from pristy.alfresco_operator.upload_pristy_operator import AlfrescoUploadPristyOperator
upload_file = AlfrescoUploadPristyOperator(
task_id="upload_file",
file_path="/datas/backups/files/abc-123.bin",
metadata={
"name": "document.pdf",
"type": "cm:content",
"path": {"parent_uuid": "workspace://SpacesStore/parent-uuid"},
"properties": {"cm:title": "My Document"}
},
http_conn_id="alfresco_api_target",
endpoint="/alfresco/service/fr/jeci/pristy/nodes/inject"
)
SaveNodeToSqliteOperator
Save node metadata to SQLite database for backup purposes.
from pristy.alfresco_operator.save_node_to_sqlite import SaveNodeToSqliteOperator
save_to_sqlite = SaveNodeToSqliteOperator(
task_id="save_metadata",
nodes="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
sqlite_path="/datas/backups/backup_20250110/backup.db",
file_downloaded=True,
checksums={"uuid-123": "2fd4e1c67a2d28fced849ee1bb76e7391b93eb12"}
)
LoadNodesFromSqliteOperator
Load node metadata from SQLite database for restore operations.
from pristy.alfresco_operator.load_nodes_from_sqlite import LoadNodesFromSqliteOperator
load_from_sqlite = LoadNodesFromSqliteOperator(
task_id="load_metadata",
sqlite_path="/datas/backups/backup_20250110/backup.db",
batch_size=20,
node_type_filter="file", # 'folder', 'file', or None
order_by="file_size DESC" # Custom sorting
)
Complete Backup & Restore System: See the dag-alfresco-pristy project for a complete implementation using these operators to backup and restore Alfresco content hierarchies.
Configuration
Airflow Connections
Define these connections in Airflow:
# Alfresco API connection
airflow connections add alfresco_api \
--conn-type http \
--conn-host alfresco.example.com \
--conn-login admin \
--conn-password admin \
--conn-port 443 \
--conn-schema https
# PostgreSQL tracking database
airflow connections add local_pg \
--conn-type postgres \
--conn-host localhost \
--conn-login airflow \
--conn-password airflow \
--conn-schema airflow \
--conn-port 5432
# Kafka (optional)
airflow connections add kafka_pristy \
--conn-type kafka \
--conn-extra '{"bootstrap.servers": "localhost:9092"}'
Airflow Variables
# Source Alfresco server URL
airflow variables set alfresco_source_server "https://alfresco.example.com"
# Kafka export topic (optional)
airflow variables set kafka_export_topic "pristy-node-injector"
# Target site for migration (optional)
airflow variables set alfresco_export_target_site "my-target-site"
# Target root UUID for migration (optional)
airflow variables set alfresco_target_root_uuid "workspace://SpacesStore/target-folder-uuid"
Configurable Connection and Variable Names (v0.6.0+)
Starting with version 0.6.0, all operators support custom connection IDs and variable names, allowing you to use the same operators in different contexts within a single Airflow instance.
PostgreSQL Operators
All PostgreSQL-based operators now accept postgres_conn_id:
# Use a custom PostgreSQL connection
create_table = CreateChildrenTableOperator(
task_id="create_table",
table_name="my_tracking_table",
postgres_conn_id="postgres_target" # Default: "local_pg"
)
save_to_db = SaveFolderToDbOperator(
task_id="save_folders",
child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}",
table_name="my_tracking_table",
postgres_conn_id="postgres_target" # Default: "local_pg"
)
Alfresco HTTP Operators
All Alfresco operators now accept http_conn_id:
# Use different Alfresco connections for source and target
search_source = AlfrescoSearchOperator(
task_id="search_source",
query="TYPE:'cm:content'",
http_conn_id="alfresco_source" # Default: "alfresco_api"
)
fetch_from_target = AlfrescoFetchNodeOperator(
task_id="fetch_target",
node_id="workspace://SpacesStore/node-uuid",
http_conn_id="alfresco_target" # Default: "alfresco_api"
)
Kafka Operator
Kafka operator accepts custom connection and topic variable:
push_to_kafka = PushToKafkaOperator(
task_id="push_to_kafka",
nodes="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
table_name="migration_tracking",
kafka_conn_id="kafka_prod", # Default: "kafka_pristy"
topic_var="kafka_export_topic_prod" # Default: "kafka_export_topic"
)
Transform Operators
Transform operators accept custom variable names:
transform = TransformFileOperator(
task_id="transform_files",
child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}",
source_server_var="alfresco_source_server_prod" # Default: "alfresco_source_server"
)
Path Update Function
The update_node_path function (used internally by transform operators) accepts custom variable names:
from pristy.alfresco_operator.update_node_path import update_node_path
# Customize variable names for path transformation
update_node_path(
src_node,
new_node,
target_site_var="alfresco_export_site_custom", # Default: "alfresco_export_target_site"
target_root_uuid_var="alfresco_root_uuid_custom", # Default: "alfresco_target_root_uuid"
short_path_remove_var="path_prefix_remove_custom" # Default: "short_path_remove"
)
Directory Export
Directory export operator accepts custom output path:
push_to_dir = PushToDirectoryOperator(
task_id="export_to_dir",
node="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
output_base_dir="/data/exports" # Default: "/usr/local/airflow/output"
)
Multi-Context Example
Run multiple parallel migration workflows with different configurations:
# Migration from Production Alfresco to Dev Alfresco
search_prod = AlfrescoSearchOperator(
task_id="search_prod",
query="TYPE:'cm:content'",
http_conn_id="alfresco_prod"
)
# Migration from Staging Alfresco to Test Kafka
search_staging = AlfrescoSearchOperator(
task_id="search_staging",
query="TYPE:'cm:content'",
http_conn_id="alfresco_staging"
)
push_prod = PushToKafkaOperator(
task_id="push_prod",
nodes="{{ task_instance.xcom_pull(task_ids='transform_prod') }}",
table_name="migration_prod",
kafka_conn_id="kafka_dev",
topic_var="kafka_topic_dev",
postgres_conn_id="postgres_dev"
)
push_staging = PushToKafkaOperator(
task_id="push_staging",
nodes="{{ task_instance.xcom_pull(task_ids='transform_staging') }}",
table_name="migration_staging",
kafka_conn_id="kafka_test",
topic_var="kafka_topic_test",
postgres_conn_id="postgres_test"
)
Pristy Pivot Format
The operators transform Alfresco nodes to a standardized format as defined in the Pristy Injector documentation.
Example node structure:
{
"name": "document.pdf",
"type": "cm:content",
"dateCreated": "2024-01-15T10:30:00Z",
"owner": "admin",
"path": {
"root": "site:my-site",
"short": "/Documents/Folder"
},
"properties": {
"cm:created": "2024-01-15T10:30:00Z",
"cm:creator": "admin",
"cm:modified": "2024-01-20T14:45:00Z",
"cm:modifier": "editor"
},
"source": {
"type": "alfresco",
"server": "https://alfresco.example.com",
"uuid": "workspace://SpacesStore/node-uuid",
"mimetype": "application/pdf",
"size": 102400
}
}
For complete format specification and available fields, see the Pristy Injector documentation.
Internal DAG Metadata (__dag_param)
The __dag_param field is reserved for internal DAG metadata and is NOT part of the Pristy pivot format. It should not be sent to the Pristy Injector API or Kafka.
Common uses:
parent_uuid: Parent node UUID for hierarchical processingisFile,isFolder: Node type flags for SQLite storage- Other DAG-specific tracking data
Example with __dag_param:
{
"name": "document.pdf",
"type": "cm:content",
"path": {
"root": "site:my-site",
"short": "/Documents/Folder"
},
"properties": {
"cm:created": "2024-01-15T10:30:00Z",
"cm:creator": "admin"
},
"source": {
"type": "alfresco",
"server": "https://alfresco.example.com",
"uuid": "workspace://SpacesStore/node-uuid"
},
"__dag_param": {
"parent_uuid": "workspace://SpacesStore/parent-123",
"isFile": true,
"isFolder": false
}
}
⚠️ Important: Only fields defined in the InjectorNode schema should be included in the pivot. Never add custom fields like isFile, isFolder, etc. directly to the pivot structure.
Example DAG
from datetime import datetime
from airflow import DAG
from pristy.alfresco_operator.search_node_operator import AlfrescoSearchOperator
from pristy.alfresco_operator.transform_file import TransformFileOperator
from pristy.alfresco_operator.push_node_to_kafka import PushToKafkaOperator
with DAG(
dag_id="alfresco_to_kafka",
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False
) as dag:
search = AlfrescoSearchOperator(
task_id="search_documents",
query="TYPE:'cm:content'",
page_size=100
)
transform = TransformFileOperator(
task_id="transform_files",
child="{{ task_instance.xcom_pull(task_ids='search_documents') }}"
)
push = PushToKafkaOperator(
task_id="push_to_kafka",
nodes="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
table_name="migration_tracking"
)
search >> transform >> push
Development
Setup
# Clone repository
git clone https://github.com/your-org/pristy-alfresco-operators.git
cd pristy-alfresco-operators
# Install dependencies (creates venv automatically)
uv sync --group test
Testing
# Run all tests with coverage
uv run pytest
# Run specific test
uv run pytest tests/schema/test_schema.py -v
# Run tests without coverage
uv run pytest --no-cov
# Run tests in verbose mode
uv run pytest -vv
Code Quality
This project follows strict code quality standards:
- Security: All SQL queries use parameterized statements
- Type hints: PEP 604 union syntax (
str | None) - Imports: Lazy imports in
execute()methods for Airflow performance - Resource management:
try/finallyblocks for connections - Error handling: Granular error states with proper tracking
See CONVENTIONS.md for detailed guidelines.
Release Process
- Update version in
pyproject.toml - Update CHANGELOG.md (if present)
- Create release:
TAG=0.8.0
git add pyproject.toml README.md
git commit -m "version $TAG"
git tag "$TAG"
git push
git push origin "tags/$TAG"
uv build
uv publish
Architecture
- Operators: Extend
BaseOperatorwith task-specific logic - Utils: Shared utilities (
parse_alfresco_pagination,create_base_node) - State Tracking: PostgreSQL tables with
new→running→success/errorstates - Schema Validation: JSON Schema validation before export
License
Apache License 2.0 - see LICENSE file for details.
Contributing
Contributions are welcome! Please:
- Follow the code conventions in CONVENTIONS.md
- Write tests for new features
- Ensure all tests pass
- Submit a pull request
Support
- Issues: https://gitlab.com/pristy-oss/pristy-alfresco-operators/-/issues
- Documentation: https://docs.pristy.fr/
Acknowledgments
Developed by Jeci for integration with Pristy services platform.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pristy_alfresco_operators-0.8.1.tar.gz.
File metadata
- Download URL: pristy_alfresco_operators-0.8.1.tar.gz
- Upload date:
- Size: 289.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1596e3918258c3bd89a254b5713cf92ddb033d049fd938dcedda3bc864b16e06
|
|
| MD5 |
8fe6478560d7154418ce29a3911ead90
|
|
| BLAKE2b-256 |
da0c553ef3e7dcc1a07b23a34be4b7838211ac18f585ee55b81484c274b49e46
|
File details
Details for the file pristy_alfresco_operators-0.8.1-py3-none-any.whl.
File metadata
- Download URL: pristy_alfresco_operators-0.8.1-py3-none-any.whl
- Upload date:
- Size: 36.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
eed446864e13feea740d1a6e1544543a34b64d9264d4356ce65670ca6494dc48
|
|
| MD5 |
8e6a4c7d0093410b2da32f869fa4f886
|
|
| BLAKE2b-256 |
beccc283d6e55be98fc53d922b3bde5f3ae3a8532dba774464743c8b3d7a0d86
|