Dagster integration library for interacting with SFTP Servers
Project description
dagster-sftp
A Dagster integration for interacting with SFTP servers, providing a high-performance resource for file transfer operations with support for parallel transfers, batch operations, and advanced filtering capabilities.
Features
- High-Performance Transfers: Automatic parallelization of large file transfers using asyncSSH
- Batch Operations: Efficient concurrent processing of multiple files
- Advanced Filtering: Pattern matching with glob, date-based filtering, and file type filtering
- Progress Monitoring: Built-in progress callbacks for tracking transfer status
- Flexible Authentication: Support for both password and key-based authentication
- Comprehensive File Operations: Upload, download, delete, rename, and directory management
- Type-Safe: Full type hints and Pydantic configuration
Installation
pip install dagster-sftp
Quick Start
Basic Configuration
from dagster import Definitions, asset
from dagster_sftp import SFTPResource
# Password authentication
sftp_resource = SFTPResource(
host="sftp.example.com",
username="myuser",
password="mypassword",
port=22, # optional, defaults to 22
)
# Key-based authentication
sftp_resource = SFTPResource(
host="sftp.example.com",
username="myuser",
private_key="/path/to/private/key",
passphrase="key_passphrase", # optional
known_hosts="/path/to/known_hosts" # optional
)
defs = Definitions(
resources={
"sftp": sftp_resource
}
)
Basic File Operations
from dagster import asset
from dagster_sftp import SFTPResource
@asset
def download_files(sftp: SFTPResource):
"""Download a file from SFTP server."""
# Download a single file
sftp.get_file(
remote_path="/data/report.csv",
local_path="/tmp/report.csv"
)
# Upload a file
sftp.put_file(
local_path="/tmp/processed_report.csv",
remote_path="/data/processed/report.csv"
)
# Delete a file
sftp.delete_file("/data/old_report.csv")
return {"status": "Files processed successfully"}
Complete Example: Sensor for new files + Asset for processing
from datetime import datetime, timedelta
import dagster as dg
import tempfile
from dagster_sftp import SFTPResource, SFTPFileInfo, SFTPFileInfoConfig
@dg.asset
def my_sftp_asset(context: dg.AssetExecutionContext, config: SFTPFileInfoConfig ,sftp: SFTPResource):
"""Demonstrate SFTP asset usage."""
context.log.info(f"Processing file {config.path}")
with tempfile.NamedTemporaryFile() as tmp_file:
sftp.get_file(config.path, tmp_file.name)
# Process the file...
@dg.sensor(target=my_sftp_asset)
def sftp_file_sensor(context: dg.SensorEvaluationContext, sftp: dg.SFTPResource):
"""Detect new files on SFTP server and trigger processing."""
last_check = datetime.fromisoformat(context.cursor) if context.cursor else None
current_check = datetime.now()
try:
new_files = sftp.list_files(
base_path="/incoming",
pattern="*.csv",
files_only=True,
modified_after=last_check
)
if not new_files:
return dg.SkipReason(f"No new files found since {last_check.isoformat()}")
return dg.SensorResult(
run_requests=[
dg.RunRequest(
asset_selection=[my_sftp_asset.key],
run_key=file.id,
run_config=dg.RunConfig(
ops={my_sftp_asset.key.to_python_identifier(): {"config": file.to_config_dict()}}
),
) for file in new_files
],
cursor=current_check.isoformat(),
)
except Exception as e:
context.log.error(f"Error checking SFTP: {str(e)}")
return dg.Failure(f"Error checking SFTP: {str(e)}")
File Listing with Filtering
import dagster as dg
from datetime import datetime, timedelta
from dagster_sftp import SFTPResource
@dg.asset
def process_recent_files(sftp: SFTPResource):
"""Process files modified in the last week."""
# List CSV files modified in the last 7 days
recent_files = sftp.list_files(
base_path="/data/exports",
pattern="*.csv",
files_only=True,
modified_after=datetime.now() - timedelta(days=7)
)
for file_info in recent_files:
print(f"Processing {file_info.filename}")
print(f" Size: {file_info.size} bytes")
print(f" Modified: {file_info.modified_time}")
# Download and process each file
local_path = f"/tmp/{file_info.filename}"
sftp.get_file(file_info.path, local_path)
# Process the file...
return {"files_processed": len(recent_files)}
Directory Operations
import dagster as dg
from dagster_sftp import SFTPResource
@dg.op
def manage_directories(context: dg.OpExecutionContext, sftp: SFTPResource):
"""Create and manage directory structures."""
# Create directory structure (including parents)
sftp.mkdir("/remote/data/2024/reports", mode=0o755)
# Check if path is directory
if sftp.is_dir("/remote/data"):
# Recursively list all Python files
python_files = sftp.list_files(
base_path="/remote/data",
pattern="**/*.py", # Recursive search
files_only=True
)
# Move/rename directories
sftp.rename(
old_path="/remote/old_reports",
new_path="/remote/archived_reports"
)
# Remove directory tree (use with caution!)
if sftp.file_exists("/remote/temp"):
context.log.warning("Removing /remote/temp...")
sftp.rmtree("/remote/temp")
return {"directories": "managed"}
Performance Tips
The SFTP resource uses asyncSSH internally for optimal performance:
- Parallel Transfers: Large files are automatically split into chunks and transferred in parallel
- Concurrent Operations: Batch operations (like deleting multiple files) are performed concurrently
- Connection Pooling: Efficiently manages SFTP connections
- Configurable Parallelization: Tune
max_requestsandblock_sizefor your network conditions
# For large files on fast networks
sftp.get_file(
remote_path="/data/large_file.zip",
local_path="/tmp/large_file.zip",
max_requests=256, # Increase parallel chunks
block_size=131072 # 128KB blocks
)
# For many small files
files = [f"/data/file_{i}.txt" for i in range(1000)]
sftp.delete_files(files) # Concurrent deletion, much faster than sequential
Configuration Reference
SFTPResource Configuration
| Parameter | Type | Default | Description |
|---|---|---|---|
host |
str | Required | SFTP server hostname or IP address |
port |
int | 22 | SFTP server port |
username |
str | Required | Username for authentication |
password |
str | None | Password for authentication |
private_key |
str | None | Path to private key file |
passphrase |
str | None | Passphrase for encrypted private key |
known_hosts |
str | None | Path to known_hosts file (None disables host key checking) |
keepalive_interval |
int | 15 | Seconds between keepalive packets |
connect_timeout |
int | 60 | Connection timeout in seconds |
default_max_requests |
int | -1 | Max concurrent SFTP requests (-1 for server default) |
default_block_size |
int | -1 | Block size for parallel transfers (-1 for server default) |
API Reference
Main Classes
SFTPResource
The main resource class for SFTP operations. All methods are synchronous but use asyncSSH internally for performance.
Key Methods:
list_files()- List and filter files with glob patternsget_file()- Download file(s) from serverput_file()- Upload file(s) to serverdelete_file()- Delete a single filedelete_files()- Delete multiple files concurrentlyfile_exists()- Check if a file/directory existsmkdir()- Create directory (with parents)rmtree()- Remove directory treeis_dir()- Check if path is a directoryis_file()- Check if path is a regular filerename()- Rename or move files/directoriesget_file_info()- Get detailed file metadata
FileInfo
Data class containing file/directory metadata returned by list_files() and get_file_info().
Attributes:
filename- Name of the filepath- Full path to the filesize- File size in bytesis_dir- True if directoryis_file- True if regular fileis_link- True if symbolic linkmode- Unix permission modemodified_time- Last modification timeaccessed_time- Last access time (optional)owner- Owner UID (optional)group- Group GID (optional)
Development
Testing
make test
Building
make build
Code Quality
# Format and lint
make ruff
# Type checking
make check
License
This integration is part of the Dagster Community Integrations project and follows the same licensing terms as the main repository
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dagster_sftp-0.0.2.tar.gz.
File metadata
- Download URL: dagster_sftp-0.0.2.tar.gz
- Upload date:
- Size: 17.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
748e65ef3b7f03bcdfd46993fa0724088e60626d40f493561fc846bf84bebada
|
|
| MD5 |
c37988325bc2b20f01f03eaa642c42fa
|
|
| BLAKE2b-256 |
ae1313e407ecb61f5d4cba94fa8160b630be561ba831ec4dd173ae9d7f9f814e
|
File details
Details for the file dagster_sftp-0.0.2-py3-none-any.whl.
File metadata
- Download URL: dagster_sftp-0.0.2-py3-none-any.whl
- Upload date:
- Size: 15.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
62c4805fc55b430f262ffc28ca279662830b1b4d9a26d39882be794007dc3a8a
|
|
| MD5 |
b25779953ebad7ea91a5f76909acf158
|
|
| BLAKE2b-256 |
abdf93ea53af158f55eeeccd36abb7a7bf324c4446da7c2abfb0441e683004d4
|