Lightweight, extensible Python library for building, managing, and executing command workflows.
Project description
Marsh
Marsh is a lightweight Python library for building, managing, and executing command workflows. It allows chaining commands, defining custom pre/post processing logic, creating DAG workflows, and structuring flexible CLI workflows. With support for local, remote, Docker-based, and Python execution, Marsh simplifies automating pipelines and integrating external processes.
Installation
Install Marsh via pip:
pip install marsh-lib
Key Features
- Command Chains: Chain multiple commands into reusable workflows.
- Pre/Post Processors: Add validation, logging, or error handling without modifying data.
- Pre/Post Modifiers: Transform input/output data during command execution.
- Execution Options: Local, Remote, Docker, Python, and custom runners.
- DAG Workflows: Support for DAG to define and run task dependencies.
Quick Start
Workflow with Conveyor
from marsh import Conveyor
def cmd_1(stdout, stderr): return stdout.upper(), stderr
def cmd_2(stdout, stderr): return stdout, stderr.lower()
# Chain commands with Conveyor
conveyor = Conveyor().add_cmd_runner(cmd_1).add_cmd_runner(cmd_2)
stdout, stderr = conveyor(b"input", b"ERROR")
print(stdout, stderr)
Output:
INPUT error
Pre/Post Processors: Validating or Logging Data
Processors perform actions (e.g., logging, validation) without modifying data.
from marsh import CmdRunDecorator
def validate(stdout, stderr): assert not stderr.strip() # Validate no errors
def log(stdout, stderr): print(f"LOG: {stdout.decode()}") # Log output
decorator = CmdRunDecorator()\
.add_processor(validate, before=True)\
.add_processor(log, before=False)
def cmd_runner(stdout, stderr): return stdout, stderr
decorated_runner = decorator.decorate(cmd_runner)
stdout, stderr = decorated_runner(b"Hello", b"")
Output:
LOG: Hello
Pre/Post Modifiers: Transforming Data
Modifiers transform the data before or after a command runs. Unlike processors, modifiers must return (stdout, stderr).
def to_upper(stdout, stderr): return stdout.upper(), stderr
def add_prefix(stdout, stderr): return b"Prefix: " + stdout, stderr
decorator = CmdRunDecorator()\
.add_mod_processor(to_upper, before=True)\
.add_mod_processor(add_prefix, before=False)
def cmd_runner(stdout, stderr): return stdout, stderr
decorated_runner = decorator.decorate(cmd_runner)
stdout, stderr = decorated_runner(b"hello", b"")
print(stdout.decode())
Output:
Prefix: HELLO
Key Difference:
- Processors act on data without altering it.
- Modifiers transform the data and return new values.
⚠️ IMPORTANT:
Order of Evaluation for Processors and Modifiers
- Pre-Modifiers
- Pre-Processors
- Command Runner
- Post-Modifiers
- Post-Processors
Passing CmdRunDecorator instance as parameter for reusabiliity
from marsh import Conveyor, CmdRunDecorator
decorator = CmdRunDecorator().add_processor(...).add_mod_processor(...)
conveyor = Conveyor().add_cmd_runner(cmd_runner, cmd_runner_decorator=decorator)
stdout, stderr = conveyor()
Using @add_processors_and_modifiers for decorating command runners
from marsh import add_processors_and_modifiers
@add_processors_and_modifiers(
("mod", True, pre_mod_func, arg_tuple, kwg_dict), # Pre-Modifier
("proc", True, pre_proc_func, arg_tuple, kwg_dict), # Pre-Processor
("mod", False, post_mod_func, arg_tuple, kwg_dict), # Post-Modifier
("proc", False, post_proc_func, arg_tuple, kwg_dict), # Post-Processor
)
def cmd_runner(x_stdout: bytes, x_stderr: bytes):
...
return b"stdout", b"stderr"
Running Local Commands with BashFactory
Simple Local Command
from marsh.bash import BashFactory
bash = BashFactory()
cmd = bash.create_cmd_runner(r'echo "Hello, $NAME"', env={"NAME": "World"})
stdout, stderr = cmd(b"", b"")
Examples with BashFactory
from pathlib import Path
from marsh.bash import BashFactory
bash = BashFactory()
# Inject Environment Variables
cmd1 = bash.create_cmd_runner(
r'echo "($ENV_VAR_1, $ENV_VAR_2)"',
env={
"ENV_VAR_1": "value1",
"ENV_VAR_2": "value2"
}
)
# Change Working Directory
cmd2 = bash.create_cmd_runner(r'echo "CWD: $PWD"', cwd=str(Path.cwd().parent))
# Unix Pipes
cmd3 = bash.create_cmd_runner(r'echo -e "Line1\nLine2\nLine3"')
cmd4 = bash.create_cmd_runner(r'grep 2 | sort', executor_kwargs={"pipe_prev_stdout": True})
# Python Command
cmd5 = bash.create_cmd_runner(r'python -c "print(\"Hello Python\")"')
# Custom Callback
import subprocess
def custom_callback(popen: subprocess.Popen, stdout, stderr):
return popen.communicate(input=b"Custom Input")
cmd6 = bash.create_cmd_runner(r'xargs echo', callback=custom_callback)
# Combine in Conveyor
from marsh import Conveyor
conveyor = Conveyor()\
.add_cmd_runner(cmd1)\
.add_cmd_runner(cmd2)\
.add_cmd_runner(cmd3)\
.add_cmd_runner(cmd4)\
.add_cmd_runner(cmd5)\
.add_cmd_runner(cmd6)
stdout, stderr = conveyor()
Running Remote Commands with SshFactory
from marsh import Conveyor
from marsh.ssh import SshFactory
ssh = SshFactory(("user@host:port",), {"connect_kwargs": {"password": "the_ssh_password"}})
cmd1 = ssh.create_cmd_runner("echo Hello, Remote World")
cmd2 = ssh.create_chained_cmd_runner(["echo Hi", "echo there"])
conveyor = Conveyor().add_cmd_runner(cmd1).add_cmd_runner(cmd2)
stdout, stderr = conveyor()
Running Commands with DockerCommandExecutor
from marsh.docker.docker_executor import DockerCommandExecutor
docker_executor = DockerCommandExecutor("bash:latest", ...)
stdout, stderr = docker_executor.run(
b"x_stdout", b"x_stderr",
environment=dict(ENV_VAR_1="value1", ENV_VAR_2="value2"),
workdir="/app"
)
Running Commands with PythonExecutor
eval mode for evaluating python expressions
from marsh import PythonExecutor
py_code = """x + y""" # Python Evaluatable Expression
python_executor = PythonExecutor(
py_code,
mode="eval",
namespace=dict(x=1, y=2),
use_pickle=False,
)
stdout, stderr = python_executor.run(b"x_stdout", b"x_stderr", ...)
exec mode for executing python statements
from marsh import PythonExecutor
py_code = """
import os
import sys
prev_stdout = x_stdout #<-- Use `x_stdout` to get the previous STDOUT
prev_stderr = x_stderr #<-- Use `x_stderr` to get the previous STDERR
exec_result = x + y #<-- Use `exec_result` for storing results and passing to STDOUT
"""
python_executor = PythonExecutor(
py_code,
mode="exec",
namespace=dict(x=1, y=2),
use_pickle=False,
)
stdout, stderr = python_executor.run(b"x_stdout", b"x_stderr", ...)
Note: eval mode also have access to x_stdout and x_stderr but not exec_result.
DAG Workflow
The DAG extends the capabilities of the core components by allowing non-linear dependencies between tasks.
The DAG subpackage has two main components: Node and Dag. The Node encapsulates a Conveyor that represents a
task in the workflow, while the Dag represents the whole workflow and task dependencies.
Note that the Dag manages Startable objects, which is the abstract base class for both Node and Dag. This means
that a Dag can contain both Node objects and other Dag objects.
Different kinds of Dag:
SyncDagAsyncDagThreadDagThreadPoolDagMultiprocessDagProcessPoolDag
Defining Nodes
from marsh import Conveyor
from marsh.dag import Node
conveyor = Conveyor().add_cmd_runner(cmd_runner, ...)
node = Node("node_name", conveyor, **run_kwargs)
Defining and Running a Dag
from marsh.dag import SyncDag
dag = SyncDag("dag_name")
# Register Nodes
dag.do(node_a)
dag.do(node_a).then(node_b, node_c) # A --> {B, C}
dag.do(node_a).when(node_b, node_c) # {B, C} --> A
dag.do(other_dag).then(node_a) # Register other Dag
...
result_dict = dag.start() # Run the Dag
result = result_dict["node_or_dag_name"] # Get result from individual startables
⚠️ IMPORTANT:
MultiprocessDagandProcessPoolDagrequires thestart()method to run in scope ofif __name__ == "__main__".from marsh.dag import MultiprocessDag, ProcessPoolDag ... if __name__ == "__main__": ... dag.start() ...
- As of the latest version, marsh DAG does not support result passing between task dependencies.
License
This project is licensed under the MIT License - see the LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file marsh_lib-0.2.0-py3-none-any.whl.
File metadata
- Download URL: marsh_lib-0.2.0-py3-none-any.whl
- Upload date:
- Size: 41.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.0.1 CPython/3.11.11 Linux/6.8.0-1017-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a611fe15adb0dbde2ce964c25a9bbb0d22b6a32473b96c4750fd718ad49062e3
|
|
| MD5 |
d3d0707333971538a9f98dd58b138f2a
|
|
| BLAKE2b-256 |
536c890f588b08d1b89656fddbf427db179eadf329c68d4051220c03191ea3f8
|