Skip to main content

A Compute agnostic pipelining software

Project description

Hello from magnus

Logo


python: Pypi Code style: black MyPy Checked Tests: Docs:


Magnus is a simplified workflow definition language that helps in:

Along with the developer friendly features, magnus also acts as an interface to production grade concepts such as data catalog, reproducibility, experiment tracking and secure access to secrets.


What does it do?

works


Documentation

More details about the project and how to use it available here.


Installation

The minimum python version that magnus supports is 3.8

pip install magnus

Please look at the installation guide for more information.


Example

Your application code. Use pydantic models as DTO.

Assumed to be present at functions.py

from pydantic import BaseModel

class InnerModel(BaseModel):
    """
    A pydantic model representing a group of related parameters.
    """

    foo: int
    bar: str


class Parameter(BaseModel):
    """
    A pydantic model representing the parameters of the whole pipeline.
    """

    x: int
    y: InnerModel


def return_parameter() -> Parameter:
    """
    The annotation of the return type of the function is not mandatory
    but it is a good practice.

    Returns:
        Parameter: The parameters that should be used in downstream steps.
    """
    # Return type of a function should be a pydantic model
    return Parameter(x=1, y=InnerModel(foo=10, bar="hello world"))


def display_parameter(x: int, y: InnerModel):
    """
    Annotating the arguments of the function is important for
    magnus to understand the type of parameters you want.

    Input args can be a pydantic model or the individual attributes.
    """
    print(x)
    # >>> prints 1
    print(y)
    # >>> prints InnerModel(foo=10, bar="hello world")

Application code using driver functions.

The code is runnable without any orchestration framework.

from functions import return_parameter, display_parameter

my_param = return_parameter()
display_parameter(my_param.x, my_param.y)

Orchestration using magnus

python SDK yaml

Example present at: examples/python-tasks.py

Run it as: python examples/python-tasks.py

from magnus import Pipeline, Task

def main():
    step1 = Task(
        name="step1",
        command="examples.functions.return_parameter",
    )
    step2 = Task(
        name="step2",
        command="examples.functions.display_parameter",
        terminate_with_success=True,
    )

    step1 >> step2

    pipeline = Pipeline(
        start_at=step1,
        steps=[step1, step2],
        add_terminal_nodes=True,
    )

    pipeline.execute()


if __name__ == "__main__":
    main()

Example present at: examples/python-tasks.yaml

Execute via the cli: magnus execute -f examples/python-tasks.yaml

dag:
  description: |
    This is a simple pipeline that does 3 steps in sequence.
    In this example:
      1. First step: returns a "parameter" x as a Pydantic model
      2. Second step: Consumes that parameter and prints it

    This pipeline demonstrates one way to pass small data from one step to another.

  start_at: step 1
  steps:
    step 1:
      type: task
      command_type: python # (2)
      command: examples.functions.return_parameter # (1)
      next: step 2
    step 2:
      type: task
      command_type: python
      command: examples.functions.display_parameter
      next: success
    success:
      type: success
    fail:
      type: fail

Transpile to argo workflows

No code change, just change the configuration.

executor:
  type: "argo"
  config:
    image: magnus:demo
    persistent_volumes:
      - name: magnus-volume
        mount_path: /mnt

run_log_store:
  type: file-system
  config:
    log_folder: /mnt/run_log_store

More details can be found in argo configuration.

Execute the code as magnus execute -f examples/python-tasks.yaml -c examples/configs/argo-config.yam

Expand
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: magnus-dag-
  annotations: {}
  labels: {}
spec:
  activeDeadlineSeconds: 172800
  entrypoint: magnus-dag
  podGC:
    strategy: OnPodCompletion
  retryStrategy:
    limit: '0'
    retryPolicy: Always
    backoff:
      duration: '120'
      factor: 2
      maxDuration: '3600'
  serviceAccountName: default-editor
  templates:
    - name: magnus-dag
      failFast: true
      dag:
        tasks:
          - name: step-1-task-uvdp7h
            template: step-1-task-uvdp7h
            depends: ''
          - name: step-2-task-772vg3
            template: step-2-task-772vg3
            depends: step-1-task-uvdp7h.Succeeded
          - name: success-success-igzq2e
            template: success-success-igzq2e
            depends: step-2-task-772vg3.Succeeded
    - name: step-1-task-uvdp7h
      container:
        image: magnus:demo
        command:
          - magnus
          - execute_single_node
          - '{{workflow.parameters.run_id}}'
          - step%1
          - --log-level
          - WARNING
          - --file
          - examples/python-tasks.yaml
          - --config-file
          - examples/configs/argo-config.yaml
        volumeMounts:
          - name: executor-0
            mountPath: /mnt
        imagePullPolicy: ''
        resources:
          limits:
            memory: 1Gi
            cpu: 250m
          requests:
            memory: 1Gi
            cpu: 250m
    - name: step-2-task-772vg3
      container:
        image: magnus:demo
        command:
          - magnus
          - execute_single_node
          - '{{workflow.parameters.run_id}}'
          - step%2
          - --log-level
          - WARNING
          - --file
          - examples/python-tasks.yaml
          - --config-file
          - examples/configs/argo-config.yaml
        volumeMounts:
          - name: executor-0
            mountPath: /mnt
        imagePullPolicy: ''
        resources:
          limits:
            memory: 1Gi
            cpu: 250m
          requests:
            memory: 1Gi
            cpu: 250m
    - name: success-success-igzq2e
      container:
        image: magnus:demo
        command:
          - magnus
          - execute_single_node
          - '{{workflow.parameters.run_id}}'
          - success
          - --log-level
          - WARNING
          - --file
          - examples/python-tasks.yaml
          - --config-file
          - examples/configs/argo-config.yaml
        volumeMounts:
          - name: executor-0
            mountPath: /mnt
        imagePullPolicy: ''
        resources:
          limits:
            memory: 1Gi
            cpu: 250m
          requests:
            memory: 1Gi
            cpu: 250m
  templateDefaults:
    activeDeadlineSeconds: 7200
    timeout: 10800s
  arguments:
    parameters:
      - name: run_id
        value: '{{workflow.uid}}'
  volumes:
    - name: executor-0
      persistentVolumeClaim:
        claimName: magnus-volume

Pipelines can be:

Linear

A simple linear pipeline with tasks either python functions, notebooks, or shell scripts

Parallel branches

Execute branches in parallel

loops or map

Execute a pipeline over an iterable parameter.

Arbitrary nesting

Any nesting of parallel within map and so on.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

magnus-0.5.1.tar.gz (111.1 kB view details)

Uploaded Source

Built Distribution

magnus-0.5.1-py3-none-any.whl (132.5 kB view details)

Uploaded Python 3

File details

Details for the file magnus-0.5.1.tar.gz.

File metadata

  • Download URL: magnus-0.5.1.tar.gz
  • Upload date:
  • Size: 111.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.1 CPython/3.8.18 Linux/6.2.0-1019-azure

File hashes

Hashes for magnus-0.5.1.tar.gz
Algorithm Hash digest
SHA256 5cca1107306b25ae5ec8591e4a8a4dc97afe5cf4a082e21fbd0f6526f83e4cc5
MD5 dec886756aa501b3238119f5ae777e2f
BLAKE2b-256 46d5c74b1f17e0f2b6ca6587c35881a1984bc726cb39f765668332fe4bcd9fdc

See more details on using hashes here.

File details

Details for the file magnus-0.5.1-py3-none-any.whl.

File metadata

  • Download URL: magnus-0.5.1-py3-none-any.whl
  • Upload date:
  • Size: 132.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.1 CPython/3.8.18 Linux/6.2.0-1019-azure

File hashes

Hashes for magnus-0.5.1-py3-none-any.whl
Algorithm Hash digest
SHA256 d375ca1146689c527c466d192bb890303b7840426b9ad6edcf64c9aed39982b1
MD5 e2e6c73fa2ed9a8aa34ee83f556e55c9
BLAKE2b-256 22e12704e0e513b43c71018bf3a26c42280fd9cb2dd99a9a824d51928d122805

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page