Skip to main content

zerocs

Project description

zerocs Description Document

zerocs: zero config service

  1. introduction

    1. zerocs is a distributed task framework with extremely simple configuration. Enable users to quickly build distributed task systems with minimal configuration and learning costs

    2. The framework has functions such as service management, work management, and task management, which can meet most usage scenarios

    3. It is divided into Master node and Slave node. The Master node is a management node and can only have one. set up ZERO_FW_NODE_MASTER=True/False ,Change node type

  2. start using

    1. Please install RabbitMQ before use, rabbitmq official website : https://www.rabbitmq.com, It is recommended to use Docker installation during testing:

    docker run -d --hostname my-rabbit --name rabbit \
    -e RABBITMQ_DEFAULT_USER=user \
    -e RABBITMQ_DEFAULT_PASS=password \
    -p 15672:15672 -p 5672:5672 rabbitmq:management

    2. To use the framework, it is very simple, just refer to zerocs in the first line of your startup script Please create the corresponding directory and files before starting:

      ├─zerocs_files //Temporary file directory
      ├─zerocs_logs //log directory
      ├─zerocs_service //Directory of microservice codes
      │  │─t_service //A folder with the same name as the microservice name
      │  │  ├─t_service.py //Microservice startup file with the same name as the service name
      ├─zerocs_demo_master.py  //Master node startup script
      └─zerocs_demo_slave.py //Slave node startup script
      └─zerocs_test.py //Test Script
    
    Master Node start zerocs_demo_master,
    Slave Node start zerocs_demo_slave
    1. zerocs_demo_master.py

      # Master startup file, please refer to zerocs first
      # Flask API is not mandatory and can be connected to other management systems
      
      from zerocs import Service
      import os
      from flask import Flask, request
      from flask_cors import CORS
      
      OS_PATH = os.path.dirname(os.path.abspath(__file__))
      app = Flask(__name__)
      CORS(app, supports_credentials=True)
      
      
      class Main(Service):
      
          def get_service_list(self):
              """
              Obtain Service List
              :return:
              """
              return self._get_all_service()
      
          def create_task(self, queue, task_data):
              """
              Create task
              :param queue: service name == task type
              :param task_data: task json, Task data must include task_id field
              :return:
              """
              self.send_message(queue, task_data)
      
          def restart_service(self, node_ip, service_name):
              """
              Restart the specified node service
              :param node_ip:
              :param service_name:
              :return:
              """
              return self._restart_service(node_ip, service_name)
      
          def stop_service(self, node_ip, service_name):
              """
              Stop the specified node service
              :param node_ip:
              :param service_name:
              :return:
              """
              return self._stop_service(node_ip, service_name)
      
          def stop_task(self, service_name, task_id):
              """
              Stop Task,Only tasks that are still queued can be stopped, and ongoing tasks cannot be stopped
              :param service_name: service name == task type == queue name
              :param task_id: task id
              :return:
              """
              self._stop_task(service_name, task_id)
              return {"code": 0}
      
          def get_stop_task_list(self):
              """
              Obtain all stopped tasks
              :return:
              """
              return self._get_all_stop_task()
      
          def get_max_work_num(self, node_ip, service_name):
              """
              Obtain the maximum concurrent number of service work for the specified node
              :param node_ip:
              :param service_name:
              :return:
              """
              num = self._get_max_work(node_ip, service_name)
              return {"code": 0, "num": num}
      
          def update_max_work_num(self, node_ip, service_name, work_num):
              """
              Update the maximum concurrent number of service work for the specified node
              :param node_ip:
              :param service_name:
              :param work_num:
              :return:
              """
              return self._update_max_work(node_ip, service_name, work_num)
      
      
      @app.route("/get_service_list")
      def get_service_list():
          data = _main.get_service_list()
          return {"code": 0, "data": data}
      
      
      @app.route("/create_task", methods=["POST"])
      def create_task():
          json_data = request.get_json()
          _main.create_task(json_data['queue'], json_data['task_data'])
          return {"code": 0, "msg": "send msg"}
      
      
      @app.route("/restart_service", methods=["POST"])
      def restart_service():
          json_data = request.get_json()
          return _main.restart_service(json_data['node_ip'], json_data['service_name'])
      
      
      @app.route("/stop_service", methods=["POST"])
      def stop_service():
          json_data = request.get_json()
          return _main.stop_service(json_data['node_ip'], json_data['service_name'])
      
      
      @app.route("/stop_task", methods=["POST"])
      def stop_task():
          json_data = request.get_json()
          return _main.stop_task(json_data['service_name'], json_data['task_id'])
      
      
      @app.route("/get_all_stop_task", methods=["GET"])
      def get_all_stop_task():
          return _main.get_stop_task_list()
      
      
      @app.route("/get_max_work_num", methods=["POST"])
      def get_max_work_num():
          json_data = request.get_json()
          return _main.get_max_work_num(json_data['node_ip'], json_data['service_name'])
      
      
      @app.route("/update_max_work_num", methods=["POST"])
      def update_max_work_num():
          json_data = request.get_json()
          return _main.update_max_work_num(json_data['node_ip'],
                                           json_data['service_name'], json_data['work_num'])
      
      
      if __name__ == '__main__':
          """
          A test.db will be created in the current directory
          to store node information and task IDs. Task details will not be saved
          """
          _main = Main(
              ZERO_PROJECT_PATH=OS_PATH,
              ZERO_FW_LOGS_PATH='zerocs_logs',
              ZERO_FW_FILE_PATH='zerocs_files',
              ZERO_FW_SERVICE_PATH='zerocs_service',
              RABBITMQ_PORT='5672',
              RABBITMQ_CONFIG='amqp://admin:123456@127.0.0.1',
              RABBITMQ_CONFIG_URL='http://127.0.0.1/get_config',
              ZERO_FW_DB_FILE="test.db",
              ZERO_FW_NODE_MASTER="True"
          )
          app.run(host='0.0.0.0', port=5002)
    2. zerocs_demo_slave.py

      # Slave startup file, please refer to zerocs first
      
      from zerocs import Service
      import os
      
      OS_PATH = os.path.dirname(os.path.abspath(__file__))
      
      
      class Main(Service):
          def run(self):
              self.logger('slave').error('start slave')
      
      
      if __name__ == '__main__':
          _main = Main(
              ZERO_PROJECT_PATH=OS_PATH,
              ZERO_FW_LOGS_PATH='zerocs_logs',
              ZERO_FW_FILE_PATH='zerocs_files',
              ZERO_FW_SERVICE_PATH='zerocs_service',
              RABBITMQ_PORT='5672',
              RABBITMQ_CONFIG='amqp://admin:123456@127.0.0.1',
              RABBITMQ_CONFIG_URL='http://127.0.0.1/get_config',
              ZERO_FW_DB_FILE="test.db",
              ZERO_FW_NODE_MASTER="False"
          )
    3. t_service.py

      import time
      from nameko.rpc import rpc
      
      
      class RpcFunction:
          """
          Class Name Not modifiable, Define RPC functions
          """
          service_name = 't_service'  # service name
      
          @rpc
          def get_testing_message(self):
              return {"code": 0, "msg": "service deployment succeeded"}
      
      
      class WorkFunction:
          """
          Class Name Not modifiable, Work Code
          """
      
          def __init__(self, service, config, task_data):
              """
              :param service: service name
              :param config: rabbitMq config
              :param task_data: Task data JSON format
              """
              self.run(service, {"AMQP_URI": config}, task_data)
      
          @staticmethod
          def run(service, config, task_data):
              """
              Business related logic,
              In zero ce, a service corresponds to a task queue and a task type.
              But this does not mean that multiple services need to be written,
              which can be subdivided in the work code to complete all types of
              tasks using one service, but it is not recommended to do so
              """
              print(service, config, task_data, '--work')
              time.sleep(25)
              print(service, config, task_data, '--end')
    4. zerocs_test.py

      import json
      import os
      import requests
      from nameko.standalone.rpc import ClusterRpcProxy
      
      OS_PATH = os.path.dirname(os.path.abspath(__file__))
      
      
      class Tests:
        def __init__(self):
          self.host = '127.0.0.1'
          self.config = {'AMQP_URI': f'amqp://admin:123456@{self.host}'}
      
        def get_service_list(self):
          res = requests.get(url=f"http://{self.host}:5002/get_service_list")
          print(res.text)
      
        def create_task(self, queue):
          headers = {
            "Content-Type": "application/json"
          }
          # Task information must have a task_id field, and in the first layer of JSON
          data = {
            "queue": queue,
            "task_data": {"task_id": "10001", "task_data": {}}
          }
          res = requests.post(url=f"http://{self.host}:5002/create_task", headers=headers, data=json.dumps(data))
          print(res.text)
      
        def restart_service(self, node_ip, service_name):
          headers = {
            "Content-Type": "application/json"
          }
          data = {
            "node_ip": node_ip,
            "service_name": service_name
          }
          res = requests.post(url=f"http://{self.host}:5002/restart_service", headers=headers, data=json.dumps(data))
          print(res.text)
      
        def stop_task(self, service_name, task_id):
          headers = {
            "Content-Type": "application/json"
          }
          data = {
            "service_name": service_name,
            "task_id": task_id
          }
          res = requests.post(url=f"http://{self.host}:5002/stop_task", headers=headers, data=json.dumps(data))
          print(res.text)
      
        def get_all_stop_task(self):
          res = requests.get(url=f"http://{self.host}:5002/get_all_stop_task")
          print(res.text)
      
        def get_max_work_num(self, node_ip, service_name):
          headers = {
            "Content-Type": "application/json"
          }
          data = {
            "node_ip": node_ip,
            "service_name": service_name
          }
          res = requests.post(url=f"http://{self.host}:5002/get_max_work_num",
                              headers=headers, data=json.dumps(data))
          print(res.text)
      
        def update_max_work_num(self, node_ip, service_name, work_num):
          headers = {
            "Content-Type": "application/json"
          }
          data = {
            "node_ip": node_ip,
            "service_name": service_name,
            "work_num": work_num
          }
          res = requests.post(url=f"http://{self.host}:5002/update_max_work_num",
                              headers=headers, data=json.dumps(data))
          print(res.text)
      
      
      # Tests().restart_service('127.0.0.1', 't_master')
      # Tests().get_service_list()
      # Tests().create_task('t_master')
      # Tests().stop_task('t_master', '10001')
      # Tests().get_all_stop_task()
      # Tests().get_max_work_num('127.0.0.1', 't_master')
      # Tests().update_max_work_num('127.0.0.1', 't_master', '3')
      
      # To call the RPC function of a specified node or service, you can use the following method
      # First obtain the service_id of the corresponding service, Tests().get_service_list()
      
      service_list = {
        "code": 0,
        "data": [
          {
            "max_work": "1",
            "node_ip": "127.0.0.1",
            "queue_pid": "--",
            "rpc_pid": "--",
            "service_id": "zerocs_service",
            "service_name": "zerocs_service",
            "status": "init"
          },
          {
            "max_work": "1",
            "node_ip": "127.0.0.1",
            "queue_pid": "--",
            "rpc_pid": "--",
            "service_id": "99fe426c65a0bd7fa16d0b37737f8eb1",
            "service_name": "zerocs_config",
            "status": "init"
          },
          {
            "max_work": "1",
            "node_ip": "127.0.0.1",
            "queue_pid": "333055",
            "rpc_pid": "333056",
            "service_id": "f0c8d374d93b01191ccd77753c1740c7",
            "service_name": "t_service",
            "status": "init"
          }
        ]
      }
      
      # Pass in the service ID of the corresponding node to obtain the RPC remote call object
      # Assuming you want to call the get_testing_message function of the 127.0.0.1 node t_service
      # Then the service_id should be f0c8d374d93b01191ccd77753c1740c7
      service_id = 'f0c8d374d93b01191ccd77753c1740c7'
      
      rm_config = Tests().config
      with ClusterRpcProxy(rm_config) as rpc_obj:
              service = getattr(rpc_obj, service_id)
              print(service.get_testing_message())

A distributed task scheduling system was completed in just a few steps

Disclaimers

  • Before using the zerocs framework, please carefully read and fully understand this statement. You can choose not to use the zerocs framework, but once you use the zerocs framework, Your usage behavior is deemed to be recognition and acceptance of the entire content of this statement.

  • You promise to use the zerocs framework in a legal and reasonable manner, Do not use the zerocs board framework to engage in any illegal or malicious behavior that infringes on the legitimate interests of others, We will not apply the zerocs framework to any platform that violates Chinese laws and regulations.

  • Any accident, negligence, contract damage, defamation This project does not assume any legal responsibility for copyright or intellectual property infringement and any losses caused (including but not limited to direct, indirect, incidental or derivative losses).

  • The user clearly and agrees to all the contents listed in the terms of this statement, The potential risks and related consequences of using the zerocs framework will be entirely borne by the user, and this project will not bear any legal responsibility.

  • After reading this disclaimer, any unit or individual should obtain the MIT Open Source License Conduct legitimate publishing, dissemination, and use of the zerocs framework within the permitted scope, If the breach of this disclaimer clause or the violation of laws and regulations results in legal liability (including but not limited to civil compensation and criminal liability), the defaulter shall bear the responsibility on their own.

  • The author owns intellectual property rights (including but not limited to trademark rights, patents, Copyrights, trade secrets, etc.) of zerocs framework, and the above products are protected by relevant laws and regulations

  • No entity or individual shall apply for intellectual property rights related to the zerocs Framework itself without the written authorization of the Author.

  • If any part of this statement is deemed invalid or unenforceable, the remaining parts shall remain in full force and effect. An unenforceable partial declaration does not constitute a waiver of our right to enforce the declaration.

  • This project has the right to make unilateral changes to the terms and attachments of this statement at any time, and publish them through message push, webpage announcement, and other means. Once published, it will automatically take effect without the need for separate notice; If you continue to use this statement after the announcement of changes, it means that you have fully read, understood, and accepted the revised statement.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

zerocs-1.3.tar.gz (22.4 kB view details)

Uploaded Source

Built Distribution

zerocs-1.3-py3-none-any.whl (27.3 kB view details)

Uploaded Python 3

File details

Details for the file zerocs-1.3.tar.gz.

File metadata

  • Download URL: zerocs-1.3.tar.gz
  • Upload date:
  • Size: 22.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.16

File hashes

Hashes for zerocs-1.3.tar.gz
Algorithm Hash digest
SHA256 a78ca86283ce3e8887925e42042314f4625d852b546553ead20554eb34c9928a
MD5 fc6ecb8f8828221c29479f04ae8be13d
BLAKE2b-256 765ab48a6c27d5defe2d9a97db48c5a6c42c644d7c10f3dbcb209c6c9691fa5f

See more details on using hashes here.

File details

Details for the file zerocs-1.3-py3-none-any.whl.

File metadata

  • Download URL: zerocs-1.3-py3-none-any.whl
  • Upload date:
  • Size: 27.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.16

File hashes

Hashes for zerocs-1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 9e7711a57443e50d7b2b45e4cde4853dcdb295b067e0d15eeb7ecaf5001e34c0
MD5 f5829c9d1f29baf0096166e07b120ace
BLAKE2b-256 6d8af4b550b9f7d6d3c056f728e45084aa5e4ed2d193d93b9a5c3fdb4829e41e

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page