Framework for running Tasks and from CLI and API for orchestation. Component-based Task builder/Runner for non-programmers.
Project description
FlowTask DataIntegration
FlowTask DataIntegration is a plugin-based, component-driven task execution framework for create complex Tasks.
FlowTask runs Tasks defined in JSON, YAML or TOML files, any Task is a combination of Components, and every component in the Task run sequentially or depend of others, like a DAG.
Can create a Task combining Commands, Shell scripts and other specific Components (as TableInput: Open a Table using a datasource, DownloadFromIMAP: Download a File from a IMAP Folder, and so on), any Python Callable can be a Component inside a Task, or can extends UserComponent to build your own componets.
Every designed Task can run from CLI, programmatically, via RESTful API (using our aioHTTP-based Handler), called by WebHooks or even dispatched to a external Worker using our built-in Scheduler.
Quickstart
pip install flowtask
Tasks can organizated into directory structure like this:
tasks / ├── programs / ├── test / ├── tasks /
The main reason of this structure, is maintain organized several tasks by tenant/program, avoiding filling a directory with several task files.
FlowTask support "TaskStorage", a Task Storage is the main repository for tasks, main Task Storage is a directory in any filesystem path (optionally you can syncronize that path using git), but Tasks can be saved onto a Database or a S3 bucket.
Dependencies
- aiohttp (Asyncio Web Framework and Server) (required by navigator)
- AsyncDB
- QuerySource
- Navigator-api
- (Optional) Qworker (for distributing asyncio Tasks on distributed workers).
Features
- Component-based Task execution framework with several components covering several actions (download files, create pandas dataframes from files, mapping dataframe columns to a json-dictionary, etc)
- Built-in API for execution of Tasks.
How I run a Task?
Can run a Task from CLI:
task --program=test --task=example
on CLI, you can pass an ENV (enviroment) to change the environment file on task execution.
ENV=dev task --program=test --task=example
or Programmatically:
from flowtask import Task
import asyncio
task = Task(program='test', task='example')
results = asyncio.run(task.run())
# we can alternatively, using the execution mode of task object:
results = asyncio.run(task())
Requirements
- Python >= 3.9
- asyncio (https://pypi.python.org/pypi/asyncio/)
- aiohttp >= 3.6.2
Contribution guidelines
Please have a look at the Contribution Guide
- Writing tests
- Code review
- Other guidelines
Who do I talk to?
- Repo owner or admin
- Other community or team contact
License
Navigator is licensed under Apache 2.0 License. See the LICENSE file for more details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distributions
Hashes for flowtask-5.4.5-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | a41bd0bd4793b4ad6b8d280ba9065167bd79fd4d9eee18729140d5645ef30a21 |
|
MD5 | 9c00d587e65e87d6dff167483929c241 |
|
BLAKE2b-256 | 238236327f61c1f349cb5c4f023dd68c0ab5e8ff0f3b9c603732e9b922400c07 |
Hashes for flowtask-5.4.5-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 7a593588d56e1f0ca709dbb8717c3e92652a963657dbccc6ec6e8714c676ba5b |
|
MD5 | 8c4edab18aadc1ffe0150d8ac5ee9822 |
|
BLAKE2b-256 | a7c239bca88501f70b0154ad1f0341c2d344b89ced5a4b092c8a0193f2827e37 |
Hashes for flowtask-5.4.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 95755b52729e7e71a6c2bcc8fdc57b4ec26318159c1bf4f24122da44cfdddc5f |
|
MD5 | d3f328f99adaa542a9a28f29ac2ce339 |
|
BLAKE2b-256 | 7f75ebaa4b0dcf2b248e4dc549be4f79275164b1e3e93155a88af374dff03e23 |
Hashes for flowtask-5.4.5-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 12b45cc0043062bd9e950cfaeae772ed2b1d8b87e2531f3efc78bc1b36a1c671 |
|
MD5 | 8ddfe92ba98509739a164543bd122b6f |
|
BLAKE2b-256 | c51e3d5c56d2d0c04092a88707a4bfd817064d4de329ecd08342d90b50e54e40 |