Skip to main content

Dagstd

Project description

Dagstd is a Python package containing a set of helper modules for use with the Dagster data orchestration tool.

Dagster is a great tool, but there are occasions where you just need to pass in a simple integer or string as input to a Dagster op, but in Dagster, inputs to ops can only be outputs of other ops. This results in a lot of boilerplate functions being written that just return a formatted string or even just an integer. This is why Dagstd was created.

Features

  • Simple ops for common numbers

  • Constant value ops

  • Helper ops for mathematical and string operations

  • Ops for retrieving environment variables

  • Sphinx autodoc support for Dagster ops

Usage

Here’s an example of a pure-Dagster graph that downloads a daily zip file and extracts a known file name. Note: the download_large_file op has been omitted for brevity.

import zipfile

from datetime import datetime

from dagster import op, job


@op
def get_todays_date() -> str:
    return datetime.today().strftime()


@op
def five() -> int:
    return 5


@op
def get_download_file_url(date: str) -> str:
    return f'https://example.com/{date}.csv'


@op
def get_nth_file_name(n: int) -> str:
    return f'file_{n:02}.txt'


@op
def extract_file_from_zip(context, zip_path: str, file_name: str) -> str:
    with zipfile.ZipFile(zip_path) as zip_file:
        with(f'/tmp/{file_name}', 'wb') as f:
            f.write(zip_file.read(file_name))
        context.log.info(f'Extracted {file_name} from {zip_path}')
        return f'/tmp/{file_name}'


@job
def process_data():
    date = get_todays_date()
    url = get_download_file_url(date)
    zip_path = download_large_file(url)

    file_name = get_nth_file_name(five())
    file_path = extract_file_from_zip(zip_path, file_name)

And here’s the same graph, but with Dagstd ops.

import zipfile

from datetime import datetime

from dagster import op, job
from dagstd.constants import Constant, Five
from dagstd.operations import fmt


@op
def get_todays_date_string() -> str:
    return datetime.today().strftime("%Y-%m-%d")


@op
def extract_file_from_zip(context, zip_path: str, file_name: str) -> str:
    with zipfile.ZipFile(zip_path) as zip_file:
        with(f'/tmp/{file_name}', 'wb') as f:
            f.write(zip_file.read(file_name))
        context.log.info(f'Extracted {file_name} from {zip_path}')
        return f'/tmp/{file_name}'


@job
def process_data():
    date = get_todays_date_string()
    url = fmt(Constant('https://example.com/{}.csv'), [date])
    zip_path = download_large_file(url)

    file_name = fmt(Constant('file_{}.txt'), [Five()])
    file_path = extract_file_from_zip(zip_path, file_name)

This was just a small example, but it serves to show how much boilerplate can be avoided when using Dagstd.

Sphinx Autodoc Plugin

Dagstd includes a Sphinx autodoc plugin that can be used to generate documentation for Dagster ops. To use the autodoc plugin, add the following to your conf.py file:

extensions = [
    ...
    'dagstd.sphinx.parser',
]

By default, this will prefix all op documentation with (op). To change this, add the following to your conf.py file:

dagstd_op_prefix = 'My Op'

Documentation

Documentation can be found at https://dagstd.readthedocs.io/en/latest/readme.html.

Installation

Install Dagstd with pip:

pip install dagstd

Dependencies

Contribute

I’m always looking for more ideas to add to Dagstd. If you have an idea, please open an issue or pull request, or message me on GitHub.

Support

If you are having issues, please let me know.

License

The project is licensed under the GNU GPLv3 license.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagstd-0.1.3.tar.gz (21.4 kB view details)

Uploaded Source

Built Distribution

dagstd-0.1.3-py3-none-any.whl (22.6 kB view details)

Uploaded Python 3

File details

Details for the file dagstd-0.1.3.tar.gz.

File metadata

  • Download URL: dagstd-0.1.3.tar.gz
  • Upload date:
  • Size: 21.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.10.4

File hashes

Hashes for dagstd-0.1.3.tar.gz
Algorithm Hash digest
SHA256 3fd8478d460db5cbfd8cc50cd7c1c1e0597cfe20d26e62cd170003a5d08111bf
MD5 87e990faa92c79036eeda29722bb8105
BLAKE2b-256 bc5a51b37f76e21b3c0f254e3e9d5dc9c68dd8e8f5eefcdcdf618cd603c8a8fe

See more details on using hashes here.

File details

Details for the file dagstd-0.1.3-py3-none-any.whl.

File metadata

  • Download URL: dagstd-0.1.3-py3-none-any.whl
  • Upload date:
  • Size: 22.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.10.4

File hashes

Hashes for dagstd-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 2a0c53f88884d4d7d5bf7c256674d147b3cccbe4348da0c3a864116596aa975c
MD5 4d42ef6e3ddd917f9c5490b691c1db97
BLAKE2b-256 f33e3d1cf4dabe265a397ea393e41a62475cd1b10f773a3283eccbff0193cc52

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page