Skip to main content

Alfresco Operators for Pristy

Project description

Pristy Alfresco Operators for Apache Airflow

Custom Apache Airflow operators for interacting with Alfresco ECM REST API and transforming content to Pristy pivot format.

License Python Version Apache Airflow PyPI version

Features

  • Search & Fetch: Query Alfresco nodes via Search API with pagination support
  • Transform: Convert Alfresco nodes to standardized Pristy pivot format
  • Export: Push transformed nodes to Kafka or filesystem
  • Backup & Restore: Download/upload binary content with SHA1 checksum verification (v0.7.0+)
  • SQLite Storage: Save/load node metadata to/from SQLite databases for backup portability (v0.7.0+)
  • State Tracking: PostgreSQL-based migration state management
  • Schema Validation: JSON Schema validation before export

Installation

pip install pristy-alfresco-operators

Or with uv (recommended):

uv add pristy-alfresco-operators

Requirements

  • Python 3.12
  • Apache Airflow 3.1+
  • PostgreSQL (for state tracking)
  • Apache Kafka (optional, for Kafka export)

Version Compatibility

  • Version 0.6.0+: Requires Apache Airflow 3.1+ (breaking change), uses uv for dependency management
  • Version 0.5.x and below: Compatible with Apache Airflow 2.9+

Migrating from Airflow 2.x

If you're upgrading from Airflow 2.x to 3.x, version 0.6.0 includes the following changes:

  • Replaced deprecated airflow.utils.helpers.merge_dicts with Python's native | operator
  • Updated dependency constraint to apache-airflow>=3.1.1,<4.0.0
  • Fixed deprecation warnings for Airflow 3.1+ imports

All operators remain API-compatible. No changes required in your DAG code.

Operators

Search & Fetch Operators

AlfrescoSearchOperator

Search Alfresco nodes using FTS (Full Text Search) with pagination.

from pristy.alfresco_operator.search_node_operator import AlfrescoSearchOperator

search_task = AlfrescoSearchOperator(
    task_id="search_documents",
    query="TYPE:'cm:content' AND ANCESTOR:'workspace://SpacesStore/site-id'",
    page_size=100,
    max_items=1000,
    sort_field="cm:modified",
    sort_ascending=False,
    http_conn_id="alfresco_api"
)

AlfrescoFetchChildrenOperator

Fetch all children of a folder node.

from pristy.alfresco_operator.fetch_children_node_operator import AlfrescoFetchChildrenOperator

fetch_children = AlfrescoFetchChildrenOperator(
    task_id="fetch_children",
    folders="workspace://SpacesStore/folder-uuid",
    page_size=50,
    max_items=2000
)

AlfrescoFetchNodeOperator

Fetch a single node by UUID.

from pristy.alfresco_operator.fetch_node_operator import AlfrescoFetchNodeOperator

fetch_node = AlfrescoFetchNodeOperator(
    task_id="fetch_node",
    node_id="workspace://SpacesStore/node-uuid"
)

Transform Operators

TransformFileOperator

Transform Alfresco file nodes to Pristy pivot format.

from pristy.alfresco_operator.transform_file import TransformFileOperator

transform_files = TransformFileOperator(
    task_id="transform_files",
    child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}",
    mapping_func=custom_metadata_mapper  # Optional
)

TransformFolderOperator

Transform Alfresco folder nodes to Pristy pivot format.

from pristy.alfresco_operator.transform_folder import TransformFolderOperator

transform_folders = TransformFolderOperator(
    task_id="transform_folders",
    child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}"
)

Export Operators

PushToKafkaOperator

Push nodes to Kafka with JSON Schema validation.

from pristy.alfresco_operator.push_node_to_kafka import PushToKafkaOperator

push_to_kafka = PushToKafkaOperator(
    task_id="push_to_kafka",
    nodes="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
    table_name="migration_tracking",
    source_key="uuid"
)

PushToDirectoryOperator

Export nodes as JSON files to filesystem.

from pristy.alfresco_operator.push_node_to_directory import PushToDirectoryOperator

push_to_dir = PushToDirectoryOperator(
    task_id="export_to_dir",
    node="{{ task_instance.xcom_pull(task_ids='transform_files') }}"
)

Database Operators

CreateChildrenTableOperator

Create PostgreSQL tracking table.

from pristy.alfresco_operator.create_children_table import CreateChildrenTableOperator

create_table = CreateChildrenTableOperator(
    task_id="create_table",
    table_name="export_alfresco_folder_children"
)

SaveFolderToDbOperator

Save folder children to tracking table.

from pristy.alfresco_operator.save_folder_to_db import SaveFolderToDbOperator

save_to_db = SaveFolderToDbOperator(
    task_id="save_folders",
    child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}",
    table_name="export_alfresco_folder_children"
)

Backup & Restore Operators (v0.7.0+)

AlfrescoDownloadContentOperator

Download binary content from Alfresco nodes with SHA1 checksum verification.

from pristy.alfresco_operator.download_content_operator import AlfrescoDownloadContentOperator

download_file = AlfrescoDownloadContentOperator(
    task_id="download_file",
    node_id="workspace://SpacesStore/file-uuid",
    output_dir="/datas/backups/files",
    filename="{uuid}.bin",  # Supports {uuid}, {name} placeholders
    calculate_checksum=True,  # SHA1 checksum calculation
    http_conn_id="alfresco_api"
)

AlfrescoUploadPristyOperator

Upload files or folders to Alfresco via Pristy custom endpoint with multipart/form-data.

from pristy.alfresco_operator.upload_pristy_operator import AlfrescoUploadPristyOperator

upload_file = AlfrescoUploadPristyOperator(
    task_id="upload_file",
    file_path="/datas/backups/files/abc-123.bin",
    metadata={
        "name": "document.pdf",
        "type": "cm:content",
        "path": {"parent_uuid": "workspace://SpacesStore/parent-uuid"},
        "properties": {"cm:title": "My Document"}
    },
    http_conn_id="alfresco_api_target",
    endpoint="/alfresco/service/fr/jeci/pristy/nodes/inject"
)

SaveNodeToSqliteOperator

Save node metadata to SQLite database for backup purposes.

from pristy.alfresco_operator.save_node_to_sqlite import SaveNodeToSqliteOperator

save_to_sqlite = SaveNodeToSqliteOperator(
    task_id="save_metadata",
    nodes="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
    sqlite_path="/datas/backups/backup_20250110/backup.db",
    file_downloaded=True,
    checksums={"uuid-123": "2fd4e1c67a2d28fced849ee1bb76e7391b93eb12"}
)

LoadNodesFromSqliteOperator

Load node metadata from SQLite database for restore operations.

from pristy.alfresco_operator.load_nodes_from_sqlite import LoadNodesFromSqliteOperator

load_from_sqlite = LoadNodesFromSqliteOperator(
    task_id="load_metadata",
    sqlite_path="/datas/backups/backup_20250110/backup.db",
    batch_size=20,
    node_type_filter="file",  # 'folder', 'file', or None
    order_by="file_size DESC"  # Custom sorting
)

Complete Backup & Restore System: See the dag-alfresco-pristy project for a complete implementation using these operators to backup and restore Alfresco content hierarchies.

Configuration

Airflow Connections

Define these connections in Airflow:

# Alfresco API connection
airflow connections add alfresco_api \
    --conn-type http \
    --conn-host alfresco.example.com \
    --conn-login admin \
    --conn-password admin \
    --conn-port 443 \
    --conn-schema https

# PostgreSQL tracking database
airflow connections add local_pg \
    --conn-type postgres \
    --conn-host localhost \
    --conn-login airflow \
    --conn-password airflow \
    --conn-schema airflow \
    --conn-port 5432

# Kafka (optional)
airflow connections add kafka_pristy \
    --conn-type kafka \
    --conn-extra '{"bootstrap.servers": "localhost:9092"}'

Airflow Variables

# Source Alfresco server URL
airflow variables set alfresco_source_server "https://alfresco.example.com"

# Kafka export topic (optional)
airflow variables set kafka_export_topic "pristy-node-injector"

# Target site for migration (optional)
airflow variables set alfresco_export_target_site "my-target-site"

# Target root UUID for migration (optional)
airflow variables set alfresco_target_root_uuid "workspace://SpacesStore/target-folder-uuid"

Configurable Connection and Variable Names (v0.6.0+)

Starting with version 0.6.0, all operators support custom connection IDs and variable names, allowing you to use the same operators in different contexts within a single Airflow instance.

PostgreSQL Operators

All PostgreSQL-based operators now accept postgres_conn_id:

# Use a custom PostgreSQL connection
create_table = CreateChildrenTableOperator(
    task_id="create_table",
    table_name="my_tracking_table",
    postgres_conn_id="postgres_target"  # Default: "local_pg"
)

save_to_db = SaveFolderToDbOperator(
    task_id="save_folders",
    child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}",
    table_name="my_tracking_table",
    postgres_conn_id="postgres_target"  # Default: "local_pg"
)

Alfresco HTTP Operators

All Alfresco operators now accept http_conn_id:

# Use different Alfresco connections for source and target
search_source = AlfrescoSearchOperator(
    task_id="search_source",
    query="TYPE:'cm:content'",
    http_conn_id="alfresco_source"  # Default: "alfresco_api"
)

fetch_from_target = AlfrescoFetchNodeOperator(
    task_id="fetch_target",
    node_id="workspace://SpacesStore/node-uuid",
    http_conn_id="alfresco_target"  # Default: "alfresco_api"
)

Kafka Operator

Kafka operator accepts custom connection and topic variable:

push_to_kafka = PushToKafkaOperator(
    task_id="push_to_kafka",
    nodes="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
    table_name="migration_tracking",
    kafka_conn_id="kafka_prod",           # Default: "kafka_pristy"
    topic_var="kafka_export_topic_prod"   # Default: "kafka_export_topic"
)

Transform Operators

Transform operators accept custom variable names:

transform = TransformFileOperator(
    task_id="transform_files",
    child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}",
    source_server_var="alfresco_source_server_prod"  # Default: "alfresco_source_server"
)

Path Update Function

The update_node_path function (used internally by transform operators) accepts custom variable names:

from pristy.alfresco_operator.update_node_path import update_node_path

# Customize variable names for path transformation
update_node_path(
    src_node,
    new_node,
    target_site_var="alfresco_export_site_custom",      # Default: "alfresco_export_target_site"
    target_root_uuid_var="alfresco_root_uuid_custom",   # Default: "alfresco_target_root_uuid"
    short_path_remove_var="path_prefix_remove_custom"   # Default: "short_path_remove"
)

Directory Export

Directory export operator accepts custom output path:

push_to_dir = PushToDirectoryOperator(
    task_id="export_to_dir",
    node="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
    output_base_dir="/data/exports"  # Default: "/usr/local/airflow/output"
)

Multi-Context Example

Run multiple parallel migration workflows with different configurations:

# Migration from Production Alfresco to Dev Alfresco
search_prod = AlfrescoSearchOperator(
    task_id="search_prod",
    query="TYPE:'cm:content'",
    http_conn_id="alfresco_prod"
)

# Migration from Staging Alfresco to Test Kafka
search_staging = AlfrescoSearchOperator(
    task_id="search_staging",
    query="TYPE:'cm:content'",
    http_conn_id="alfresco_staging"
)

push_prod = PushToKafkaOperator(
    task_id="push_prod",
    nodes="{{ task_instance.xcom_pull(task_ids='transform_prod') }}",
    table_name="migration_prod",
    kafka_conn_id="kafka_dev",
    topic_var="kafka_topic_dev",
    postgres_conn_id="postgres_dev"
)

push_staging = PushToKafkaOperator(
    task_id="push_staging",
    nodes="{{ task_instance.xcom_pull(task_ids='transform_staging') }}",
    table_name="migration_staging",
    kafka_conn_id="kafka_test",
    topic_var="kafka_topic_test",
    postgres_conn_id="postgres_test"
)

Pristy Pivot Format

The operators transform Alfresco nodes to a standardized format as defined in the Pristy Injector documentation.

Example node structure:

{
  "name": "document.pdf",
  "type": "cm:content",
  "dateCreated": "2024-01-15T10:30:00Z",
  "owner": "admin",
  "path": {
    "root": "site:my-site",
    "short": "/Documents/Folder"
  },
  "properties": {
    "cm:created": "2024-01-15T10:30:00Z",
    "cm:creator": "admin",
    "cm:modified": "2024-01-20T14:45:00Z",
    "cm:modifier": "editor"
  },
  "source": {
    "type": "alfresco",
    "server": "https://alfresco.example.com",
    "uuid": "workspace://SpacesStore/node-uuid",
    "mimetype": "application/pdf",
    "size": 102400
  }
}

For complete format specification and available fields, see the Pristy Injector documentation.

Example DAG

from datetime import datetime
from airflow import DAG
from pristy.alfresco_operator.search_node_operator import AlfrescoSearchOperator
from pristy.alfresco_operator.transform_file import TransformFileOperator
from pristy.alfresco_operator.push_node_to_kafka import PushToKafkaOperator

with DAG(
    dag_id="alfresco_to_kafka",
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False
) as dag:

    search = AlfrescoSearchOperator(
        task_id="search_documents",
        query="TYPE:'cm:content'",
        page_size=100
    )

    transform = TransformFileOperator(
        task_id="transform_files",
        child="{{ task_instance.xcom_pull(task_ids='search_documents') }}"
    )

    push = PushToKafkaOperator(
        task_id="push_to_kafka",
        nodes="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
        table_name="migration_tracking"
    )

    search >> transform >> push

Development

Setup

# Clone repository
git clone https://github.com/your-org/pristy-alfresco-operators.git
cd pristy-alfresco-operators

# Install dependencies (creates venv automatically)
uv sync --group test

Testing

# Run all tests with coverage
uv run pytest

# Run specific test
uv run pytest tests/schema/test_schema.py -v

# Run tests without coverage
uv run pytest --no-cov

# Run tests in verbose mode
uv run pytest -vv

Code Quality

This project follows strict code quality standards:

  • Security: All SQL queries use parameterized statements
  • Type hints: PEP 604 union syntax (str | None)
  • Imports: Lazy imports in execute() methods for Airflow performance
  • Resource management: try/finally blocks for connections
  • Error handling: Granular error states with proper tracking

See CONVENTIONS.md for detailed guidelines.

Release Process

  1. Update version in pyproject.toml
  2. Update CHANGELOG.md (if present)
  3. Create release:
TAG=0.8.0
git add pyproject.toml README.md
git commit -m "version $TAG"
git tag "$TAG"
git push
git push origin "tags/$TAG"
uv build
uv publish

Architecture

  • Operators: Extend BaseOperator with task-specific logic
  • Utils: Shared utilities (parse_alfresco_pagination, create_base_node)
  • State Tracking: PostgreSQL tables with newrunningsuccess/error states
  • Schema Validation: JSON Schema validation before export

License

Apache License 2.0 - see LICENSE file for details.

Contributing

Contributions are welcome! Please:

  1. Follow the code conventions in CONVENTIONS.md
  2. Write tests for new features
  3. Ensure all tests pass
  4. Submit a pull request

Support

Acknowledgments

Developed by Jeci for integration with Pristy services platform.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pristy_alfresco_operators-0.8.0.tar.gz (286.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pristy_alfresco_operators-0.8.0-py3-none-any.whl (35.5 kB view details)

Uploaded Python 3

File details

Details for the file pristy_alfresco_operators-0.8.0.tar.gz.

File metadata

File hashes

Hashes for pristy_alfresco_operators-0.8.0.tar.gz
Algorithm Hash digest
SHA256 efcce7019337149b82bfab61800835617d99e54d8082c14db648d522009880a5
MD5 b614e7ef409dc34aa3e8a244483a849c
BLAKE2b-256 50478a458448a19c2975278450b82ea25d19d083238f805d20bab802be2c659f

See more details on using hashes here.

File details

Details for the file pristy_alfresco_operators-0.8.0-py3-none-any.whl.

File metadata

File hashes

Hashes for pristy_alfresco_operators-0.8.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e37c98c203e5cfd3168c315473b40304b021cd950cdbfcb3ca955fef52d05e9d
MD5 c78afeff04f41de394960581b4834af0
BLAKE2b-256 5c2f51e543935ea52992cf5b1e4a6f95f15ba112c14e13554dad7ebdff698c53

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page