Skip to main content

Alfresco Operators for Pristy

Project description

Pristy Alfresco Operators for Apache Airflow

Custom Apache Airflow operators for interacting with Alfresco ECM REST API and transforming content to Pristy pivot format.

License Python Version PyPI version

Features

  • Search & Fetch: Query Alfresco nodes via Search API with pagination support
  • Transform: Convert Alfresco nodes to standardized Pristy pivot format
  • Export: Push transformed nodes to Kafka or filesystem
  • State Tracking: PostgreSQL-based migration state management
  • Schema Validation: JSON Schema validation before export

Installation

pip install pristy-alfresco-operators

Or with Poetry:

poetry add pristy-alfresco-operators

Requirements

  • Python 3.12
  • Apache Airflow 2.9+
  • PostgreSQL (for state tracking)
  • Apache Kafka (optional, for Kafka export)

Operators

Search & Fetch Operators

AlfrescoSearchOperator

Search Alfresco nodes using FTS (Full Text Search) with pagination.

from pristy.alfresco_operator.search_node_operator import AlfrescoSearchOperator

search_task = AlfrescoSearchOperator(
    task_id="search_documents",
    query="TYPE:'cm:content' AND ANCESTOR:'workspace://SpacesStore/site-id'",
    page_size=100,
    max_items=1000,
    sort_field="cm:modified",
    sort_ascending=False,
    http_conn_id="alfresco_api"
)

AlfrescoFetchChildrenOperator

Fetch all children of a folder node.

from pristy.alfresco_operator.fetch_children_node_operator import AlfrescoFetchChildrenOperator

fetch_children = AlfrescoFetchChildrenOperator(
    task_id="fetch_children",
    folders="workspace://SpacesStore/folder-uuid",
    page_size=50,
    max_items=2000
)

AlfrescoFetchNodeOperator

Fetch a single node by UUID.

from pristy.alfresco_operator.fetch_node_operator import AlfrescoFetchNodeOperator

fetch_node = AlfrescoFetchNodeOperator(
    task_id="fetch_node",
    node_id="workspace://SpacesStore/node-uuid"
)

Transform Operators

TransformFileOperator

Transform Alfresco file nodes to Pristy pivot format.

from pristy.alfresco_operator.transform_file import TransformFileOperator

transform_files = TransformFileOperator(
    task_id="transform_files",
    child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}",
    mapping_func=custom_metadata_mapper  # Optional
)

TransformFolderOperator

Transform Alfresco folder nodes to Pristy pivot format.

from pristy.alfresco_operator.transform_folder import TransformFolderOperator

transform_folders = TransformFolderOperator(
    task_id="transform_folders",
    child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}"
)

Export Operators

PushToKafkaOperator

Push nodes to Kafka with JSON Schema validation.

from pristy.alfresco_operator.push_node_to_kafka import PushToKafkaOperator

push_to_kafka = PushToKafkaOperator(
    task_id="push_to_kafka",
    nodes="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
    table_name="migration_tracking",
    source_key="uuid"
)

PushToDirectoryOperator

Export nodes as JSON files to filesystem.

from pristy.alfresco_operator.push_node_to_directory import PushToDirectoryOperator

push_to_dir = PushToDirectoryOperator(
    task_id="export_to_dir",
    node="{{ task_instance.xcom_pull(task_ids='transform_files') }}"
)

Database Operators

CreateChildrenTableOperator

Create PostgreSQL tracking table.

from pristy.alfresco_operator.create_children_table import CreateChildrenTableOperator

create_table = CreateChildrenTableOperator(
    task_id="create_table",
    table_name="export_alfresco_folder_children"
)

SaveFolderToDbOperator

Save folder children to tracking table.

from pristy.alfresco_operator.save_folder_to_db import SaveFolderToDbOperator

save_to_db = SaveFolderToDbOperator(
    task_id="save_folders",
    child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}",
    table_name="export_alfresco_folder_children"
)

Configuration

Airflow Connections

Define these connections in Airflow:

# Alfresco API connection
airflow connections add alfresco_api \
    --conn-type http \
    --conn-host alfresco.example.com \
    --conn-login admin \
    --conn-password admin \
    --conn-port 443 \
    --conn-schema https

# PostgreSQL tracking database
airflow connections add local_pg \
    --conn-type postgres \
    --conn-host localhost \
    --conn-login airflow \
    --conn-password airflow \
    --conn-schema airflow \
    --conn-port 5432

# Kafka (optional)
airflow connections add kafka_pristy \
    --conn-type kafka \
    --conn-extra '{"bootstrap.servers": "localhost:9092"}'

Airflow Variables

# Source Alfresco server URL
airflow variables set alfresco_source_server "https://alfresco.example.com"

# Kafka export topic (optional)
airflow variables set kafka_export_topic "pristy-node-injector"

# Target site for migration (optional)
airflow variables set alfresco_export_target_site "my-target-site"

# Target root UUID for migration (optional)
airflow variables set alfresco_target_root_uuid "workspace://SpacesStore/target-folder-uuid"

Pristy Pivot Format

The operators transform Alfresco nodes to a standardized format as defined in the Pristy Injector documentation.

Example node structure:

{
  "name": "document.pdf",
  "type": "cm:content",
  "dateCreated": "2024-01-15T10:30:00Z",
  "owner": "admin",
  "path": {
    "root": "site:my-site",
    "short": "/Documents/Folder"
  },
  "properties": {
    "cm:created": "2024-01-15T10:30:00Z",
    "cm:creator": "admin",
    "cm:modified": "2024-01-20T14:45:00Z",
    "cm:modifier": "editor"
  },
  "source": {
    "type": "alfresco",
    "server": "https://alfresco.example.com",
    "uuid": "workspace://SpacesStore/node-uuid",
    "mimetype": "application/pdf",
    "size": 102400
  }
}

For complete format specification and available fields, see the Pristy Injector documentation.

Example DAG

from airflow import DAG
from airflow.utils.dates import days_ago
from pristy.alfresco_operator.search_node_operator import AlfrescoSearchOperator
from pristy.alfresco_operator.transform_file import TransformFileOperator
from pristy.alfresco_operator.push_node_to_kafka import PushToKafkaOperator

with DAG(
    dag_id="alfresco_to_kafka",
    start_date=days_ago(1),
    schedule_interval=None,
    catchup=False
) as dag:

    search = AlfrescoSearchOperator(
        task_id="search_documents",
        query="TYPE:'cm:content'",
        page_size=100
    )

    transform = TransformFileOperator(
        task_id="transform_files",
        child="{{ task_instance.xcom_pull(task_ids='search_documents') }}"
    )

    push = PushToKafkaOperator(
        task_id="push_to_kafka",
        nodes="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
        table_name="migration_tracking"
    )

    search >> transform >> push

Development

Setup

# Clone repository
git clone https://github.com/your-org/pristy-alfresco-operators.git
cd pristy-alfresco-operators

# Create virtual environment
python3.12 -m venv .venv
source .venv/bin/activate

# Install dependencies
pip install poetry
poetry install

Testing

# Run tests
pytest tests/

# Run specific test
pytest tests/schema/test_schema.py -v

Code Quality

This project follows strict code quality standards:

  • Security: All SQL queries use parameterized statements
  • Type hints: PEP 604 union syntax (str | None)
  • Imports: Lazy imports in execute() methods for Airflow performance
  • Resource management: try/finally blocks for connections
  • Error handling: Granular error states with proper tracking

See CONVENTIONS.md for detailed guidelines.

Release Process

  1. Update version in pyproject.toml
  2. Update CHANGELOG.md (if present)
  3. Create release:
TAG=0.4.2
git add pyproject.toml README.md
git commit -m "version $TAG"
git tag "$TAG"
git push
git push origin "tags/$TAG"
poetry build
poetry publish

Architecture

  • Operators: Extend BaseOperator with task-specific logic
  • Utils: Shared utilities (parse_alfresco_pagination, create_base_node)
  • State Tracking: PostgreSQL tables with newrunningsuccess/error states
  • Schema Validation: JSON Schema validation before export

License

Apache License 2.0 - see LICENSE file for details.

Contributing

Contributions are welcome! Please:

  1. Follow the code conventions in CONVENTIONS.md
  2. Write tests for new features
  3. Ensure all tests pass
  4. Submit a pull request

Support

Acknowledgments

Developed by Jeci for integration with Pristy services platform.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pristy_alfresco_operators-0.5.0.tar.gz (19.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pristy_alfresco_operators-0.5.0-py3-none-any.whl (25.6 kB view details)

Uploaded Python 3

File details

Details for the file pristy_alfresco_operators-0.5.0.tar.gz.

File metadata

  • Download URL: pristy_alfresco_operators-0.5.0.tar.gz
  • Upload date:
  • Size: 19.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.1 CPython/3.13.7 Linux/6.16.9-200.fc42.x86_64

File hashes

Hashes for pristy_alfresco_operators-0.5.0.tar.gz
Algorithm Hash digest
SHA256 c55c0b8ad84dc51fb680ebe2535ff2e7e0a3b0e8916c11e07f8e40a229a7a2e6
MD5 f472f0c4e9c04b49cef38ded0925f78f
BLAKE2b-256 7c87efdd8263ca0f8da02cd23081c41ed8706aaabe1522fe5d82050ba3aa8427

See more details on using hashes here.

File details

Details for the file pristy_alfresco_operators-0.5.0-py3-none-any.whl.

File metadata

File hashes

Hashes for pristy_alfresco_operators-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a2fcfabc0e4caec7587c66ab342714d0526f3a2db914153f91a1fa19959cc6e1
MD5 4ee3712fb98ffca17c53d1a1449d20ca
BLAKE2b-256 734adc0017fc454733c058713da8d0bcb7f441d2e4cb0aa3ff88cfc7c39ab88a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page