Skip to main content

Dagster integration for SharePoint document management using Microsoft Graph API

Project description

dagster-sharepoint

A Dagster integration for interacting with SharePoint document libraries using the Microsoft Graph API. This integration provides a Dagster resource that enables file operations, folder management, and data extraction from SharePoint.

Features

  • Authentication: Secure authentication using Azure AD client credentials
  • File Operations: Upload, download, delete, move, and rename files
  • Folder Management: Create folders, list contents, and navigate folder structures
  • Search: Search for files across SharePoint document libraries
  • Batch Operations: List newly created files, filter by extension, recursive operations

Installation

pip install dagster-sharepoint

Prerequisites

Before using this integration, you need to set up Azure AD authentication:

  1. Register an application in Azure AD
  2. Grant the application appropriate SharePoint permissions (e.g., Sites.ReadWrite.All)
  3. Create a client secret for the application
  4. Note down:
    • Tenant ID
    • Client ID (Application ID)
    • Client Secret
    • SharePoint Site ID

Usage

Basic Setup

import dagster as dg
from dagster_sharepoint import SharePointResource
import os

# Configure the resource
defs = dg.Definitions(
    resources={
        "sharepoint": SharePointResource(
            site_id=os.getenv("SHAREPOINT_SITE_ID"),
            tenant_id=os.getenv("AZURE_TENANT_ID"),
            client_id=os.getenv("AZURE_CLIENT_ID"),
            client_secret=os.getenv("AZURE_CLIENT_SECRET")
        )
    }
)

File Operations

import dagster as dg
from dagster_sharepoint import SharePointResource


@dg.asset
def sharepoint_file_operations(sharepoint: SharePointResource):
    # Upload a file
    with open("local_report.xlsx", "rb") as f:
        result = sharepoint.upload_file(
            file_name="report_2024.xlsx",
            content=f,
            folder_path="Documents/Reports/2024"
        )

    if result.success:
        print(f"Uploaded: {result.file_info.name}")

    # Download a file
    content = sharepoint.download_file_by_path("Documents/Reports/report.xlsx")

    # Move a file
    moved_file = sharepoint.move_file_by_path(
        source_file_path="Documents/Temp/draft.docx",
        destination_folder_path="Documents/Final",
        new_name="final_report.docx"
    )

    # Delete a file
    sharepoint.delete_file_by_path("Documents/Temp/old_file.xlsx")

Folder Operations

import dagster as dg
from dagster_sharepoint import SharePointResource


@dg.asset
def manage_folders(sharepoint: SharePointResource):
    # Create a new folder
    new_folder = sharepoint.create_folder(
        folder_name="2024_Q4",
        parent_path="Documents/Reports"
    )

    # List all folders recursively
    folders = sharepoint.list_folders(
        folder_path="Documents",
        recursive=True
    )

    for folder in folders:
        print(f"Folder: {folder.name} (contains {folder.child_count} items)")

Sensor and Asset Pattern

import dagster as dg
from datetime import datetime, timedelta
from dagster_sharepoint import SharePointResource, FileInfoConfig


@dg.asset
def my_asset(context: dg.AssetExecutionContext, sharepoint: SharePointResource, config: FileInfoConfig):
   """
   Example dg.asset that processes SharePoint files.

   This would be triggered by the sharepoint_new_files.
   """
   context.log.info(f"Processing file from SharePoint {config}")
   contents = sharepoint.download_file(config.id)
   context.log.info(f"Downloaded file {config.parent_path}/{config.name}")
   
   # Process file contents...
   
   return contents


@dg.sensor(
   name="sharepoint_new_files",
   minimum_interval_seconds=600,
   target=[my_asset],
)
def sharepoint_new_files(
        context: dg.SensorEvaluationContext,
        sharepoint: SharePointResource,
) -> dg.SensorResult:
   """
   Sensor that checks for new or created files in SharePoint.
 
   This dg.sensor:
   1. Checks a configured SharePoint folder for files created since the last run
   2. Triggers runs for each new file found
   3. Stores the last check timestamp in cursor storage
   """


   last_check = datetime.fromisoformat(context.cursor) if context.cursor else datetime.now() - timedelta(weeks=999)
   current_check = datetime.now()

   try:
      newly_created_files = sharepoint.list_newly_created_files(
         since_timestamp=last_check,
         file_name_glob_pattern="*/my/file/pattern*.csv",
         recursive=True,
      )
      if not newly_created_files:
         return dg.SkipReason(f"No new files found since {last_check.isoformat()}")

      return dg.SensorResult(
         run_requests=[
            dg.RunRequest(
               asset_selection=[my_asset.key],
               run_key=file.id,
               run_config=dg.RunConfig(
                  ops={my_asset.key.to_python_identifier(): {"config": file.to_config_dict()}}
               ),
            )
            for file in newly_created_files
         ],
         cursor=current_check.isoformat(),
      )

   except Exception as e:
      context.log.error(f"Error checking SharePoint: {str(e)}")
      return dg.Failure(f"Error checking SharePoint: {str(e)}")

Testing

# Run tests
make test

# Run linting and formatting
make ruff

# Run type checking
make check

Development

# Install development dependencies
make install

# Build the package
make build

License

See LICENSE file in the repository.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagster_sharepoint-0.0.2.tar.gz (12.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagster_sharepoint-0.0.2-py3-none-any.whl (10.7 kB view details)

Uploaded Python 3

File details

Details for the file dagster_sharepoint-0.0.2.tar.gz.

File metadata

  • Download URL: dagster_sharepoint-0.0.2.tar.gz
  • Upload date:
  • Size: 12.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.8

File hashes

Hashes for dagster_sharepoint-0.0.2.tar.gz
Algorithm Hash digest
SHA256 54d3608726e5077a30b901fa6b6009ffe1bb5eae9cae88907e9fd243bba78a90
MD5 31fb07bd7f4fcf77ef481932b57c391e
BLAKE2b-256 d66b73239774db2a8b4428cac2108daf5fe5bc44090a310ccb8202827df6ec94

See more details on using hashes here.

File details

Details for the file dagster_sharepoint-0.0.2-py3-none-any.whl.

File metadata

File hashes

Hashes for dagster_sharepoint-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 68c1bb404cd72b691a3f86db235286f59f883ea53b83cc0b1e516b372f20a5a5
MD5 5ceb92abe74f7bd7f39f9745a8a319f9
BLAKE2b-256 24037be14f68f689ca266a93ce65c6576df1f42d8bbf9305d94acdc6bef461e0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page