Skip to main content

No project description provided

Project description

dagster-ssis

Package to allow observability of SSIS and MSSQL jobs

This simplifies the tracking of SSIS and MSSQL jobs in SQL Server to use as upstream dependencies for other dagster assets.

Requirements

Drivers

You need the ODBC drivers installed on the machine that is running the dagster pipeline.

See Microsoft's documentation for more information.

Permissions

For SSIS the user must be a member of the role ssis_logreader or have permissions to read from the catalog.executions view.

For MSSQL Jobs the user must be able to select from msdb.dbo.sysjobs and msdb.dbo.sysjobhistory. The Easiest method is to grant SELECT on these two tables as the default roles grant much higher levels of privilage then required.

SSIS

Creates an asset to represent an SSIS Package and the assets (tables, etc.) that are generated by it.

This captures executions outside of dagster, but reports events to be triggered and responded to by downstream assets, such as dbt.

SSISAsset Usage

Two primary methods to use the assets. The asset key will be Folder + Project Name + Package Name.

# Create a single SSIS Asset
ssis_asset = SSISAsset(
    project_name='Project',
    package_name'Package.dtsx'
)

# get a list with a single asset spec for the SSIS package
asset_spec = ssis_asset.asset_specs
# Create a single SSIS Asset representing multiple other assets, such as table in MSSQL, or a stored proceedure
# This pattern is good if the asset should be defined differently then the package itself.

table_assets = [
    AssetSpec(key='my_table'), AssetSpec(key='my_other_table')
]

ssis_asset = SSISAsset(
    project_name='Project',
    package_name'Package.dtsx',
    asset_list=table_assets
)
# get the list of all the assets, including the package asset
asset_spec = ssis_asset.asset_specs
# using helper function `build_ssis_assets`, produce the same as above
# this assigns the key of the sub assets to the ssis path

# Good for composing the ssis package and all related assets together
ssis_asset = build_ssis_assets(
    project_name='Project',
    package_name'Package.dtsx',
    asset_list=['my_table', 'my_other_table']
)

# get the list of all the assets, including the package asset
asset_spec = ssis_asset.asset_specs

To pass anything along to the asset spec created, pass a dictionary of arguments to asset_spec_kwargs. These will be sent along to the AssetSpec for the asset, or sub assets.

ssis_asset = build_ssis_assets(
    project_name='Project',
    package_name'Package.dtsx',
    asset_list=['my_table', 'my_other_table'],
    asset_spec_kwargs={
        'owner': 'someone',
        'skippable': True,
        'metadata': {
            'my_meta': 'data'
        }
    }
)

Sensor

To report the materialization events, a sensor can be used to check the ssisdb for completed or successful events from the catalog.executions view.

You will need a resource defined that connects to the database and exposes a connect function, which returns a SQLAlchemy connection object.

this can be created with build_ssis_asset_sensor

Continuing from the example above

ssis_sensor = build_ssis_asset_sensor(
    ssis_assets=[ssis_asset],
    sensor_name='ssis_sensor',
    database_resource_key='my_db_resource'
)

Jobs

To represent the execution of a MSSQL Job, it is defined through a MSSQLJobSpec.

The user must be able to select from msdb.dbo.sysjobs and msdb.dbo.sysjobhistory Easiest method is to grant SELECT on these two tables.

MSSQL Job Asset

Two primary methods to use the assets. The asset key will either be MSSQLJob + Job Name or if using the helper function, MSSQLJob + Job Name + Job Name to allow other assets to sit along side it.

# single asset
job_asset = MSSQLJobAsset(
    job_name='job_name',
)

# or adding specific specs to capture alongside
job_asset = MSSQLJobAsset(
    job_name='job_name', asset_list=[AssetSpec('a'), AssetSpec('b')]
)
# single asset from helper with child assets
job_asset = build_mssql_job_assets(
    "job_name", asset_list=["other_asset"]
)

As above, you can also pass the keyword args to the AssetSpec using asset_spec_kwargs

MSSQL Job Sensor

To report the materialization events, a sensor can be used to check the msdb for completed or successful events from the dbo.sysjobhistories table.

You will need a resource defined that connects to the database and exposes a connect function, which returns a SQLAlchemy connection object.

The sensor can be created with build_mssql_job_asset_sensor

Complete Example

from dagster_ssis import (
    SQLServerResource,
    build_ssis_assets, build_ssis_asset_sensor,
    MSSQLJobAsset, build_mssql_job_asset_sensor
)

my_db_resource = SQLServerResource(
    host='localhost',
    database='MyDB',
    username='...',
    password='...',
    query_props={
        "driver": "ODBC Driver 18 for SQL Server",
        "TrustServerCertificate": "yes",
    }
)

ssis_asset_a = build_ssis_assets(
    project_name='Project',
    package_name'Package.dtsx',
    asset_list=['my_table', 'my_other_table'],
    asset_spec_kwargs={
        'owner': 'someone',
        'skippable': True,
        'metadata': {
            'my_meta': 'data'
        }
    }
)

ssis_asset_b = build_ssis_assets(
    project_name='Project2',
    package_name'Package.dtsx',
    asset_list=['something_else'],
    asset_spec_kwargs={
        'owner': 'someone',
        'skippable': True,
        'metadata': {
            'my_meta': 'data'
        }
    }
)

# a single job sensor
job_assets = MSSQLJobAsset(
    'job_name'
)

# tie ssis assets to a job exeuction check
# you probably want to exclude checking using ssis and use the job instead if adding ssis assets
ssis_job_assets = MSSQLJobAsset(
    'job_name_ssis',
    asset_list=ssis_asset_b.asset_specs
)

# only check for a, as b is now tied to a job sensor
ssis_sensor = build_ssis_asset_sensor(
    ssis_assets=[ssis_asset_a],
    sensor_name='ssis_sensor',
    database_resource_key='my_db_resource'
)

mssql_job_sensor = build_mssql_job_asset_sensor(
    [job_assets, ssis_job_assets],
    sensor_name='mssql_job_sensor',
    database_resource_key='my_db_resource'
)

Definition(
    # add all the assets. only add either ssis_asset_b or the job specs otherwise it will find duplicates 
    assets=ssis_asset_a.asset_specs + ssis_job_assets.asset_specs + job_assets.asset_specs,
    sensors=[ssis_sensor, mssql_job_sensor],
    resources={
        'my_db_resource': my_db_resource
    }
)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagster_ssis-0.0.1.tar.gz (17.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagster_ssis-0.0.1-py3-none-any.whl (19.0 kB view details)

Uploaded Python 3

File details

Details for the file dagster_ssis-0.0.1.tar.gz.

File metadata

  • Download URL: dagster_ssis-0.0.1.tar.gz
  • Upload date:
  • Size: 17.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for dagster_ssis-0.0.1.tar.gz
Algorithm Hash digest
SHA256 24cc795417e647db19baeb6d47bf7eed2a7f419fa3269a9847fc7ab283dc228f
MD5 7de18df73b23165cf90ecb7171a3471a
BLAKE2b-256 6db15e25a5e040c567663c0e4655395d6732df9d548b5128598d79c06c7bf641

See more details on using hashes here.

Provenance

The following attestation bundles were made for dagster_ssis-0.0.1.tar.gz:

Publisher: python-publish.yml on cody-scott/dagster-ssis

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dagster_ssis-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: dagster_ssis-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 19.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for dagster_ssis-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 bdc21df9a913d4451ebb52d3f20b3264fe02f4b2d595bda5a9cd684413ed7050
MD5 7d401502ea984ea123d3a3229a53fe82
BLAKE2b-256 3cb68e7bff2ac5a426f612026a7025f58994d099070e44e22042d91cfa863ad0

See more details on using hashes here.

Provenance

The following attestation bundles were made for dagster_ssis-0.0.1-py3-none-any.whl:

Publisher: python-publish.yml on cody-scott/dagster-ssis

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page