Alfresco Operators for Pristy
Project description
Pristy Alfresco Operators for Apache Airflow
Custom Apache Airflow operators for interacting with Alfresco ECM REST API and transforming content to Pristy pivot format.
Features
- Search & Fetch: Query Alfresco nodes via Search API with pagination support
- Transform: Convert Alfresco nodes to standardized Pristy pivot format
- Export: Push transformed nodes to Kafka or filesystem
- State Tracking: PostgreSQL-based migration state management
- Schema Validation: JSON Schema validation before export
Installation
pip install pristy-alfresco-operators
Or with Poetry:
poetry add pristy-alfresco-operators
Requirements
- Python 3.12
- Apache Airflow 2.9+
- PostgreSQL (for state tracking)
- Apache Kafka (optional, for Kafka export)
Operators
Search & Fetch Operators
AlfrescoSearchOperator
Search Alfresco nodes using FTS (Full Text Search) with pagination.
from pristy.alfresco_operator.search_node_operator import AlfrescoSearchOperator
search_task = AlfrescoSearchOperator(
task_id="search_documents",
query="TYPE:'cm:content' AND ANCESTOR:'workspace://SpacesStore/site-id'",
page_size=100,
max_items=1000,
sort_field="cm:modified",
sort_ascending=False,
http_conn_id="alfresco_api"
)
AlfrescoFetchChildrenOperator
Fetch all children of a folder node.
from pristy.alfresco_operator.fetch_children_node_operator import AlfrescoFetchChildrenOperator
fetch_children = AlfrescoFetchChildrenOperator(
task_id="fetch_children",
folders="workspace://SpacesStore/folder-uuid",
page_size=50,
max_items=2000
)
AlfrescoFetchNodeOperator
Fetch a single node by UUID.
from pristy.alfresco_operator.fetch_node_operator import AlfrescoFetchNodeOperator
fetch_node = AlfrescoFetchNodeOperator(
task_id="fetch_node",
node_id="workspace://SpacesStore/node-uuid"
)
Transform Operators
TransformFileOperator
Transform Alfresco file nodes to Pristy pivot format.
from pristy.alfresco_operator.transform_file import TransformFileOperator
transform_files = TransformFileOperator(
task_id="transform_files",
child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}",
mapping_func=custom_metadata_mapper # Optional
)
TransformFolderOperator
Transform Alfresco folder nodes to Pristy pivot format.
from pristy.alfresco_operator.transform_folder import TransformFolderOperator
transform_folders = TransformFolderOperator(
task_id="transform_folders",
child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}"
)
Export Operators
PushToKafkaOperator
Push nodes to Kafka with JSON Schema validation.
from pristy.alfresco_operator.push_node_to_kafka import PushToKafkaOperator
push_to_kafka = PushToKafkaOperator(
task_id="push_to_kafka",
nodes="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
table_name="migration_tracking",
source_key="uuid"
)
PushToDirectoryOperator
Export nodes as JSON files to filesystem.
from pristy.alfresco_operator.push_node_to_directory import PushToDirectoryOperator
push_to_dir = PushToDirectoryOperator(
task_id="export_to_dir",
node="{{ task_instance.xcom_pull(task_ids='transform_files') }}"
)
Database Operators
CreateChildrenTableOperator
Create PostgreSQL tracking table.
from pristy.alfresco_operator.create_children_table import CreateChildrenTableOperator
create_table = CreateChildrenTableOperator(
task_id="create_table",
table_name="export_alfresco_folder_children"
)
SaveFolderToDbOperator
Save folder children to tracking table.
from pristy.alfresco_operator.save_folder_to_db import SaveFolderToDbOperator
save_to_db = SaveFolderToDbOperator(
task_id="save_folders",
child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}",
table_name="export_alfresco_folder_children"
)
Configuration
Airflow Connections
Define these connections in Airflow:
# Alfresco API connection
airflow connections add alfresco_api \
--conn-type http \
--conn-host alfresco.example.com \
--conn-login admin \
--conn-password admin \
--conn-port 443 \
--conn-schema https
# PostgreSQL tracking database
airflow connections add local_pg \
--conn-type postgres \
--conn-host localhost \
--conn-login airflow \
--conn-password airflow \
--conn-schema airflow \
--conn-port 5432
# Kafka (optional)
airflow connections add kafka_pristy \
--conn-type kafka \
--conn-extra '{"bootstrap.servers": "localhost:9092"}'
Airflow Variables
# Source Alfresco server URL
airflow variables set alfresco_source_server "https://alfresco.example.com"
# Kafka export topic (optional)
airflow variables set kafka_export_topic "pristy-node-injector"
# Target site for migration (optional)
airflow variables set alfresco_export_target_site "my-target-site"
# Target root UUID for migration (optional)
airflow variables set alfresco_target_root_uuid "workspace://SpacesStore/target-folder-uuid"
Pristy Pivot Format
The operators transform Alfresco nodes to a standardized format as defined in the Pristy Injector documentation.
Example node structure:
{
"name": "document.pdf",
"type": "cm:content",
"dateCreated": "2024-01-15T10:30:00Z",
"owner": "admin",
"path": {
"root": "site:my-site",
"short": "/Documents/Folder"
},
"properties": {
"cm:created": "2024-01-15T10:30:00Z",
"cm:creator": "admin",
"cm:modified": "2024-01-20T14:45:00Z",
"cm:modifier": "editor"
},
"source": {
"type": "alfresco",
"server": "https://alfresco.example.com",
"uuid": "workspace://SpacesStore/node-uuid",
"mimetype": "application/pdf",
"size": 102400
}
}
For complete format specification and available fields, see the Pristy Injector documentation.
Example DAG
from airflow import DAG
from airflow.utils.dates import days_ago
from pristy.alfresco_operator.search_node_operator import AlfrescoSearchOperator
from pristy.alfresco_operator.transform_file import TransformFileOperator
from pristy.alfresco_operator.push_node_to_kafka import PushToKafkaOperator
with DAG(
dag_id="alfresco_to_kafka",
start_date=days_ago(1),
schedule_interval=None,
catchup=False
) as dag:
search = AlfrescoSearchOperator(
task_id="search_documents",
query="TYPE:'cm:content'",
page_size=100
)
transform = TransformFileOperator(
task_id="transform_files",
child="{{ task_instance.xcom_pull(task_ids='search_documents') }}"
)
push = PushToKafkaOperator(
task_id="push_to_kafka",
nodes="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
table_name="migration_tracking"
)
search >> transform >> push
Development
Setup
# Clone repository
git clone https://github.com/your-org/pristy-alfresco-operators.git
cd pristy-alfresco-operators
# Create virtual environment
python3.12 -m venv .venv
source .venv/bin/activate
# Install dependencies
pip install poetry
poetry install
Testing
# Run tests
pytest tests/
# Run specific test
pytest tests/schema/test_schema.py -v
Code Quality
This project follows strict code quality standards:
- Security: All SQL queries use parameterized statements
- Type hints: PEP 604 union syntax (
str | None) - Imports: Lazy imports in
execute()methods for Airflow performance - Resource management:
try/finallyblocks for connections - Error handling: Granular error states with proper tracking
See CONVENTIONS.md for detailed guidelines.
Release Process
- Update version in
pyproject.toml - Update CHANGELOG.md (if present)
- Create release:
TAG=0.4.2
git add pyproject.toml README.md
git commit -m "version $TAG"
git tag "$TAG"
git push
git push origin "tags/$TAG"
poetry build
poetry publish
Architecture
- Operators: Extend
BaseOperatorwith task-specific logic - Utils: Shared utilities (
parse_alfresco_pagination,create_base_node) - State Tracking: PostgreSQL tables with
new→running→success/errorstates - Schema Validation: JSON Schema validation before export
License
Apache License 2.0 - see LICENSE file for details.
Contributing
Contributions are welcome! Please:
- Follow the code conventions in CONVENTIONS.md
- Write tests for new features
- Ensure all tests pass
- Submit a pull request
Support
- Issues: https://gitlab.com/pristy-oss/pristy-alfresco-operators/-/issues
- Documentation: https://docs.pristy.fr/
Acknowledgments
Developed by Jeci for integration with Pristy services platform.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pristy_alfresco_operators-0.5.0.tar.gz.
File metadata
- Download URL: pristy_alfresco_operators-0.5.0.tar.gz
- Upload date:
- Size: 19.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.1 CPython/3.13.7 Linux/6.16.9-200.fc42.x86_64
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c55c0b8ad84dc51fb680ebe2535ff2e7e0a3b0e8916c11e07f8e40a229a7a2e6
|
|
| MD5 |
f472f0c4e9c04b49cef38ded0925f78f
|
|
| BLAKE2b-256 |
7c87efdd8263ca0f8da02cd23081c41ed8706aaabe1522fe5d82050ba3aa8427
|
File details
Details for the file pristy_alfresco_operators-0.5.0-py3-none-any.whl.
File metadata
- Download URL: pristy_alfresco_operators-0.5.0-py3-none-any.whl
- Upload date:
- Size: 25.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.1 CPython/3.13.7 Linux/6.16.9-200.fc42.x86_64
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a2fcfabc0e4caec7587c66ab342714d0526f3a2db914153f91a1fa19959cc6e1
|
|
| MD5 |
4ee3712fb98ffca17c53d1a1449d20ca
|
|
| BLAKE2b-256 |
734adc0017fc454733c058713da8d0bcb7f441d2e4cb0aa3ff88cfc7c39ab88a
|