Parallelling Data Processor
Project description
介绍
我们提供了一个可快速处理大量数据的多进程数据处理模块
此模块可以处理list、numpy、pandas等数据,并且可以以tuple的形式输入,以tuple的形式输出
中间的处理过程可以让用户来决定,只要传入一个自定义函数即可
可设置进程数
样例
引入:
from parallel_processor import process_data
# 注意在向进程池传入处理函数时,不能使用lambda的方式,否则会报错
from functools import partial
样例1:分词
def seg(x):
result = []
for i in tqdm_notebook(x):
result.append(list(jieba.cut(i)))
return result
data = process_data(data, seg, num_workers=16)
样例2:text2ids
def convert_example(x, f_token, max_seq_len, return_raw_data=False, mask_rate=0, x1=None, avg_split=False):
input_ids_list = []
input_mask_list = []
segment_ids_list = []
input_ids_gt_list = []
contents = []
if x1 is not None:
if "JPY_PARENT_PID" in os.environ:
bar = tqdm_notebook(zip(x, x1))
else:
bar = tqdm(zip(x, x1))
else:
if "JPY_PARENT_PID" in os.environ:
bar = tqdm_notebook(x)
else:
bar = tqdm(x)
for line in bar:
if x1 is not None:
line, line1 = line[0], line[1]
else:
line1 = ''
if mask_rate <= 0:
input_ids, input_mask, segment_ids = convert_single_example(max_seq_len, f_token, line, line1,
avg_split=avg_split)
else:
input_ids, input_mask, segment_ids, input_ids_gt = convert_single_example_with_mlm(
max_seq_len, f_token, line, line1, avg_split=avg_split)
input_ids = np.array(input_ids)
input_mask = np.array(input_mask)
segment_ids = np.array(segment_ids)
input_ids_list.append(input_ids)
input_mask_list.append(input_mask)
segment_ids_list.append(segment_ids)
if mask_rate > 0:
input_ids_gt = np.array(input_ids_gt)
input_ids_gt_list.append(input_ids_gt)
if return_raw_data:
contents.append(list(line.replace('\n', '')))
if mask_rate > 0:
if return_raw_data:
return np.array(input_ids_list), np.array(input_mask_list), np.array(segment_ids_list), \
np.array(input_ids_gt_list), np.array(contents)
return np.array(input_ids_list), np.array(input_mask_list), np.array(segment_ids_list), \
np.array(input_ids_gt_list)
else:
if return_raw_data:
return np.array(input_ids_list), np.array(input_mask_list), np.array(segment_ids_list), np.array(contents)
return np.array(input_ids_list), np.array(input_mask_list), np.array(segment_ids_list)
def _process(text, **kwargs):
f_token = kwargs.get('f_token')
max_seq_len = kwargs.get('max_seq_len', 128)
return_raw_data = kwargs.get('return_raw_data', False)
mask_rate = kwargs.get('mask_rate', 0)
avg_split = kwargs.get('avg_split', False)
if len(text.shape) > 1:
text_a, text_b = text[:, 0], text[:, 1]
return convert_example(text_a, f_token, max_seq_len, return_raw_data, mask_rate, text_b, avg_split=avg_split)
else:
return convert_example(text, f_token, max_seq_len, return_raw_data, mask_rate, avg_split=avg_split)
vocab_file = os.path.join(bert_model_dir, "vocab.txt")
f_token = FullTokenizer(vocab_file)
text = data[text_cols].values
func_kwargs = {'f_token': f_token, 'max_seq_len': max_seq_len, 'return_raw_data': return_raw_data,
'mask_rate': mask_rate, 'avg_split': avg_split}
result = process_data(text, _process, num_workers=16, is_tuple_data=False, func_kwargs=func_kwargs)
input_ids = result[0]
input_mask = result[1]
segment_ids = result[2]
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file parallel_processor-0.1.7.tar.gz
.
File metadata
- Download URL: parallel_processor-0.1.7.tar.gz
- Upload date:
- Size: 4.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.3
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1808ca28d0e4a324d3933a1006ead3fcdfa380b5c58410c66108bc8e3dc87f4e |
|
MD5 | b10d70f28f4c8d9f4b01c84becc26bbb |
|
BLAKE2b-256 | 26c7573883ede92295c8481c979a4e31d5977a7ccf0dec993de6e4c47161f882 |
File details
Details for the file parallel_processor-0.1.7-py3-none-any.whl
.
File metadata
- Download URL: parallel_processor-0.1.7-py3-none-any.whl
- Upload date:
- Size: 4.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.3
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4eeaefa1a2655bade6f7ce951876fd452b8ff681b70c1cb7bb4d80b4c10972b9 |
|
MD5 | 93e70aada8b8473b3c32979da1e5944c |
|
BLAKE2b-256 | 6e8bde18ca0e0380bb06c646ea12b8b28c805889ce15610fc08e99c841420629 |