ipc-worker: Inter-Process Communication , muti Process Woker works by share memory or MQ.
Project description
ipc-worker: Inter-Process Communication , muti Process Woker works by share memory or MQ.
support share memory (py>=3.8 and linux) and mq process worker (py >=3.6)
# -*- coding: utf-8 -*-
# @Time : 2021/11/23 9:35
'''
demo share memrory
recommended system linux and python >= 3.8
recommended linux
python 3.8
Do not recommended run in windows , it will report an error as follow
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
'''
import multiprocessing
import os
import signal
from ipc_worker import logger
from ipc_worker.ipc_shm_loader import IPC_shm,SHM_process_worker
class My_worker(SHM_process_worker):
def __init__(self,config,*args,**kwargs):
super(My_worker,self).__init__(*args,**kwargs)
#config info , use by yourself
logger.info('Process id {}, group name {} ,shm name {}'.format(self._idx,self._group_name,self._shm_name))
logger.info(config)
self.config = config
#Process begin trigger this func
def run_begin(self):
logger.info('worker pid {}...'.format(os.getpid()))
self.handle = None
pass
# Process end trigger this func
def run_end(self):
if self.handle is not None:
pass
#any data put will trigger this func
def run_once(self,request_data):
#process request_data
if isinstance(request_data,dict):
request_data['b'] = 200
if self.handle is not None:
#do some thing
pass
return request_data
if __name__ == '__main__':
config = {
"anything" : "anything",
"aa": 100
}
evt_quit = multiprocessing.Manager().Event()
# group_name 为共享内存组名,需唯一
# manager is an agent and act as a load balancing
# worker is real doing your work
instance = IPC_shm(
CLS_worker=My_worker,
worker_args=(config,), # must be tuple
worker_num=10, # number of worker Process
manager_num=2, # number of agent Process
group_name='serving_shm', # share memory name
shm_size=1 * 1024 * 1024, # share memory size
queue_size=20, # recv queue size
is_log_time=False, # whether log compute time
daemon=False,
)
instance.start()
#demo produce and consume message , you can process by http
for i in range(10):
print('produce message')
data = {"a" : 100}
request_id = instance.put(data)
data = instance.get(request_id)
print('get process result',data)
def signal_handler(signum, frame):
evt_quit.set()
instance.terminate()
raise KeyboardInterrupt
signal.signal(signal.SIGINT, signal_handler)
try:
instance.join()
except Exception as e:
evt_quit.set()
instance.terminate()
del evt_quit
# -*- coding: utf-8 -*-
# @Time : 2021/11/29 15:06
# @Author : tk
import multiprocessing
import os
import signal
import torch
from ipc_worker import logger
from ipc_worker.ipc_zmq_loader import IPC_zmq, ZMQ_process_worker
class My_worker(ZMQ_process_worker):
def __init__(self, config, *args, **kwargs):
super(My_worker, self).__init__(*args, **kwargs)
# config info , use by yourself
logger.info('Process id {}, group name {} , identity {}'.format(self._idx, self._group_name, self._identity))
logger.info(config)
self.config = config
# Process begin trigger this func
def run_begin(self):
logger.info('worker pid {}...'.format(os.getpid()))
self.handle = None
pass
# Process end trigger this func
def run_end(self):
if self.handle is not None:
pass
# any data put will trigger this func
def run_once(self, request_data):
# process request_data
print(torch.cuda.device_count(), torch.cuda.current_device())
if isinstance(request_data, dict):
request_data['b'] = 200
if self.handle is not None:
# do some thing
pass
return request_data
if __name__ == '__main__':
# torch.multiprocessing.set_start_method('spawn', force=True)
'''
demo ZMQ depend zmq
pip install pyzmq
test pass >= python3.6
'''
tmp_dir = './tmp'
if not os.path.exists(tmp_dir):
os.mkdir(tmp_dir)
os.environ['ZEROMQ_SOCK_TMP_DIR'] = tmp_dir
config = {
"anything" : "anything",
"aa": 100
}
evt_quit = multiprocessing.Manager().Event()
# group_name 为共享内存组名,需唯一
# manager is an agent and act as a load balancing
# worker is real doing your work
instance = IPC_zmq(
CLS_worker=My_worker,
worker_args=(config,), # must be tuple
worker_num=10, # number of worker Process
group_name='serving_zmq', # share memory name
evt_quit=evt_quit,
queue_size=20, # recv queue size
is_log_time=True, # whether log compute time
daemon=False,
)
instance.start()
#demo produce and consume message , you can process by http
for i in range(10):
data = {"a" : 100}
request_id = instance.put(data)
data = instance.get(request_id)
print('get process result',request_id,data)
def signal_handler(signum, frame):
evt_quit.set()
instance.terminate()
raise KeyboardInterrupt
signal.signal(signal.SIGINT, signal_handler)
try:
instance.join()
except Exception as e:
evt_quit.set()
instance.terminate()
del evt_quit
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
No source distribution files available for this release.See tutorial on generating distribution archives.
Built Distribution
File details
Details for the file ipc_worker-0.1.7-py3-none-any.whl
.
File metadata
- Download URL: ipc_worker-0.1.7-py3-none-any.whl
- Upload date:
- Size: 19.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.5.0 importlib_metadata/4.8.2 pkginfo/1.7.1 requests/2.28.2 requests-toolbelt/0.9.1 tqdm/4.64.0 CPython/3.8.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 923f950de1be3b63fded3ea96d8af2e85a2eb41dd3259e27abdb7b9432ccedd6 |
|
MD5 | 8d6007f280eb06238354d261755d484e |
|
BLAKE2b-256 | 6ddd56bf499d33df269ae1d4be5d7fd163090f59d553ce02387ff933db7787aa |