Skip to main content

Dagster integration library for interacting with SFTP Servers

Project description

dagster-sftp

A Dagster integration for interacting with SFTP servers, providing a high-performance resource for file transfer operations with support for parallel transfers, batch operations, and advanced filtering capabilities.

Features

  • High-Performance Transfers: Automatic parallelization of large file transfers using asyncSSH
  • Batch Operations: Efficient concurrent processing of multiple files
  • Advanced Filtering: Pattern matching with glob, date-based filtering, and file type filtering
  • Progress Monitoring: Built-in progress callbacks for tracking transfer status
  • Flexible Authentication: Support for both password and key-based authentication
  • Comprehensive File Operations: Upload, download, delete, rename, and directory management
  • Type-Safe: Full type hints and Pydantic configuration

Installation

pip install dagster-sftp

Quick Start

Basic Configuration

from dagster import Definitions, asset
from dagster_sftp import SFTPResource

# Password authentication
sftp_resource = SFTPResource(
    host="sftp.example.com",
    username="myuser",
    password="mypassword",
    port=22,  # optional, defaults to 22
)

# Key-based authentication
sftp_resource = SFTPResource(
    host="sftp.example.com",
    username="myuser",
    private_key="/path/to/private/key",
    passphrase="key_passphrase",  # optional
    known_hosts="/path/to/known_hosts"  # optional
)

defs = Definitions(
    resources={
        "sftp": sftp_resource
    }
)

Basic File Operations

from dagster import asset
from dagster_sftp import SFTPResource

@asset
def download_files(sftp: SFTPResource):
    """Download a file from SFTP server."""
    # Download a single file
    sftp.get_file(
        remote_path="/data/report.csv",
        local_path="/tmp/report.csv"
    )

    # Upload a file
    sftp.put_file(
        local_path="/tmp/processed_report.csv",
        remote_path="/data/processed/report.csv"
    )

    # Delete a file
    sftp.delete_file("/data/old_report.csv")

    return {"status": "Files processed successfully"}

Complete Example: Sensor for new files + Asset for processing

from datetime import datetime, timedelta
import dagster as dg
import tempfile
from dagster_sftp import SFTPResource, SFTPFileInfo, SFTPFileInfoConfig


@dg.asset
def my_sftp_asset(context: dg.AssetExecutionContext, config: SFTPFileInfoConfig ,sftp: SFTPResource):
    """Demonstrate SFTP asset usage."""
    context.log.info(f"Processing file {config.path}")
    with tempfile.NamedTemporaryFile() as tmp_file:
        sftp.get_file(config.path, tmp_file.name)
        # Process the file...

@dg.sensor(target=my_sftp_asset)
def sftp_file_sensor(context: dg.SensorEvaluationContext, sftp: dg.SFTPResource):
    """Detect new files on SFTP server and trigger processing."""
    last_check = datetime.fromisoformat(context.cursor) if context.cursor else None
    current_check = datetime.now()
    
    try:
        new_files = sftp.list_files(
        base_path="/incoming",
        pattern="*.csv",
        files_only=True,
        modified_after=last_check
        )
    
        if not new_files:
            return dg.SkipReason(f"No new files found since {last_check.isoformat()}")
        
        return dg.SensorResult(
            run_requests=[
                dg.RunRequest(
                    asset_selection=[my_sftp_asset.key],
                    run_key=file.id,
                    run_config=dg.RunConfig(
                        ops={my_sftp_asset.key.to_python_identifier(): {"config": file.to_config_dict()}}
                    ),
                ) for file in new_files
            ],
            cursor=current_check.isoformat(),
        )
    
    except Exception as e:
        context.log.error(f"Error checking SFTP: {str(e)}")
        return dg.Failure(f"Error checking SFTP: {str(e)}")

File Listing with Filtering

import dagster as dg
from datetime import datetime, timedelta
from dagster_sftp import SFTPResource

@dg.asset
def process_recent_files(sftp: SFTPResource):
    """Process files modified in the last week."""
    # List CSV files modified in the last 7 days
    recent_files = sftp.list_files(
        base_path="/data/exports",
        pattern="*.csv",
        files_only=True,
        modified_after=datetime.now() - timedelta(days=7)
    )

    for file_info in recent_files:
        print(f"Processing {file_info.filename}")
        print(f"  Size: {file_info.size} bytes")
        print(f"  Modified: {file_info.modified_time}")

        # Download and process each file
        local_path = f"/tmp/{file_info.filename}"
        sftp.get_file(file_info.path, local_path)
        # Process the file...

    return {"files_processed": len(recent_files)}

Directory Operations

import dagster as dg
from dagster_sftp import SFTPResource

@dg.op
def manage_directories(context: dg.OpExecutionContext, sftp: SFTPResource):
    """Create and manage directory structures."""
    # Create directory structure (including parents)
    sftp.mkdir("/remote/data/2024/reports", mode=0o755)

    # Check if path is directory
    if sftp.is_dir("/remote/data"):
        # Recursively list all Python files
        python_files = sftp.list_files(
            base_path="/remote/data",
            pattern="**/*.py",  # Recursive search
            files_only=True
        )

    # Move/rename directories
    sftp.rename(
        old_path="/remote/old_reports",
        new_path="/remote/archived_reports"
    )

    # Remove directory tree (use with caution!)
    if sftp.file_exists("/remote/temp"):
        context.log.warning("Removing /remote/temp...")
        sftp.rmtree("/remote/temp")

    return {"directories": "managed"}

Performance Tips

The SFTP resource uses asyncSSH internally for optimal performance:

  1. Parallel Transfers: Large files are automatically split into chunks and transferred in parallel
  2. Concurrent Operations: Batch operations (like deleting multiple files) are performed concurrently
  3. Connection Pooling: Efficiently manages SFTP connections
  4. Configurable Parallelization: Tune max_requests and block_size for your network conditions
# For large files on fast networks
sftp.get_file(
    remote_path="/data/large_file.zip",
    local_path="/tmp/large_file.zip",
    max_requests=256,  # Increase parallel chunks
    block_size=131072  # 128KB blocks
)

# For many small files
files = [f"/data/file_{i}.txt" for i in range(1000)]
sftp.delete_files(files)  # Concurrent deletion, much faster than sequential

Configuration Reference

SFTPResource Configuration

Parameter Type Default Description
host str Required SFTP server hostname or IP address
port int 22 SFTP server port
username str Required Username for authentication
password str None Password for authentication
private_key str None Path to private key file
passphrase str None Passphrase for encrypted private key
known_hosts str None Path to known_hosts file (None disables host key checking)
keepalive_interval int 15 Seconds between keepalive packets
connect_timeout int 60 Connection timeout in seconds
default_max_requests int -1 Max concurrent SFTP requests (-1 for server default)
default_block_size int -1 Block size for parallel transfers (-1 for server default)

API Reference

Main Classes

SFTPResource

The main resource class for SFTP operations. All methods are synchronous but use asyncSSH internally for performance.

Key Methods:

  • list_files() - List and filter files with glob patterns
  • get_file() - Download file(s) from server
  • put_file() - Upload file(s) to server
  • delete_file() - Delete a single file
  • delete_files() - Delete multiple files concurrently
  • file_exists() - Check if a file/directory exists
  • mkdir() - Create directory (with parents)
  • rmtree() - Remove directory tree
  • is_dir() - Check if path is a directory
  • is_file() - Check if path is a regular file
  • rename() - Rename or move files/directories
  • get_file_info() - Get detailed file metadata

FileInfo

Data class containing file/directory metadata returned by list_files() and get_file_info().

Attributes:

  • filename - Name of the file
  • path - Full path to the file
  • size - File size in bytes
  • is_dir - True if directory
  • is_file - True if regular file
  • is_link - True if symbolic link
  • mode - Unix permission mode
  • modified_time - Last modification time
  • accessed_time - Last access time (optional)
  • owner - Owner UID (optional)
  • group - Group GID (optional)

Development

Testing

make test

Building

make build

Code Quality

# Format and lint
make ruff

# Type checking
make check

License

This integration is part of the Dagster Community Integrations project and follows the same licensing terms as the main repository

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagster_sftp-0.0.2.tar.gz (17.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagster_sftp-0.0.2-py3-none-any.whl (15.0 kB view details)

Uploaded Python 3

File details

Details for the file dagster_sftp-0.0.2.tar.gz.

File metadata

  • Download URL: dagster_sftp-0.0.2.tar.gz
  • Upload date:
  • Size: 17.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.8

File hashes

Hashes for dagster_sftp-0.0.2.tar.gz
Algorithm Hash digest
SHA256 748e65ef3b7f03bcdfd46993fa0724088e60626d40f493561fc846bf84bebada
MD5 c37988325bc2b20f01f03eaa642c42fa
BLAKE2b-256 ae1313e407ecb61f5d4cba94fa8160b630be561ba831ec4dd173ae9d7f9f814e

See more details on using hashes here.

File details

Details for the file dagster_sftp-0.0.2-py3-none-any.whl.

File metadata

File hashes

Hashes for dagster_sftp-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 62c4805fc55b430f262ffc28ca279662830b1b4d9a26d39882be794007dc3a8a
MD5 b25779953ebad7ea91a5f76909acf158
BLAKE2b-256 abdf93ea53af158f55eeeccd36abb7a7bf324c4446da7c2abfb0441e683004d4

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page