TaskFlux Description
Project description
Assist developers/testers in quickly building distributed task systems.
1. Introduction
Integrated service discovery, service registration, service governance, workflow, task scheduling, system monitoring, and distributed logging.
Supports priority queue, sharded tasks, and automatic synchronization of subtask status.
2. QuickStart
To help you quickly learn how to use TaskFlux, please follow the steps below to create a test project.
2.1 Install
pip install TaskFlux
# Install RabbitMQ and initialize the administrator account
rabbitmqctl add_user scheduleAdmin scheduleAdminPasswrd
rabbitmqctl set_user_tags scheduleAdmin administrator
rabbitmqctl set_permissions -p / scheduleAdmin ".*" ".*" ".*"
rabbitmqctl list_users
# Install MongoDB and initialize the administrator account
mongo
use admin
db.createUser({user: "scheduleAdmin", pwd: "scheduleAdminPasswrd", roles: [{role: "root", db: "admin"}]})
Note: Use higher security passwords in production environments.
2.2 Create a Test Project
Please create folders and Python files according to the instructions:
.
├── test_server
│ ├── test_server_1
│ │ ├── test_server_1.py
│ ├── test_server_2
│ │ ├── test_server_2.py
├── taskflux_test.py
2.3 test_server Python File Content
from taskflux import *
class RpcFunction(ServiceConstructor):
'''
Class Name Not modifiable, Define RPC functions
'''
service_name = 'test_server'
test_service_name = 'test_server'
def get_service_name(self):
return {"service_name": self.service_name}
def test_function(self, x, y):
self.logger.info(f'x == {x}, y == {y}')
return {"test_service_name": self.test_service_name, 'x': x, 'y': y}
class WorkerFunction(WorkerConstructor):
'''
Class Name Not modifiable, Worker Code
'''
worker_name = 'test_server'
def run(self, data):
self.logger.info(data)
'''
The subtask data must have a source_id and be the task_id of the current task. After submitting the subtasks,
the scheduling system will automatically distribute the tasks
and update the current task status after all subtasks are completed
'''
source_id = data['task_id']
subtask_data = [
{"subtask_name": "test_server_2", "xx": "x1"},
{"subtask_name": "x2", "xx": "x1"},
{"subtask_name": "x3", "xx": "x1"},
{"subtask_name": "x4", "xx": "x5", "task_id": snowflake_id()}
]
subtask_ids = TaskFlux.create_subtask(
subtask_queue='test_server_subtask',
subtasks=subtask_data,
source_task_id=source_id
)
print(subtask_ids)
2.4 taskflux_test Python File Content
import os
import time
import logging
import argparse
from taskflux import TaskFlux
from test_services import test_server
logging.basicConfig(level=logging.INFO)
current_dir = os.path.dirname(os.path.abspath(__file__))
class TestUtils:
'''
TestUtils class for testing the pyrpc_schedule service. This class provides methods for starting the service,
testing the RPC service, and submitting tasks to the scheduling system. It also includes a method for generating
random IDs.
'''
def __init__(self):
self.config = {
'MONGODB_CONFIG': 'mongodb://scheduleAdmin:scheduleAdminPasswrd@127.0.0.1:27017',
'RABBITMQ_CONFIG': 'amqp://scheduleAdmin:scheduleAdminPasswrd@127.0.0.1:5672',
'ROOT_PATH': current_dir,
'ADMIN_USERNAME': 'scheduleAdmin', # default is scheduleAdmin
'ADMIN_PASSWORD': 'scheduleAdminPasswrd', # default is scheduleAdminPasswrd
'DEFAULT_SCHEDULE_TIME': 10, # default is 10
'HTTP_SERVER_FORK': False # default is True
}
self.tfx = TaskFlux(config=self.config)
def start_service(self):
'''
Start the service by registering and initializing it.
'''
self.tfx.registry(services=[test_server])
self.tfx.start()
def test_rpc_service(self):
'''
Test the RPC service by calling a method on the service.
proxy_call:
service_name: str, method_name: str, **kwargs
'''
res = self.tfx.proxy_call(service_name='test_server', method_name='get_service_name', **{'version': 1})
print(res)
def send_task_message(self):
'''
Send messages directly to the task queue without being delegated by the scheduling system
'''
self.tfx.send_message(
queue_name='test_server', # queue name
message={
'task_id': self.tfx.generate_id, # TASK_ID is required, use random ID if not filled in
'is_sub_task': False, # Is it a subtask, default is False
'param1': 'pyrpc_schedule test task', # Task parameters
'param2': '' # Task parameters
}
)
def submit_task(self, queue='test_server_2'):
'''
Submit a task to the scheduling system.
The scheduling system will automatically assign the task to a worker.
'''
self.tfx.send_message(
queue_name=queue, # queue name
message={
'task_id': self.tfx.generate_id, # TASK_ID is required, use random ID if not filled in
'param1': 'pyrpc_schedule test task', # Task parameters
'param2': '' # Task parameters
}
)
def update_work_max_process(self, worker_name: str, worker_ipaddr: str, worker_max_process: int):
'''
Update the maximum number of processes for a worker identified by its name and IP address.
Args:
worker_name (str): The name of the worker.
worker_ipaddr (str): The IP address of the worker.
worker_max_process (int): The new maximum number of processes for the worker.
Returns:
None
'''
self.tfx.update_work_max_process(
worker_name=worker_name, worker_ipaddr=worker_ipaddr, worker_max_process=worker_max_process)
def get_service_list(self, query: dict, field: dict, limit: int, skip_no: int) -> list:
'''
Retrieve a list of services from the database based on the given query, fields, limit, and skip number.
Args:
query (dict): A dictionary representing the query conditions for filtering the services.
field (dict): A dictionary specifying the fields to be included in the result.
limit (int): The maximum number of services to return.
skip_no (int): The number of services to skip before starting to return results.
Returns:
list: A list of services that match the specified query and field criteria.
'''
return self.tfx.query_service_list(query=query, field=field, limit=limit, skip_no=skip_no)
def get_node_list(self, query: dict, field: dict, limit: int, skip_no: int) -> list:
'''
Retrieve a list of nodes from the database based on the given query, fields, limit, and skip number.
Args:
query (dict): A dictionary representing the query conditions for filtering the nodes.
field (dict): A dictionary specifying the fields to be included in the result.
limit (int): The maximum number of nodes to return.
skip_no (int): The number of nodes to skip before starting to return results.
Returns:
list: A list of nodes that match the specified query and field criteria.
'''
return self.tfx.query_node_list(query=query, field=field, limit=limit, skip_no=skip_no)
def get_task_list(self, query: dict, field: dict, limit: int, skip_no: int) -> list:
'''
Retrieve a list of tasks from the database based on the given query, fields, limit, and skip number.
Args:
query (dict): A dictionary representing the query conditions for filtering the tasks.
field (dict): A dictionary specifying the fields to be included in the result.
limit (int): The maximum number of tasks to return.
skip_no (int): The number of tasks to skip before starting to return results.
Returns:
list: A list of tasks that match the specified query and field criteria.
'''
return self.tfx.query_task_list(query=query, field=field, limit=limit, skip_no=skip_no)
def get_task_status_by_task_id(self, task_id: str):
'''
Retrieve the task status by the given task ID.
Args:
task_id (str): The unique identifier of the task.
Returns:
dict: The first document containing the task status information.
'''
self.tfx.query_task_status_by_task_id(task_id=task_id)
def stop_task(self, task_id: str):
'''
Stop a task by the given task ID.
Args:
task_id (str): The unique identifier of the task.
Returns:
None
'''
self.tfx.stop_task(task_id=task_id)
def generate_id(self) -> str:
'''
Generate a unique ID using the Snowflake algorithm.
Returns:
str: A unique ID generated using the Snowflake algorithm.
'''
return self.tfx.generate_id
def kill(self):
'''
ids=$(ps -ef | grep python3 | grep -v 'grep' | awk '{print $2}') && sudo kill -9 $ids
'''
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="pyrpc_schedule test script")
parser.add_argument("--test", type=bool, help="send test task", default=False)
args = parser.parse_args()
t = TestUtils()
if args.test:
'''
Test the RPC service by calling a method on the service.
'''
t.test_rpc_service()
t.submit_task(queue='test_server')
t.send_task_message()
else:
'''
please let the main process run continuously
while True:
time.sleep(10000)
'''
t.start_service()
time.sleep(10000)
2.5 Initiate Testing Project
# Start Service
python taskflux_test.py
# You can access the backend management page in your browser: http://127.0.0.1:5000
# Default administrator user: admin, Default administrator password: 123456
# Test RPC Service
python taskflux_test.py --test True
# After startup, a logs folder will be created in the current directory, classified by service type.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file taskflux-1.1.0.tar.gz.
File metadata
- Download URL: taskflux-1.1.0.tar.gz
- Upload date:
- Size: 28.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d473d39a6ea515cda9831f35b3365c72d194c3125c82ebc862ec0f0416fceb23
|
|
| MD5 |
010200b167c4160392014b48265aa9f5
|
|
| BLAKE2b-256 |
8575404ed80850a8ae948bd4d9d9251d5057fa10dfcf3cfc2d433552516c72c0
|
File details
Details for the file TaskFlux-1.1.0-py3-none-any.whl.
File metadata
- Download URL: TaskFlux-1.1.0-py3-none-any.whl
- Upload date:
- Size: 40.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3cce0432e8ec2a5914f327105abd5030fd1e5a762cc2bda790acd8a905acace7
|
|
| MD5 |
1d55e615616205d4e8ada3229cec2563
|
|
| BLAKE2b-256 |
c7ab8459cfeb560169330e90bbc6551bf27d5d812bb7bfd56f5c119f75331b34
|