Skip to main content

Alfresco Operators for Pristy

Project description

Pristy Alfresco Operators for Apache Airflow

Custom Apache Airflow operators for interacting with Alfresco ECM REST API and transforming content to Pristy pivot format.

License Python Version Apache Airflow PyPI version

Features

  • Ticket Authentication: AlfrescoHook with ticket-based auth and optional PostgreSQL cache to minimize server-side sessions (v0.9.0+)
  • Search & Fetch: Query Alfresco nodes via Search API with pagination support
  • Transform: Convert Alfresco nodes to standardized Pristy pivot format
  • Export: Push transformed nodes to Kafka or filesystem
  • Backup & Restore: Download/upload binary content with SHA1 checksum verification (v0.7.0+)
  • SQLite Storage: Save/load node metadata to/from SQLite databases for backup portability (v0.7.0+)
  • State Tracking: PostgreSQL-based migration state management
  • Schema Validation: JSON Schema validation before export

Installation

pip install pristy-alfresco-operators

Or with uv (recommended):

uv add pristy-alfresco-operators

Requirements

  • Python 3.12
  • Apache Airflow 3.1+
  • PostgreSQL (for state tracking)
  • Apache Kafka (optional, for Kafka export)

Version Compatibility

  • Version 0.9.0+: Adds AlfrescoHook with ticket authentication (backward compatible)
  • Version 0.6.0+: Requires Apache Airflow 3.1+ (breaking change), uses uv for dependency management
  • Version 0.5.x and below: Compatible with Apache Airflow 2.9+

Migrating from Airflow 2.x

If you're upgrading from Airflow 2.x to 3.x, version 0.6.0 includes the following changes:

  • Replaced deprecated airflow.utils.helpers.merge_dicts with Python's native | operator
  • Updated dependency constraint to apache-airflow>=3.1.1,<4.0.0
  • Fixed deprecation warnings for Airflow 3.1+ imports

All operators remain API-compatible. No changes required in your DAG code.

AlfrescoHook (v0.9.0+)

All Alfresco operators now use AlfrescoHook instead of the standard HttpHook. This hook authenticates via the Alfresco Ticket API, acquiring a single ticket and reusing it across all HTTP calls within a task execution, instead of sending Basic Auth credentials on every request.

Why?

With Basic Auth, every HTTP request creates a new server-side session in Alfresco. High-volume DAGs (pagination, recursive traversals) can generate hundreds of thousands of sessions, overwhelming the default Alfresco session cache (1000 entries).

Two operating modes

In-memory only (default): A ticket is acquired once per task and reused for all HTTP calls within that task.

from pristy.alfresco_operator.alfresco_hook import AlfrescoHook

hook = AlfrescoHook(http_conn_id="alfresco_api")

PostgreSQL cache: When ticket_cache_conn_id is provided, tickets are stored in a shared alfresco_ticket_cache table so that all tasks and DAGs reuse the same ticket.

hook = AlfrescoHook(
    http_conn_id="alfresco_api",
    ticket_cache_conn_id="local_pg",  # PostgreSQL connection for shared cache
    ticket_ttl=3000,                  # 50 min (default), 10 min safety margin on 1h server TTL
)

Automatic 401 retry

If the server responds with 401 Unauthorized (expired ticket), the hook automatically invalidates the current ticket, acquires a new one, and retries the request once.

Backward compatibility

AlfrescoHook extends HttpHook, so isinstance(hook, HttpHook) returns True. All operators accept the new optional parameters ticket_cache_conn_id and ticket_ttl with backward-compatible defaults (None and 3000).

Using the hook directly in DAGs

For custom Alfresco API calls outside the provided operators:

from pristy.alfresco_operator.alfresco_hook import AlfrescoHook

hook = AlfrescoHook(
    method="GET",
    http_conn_id="alfresco_api",
    ticket_cache_conn_id="local_pg",  # Optional
)

response = hook.run(
    endpoint="/alfresco/api/-default-/public/alfresco/versions/1/nodes/node-uuid",
    headers={"Accept": "application/json"},
)

Operators

Search & Fetch Operators

AlfrescoSearchOperator

Search Alfresco nodes using FTS (Full Text Search) with pagination.

from pristy.alfresco_operator.search_node_operator import AlfrescoSearchOperator

search_task = AlfrescoSearchOperator(
    task_id="search_documents",
    query="TYPE:'cm:content' AND ANCESTOR:'workspace://SpacesStore/site-id'",
    page_size=100,
    max_items=1000,
    sort_field="cm:modified",
    sort_ascending=False,
    http_conn_id="alfresco_api"
)

AlfrescoFetchChildrenOperator

Fetch all children of a folder node.

from pristy.alfresco_operator.fetch_children_node_operator import AlfrescoFetchChildrenOperator

fetch_children = AlfrescoFetchChildrenOperator(
    task_id="fetch_children",
    folders="workspace://SpacesStore/folder-uuid",
    page_size=50,
    max_items=2000
)

AlfrescoFetchNodeOperator

Fetch a single node by UUID.

from pristy.alfresco_operator.fetch_node_operator import AlfrescoFetchNodeOperator

fetch_node = AlfrescoFetchNodeOperator(
    task_id="fetch_node",
    node_id="workspace://SpacesStore/node-uuid"
)

Transform Operators

TransformFileOperator

Transform Alfresco file nodes to Pristy pivot format.

from pristy.alfresco_operator.transform_file import TransformFileOperator

transform_files = TransformFileOperator(
    task_id="transform_files",
    child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}",
    mapping_func=custom_metadata_mapper  # Optional
)

TransformFolderOperator

Transform Alfresco folder nodes to Pristy pivot format.

from pristy.alfresco_operator.transform_folder import TransformFolderOperator

transform_folders = TransformFolderOperator(
    task_id="transform_folders",
    child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}"
)

Export Operators

PushToKafkaOperator

Push nodes to Kafka with JSON Schema validation.

from pristy.alfresco_operator.push_node_to_kafka import PushToKafkaOperator

push_to_kafka = PushToKafkaOperator(
    task_id="push_to_kafka",
    nodes="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
    table_name="migration_tracking",
    source_key="uuid"
)

PushToDirectoryOperator

Export nodes as JSON files to filesystem.

from pristy.alfresco_operator.push_node_to_directory import PushToDirectoryOperator

push_to_dir = PushToDirectoryOperator(
    task_id="export_to_dir",
    node="{{ task_instance.xcom_pull(task_ids='transform_files') }}"
)

Database Operators

CreateChildrenTableOperator

Create PostgreSQL tracking table.

from pristy.alfresco_operator.create_children_table import CreateChildrenTableOperator

create_table = CreateChildrenTableOperator(
    task_id="create_table",
    table_name="export_alfresco_folder_children"
)

SaveFolderToDbOperator

Save folder children to tracking table.

from pristy.alfresco_operator.save_folder_to_db import SaveFolderToDbOperator

save_to_db = SaveFolderToDbOperator(
    task_id="save_folders",
    child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}",
    table_name="export_alfresco_folder_children"
)

Backup & Restore Operators (v0.7.0+)

AlfrescoDownloadContentOperator

Download binary content from Alfresco nodes with SHA1 checksum verification.

from pristy.alfresco_operator.download_content_operator import AlfrescoDownloadContentOperator

download_file = AlfrescoDownloadContentOperator(
    task_id="download_file",
    node_id="workspace://SpacesStore/file-uuid",
    output_dir="/datas/backups/files",
    filename="{uuid}.bin",  # Supports {uuid}, {name} placeholders
    calculate_checksum=True,  # SHA1 checksum calculation
    http_conn_id="alfresco_api"
)

AlfrescoUploadPristyOperator

Upload files or folders to Alfresco via Pristy custom endpoint with multipart/form-data.

from pristy.alfresco_operator.upload_pristy_operator import AlfrescoUploadPristyOperator

upload_file = AlfrescoUploadPristyOperator(
    task_id="upload_file",
    file_path="/datas/backups/files/abc-123.bin",
    metadata={
        "name": "document.pdf",
        "type": "cm:content",
        "path": {"parent_uuid": "workspace://SpacesStore/parent-uuid"},
        "properties": {"cm:title": "My Document"}
    },
    http_conn_id="alfresco_api_target",
    endpoint="/alfresco/service/fr/jeci/pristy/nodes/inject"
)

SaveNodeToSqliteOperator

Save node metadata to SQLite database for backup purposes.

from pristy.alfresco_operator.save_node_to_sqlite import SaveNodeToSqliteOperator

save_to_sqlite = SaveNodeToSqliteOperator(
    task_id="save_metadata",
    nodes="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
    sqlite_path="/datas/backups/backup_20250110/backup.db",
    file_downloaded=True,
    checksums={"uuid-123": "2fd4e1c67a2d28fced849ee1bb76e7391b93eb12"}
)

LoadNodesFromSqliteOperator

Load node metadata from SQLite database for restore operations.

from pristy.alfresco_operator.load_nodes_from_sqlite import LoadNodesFromSqliteOperator

load_from_sqlite = LoadNodesFromSqliteOperator(
    task_id="load_metadata",
    sqlite_path="/datas/backups/backup_20250110/backup.db",
    batch_size=20,
    node_type_filter="file",  # 'folder', 'file', or None
    order_by="file_size DESC"  # Custom sorting
)

Complete Backup & Restore System: See the dag-alfresco-pristy project for a complete implementation using these operators to backup and restore Alfresco content hierarchies.

Configuration

Airflow Connections

Define these connections in Airflow:

# Alfresco API connection
airflow connections add alfresco_api \
    --conn-type http \
    --conn-host alfresco.example.com \
    --conn-login admin \
    --conn-password admin \
    --conn-port 443 \
    --conn-schema https

# PostgreSQL tracking database
airflow connections add local_pg \
    --conn-type postgres \
    --conn-host localhost \
    --conn-login airflow \
    --conn-password airflow \
    --conn-schema airflow \
    --conn-port 5432

# Kafka (optional)
airflow connections add kafka_pristy \
    --conn-type kafka \
    --conn-extra '{"bootstrap.servers": "localhost:9092"}'

Airflow Variables

# Source Alfresco server URL
airflow variables set alfresco_source_server "https://alfresco.example.com"

# Kafka export topic (optional)
airflow variables set kafka_export_topic "pristy-node-injector"

# Target site for migration (optional)
airflow variables set alfresco_export_target_site "my-target-site"

# Target root UUID for migration (optional)
airflow variables set alfresco_target_root_uuid "workspace://SpacesStore/target-folder-uuid"

Configurable Connection and Variable Names (v0.6.0+)

Starting with version 0.6.0, all operators support custom connection IDs and variable names, allowing you to use the same operators in different contexts within a single Airflow instance.

PostgreSQL Operators

All PostgreSQL-based operators now accept postgres_conn_id:

# Use a custom PostgreSQL connection
create_table = CreateChildrenTableOperator(
    task_id="create_table",
    table_name="my_tracking_table",
    postgres_conn_id="postgres_target"  # Default: "local_pg"
)

save_to_db = SaveFolderToDbOperator(
    task_id="save_folders",
    child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}",
    table_name="my_tracking_table",
    postgres_conn_id="postgres_target"  # Default: "local_pg"
)

Alfresco HTTP Operators

All Alfresco operators now accept http_conn_id:

# Use different Alfresco connections for source and target
search_source = AlfrescoSearchOperator(
    task_id="search_source",
    query="TYPE:'cm:content'",
    http_conn_id="alfresco_source"  # Default: "alfresco_api"
)

fetch_from_target = AlfrescoFetchNodeOperator(
    task_id="fetch_target",
    node_id="workspace://SpacesStore/node-uuid",
    http_conn_id="alfresco_target"  # Default: "alfresco_api"
)

Kafka Operator

Kafka operator accepts custom connection and topic variable:

push_to_kafka = PushToKafkaOperator(
    task_id="push_to_kafka",
    nodes="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
    table_name="migration_tracking",
    kafka_conn_id="kafka_prod",           # Default: "kafka_pristy"
    topic_var="kafka_export_topic_prod"   # Default: "kafka_export_topic"
)

Transform Operators

Transform operators accept custom variable names:

transform = TransformFileOperator(
    task_id="transform_files",
    child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}",
    source_server_var="alfresco_source_server_prod"  # Default: "alfresco_source_server"
)

Path Update Function

The update_node_path function (used internally by transform operators) accepts custom variable names:

from pristy.alfresco_operator.update_node_path import update_node_path

# Customize variable names for path transformation
update_node_path(
    src_node,
    new_node,
    target_site_var="alfresco_export_site_custom",      # Default: "alfresco_export_target_site"
    target_root_uuid_var="alfresco_root_uuid_custom",   # Default: "alfresco_target_root_uuid"
    short_path_remove_var="path_prefix_remove_custom"   # Default: "short_path_remove"
)

Directory Export

Directory export operator accepts custom output path:

push_to_dir = PushToDirectoryOperator(
    task_id="export_to_dir",
    node="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
    output_base_dir="/data/exports"  # Default: "/usr/local/airflow/output"
)

Multi-Context Example

Run multiple parallel migration workflows with different configurations:

# Migration from Production Alfresco to Dev Alfresco
search_prod = AlfrescoSearchOperator(
    task_id="search_prod",
    query="TYPE:'cm:content'",
    http_conn_id="alfresco_prod"
)

# Migration from Staging Alfresco to Test Kafka
search_staging = AlfrescoSearchOperator(
    task_id="search_staging",
    query="TYPE:'cm:content'",
    http_conn_id="alfresco_staging"
)

push_prod = PushToKafkaOperator(
    task_id="push_prod",
    nodes="{{ task_instance.xcom_pull(task_ids='transform_prod') }}",
    table_name="migration_prod",
    kafka_conn_id="kafka_dev",
    topic_var="kafka_topic_dev",
    postgres_conn_id="postgres_dev"
)

push_staging = PushToKafkaOperator(
    task_id="push_staging",
    nodes="{{ task_instance.xcom_pull(task_ids='transform_staging') }}",
    table_name="migration_staging",
    kafka_conn_id="kafka_test",
    topic_var="kafka_topic_test",
    postgres_conn_id="postgres_test"
)

Pristy Pivot Format

The operators transform Alfresco nodes to a standardized format as defined in the Pristy Injector documentation.

Example node structure:

{
  "name": "document.pdf",
  "type": "cm:content",
  "dateCreated": "2024-01-15T10:30:00Z",
  "owner": "admin",
  "path": {
    "root": "site:my-site",
    "short": "/Documents/Folder"
  },
  "properties": {
    "cm:created": "2024-01-15T10:30:00Z",
    "cm:creator": "admin",
    "cm:modified": "2024-01-20T14:45:00Z",
    "cm:modifier": "editor"
  },
  "source": {
    "type": "alfresco",
    "server": "https://alfresco.example.com",
    "uuid": "workspace://SpacesStore/node-uuid",
    "mimetype": "application/pdf",
    "size": 102400
  }
}

For complete format specification and available fields, see the Pristy Injector documentation.

Internal DAG Metadata (__dag_param)

The __dag_param field is reserved for internal DAG metadata and is NOT part of the Pristy pivot format. It should not be sent to the Pristy Injector API or Kafka.

Common uses:

  • parent_uuid: Parent node UUID for hierarchical processing
  • isFile, isFolder: Node type flags for SQLite storage
  • Other DAG-specific tracking data

Example with __dag_param:

{
  "name": "document.pdf",
  "type": "cm:content",
  "path": {
    "root": "site:my-site",
    "short": "/Documents/Folder"
  },
  "properties": {
    "cm:created": "2024-01-15T10:30:00Z",
    "cm:creator": "admin"
  },
  "source": {
    "type": "alfresco",
    "server": "https://alfresco.example.com",
    "uuid": "workspace://SpacesStore/node-uuid"
  },
  "__dag_param": {
    "parent_uuid": "workspace://SpacesStore/parent-123",
    "isFile": true,
    "isFolder": false
  }
}

⚠️ Important: Only fields defined in the InjectorNode schema should be included in the pivot. Never add custom fields like isFile, isFolder, etc. directly to the pivot structure.

Example DAG

from datetime import datetime
from airflow import DAG
from pristy.alfresco_operator.search_node_operator import AlfrescoSearchOperator
from pristy.alfresco_operator.transform_file import TransformFileOperator
from pristy.alfresco_operator.push_node_to_kafka import PushToKafkaOperator

with DAG(
    dag_id="alfresco_to_kafka",
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False
) as dag:

    search = AlfrescoSearchOperator(
        task_id="search_documents",
        query="TYPE:'cm:content'",
        page_size=100
    )

    transform = TransformFileOperator(
        task_id="transform_files",
        child="{{ task_instance.xcom_pull(task_ids='search_documents') }}"
    )

    push = PushToKafkaOperator(
        task_id="push_to_kafka",
        nodes="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
        table_name="migration_tracking"
    )

    search >> transform >> push

Development

Setup

# Clone repository
git clone https://github.com/your-org/pristy-alfresco-operators.git
cd pristy-alfresco-operators

# Install dependencies (creates venv automatically)
uv sync --group test

Testing

# Run all tests with coverage
uv run pytest

# Run specific test
uv run pytest tests/schema/test_schema.py -v

# Run tests without coverage
uv run pytest --no-cov

# Run tests in verbose mode
uv run pytest -vv

Code Quality

This project follows strict code quality standards:

  • Security: All SQL queries use parameterized statements
  • Type hints: PEP 604 union syntax (str | None)
  • Imports: Lazy imports in execute() methods for Airflow performance
  • Resource management: try/finally blocks for connections
  • Error handling: Granular error states with proper tracking

See CONVENTIONS.md for detailed guidelines.

Release Process

  1. Update version in pyproject.toml
  2. Update CHANGELOG.md (if present)
  3. Create release:
TAG=0.9.1
git add pyproject.toml README.md
git commit -m "version $TAG"
git tag "$TAG"
git push
git push origin "tags/$TAG"
uv build
uv publish

Architecture

  • AlfrescoHook: Extends HttpHook with ticket-based authentication and optional PostgreSQL cache
  • Operators: Extend BaseOperator with task-specific logic, all using AlfrescoHook for Alfresco API calls
  • Utils: Shared utilities (parse_alfresco_pagination, create_base_node)
  • State Tracking: PostgreSQL tables with newrunningsuccess/error states
  • Schema Validation: JSON Schema validation before export

License

Apache License 2.0 - see LICENSE file for details.

Contributing

Contributions are welcome! Please:

  1. Follow the code conventions in CONVENTIONS.md
  2. Write tests for new features
  3. Ensure all tests pass
  4. Submit a pull request

Support

Acknowledgments

Developed by Jeci for integration with Pristy services platform.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pristy_alfresco_operators-0.9.1.tar.gz (144.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pristy_alfresco_operators-0.9.1-py3-none-any.whl (43.1 kB view details)

Uploaded Python 3

File details

Details for the file pristy_alfresco_operators-0.9.1.tar.gz.

File metadata

  • Download URL: pristy_alfresco_operators-0.9.1.tar.gz
  • Upload date:
  • Size: 144.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.4 {"installer":{"name":"uv","version":"0.10.4","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Fedora Linux","version":"43","id":"","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for pristy_alfresco_operators-0.9.1.tar.gz
Algorithm Hash digest
SHA256 ee9e42485683f61398d5d6138a4a274c51f3f8d000f1bb4db77b3a3a151e3b36
MD5 630e78e0890a96882b5ffb1ef2da31f1
BLAKE2b-256 277602951968edcd215ab16d0fa87b75ab020b29de093b8783774891e4af1888

See more details on using hashes here.

File details

Details for the file pristy_alfresco_operators-0.9.1-py3-none-any.whl.

File metadata

  • Download URL: pristy_alfresco_operators-0.9.1-py3-none-any.whl
  • Upload date:
  • Size: 43.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.4 {"installer":{"name":"uv","version":"0.10.4","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Fedora Linux","version":"43","id":"","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for pristy_alfresco_operators-0.9.1-py3-none-any.whl
Algorithm Hash digest
SHA256 c1bd01c260a5fb448126930dbcca4038c885e1b5fe1baf9aa7ad92ecd9c86613
MD5 22d17c8a1a122eb3cf9c9ae40aac2a6d
BLAKE2b-256 1585f419dcbc55ea6e9175c4ebf4e98fb44331e55383d29766b9dcd66c51dfde

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page