No project description provided
Project description
dagster-ssis
Package to allow observability of SSIS and MSSQL jobs
This simplifies the tracking of SSIS and MSSQL jobs in SQL Server to use as upstream dependencies for other dagster assets.
Requirements
Drivers
You need the ODBC drivers installed on the machine that is running the dagster pipeline.
See Microsoft's documentation for more information.
Permissions
For SSIS the user must be a member of the role ssis_logreader or have permissions to read from the catalog.executions view.
For MSSQL Jobs the user must be able to select from msdb.dbo.sysjobs and msdb.dbo.sysjobhistory.
The Easiest method is to grant SELECT on these two tables as the default roles grant much higher levels of privilage then required.
SSIS
Creates an asset to represent an SSIS Package and the assets (tables, etc.) that are generated by it.
This captures executions outside of dagster, but reports events to be triggered and responded to by downstream assets, such as dbt.
SSISAsset Usage
Two primary methods to use the assets.
The asset key will be Folder + Project Name + Package Name.
# Create a single SSIS Asset
ssis_asset = SSISAsset(
project_name='Project',
package_name'Package.dtsx'
)
# get a list with a single asset spec for the SSIS package
asset_spec = ssis_asset.asset_specs
# Create a single SSIS Asset representing multiple other assets, such as table in MSSQL, or a stored proceedure
# This pattern is good if the asset should be defined differently then the package itself.
table_assets = [
AssetSpec(key='my_table'), AssetSpec(key='my_other_table')
]
ssis_asset = SSISAsset(
project_name='Project',
package_name'Package.dtsx',
asset_list=table_assets
)
# get the list of all the assets, including the package asset
asset_spec = ssis_asset.asset_specs
# using helper function `build_ssis_assets`, produce the same as above
# this assigns the key of the sub assets to the ssis path
# Good for composing the ssis package and all related assets together
ssis_asset = build_ssis_assets(
project_name='Project',
package_name'Package.dtsx',
asset_list=['my_table', 'my_other_table']
)
# get the list of all the assets, including the package asset
asset_spec = ssis_asset.asset_specs
To pass anything along to the asset spec created, pass a dictionary of arguments to asset_spec_kwargs. These will be sent along to the AssetSpec for the asset, or sub assets.
ssis_asset = build_ssis_assets(
project_name='Project',
package_name'Package.dtsx',
asset_list=['my_table', 'my_other_table'],
asset_spec_kwargs={
'owner': 'someone',
'skippable': True,
'metadata': {
'my_meta': 'data'
}
}
)
Sensor
To report the materialization events, a sensor can be used to check the ssisdb for completed or successful events from the catalog.executions view.
You will need a resource defined that connects to the database and exposes a connect function, which returns a SQLAlchemy connection object.
this can be created with build_ssis_asset_sensor
Continuing from the example above
ssis_sensor = build_ssis_asset_sensor(
ssis_assets=[ssis_asset],
sensor_name='ssis_sensor',
database_resource_key='my_db_resource'
)
Jobs
To represent the execution of a MSSQL Job, it is defined through a MSSQLJobSpec.
The user must be able to select from msdb.dbo.sysjobs and msdb.dbo.sysjobhistory
Easiest method is to grant SELECT on these two tables.
MSSQL Job Asset
Two primary methods to use the assets.
The asset key will either be MSSQLJob + Job Name or if using the helper function, MSSQLJob + Job Name + Job Name to allow other assets to sit along side it.
# single asset
job_asset = MSSQLJobAsset(
job_name='job_name',
)
# or adding specific specs to capture alongside
job_asset = MSSQLJobAsset(
job_name='job_name', asset_list=[AssetSpec('a'), AssetSpec('b')]
)
# single asset from helper with child assets
job_asset = build_mssql_job_assets(
"job_name", asset_list=["other_asset"]
)
As above, you can also pass the keyword args to the AssetSpec using asset_spec_kwargs
MSSQL Job Sensor
To report the materialization events, a sensor can be used to check the msdb for completed or successful events from the dbo.sysjobhistories table.
You will need a resource defined that connects to the database and exposes a connect function, which returns a SQLAlchemy connection object.
The sensor can be created with build_mssql_job_asset_sensor
Complete Example
from dagster_ssis import (
SQLServerResource,
build_ssis_assets, build_ssis_asset_sensor,
MSSQLJobAsset, build_mssql_job_asset_sensor
)
my_db_resource = SQLServerResource(
host='localhost',
database='MyDB',
username='...',
password='...',
query_props={
"driver": "ODBC Driver 18 for SQL Server",
"TrustServerCertificate": "yes",
}
)
ssis_asset_a = build_ssis_assets(
project_name='Project',
package_name'Package.dtsx',
asset_list=['my_table', 'my_other_table'],
asset_spec_kwargs={
'owner': 'someone',
'skippable': True,
'metadata': {
'my_meta': 'data'
}
}
)
ssis_asset_b = build_ssis_assets(
project_name='Project2',
package_name'Package.dtsx',
asset_list=['something_else'],
asset_spec_kwargs={
'owner': 'someone',
'skippable': True,
'metadata': {
'my_meta': 'data'
}
}
)
# a single job sensor
job_assets = MSSQLJobAsset(
'job_name'
)
# tie ssis assets to a job exeuction check
# you probably want to exclude checking using ssis and use the job instead if adding ssis assets
ssis_job_assets = MSSQLJobAsset(
'job_name_ssis',
asset_list=ssis_asset_b.asset_specs
)
# only check for a, as b is now tied to a job sensor
ssis_sensor = build_ssis_asset_sensor(
ssis_assets=[ssis_asset_a],
sensor_name='ssis_sensor',
database_resource_key='my_db_resource'
)
mssql_job_sensor = build_mssql_job_asset_sensor(
[job_assets, ssis_job_assets],
sensor_name='mssql_job_sensor',
database_resource_key='my_db_resource'
)
Definition(
# add all the assets. only add either ssis_asset_b or the job specs otherwise it will find duplicates
assets=ssis_asset_a.asset_specs + ssis_job_assets.asset_specs + job_assets.asset_specs,
sensors=[ssis_sensor, mssql_job_sensor],
resources={
'my_db_resource': my_db_resource
}
)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dagster_ssis-0.0.1.tar.gz.
File metadata
- Download URL: dagster_ssis-0.0.1.tar.gz
- Upload date:
- Size: 17.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
24cc795417e647db19baeb6d47bf7eed2a7f419fa3269a9847fc7ab283dc228f
|
|
| MD5 |
7de18df73b23165cf90ecb7171a3471a
|
|
| BLAKE2b-256 |
6db15e25a5e040c567663c0e4655395d6732df9d548b5128598d79c06c7bf641
|
Provenance
The following attestation bundles were made for dagster_ssis-0.0.1.tar.gz:
Publisher:
python-publish.yml on cody-scott/dagster-ssis
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dagster_ssis-0.0.1.tar.gz -
Subject digest:
24cc795417e647db19baeb6d47bf7eed2a7f419fa3269a9847fc7ab283dc228f - Sigstore transparency entry: 175446028
- Sigstore integration time:
-
Permalink:
cody-scott/dagster-ssis@5f9e888976f184cff03cde2628e2d4267a48d721 -
Branch / Tag:
refs/tags/v0.0.1 - Owner: https://github.com/cody-scott
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@5f9e888976f184cff03cde2628e2d4267a48d721 -
Trigger Event:
release
-
Statement type:
File details
Details for the file dagster_ssis-0.0.1-py3-none-any.whl.
File metadata
- Download URL: dagster_ssis-0.0.1-py3-none-any.whl
- Upload date:
- Size: 19.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bdc21df9a913d4451ebb52d3f20b3264fe02f4b2d595bda5a9cd684413ed7050
|
|
| MD5 |
7d401502ea984ea123d3a3229a53fe82
|
|
| BLAKE2b-256 |
3cb68e7bff2ac5a426f612026a7025f58994d099070e44e22042d91cfa863ad0
|
Provenance
The following attestation bundles were made for dagster_ssis-0.0.1-py3-none-any.whl:
Publisher:
python-publish.yml on cody-scott/dagster-ssis
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dagster_ssis-0.0.1-py3-none-any.whl -
Subject digest:
bdc21df9a913d4451ebb52d3f20b3264fe02f4b2d595bda5a9cd684413ed7050 - Sigstore transparency entry: 175446029
- Sigstore integration time:
-
Permalink:
cody-scott/dagster-ssis@5f9e888976f184cff03cde2628e2d4267a48d721 -
Branch / Tag:
refs/tags/v0.0.1 - Owner: https://github.com/cody-scott
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@5f9e888976f184cff03cde2628e2d4267a48d721 -
Trigger Event:
release
-
Statement type: