Skip to main content

Simply define DAG-workflows in Python where artefacts are stored on a filesystem.

Project description

fsdag

This library allows you to simply define DAG-workflows in Python where artefacts are stored on a filesystem.

Fsdag aims at simple personal or group projects, where no dependencies and simplicity are paramount. It is implemented in less than 100 lines of code.

For more complex workflow libraries, see:

Approach

You simply define nodes of your workflow, and execute them lazily. Each node corresponds to an artefact. If the artefact already exists on the filesystem, it will be loaded; otherwise, it will be computed. Once loaded or computed, the artefacts are kept in memory for further access.

Installation

To install fsdag, simply run the following command in your virtual environment:

pip3 install fsdag

Usage

The workflow node is implemented as an abstract class fsdag.Node. For your concrete nodes, you have to implement the following methods:

  • _path: where the artefact should be stored on disk,
  • _save: how to store the artefact to _path(),
  • _load: how to load the artefact from _path(), and
  • _compute: how to compute the artefact.

To resolve the node, call resolve().

Examples

Basic Example

Here is an example showing how you can model a node where the data is de/serialized using JSON.

import json
import pathlib
from typing import List

import fsdag

class Something(fsdag.Node[List[int]]):
    def _path(self) -> pathlib.Path:
        return pathlib.Path("/some/path/something.json")

    def _save(self, artefact: List[int]) -> None:
        self._path().write_text(json.dumps(artefact))

    def _load(self) -> List[int]:
        return json.loads(
            self._path().read_text()
        )  # type: ignore

    def _compute(self) -> List[int]:
        return [1, 2, 3]

something = Something()
print(something.resolve())
# Outputs: [1, 2, 3]
# The artefact is now saved to the filesystem. It is also kept
# in memory # for faster access if you ever resolve it again.

# For example, calling ``resolve`` here again retrieves
# the artefact from the memory cache:
print(something.resolve())
# Outputs: [1, 2, 3]

another_something = Something()
# This call to the ``resolve`` method will not perform
# the computation, but load the artefact from the filesystem.
print(another_something.resolve())
# Outputs: [1, 2, 3]

None Artefact

Some tasks contain no artefact, i.e., they are mere procedures which should be executed, but return nothing. To model such procedures, use None as the generic parameter and a marker file:

import pathlib

import fsdag

class Something(fsdag.Node[None]):
    def _path(self) -> pathlib.Path:
        return pathlib.Path("/path/to/somewhere/done")

    def _save(self, artefact: None) -> None:
        self._path().write_text("done")

    def _load(self) -> None:
        return

    def _compute(self) -> None:
        # Perform some complex procedure.
        ...
        return

something = Something()
# The procedure is executed here once.
something.resolve()

another_something = Something()
# This resolution does nothing as the procedure 
# has been already executed.
another_something.resolve()

Workflow Graph

Here is a full example of a simple workflow graph.

import json
import pathlib

import fsdag

class Something(fsdag.Node[int]):
    def _path(self) -> pathlib.Path:
        return pathlib.Path("/some/path/something.json")

    def _save(self, artefact: int) -> None:
        self._path().write_text(json.dumps(artefact))

    def _load(self) -> int:
        return json.loads(
            self._path().read_text()
        )  # type: ignore

    def _compute(self) -> int:
        return 1


class Another(fsdag.Node[int]):
    def _path(self) -> pathlib.Path:
        return pathlib.Path("/some/path/another.json")

    def _save(self, artefact: int) -> None:
        self._path().write_text(json.dumps(artefact))

    def _load(self) -> int:
        return json.loads(
            self._path().read_text()
        )  # type: ignore

    def _compute(self) -> int:
        return 2

class Sum(fsdag.Node[int]):
    def __init__(
            self, 
            something: Something, 
            another: Another
    ) -> None:
        super().__init__()
        self.something = something
        self.another = another
    
    def _path(self) -> pathlib.Path:
        return pathlib.Path("/some/path/sum.json")

    def _save(self, artefact: int) -> None:
        self._path().write_text(json.dumps(artefact))

    def _load(self) -> int:
        return json.loads(
            self._path().read_text()
        )  # type: ignore

    def _compute(self) -> int:
        # Note the calls to ``resolve`` methods here.
        return (
            self.something.resolve() 
            + self.another.resolve()
        )

something = Something()
another = Another()

result = Sum(something=something, another=another)

# The call to ``result.resolve`` will recursively and 
# lazily resolve the ``something`` and ``another``.
print(result.resolve())
# Outputs: 3

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fsdag-1.0.0.tar.gz (5.3 kB view details)

Uploaded Source

Built Distribution

fsdag-1.0.0-py3-none-any.whl (4.7 kB view details)

Uploaded Python 3

File details

Details for the file fsdag-1.0.0.tar.gz.

File metadata

  • Download URL: fsdag-1.0.0.tar.gz
  • Upload date:
  • Size: 5.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.8.5

File hashes

Hashes for fsdag-1.0.0.tar.gz
Algorithm Hash digest
SHA256 c48e125d614a2fa120abc4aa53932a22eae44172c17b4f7b67d057e8cdda2988
MD5 0967e0a1915ecb7f975bf7a73ed0f0aa
BLAKE2b-256 e166129bf2751ccd3a888461673276aee00e3a6ca0a99813a25be0335e6f5221

See more details on using hashes here.

File details

Details for the file fsdag-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: fsdag-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 4.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.8.5

File hashes

Hashes for fsdag-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 005e55e0c1c54a9f4fb668b26dcf0a0d2bdf6d8f1bcbfecd465172d13153c0dd
MD5 96a271aec2c3825d945b6779b97fff8c
BLAKE2b-256 c3a56d377a6713141de21e1dc565cd9375c66b4794cd7b47ff285e01ff38d1d9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page