zerocs
Project description
zerocs Description Document
zerocs: zero config service
introduction
1. zerocs is a distributed task framework with extremely simple configuration. Enable users to quickly build distributed task systems with minimal configuration and learning costs
2. The framework has functions such as service management, work management, and task management, which can meet most usage scenarios
3. It is divided into Master node and Slave node. The Master node is a management node and can only have one. set up ZERO_FW_NODE_MASTER=True/False ,Change node type
start using
1. Please install RabbitMQ before use, rabbitmq official website : https://www.rabbitmq.com, It is recommended to use Docker installation during testing:
docker run -d --hostname my-rabbit --name rabbit \ -e RABBITMQ_DEFAULT_USER=user \ -e RABBITMQ_DEFAULT_PASS=password \ -p 15672:15672 -p 5672:5672 rabbitmq:management
2. To use the framework, it is very simple, just refer to zerocs in the first line of your startup script Please create the corresponding directory and files before starting:
├─zerocs_files //Temporary file directory ├─zerocs_logs //log directory ├─zerocs_service //Directory of microservice codes │ │─t_service //A folder with the same name as the microservice name │ │ ├─t_service.py //Microservice startup file with the same name as the service name ├─zerocs_demo_master.py //Master node startup script └─zerocs_demo_slave.py //Slave node startup script └─zerocs_test.py //Test Script Master Node start zerocs_demo_master, Slave Node start zerocs_demo_slave
zerocs_demo_master.py
# Master startup file, please refer to zerocs first # Flask API is not mandatory and can be connected to other management systems from zerocs import Service import os from flask import Flask, request from flask_cors import CORS OS_PATH = os.path.dirname(os.path.abspath(__file__)) app = Flask(__name__) CORS(app, supports_credentials=True) class Main(Service): def get_service_list(self): """ Obtain Service List :return: """ return self._get_all_service() def create_task(self, queue, task_data): """ Create task :param queue: service name == task type :param task_data: task json, Task data must include task_id field :return: """ self.send_message(queue, task_data) def restart_service(self, node_ip, service_name): """ Restart the specified node service :param node_ip: :param service_name: :return: """ return self._restart_service(node_ip, service_name) def stop_service(self, node_ip, service_name): """ Stop the specified node service :param node_ip: :param service_name: :return: """ return self._stop_service(node_ip, service_name) def stop_task(self, service_name, task_id): """ Stop Task,Only tasks that are still queued can be stopped, and ongoing tasks cannot be stopped :param service_name: service name == task type == queue name :param task_id: task id :return: """ self._stop_task(service_name, task_id) return {"code": 0} def get_stop_task_list(self): """ Obtain all stopped tasks :return: """ return self._get_all_stop_task() def get_max_work_num(self, node_ip, service_name): """ Obtain the maximum concurrent number of service work for the specified node :param node_ip: :param service_name: :return: """ num = self._get_max_work(node_ip, service_name) return {"code": 0, "num": num} def update_max_work_num(self, node_ip, service_name, work_num): """ Update the maximum concurrent number of service work for the specified node :param node_ip: :param service_name: :param work_num: :return: """ return self._update_max_work(node_ip, service_name, work_num) @app.route("/get_service_list") def get_service_list(): data = _main.get_service_list() return {"code": 0, "data": data} @app.route("/create_task", methods=["POST"]) def create_task(): json_data = request.get_json() _main.create_task(json_data['queue'], json_data['task_data']) return {"code": 0, "msg": "send msg"} @app.route("/restart_service", methods=["POST"]) def restart_service(): json_data = request.get_json() return _main.restart_service(json_data['node_ip'], json_data['service_name']) @app.route("/stop_service", methods=["POST"]) def stop_service(): json_data = request.get_json() return _main.stop_service(json_data['node_ip'], json_data['service_name']) @app.route("/stop_task", methods=["POST"]) def stop_task(): json_data = request.get_json() return _main.stop_task(json_data['service_name'], json_data['task_id']) @app.route("/get_all_stop_task", methods=["GET"]) def get_all_stop_task(): return _main.get_stop_task_list() @app.route("/get_max_work_num", methods=["POST"]) def get_max_work_num(): json_data = request.get_json() return _main.get_max_work_num(json_data['node_ip'], json_data['service_name']) @app.route("/update_max_work_num", methods=["POST"]) def update_max_work_num(): json_data = request.get_json() return _main.update_max_work_num(json_data['node_ip'], json_data['service_name'], json_data['work_num']) if __name__ == '__main__': """ A test.db will be created in the current directory to store node information and task IDs. Task details will not be saved """ _main = Main( ZERO_PROJECT_PATH=OS_PATH, ZERO_FW_LOGS_PATH='zerocs_logs', ZERO_FW_FILE_PATH='zerocs_files', ZERO_FW_SERVICE_PATH='zerocs_service', RABBITMQ_PORT='5672', RABBITMQ_CONFIG='amqp://admin:123456@127.0.0.1', RABBITMQ_CONFIG_URL='http://127.0.0.1/get_config', ZERO_FW_DB_FILE="test.db", ZERO_FW_NODE_MASTER="True" ) app.run(host='0.0.0.0', port=5002)
zerocs_demo_slave.py
# Slave startup file, please refer to zerocs first from zerocs import Service import os OS_PATH = os.path.dirname(os.path.abspath(__file__)) class Main(Service): def run(self): self.logger('slave').error('start slave') if __name__ == '__main__': _main = Main( ZERO_PROJECT_PATH=OS_PATH, ZERO_FW_LOGS_PATH='zerocs_logs', ZERO_FW_FILE_PATH='zerocs_files', ZERO_FW_SERVICE_PATH='zerocs_service', RABBITMQ_PORT='5672', RABBITMQ_CONFIG='amqp://admin:123456@127.0.0.1', RABBITMQ_CONFIG_URL='http://127.0.0.1/get_config', ZERO_FW_DB_FILE="test.db", ZERO_FW_NODE_MASTER="False" )
t_service.py
import time from nameko.rpc import rpc class RpcFunction: """ Class Name Not modifiable, Define RPC functions """ service_name = 't_service' # service name @rpc def get_testing_message(self): return {"code": 0, "msg": "service deployment succeeded"} class WorkFunction: """ Class Name Not modifiable, Work Code """ def __init__(self, service, config, task_data): """ :param service: service name :param config: rabbitMq config :param task_data: Task data JSON format """ self.run(service, {"AMQP_URI": config}, task_data) @staticmethod def run(service, config, task_data): """ Business related logic, In zero ce, a service corresponds to a task queue and a task type. But this does not mean that multiple services need to be written, which can be subdivided in the work code to complete all types of tasks using one service, but it is not recommended to do so """ print(service, config, task_data, '--work') time.sleep(25) print(service, config, task_data, '--end')
zerocs_test.py
import json import os import requests from nameko.standalone.rpc import ClusterRpcProxy OS_PATH = os.path.dirname(os.path.abspath(__file__)) class Tests: def __init__(self): self.host = '127.0.0.1' self.config = {'AMQP_URI': f'amqp://admin:123456@{self.host}'} def get_service_list(self): res = requests.get(url=f"http://{self.host}:5002/get_service_list") print(res.text) def create_task(self, queue): headers = { "Content-Type": "application/json" } # Task information must have a task_id field, and in the first layer of JSON data = { "queue": queue, "task_data": {"task_id": "10001", "task_data": {}} } res = requests.post(url=f"http://{self.host}:5002/create_task", headers=headers, data=json.dumps(data)) print(res.text) def restart_service(self, node_ip, service_name): headers = { "Content-Type": "application/json" } data = { "node_ip": node_ip, "service_name": service_name } res = requests.post(url=f"http://{self.host}:5002/restart_service", headers=headers, data=json.dumps(data)) print(res.text) def stop_task(self, service_name, task_id): headers = { "Content-Type": "application/json" } data = { "service_name": service_name, "task_id": task_id } res = requests.post(url=f"http://{self.host}:5002/stop_task", headers=headers, data=json.dumps(data)) print(res.text) def get_all_stop_task(self): res = requests.get(url=f"http://{self.host}:5002/get_all_stop_task") print(res.text) def get_max_work_num(self, node_ip, service_name): headers = { "Content-Type": "application/json" } data = { "node_ip": node_ip, "service_name": service_name } res = requests.post(url=f"http://{self.host}:5002/get_max_work_num", headers=headers, data=json.dumps(data)) print(res.text) def update_max_work_num(self, node_ip, service_name, work_num): headers = { "Content-Type": "application/json" } data = { "node_ip": node_ip, "service_name": service_name, "work_num": work_num } res = requests.post(url=f"http://{self.host}:5002/update_max_work_num", headers=headers, data=json.dumps(data)) print(res.text) # Tests().restart_service('127.0.0.1', 't_master') # Tests().get_service_list() # Tests().create_task('t_master') # Tests().stop_task('t_master', '10001') # Tests().get_all_stop_task() # Tests().get_max_work_num('127.0.0.1', 't_master') # Tests().update_max_work_num('127.0.0.1', 't_master', '3') # To call the RPC function of a specified node or service, you can use the following method # First obtain the service_id of the corresponding service, Tests().get_service_list() service_list = { "code": 0, "data": [ { "max_work": "1", "node_ip": "127.0.0.1", "queue_pid": "--", "rpc_pid": "--", "service_id": "zerocs_service", "service_name": "zerocs_service", "status": "init" }, { "max_work": "1", "node_ip": "127.0.0.1", "queue_pid": "--", "rpc_pid": "--", "service_id": "99fe426c65a0bd7fa16d0b37737f8eb1", "service_name": "zerocs_config", "status": "init" }, { "max_work": "1", "node_ip": "127.0.0.1", "queue_pid": "333055", "rpc_pid": "333056", "service_id": "f0c8d374d93b01191ccd77753c1740c7", "service_name": "t_service", "status": "init" } ] } # Pass in the service ID of the corresponding node to obtain the RPC remote call object # Assuming you want to call the get_testing_message function of the 127.0.0.1 node t_service # Then the service_id should be f0c8d374d93b01191ccd77753c1740c7 service_id = 'f0c8d374d93b01191ccd77753c1740c7' rm_config = Tests().config with ClusterRpcProxy(rm_config) as rpc_obj: service = getattr(rpc_obj, service_id) print(service.get_testing_message())
A distributed task scheduling system was completed in just a few steps
Disclaimers
Before using the zerocs framework, please carefully read and fully understand this statement. You can choose not to use the zerocs framework, but once you use the zerocs framework, Your usage behavior is deemed to be recognition and acceptance of the entire content of this statement.
You promise to use the zerocs framework in a legal and reasonable manner, Do not use the zerocs board framework to engage in any illegal or malicious behavior that infringes on the legitimate interests of others, We will not apply the zerocs framework to any platform that violates Chinese laws and regulations.
Any accident, negligence, contract damage, defamation This project does not assume any legal responsibility for copyright or intellectual property infringement and any losses caused (including but not limited to direct, indirect, incidental or derivative losses).
The user clearly and agrees to all the contents listed in the terms of this statement, The potential risks and related consequences of using the zerocs framework will be entirely borne by the user, and this project will not bear any legal responsibility.
After reading this disclaimer, any unit or individual should obtain the MIT Open Source License Conduct legitimate publishing, dissemination, and use of the zerocs framework within the permitted scope, If the breach of this disclaimer clause or the violation of laws and regulations results in legal liability (including but not limited to civil compensation and criminal liability), the defaulter shall bear the responsibility on their own.
The author owns intellectual property rights (including but not limited to trademark rights, patents, Copyrights, trade secrets, etc.) of zerocs framework, and the above products are protected by relevant laws and regulations
No entity or individual shall apply for intellectual property rights related to the zerocs Framework itself without the written authorization of the Author.
If any part of this statement is deemed invalid or unenforceable, the remaining parts shall remain in full force and effect. An unenforceable partial declaration does not constitute a waiver of our right to enforce the declaration.
This project has the right to make unilateral changes to the terms and attachments of this statement at any time, and publish them through message push, webpage announcement, and other means. Once published, it will automatically take effect without the need for separate notice; If you continue to use this statement after the announcement of changes, it means that you have fully read, understood, and accepted the revised statement.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file zerocs-1.3.tar.gz
.
File metadata
- Download URL: zerocs-1.3.tar.gz
- Upload date:
- Size: 22.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.16
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | a78ca86283ce3e8887925e42042314f4625d852b546553ead20554eb34c9928a |
|
MD5 | fc6ecb8f8828221c29479f04ae8be13d |
|
BLAKE2b-256 | 765ab48a6c27d5defe2d9a97db48c5a6c42c644d7c10f3dbcb209c6c9691fa5f |
File details
Details for the file zerocs-1.3-py3-none-any.whl
.
File metadata
- Download URL: zerocs-1.3-py3-none-any.whl
- Upload date:
- Size: 27.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.16
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9e7711a57443e50d7b2b45e4cde4853dcdb295b067e0d15eeb7ecaf5001e34c0 |
|
MD5 | f5829c9d1f29baf0096166e07b120ace |
|
BLAKE2b-256 | 6d8af4b550b9f7d6d3c056f728e45084aa5e4ed2d193d93b9a5c3fdb4829e41e |