run and schedule task pipeline
Project description
Ginny
A simple, convenient task manager that is similar to luigi framework but less blown up. It allows easy exceution and scheduling of tasks locally and remotelty using argo workflows.
Run locally
from ginny import DownloadTask, run
result = run(DownloadTask(
url="https://static.wikia.nocookie.net/harrypotter/images/e/e9/Ginny-HQ-ginevra-ginny-weasley.jpg/revision/latest/scale-to-width-down/250?cb=20150228082608&path-prefix=de",
destination='image.jpg')
)
Schedule tasks via command line
ginny --task ginny.DownloadTask url "https://static.wikia.nocookie.net/harrypotter/images/e/e9/Ginny-HQ-ginevra-ginny-weasley.jpg/revision/latest/scale-to-width-down/250?cb=20150228082608&path-prefix=de" destination "image.jpg"
# run every 5 minutes
ginny --task ginny.DownloadTask --every 'minute' --count 5 url "https://static.wikia.nocookie.net/harrypotter/images/e/e9/Ginny-HQ-ginevra-ginny-weasley.jpg/revision/latest/scale-to-width-down/250?cb=20150228082608&path-prefix=de" destination "image.jpg"
# EVERY DAY at 0:00
ginny --task ginny.DownloadTask --every 'day' --at "00:00" url "https://static.wikia.nocookie.net/harrypotter/images/e/e9/Ginny-HQ-ginevra-ginny-weasley.jpg/revision/latest/scale-to-width-down/250?cb=20150228082608&path-prefix=de" destination "image.jpg"
Build your own tasks
from ginny import run, Task
import dataclasses
@dataclasses.dataclass(frozen=True)
class MyTask(Task):
url: str
def depends(self):
# return tasks or targets that this task depends on
# return LocalTarget("/tmp/data.json")
# return [LocalTarget("/tmp/data.json"), LocalTarget("/tmp/data2.json")]
return [LocalTarget("/tmp/data.json"), DownloadTask(self.url, "/tmp/data2.json")]
def run(self, *args, **kwargs):
target, download_task = self.depends()
data1 = target.read_json()
data2 = download_task.target().read_json()
data1.update(data2)
with self.target().open("w") as writer:
writer.write("done")
def target(self):
# define a target if the task should not be executed every time / has output data
return LocalTarget("/tmp/target.json")
# run the task (results of all tasks that will be executed are returned in results)
task = MyTask(url=...)
# delelte results of tasks
task.delete(recursive=False) # set recursive=True, to also delete results of subtasks
results = run(task)
Buld-in tasks
from ginny import BashTask, S3DownloadTask, DownloadTask, S3UploadTask, Task, SSHCommandTask, DepTask, TempDownloadTask, run
r = run(BashTask(['ls', '-lha']))
Run Dag/Task with Argo Workflows (local targets will automatically become s3 targets)
Define argo config with storage via yaml (preferred) and save as storage.yaml
or use .from_env()
to load from environment vars
namespace: "argo" # default
serviceAccountName: "argo-workflows" # default
storage:
key: "argo-workflows" # default
bucket: "ai-datastore" # required
region: "us-east-1" # required
endpoint: "s3.amazonaws.com" # default
accessKeySecret: # default
name: "argo-secret"
key: "ARGO_WORKFLOWS_ACCESS"
secretKeySecret: # default
name: "argo-secret"
key: "ARGO_WORKFLOWS_SECRET2"
Define tasks:
import dataclasses
from typing import List
from src import GlobalVar, LocalTarget, Task, S3StorageConfig
@dataclasses.dataclass(frozen=True)
class A(Task):
pano_id: str
order_id: str = GlobalVar("order_id")
def run(self, *args, **kwargs):
self.target().write_text("hello")
def target(self):
return LocalTarget("/tmp/a.txt")
@dataclasses.dataclass(frozen=True)
class B(Task):
def run(self, *args, **kwargs):
self.target().write_text("hello")
def target(self):
return LocalTarget("/tmp/b.txt")
# define the workflow (allows to define global variables which are necessary to make the workflow run)
@dataclasses.dataclass(frozen=True)
class Pipeline(Task):
order_id: str = GlobalVar("order_id")
def depends(self) -> List[Task]:
a = A(order_id=self.order_id, pano_id="testing123")
b = B()
return [a, b]
def run(self, *args, **kwargs):
print("Running pipeline")
data1 = self.depends()[0].target().read_text()
print("Task A exists: ", self.depends()[0].target().exists())
print("Task A result: ", data1)
data2 = self.depends()[1].target().read_text()
print("Task B exists: ", self.depends()[1].target().exists())
print("Task B result: ", data2)
print("Total result: ")
print(data1 + data2)
Create the workflow yaml from the task
### export the task graph as a workflow
task = Pipeline()
config = ArgoConfig.from_yaml("argo_config.yaml")
# use the base image here where your workflow will be defined and that has the requirements (ginny) installed
workflow = schedule_to_workflow(task, "a-b-process-test", config, base_image="baudcode/ginny_test:latest")
workflow.save("test_workflow.yaml")
Push test_workflow.yaml to argo workflows
argo submit -n argo --watch test-workflow.yaml
Run dynamic tasks
Limit: Dynamic tasks are not allowed to have another dynamic task dependecy.
# generate some parameters within some task (producer)
@dataclasses.dataclass(frozen=True)
class GenerateLines(Task):
def run(self, *args, **kwargs):
self.target()[2].set([
{"key": "testing123", "dummy": "1"},
{"key": "testing456", "dummy": "2"},
{"key": "testing4567", "dummy": "3"},
])
def target(self):
return [IterableParameterMap(name='data', keys=['key', 'dummy'])]
# consume one item
@dataclasses.dataclass(frozen=True)
class ProcessLine(Task):
key: str
dummy: str
def run(self, *args, **kwargs):
self.target().write_text(f"processed {self.key} {self.dummy}")
def target(self):
return LocalTarget(f"/tmp/processed_{self.key}.txt")
# run all in parallel
@dataclasses.dataclass(frozen=True)
class ProcessLines(DynamicTask):
@property
def taskclass(self):
return ProcessLine
@property
def parameter(self):
return [IterableParameterMap(name='data', keys=['pano_id', 'order_id'])]
def depends(self):
return [GenerateLines()]
Connect task to argo events
WIP
Development
python setup.py clean
pip install .
TODO
- implement argo events and argo sensors to connect tasks to them and make it possible to simulate events comming from them
- use logging
- make dynamic tasks work with argo workflows
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file ginny-0.1.1.tar.gz
.
File metadata
- Download URL: ginny-0.1.1.tar.gz
- Upload date:
- Size: 33.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 |
5c744ead20a9f99f04861e0b74fb0832310b0173999b5dcaab9fa88ce6cc729e
|
|
MD5 |
eac850db8e667c7242027195c3808f22
|
|
BLAKE2b-256 |
1c04094a68d4ff06aea8c384d920d1d96875b753417a315db11110d18cfc723b
|
Provenance
The following attestation bundles were made for ginny-0.1.1.tar.gz
:
Publisher:
python-publish.yml
on baudcode/ginny
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1
-
Predicate type:
https://docs.pypi.org/attestations/publish/v1
-
Subject name:
ginny-0.1.1.tar.gz
-
Subject digest:
5c744ead20a9f99f04861e0b74fb0832310b0173999b5dcaab9fa88ce6cc729e
- Sigstore transparency entry: 175783733
- Sigstore integration time:
-
Permalink:
baudcode/ginny@558fdc1707cda930169a244c28242890b55bcc7e
-
Branch / Tag:
refs/heads/main
- Owner: https://github.com/baudcode
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com
-
Runner Environment:
github-hosted
-
Publication workflow:
python-publish.yml@558fdc1707cda930169a244c28242890b55bcc7e
-
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file ginny-0.1.1-py3-none-any.whl
.
File metadata
- Download URL: ginny-0.1.1-py3-none-any.whl
- Upload date:
- Size: 34.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 |
836f3f1990226bbe88cfc2f73c005ae7a380ac689f4a9dcce670113ba6099b0a
|
|
MD5 |
a52d85d11b107ab0043f7a96c8198ada
|
|
BLAKE2b-256 |
83d43a0f19e13591d06b514ed0220c5e69a77373f9f961cba471510c43236653
|
Provenance
The following attestation bundles were made for ginny-0.1.1-py3-none-any.whl
:
Publisher:
python-publish.yml
on baudcode/ginny
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1
-
Predicate type:
https://docs.pypi.org/attestations/publish/v1
-
Subject name:
ginny-0.1.1-py3-none-any.whl
-
Subject digest:
836f3f1990226bbe88cfc2f73c005ae7a380ac689f4a9dcce670113ba6099b0a
- Sigstore transparency entry: 175783736
- Sigstore integration time:
-
Permalink:
baudcode/ginny@558fdc1707cda930169a244c28242890b55bcc7e
-
Branch / Tag:
refs/heads/main
- Owner: https://github.com/baudcode
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com
-
Runner Environment:
github-hosted
-
Publication workflow:
python-publish.yml@558fdc1707cda930169a244c28242890b55bcc7e
-
Trigger Event:
workflow_dispatch
-
Statement type: