Skip to main content

Alfresco Operators for Pristy

Project description

Pristy Alfresco Operators for Apache Airflow

Custom Apache Airflow operators for interacting with Alfresco ECM REST API and transforming content to Pristy pivot format.

License Python Version Apache Airflow PyPI version

Features

  • Search & Fetch: Query Alfresco nodes via Search API with pagination support
  • Transform: Convert Alfresco nodes to standardized Pristy pivot format
  • Export: Push transformed nodes to Kafka or filesystem
  • State Tracking: PostgreSQL-based migration state management
  • Schema Validation: JSON Schema validation before export

Installation

pip install pristy-alfresco-operators

Or with uv (recommended):

uv add pristy-alfresco-operators

Requirements

  • Python 3.12
  • Apache Airflow 3.1+
  • PostgreSQL (for state tracking)
  • Apache Kafka (optional, for Kafka export)

Version Compatibility

  • Version 0.6.0+: Requires Apache Airflow 3.1+ (breaking change), uses uv for dependency management
  • Version 0.5.x and below: Compatible with Apache Airflow 2.9+

Migrating from Airflow 2.x

If you're upgrading from Airflow 2.x to 3.x, version 0.6.0 includes the following changes:

  • Replaced deprecated airflow.utils.helpers.merge_dicts with Python's native | operator
  • Updated dependency constraint to apache-airflow>=3.1.1,<4.0.0

All operators remain API-compatible. No changes required in your DAG code.

Operators

Search & Fetch Operators

AlfrescoSearchOperator

Search Alfresco nodes using FTS (Full Text Search) with pagination.

from pristy.alfresco_operator.search_node_operator import AlfrescoSearchOperator

search_task = AlfrescoSearchOperator(
    task_id="search_documents",
    query="TYPE:'cm:content' AND ANCESTOR:'workspace://SpacesStore/site-id'",
    page_size=100,
    max_items=1000,
    sort_field="cm:modified",
    sort_ascending=False,
    http_conn_id="alfresco_api"
)

AlfrescoFetchChildrenOperator

Fetch all children of a folder node.

from pristy.alfresco_operator.fetch_children_node_operator import AlfrescoFetchChildrenOperator

fetch_children = AlfrescoFetchChildrenOperator(
    task_id="fetch_children",
    folders="workspace://SpacesStore/folder-uuid",
    page_size=50,
    max_items=2000
)

AlfrescoFetchNodeOperator

Fetch a single node by UUID.

from pristy.alfresco_operator.fetch_node_operator import AlfrescoFetchNodeOperator

fetch_node = AlfrescoFetchNodeOperator(
    task_id="fetch_node",
    node_id="workspace://SpacesStore/node-uuid"
)

Transform Operators

TransformFileOperator

Transform Alfresco file nodes to Pristy pivot format.

from pristy.alfresco_operator.transform_file import TransformFileOperator

transform_files = TransformFileOperator(
    task_id="transform_files",
    child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}",
    mapping_func=custom_metadata_mapper  # Optional
)

TransformFolderOperator

Transform Alfresco folder nodes to Pristy pivot format.

from pristy.alfresco_operator.transform_folder import TransformFolderOperator

transform_folders = TransformFolderOperator(
    task_id="transform_folders",
    child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}"
)

Export Operators

PushToKafkaOperator

Push nodes to Kafka with JSON Schema validation.

from pristy.alfresco_operator.push_node_to_kafka import PushToKafkaOperator

push_to_kafka = PushToKafkaOperator(
    task_id="push_to_kafka",
    nodes="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
    table_name="migration_tracking",
    source_key="uuid"
)

PushToDirectoryOperator

Export nodes as JSON files to filesystem.

from pristy.alfresco_operator.push_node_to_directory import PushToDirectoryOperator

push_to_dir = PushToDirectoryOperator(
    task_id="export_to_dir",
    node="{{ task_instance.xcom_pull(task_ids='transform_files') }}"
)

Database Operators

CreateChildrenTableOperator

Create PostgreSQL tracking table.

from pristy.alfresco_operator.create_children_table import CreateChildrenTableOperator

create_table = CreateChildrenTableOperator(
    task_id="create_table",
    table_name="export_alfresco_folder_children"
)

SaveFolderToDbOperator

Save folder children to tracking table.

from pristy.alfresco_operator.save_folder_to_db import SaveFolderToDbOperator

save_to_db = SaveFolderToDbOperator(
    task_id="save_folders",
    child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}",
    table_name="export_alfresco_folder_children"
)

Configuration

Airflow Connections

Define these connections in Airflow:

# Alfresco API connection
airflow connections add alfresco_api \
    --conn-type http \
    --conn-host alfresco.example.com \
    --conn-login admin \
    --conn-password admin \
    --conn-port 443 \
    --conn-schema https

# PostgreSQL tracking database
airflow connections add local_pg \
    --conn-type postgres \
    --conn-host localhost \
    --conn-login airflow \
    --conn-password airflow \
    --conn-schema airflow \
    --conn-port 5432

# Kafka (optional)
airflow connections add kafka_pristy \
    --conn-type kafka \
    --conn-extra '{"bootstrap.servers": "localhost:9092"}'

Airflow Variables

# Source Alfresco server URL
airflow variables set alfresco_source_server "https://alfresco.example.com"

# Kafka export topic (optional)
airflow variables set kafka_export_topic "pristy-node-injector"

# Target site for migration (optional)
airflow variables set alfresco_export_target_site "my-target-site"

# Target root UUID for migration (optional)
airflow variables set alfresco_target_root_uuid "workspace://SpacesStore/target-folder-uuid"

Configurable Connection and Variable Names (v0.6.0+)

Starting with version 0.6.0, all operators support custom connection IDs and variable names, allowing you to use the same operators in different contexts within a single Airflow instance.

PostgreSQL Operators

All PostgreSQL-based operators now accept postgres_conn_id:

# Use a custom PostgreSQL connection
create_table = CreateChildrenTableOperator(
    task_id="create_table",
    table_name="my_tracking_table",
    postgres_conn_id="postgres_target"  # Default: "local_pg"
)

save_to_db = SaveFolderToDbOperator(
    task_id="save_folders",
    child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}",
    table_name="my_tracking_table",
    postgres_conn_id="postgres_target"  # Default: "local_pg"
)

Alfresco HTTP Operators

All Alfresco operators now accept http_conn_id:

# Use different Alfresco connections for source and target
search_source = AlfrescoSearchOperator(
    task_id="search_source",
    query="TYPE:'cm:content'",
    http_conn_id="alfresco_source"  # Default: "alfresco_api"
)

fetch_from_target = AlfrescoFetchNodeOperator(
    task_id="fetch_target",
    node_id="workspace://SpacesStore/node-uuid",
    http_conn_id="alfresco_target"  # Default: "alfresco_api"
)

Kafka Operator

Kafka operator accepts custom connection and topic variable:

push_to_kafka = PushToKafkaOperator(
    task_id="push_to_kafka",
    nodes="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
    table_name="migration_tracking",
    kafka_conn_id="kafka_prod",           # Default: "kafka_pristy"
    topic_var="kafka_export_topic_prod"   # Default: "kafka_export_topic"
)

Transform Operators

Transform operators accept custom variable names:

transform = TransformFileOperator(
    task_id="transform_files",
    child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}",
    source_server_var="alfresco_source_server_prod"  # Default: "alfresco_source_server"
)

Path Update Function

The update_node_path function (used internally by transform operators) accepts custom variable names:

from pristy.alfresco_operator.update_node_path import update_node_path

# Customize variable names for path transformation
update_node_path(
    src_node,
    new_node,
    target_site_var="alfresco_export_site_custom",      # Default: "alfresco_export_target_site"
    target_root_uuid_var="alfresco_root_uuid_custom",   # Default: "alfresco_target_root_uuid"
    short_path_remove_var="path_prefix_remove_custom"   # Default: "short_path_remove"
)

Directory Export

Directory export operator accepts custom output path:

push_to_dir = PushToDirectoryOperator(
    task_id="export_to_dir",
    node="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
    output_base_dir="/data/exports"  # Default: "/usr/local/airflow/output"
)

Multi-Context Example

Run multiple parallel migration workflows with different configurations:

# Migration from Production Alfresco to Dev Alfresco
search_prod = AlfrescoSearchOperator(
    task_id="search_prod",
    query="TYPE:'cm:content'",
    http_conn_id="alfresco_prod"
)

# Migration from Staging Alfresco to Test Kafka
search_staging = AlfrescoSearchOperator(
    task_id="search_staging",
    query="TYPE:'cm:content'",
    http_conn_id="alfresco_staging"
)

push_prod = PushToKafkaOperator(
    task_id="push_prod",
    nodes="{{ task_instance.xcom_pull(task_ids='transform_prod') }}",
    table_name="migration_prod",
    kafka_conn_id="kafka_dev",
    topic_var="kafka_topic_dev",
    postgres_conn_id="postgres_dev"
)

push_staging = PushToKafkaOperator(
    task_id="push_staging",
    nodes="{{ task_instance.xcom_pull(task_ids='transform_staging') }}",
    table_name="migration_staging",
    kafka_conn_id="kafka_test",
    topic_var="kafka_topic_test",
    postgres_conn_id="postgres_test"
)

Pristy Pivot Format

The operators transform Alfresco nodes to a standardized format as defined in the Pristy Injector documentation.

Example node structure:

{
  "name": "document.pdf",
  "type": "cm:content",
  "dateCreated": "2024-01-15T10:30:00Z",
  "owner": "admin",
  "path": {
    "root": "site:my-site",
    "short": "/Documents/Folder"
  },
  "properties": {
    "cm:created": "2024-01-15T10:30:00Z",
    "cm:creator": "admin",
    "cm:modified": "2024-01-20T14:45:00Z",
    "cm:modifier": "editor"
  },
  "source": {
    "type": "alfresco",
    "server": "https://alfresco.example.com",
    "uuid": "workspace://SpacesStore/node-uuid",
    "mimetype": "application/pdf",
    "size": 102400
  }
}

For complete format specification and available fields, see the Pristy Injector documentation.

Example DAG

from datetime import datetime
from airflow import DAG
from pristy.alfresco_operator.search_node_operator import AlfrescoSearchOperator
from pristy.alfresco_operator.transform_file import TransformFileOperator
from pristy.alfresco_operator.push_node_to_kafka import PushToKafkaOperator

with DAG(
    dag_id="alfresco_to_kafka",
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False
) as dag:

    search = AlfrescoSearchOperator(
        task_id="search_documents",
        query="TYPE:'cm:content'",
        page_size=100
    )

    transform = TransformFileOperator(
        task_id="transform_files",
        child="{{ task_instance.xcom_pull(task_ids='search_documents') }}"
    )

    push = PushToKafkaOperator(
        task_id="push_to_kafka",
        nodes="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
        table_name="migration_tracking"
    )

    search >> transform >> push

Development

Setup

# Clone repository
git clone https://github.com/your-org/pristy-alfresco-operators.git
cd pristy-alfresco-operators

# Install dependencies (creates venv automatically)
uv sync --group test

Testing

# Run all tests with coverage
uv run pytest

# Run specific test
uv run pytest tests/schema/test_schema.py -v

# Run tests without coverage
uv run pytest --no-cov

# Run tests in verbose mode
uv run pytest -vv

Code Quality

This project follows strict code quality standards:

  • Security: All SQL queries use parameterized statements
  • Type hints: PEP 604 union syntax (str | None)
  • Imports: Lazy imports in execute() methods for Airflow performance
  • Resource management: try/finally blocks for connections
  • Error handling: Granular error states with proper tracking

See CONVENTIONS.md for detailed guidelines.

Release Process

  1. Update version in pyproject.toml
  2. Update CHANGELOG.md (if present)
  3. Create release:
TAG=0.6.0
git add pyproject.toml README.md
git commit -m "version $TAG"
git tag "$TAG"
git push
git push origin "tags/$TAG"
uv build
uv publish

Note: Version 0.6.0 introduces configurable connection IDs and migrates from Poetry to uv.

Architecture

  • Operators: Extend BaseOperator with task-specific logic
  • Utils: Shared utilities (parse_alfresco_pagination, create_base_node)
  • State Tracking: PostgreSQL tables with newrunningsuccess/error states
  • Schema Validation: JSON Schema validation before export

License

Apache License 2.0 - see LICENSE file for details.

Contributing

Contributions are welcome! Please:

  1. Follow the code conventions in CONVENTIONS.md
  2. Write tests for new features
  3. Ensure all tests pass
  4. Submit a pull request

Support

Acknowledgments

Developed by Jeci for integration with Pristy services platform.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pristy_alfresco_operators-0.6.0.tar.gz (115.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pristy_alfresco_operators-0.6.0-py3-none-any.whl (23.5 kB view details)

Uploaded Python 3

File details

Details for the file pristy_alfresco_operators-0.6.0.tar.gz.

File metadata

File hashes

Hashes for pristy_alfresco_operators-0.6.0.tar.gz
Algorithm Hash digest
SHA256 12fd89f3d73be513b30188c2d6522f5e5febad843c1a560815cfe3e0b1ddf4c1
MD5 b6da84c9703e38b55264d919a338ff00
BLAKE2b-256 e98853c7a2b730326a06ed92d461eadd85e8bfd51d8963edc9c1a7897c03c761

See more details on using hashes here.

File details

Details for the file pristy_alfresco_operators-0.6.0-py3-none-any.whl.

File metadata

File hashes

Hashes for pristy_alfresco_operators-0.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 cc2396da8f951b5c50b7eb995233edcd2dab6972d1795f0a285488f8260e05c1
MD5 be12356d65b91a83c19bd4805cc8d99c
BLAKE2b-256 73abae980fa4b31487f54ef1a7e2a28622e7f9d1d7c20bf1ac9a9468a38a350f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page