Skip to main content

Its a Load Balancer for multiprocessing gRPC Servers.

Project description

Multiprocessing_gRPC_Load_Balancer

Load Balancer for multiprocessing grpc server in Linux.

[!IMPORTANT] The server doesn't work in Windows, this is because of a property that Linux have and Windows no. The socket.REUSE_PORT

The search for the server works in both OS.

Installation

pip install Multiprocessing_gRPC_Load_Balancer

How to use

This is a short code to show how this library works.

[!IMPORTANT] If you want to share data, instances or classes between your servers, you have to handle your self. The gRPC class server is created on each process, but you can give arguments already initialized whit multiprocessing capabilities.

Server

import grpc, time
from typing import Iterable

from Multiprocessing_gRPC_Load_Balancer import Multiprocessing_gRPC_Load_Balancer_Server

from proto_test.Test_pb2_grpc import Test_StreamerServicer, add_Test_StreamerServicer_to_server
from proto_test.Test_pb2 import send_msg, response_msg

class Test_Server(Test_StreamerServicer):
    def __init__(self):
        pass
    
    def One_to_One(self, request: send_msg, context: grpc.ServicerContext) -> response_msg:
        print(request)
        time.sleep(1)
        return response_msg(success = True, msg = str(len(request.send)))
    
    def Many_to_One(self, request_iterator: Iterable[send_msg], context: grpc.ServicerContext) -> response_msg:
        ret = []
        for request in request_iterator:
            print(request)
            time.sleep(1)
            ret.append(request.send)
        ret.reverse()
        return response_msg(success = True, msg = ''.join(ret))
    
    def One_to_Many(self, request: send_msg, context: grpc.ServicerContext) -> Iterable[response_msg]:
        print(request)
        for data in request.send:
            time.sleep(1)
            yield response_msg(success = True, msg = str(ord(data)))
    
    def Many_to_Many(self, request_iterator: Iterable[send_msg], context: grpc.ServicerContext) -> Iterable[response_msg]:
        for request in request_iterator:
            print(request)
            time.sleep(1)
            yield response_msg(success = True, msg = str(len(request.send)))

if __name__ == '__main__':
    # this is the linux port where it would be accessible, select one that was available
    linux_port: int = 'a free port that you want to attach your server'
    
    # this is the number of process that you want to spawn
    num_of_process: int = 3
    
    # this is the number of threads each grpc process will have
    num_of_threads: int = 10
    
    # this is the weight this server will have over the other ones, more weight indicates that will be preferable to select
    num_of_weight: int = 1
    
    server = Multiprocessing_gRPC_Load_Balancer_Server(linux_port, num_of_process, num_of_threads, num_of_weight)
    
    # this is the class where that will handle the service methods, the class should be not initiated
    grpc_service_cls = Test_Server
    
    # this is the function to add the class to the server
    add_service_to_server = add_Test_StreamerServicer_to_server
    
    # this is if you want to block the code until you cancel it, or to continue running more code while the server is up, is default to True
    ## Important if you put the block arg to False you need to handle the infinite loop until you want to close it and run server.close()
    ## If you don't do this, the subprocess will continue consuming resources from your computer or server.
    block_the_code: bool = True
    
    # *args: List[Any] # is a list of arguments that you want to pass to the class onces is running on each child process
    args_for_class = []
    
    # **kwargs: Dict[str, Any] # is a dict of key value arguments that you want to pass to the class onces is running on each child process
    kwargs_for_the_class = {}
    
    server.start(grpc_service_cls, add_service_to_server, block_the_code, *args_for_class, **kwargs_for_the_class)
    
    ### This is just a way to block the code, you can use any way you want, just remember to close the server.
    if not block_the_code:
        from time import sleep
        try:
            while True:
                sleep(86400) # 86400 seconds == 1 day
        except:
            server.close() # Close the server, this safetly stop and join every thread and subprocess created

Client

from typing import List, Union

from proto_test.Test_pb2 import send_msg
from proto_test.Test_pb2_grpc import Test_StreamerStub

from Multiprocessing_gRPC_Load_Balancer import search_servers

class Test_Client:
    def __init__(self, servers: List[str]) -> None:
        self.servers = servers
    
    def one_to_one(self, data: str) -> List[Union[str, int]]:
        with search_servers(self.servers) as channel:
            test_stub = Test_StreamerStub(channel)
            response = test_stub.One_to_One(send_msg(send = data))
            ret = [response.success, int(response.msg)]
        return ret
    
    def many_to_one(self, data: List[str]) -> List[Union[str, int]]:
        with search_servers(self.servers) as channel:
            test_stub = Test_StreamerStub(channel)
            response = test_stub.Many_to_One(iter([send_msg(send = x) for x in data]))
            ret = [response.success, response.msg]
        return ret
    
    def one_to_many(self, data: str) -> List[List[Union[str, int]]]:
        ret = []
        with search_servers(self.servers) as channel:
            test_stub = Test_StreamerStub(channel)
            for response in test_stub.One_to_Many(send_msg(send = data)):
                ret.append([response.success, int(response.msg)])
        return ret
    
    def many_to_many(self, data: List[str]) -> List[List[Union[str, int]]]:
        ret = []
        with search_servers(self.servers) as channel:
            test_stub = Test_StreamerStub(channel)
            for response in test_stub.Many_to_Many(iter([send_msg(send = x) for x in data])):
                ret.append([response.success, int(response.msg)])
        return ret

# just need to give the class a list of host + port of the linux server
# the code will search automatically what server is with less demand and more weight
client = Test_Client(['linux_ip1:linux_port1', 'linux_ip2:linux_port2', 'linux_ip3:linux_port3'])

# one data send, one data receive
client.one_to_one('Test')

# multiple data send, one data receive
client.many_to_one([x for x in 'Test'])

# multiple data send, one data receive
client.one_to_many('Test')

# return a list of each value if complete and the lenght of the text
client.many_to_many(['x'*x for x in range(10)])

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

Built Distribution

File details

Details for the file multiprocessing_grpc_load_balancer-0.0.2.tar.gz.

File metadata

File hashes

Hashes for multiprocessing_grpc_load_balancer-0.0.2.tar.gz
Algorithm Hash digest
SHA256 e93cf5a112be54988b7e2d8558abf55b9a9fb891b731d7df3600a2df885eb3f7
MD5 3672767b5108e169781fe9b9b39240d8
BLAKE2b-256 0b26150cb936d15ec6a41e6331d44f9936494ccbc1f1b25e016221831e353b68

See more details on using hashes here.

File details

Details for the file Multiprocessing_gRPC_Load_Balancer-0.0.2-py3-none-any.whl.

File metadata

File hashes

Hashes for Multiprocessing_gRPC_Load_Balancer-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 ff5a7a7f71c81335a549afd70a2370580df52642790c2846bf58318e55ca362d
MD5 f651a37d1da8497e59a300309e711129
BLAKE2b-256 bfa8c8edbf1dbc02df61d60b00fdd64689b7050dca02de7c3e0c57e0a9b5b317

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page