My library of frequently used tools.
Project description
utils - 我常用的工具类
1. mongo
MongoClient
类: 通过该类的get_client()
获取连接;bulk_upsert
方法: 适用于单节点部署的MongoDB数据库的高性能insert
或update
操作;TimeObjectId
类: 通过该类的generate()
生成基于时间的MongoDB的ObjectId
, 入参格式为YYYY-MM-DD
或者YYYY-MM-DD HH:MM:SS
;
例子:
import math
import os
from multiprocessing.pool import Pool
from mongo import MongoClient, bulk_upsert
class Dmo:
@staticmethod
def multiprocessing_insert():
# 生成待处理的原始数据
all_data = []
for num in range(1, 10000):
all_data.append({"_id": num, "name": "chenjp"})
ori_data_count = len(all_data)
# 将原始数据分成cpu核数-1块
sub_list_len = int(math.ceil(len(all_data) / float(os.cpu_count())))
chunks_data = [all_data[i:i + sub_list_len] for i in range(0, len(all_data), sub_list_len)]
deal_data_pool = Pool(processes=os.cpu_count())
multiprocessing_deal_data_results = []
for sub_list in chunks_data:
multiprocessing_deal_data_results.append(
deal_data_pool.apply_async(Dmo.multiprocessing_data, args=(sub_list, "insert"))
)
deal_data_pool.close()
deal_data_pool.join()
# 合并各进程的处理结果、收集各进程错误信息
data_count_list = []
err_list = []
for res in multiprocessing_deal_data_results:
sub_process_result = res.get()
if sub_process_result[1]:
err_list.append(sub_process_result[1])
data_count_list.extend([sub_process_result[0]])
# 获取各进程处理的数量
deal_count = 0
for sub_data in data_count_list:
deal_count += sub_data
# 返回原始数据总量、处理的总量、err信息list
return ori_data_count, deal_count, err_list
@staticmethod
def multiprocessing_data(data_list, opera_type):
url = "mongodb://xxx:xxx@192.168.0.88:49102/?authSource=xxx&authMechanism=SCRAM-SHA-1"
ca = "/Users/yingsf/.ssh/ca/mongo/gwola/ca.crt"
client = "/Users/yingsf/.ssh/ca/mongo/gwola/gwola_client.pem"
db_client = MongoClient(url, client_cert=client, ca_cert=ca).get_client()
return bulk_upsert(db_client, "demo_bulk_test", data_list, opera_type)
if __name__ == "__main__":
try:
aa = Dmo.multiprocessing_insert()
bb = 1
except Exception as err:
print(err)
2. list
chunks_list
方法: 按指定条件将list分片, 可以按片数或长度进行分片;compare_dict_in_list
方法: 比较两个list中的dict是否相等(key相等);
3. dataframe
取消pandas
的dataframe
的科学计数法显示, 实际使用时先import该函数, 然后执行pd.options.display.float_format=pdFloatFormat
即可取消df的科学计数法显示.
4. pid
操作磁盘上的pid文件
5. job
例子:
import time
from job import JobUtils, BackgroundScheduler
from mongo import MongoClient
JOB_FOLDER = "/Users/yingsf/Mycode.localized/Github/utils/demo/jobs"
# 从顶级目录到job文件夹所在目录的"."格式字符串, 注意顶级目录取决于你运行时所在的位置, 所以这里的顶级目录是demo不是utils
JOB_PACKAGE_PREFIX = "demo.jobs"
ALL_JOBS = JobUtils.get_all_jobs_from_job_folder(JOB_FOLDER, JOB_PACKAGE_PREFIX)
def default_job():
""" 添加指定文件夹下的所有job, 全部使用默认配置
"""
scheduler = BackgroundScheduler().scheduler
for job in ALL_JOBS:
scheduler.add_job(job.method, **job.run_args)
scheduler.start()
while True:
time.sleep(10)
def custom_parameters_job():
""" 自定义scheduler参数和job参数
"""
job_config = {
"jobstores": "memory",
"executors": "process",
"timezone": "Asia/Harbin",
"max_workers": 10,
"coalesce": False,
"max_instances": "20",
"replace_existing": False,
"misfire_grace_time": 60 * 2,
"daemonic": False
}
scheduler = BackgroundScheduler(**job_config).scheduler
for job in ALL_JOBS:
# job2使用进程池执行, 覆盖默认线程池的参数
if job.run_args["id"] == "job2_test":
job.run_args["executor"] = "thread"
job.run_args["misfire_grace_time"] = 60 * 10
job.run_args["replace_existing"] = True
scheduler.add_job(job.method, executor="thread", replace_existing=True, misfire_grace_time=60*10, **job.run_args)
scheduler.start()
while True:
time.sleep(10)
def mongo_job():
""" 使用MongoDB作为后端存储的job
"""
url = "mongodb://xxx:xxx@192.168.0.88:49102/?authSource=xxx&authMechanism=SCRAM-SHA-1"
ca = "/Users/yingsf/.ssh/ca/mongo/gwola/ca.crt"
client = "/Users/yingsf/.ssh/ca/mongo/gwola/gwola_client.pem"
db_client = MongoClient(url, client_cert=client, ca_cert=ca).get_client()
scheduler = BackgroundScheduler(jobstores="mongo", database_name="warehouse", collection_name="job_demo_mongo_store", mongo_client=db_client).scheduler
for job in ALL_JOBS:
scheduler.add_job(job.method, executor="process", replace_existing=True, misfire_grace_time=60*10, **job.run_args)
scheduler.start()
while True:
time.sleep(10)
if __name__ == "__main__":
try:
mongo_job()
except Exception as err:
print(err)
6. sqlites
支持SQLCipher版本4
加密的sqlite数据库
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
ysfutils-1.0.19.tar.gz
(34.6 kB
view details)
Built Distribution
ysfutils-1.0.19-py3-none-any.whl
(34.2 kB
view details)
File details
Details for the file ysfutils-1.0.19.tar.gz
.
File metadata
- Download URL: ysfutils-1.0.19.tar.gz
- Upload date:
- Size: 34.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.0 CPython/3.12.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0fb4ed87801a55f18c7371b248026f1d0c814433640db0a43a24293f3a4cd7ce |
|
MD5 | 3531c00eaaa342719ea36af1f92b19ce |
|
BLAKE2b-256 | 2e4ecd6ea3bcd197c21608ee2e139fc31960c9c8b23ca9409015e530eeaa3e38 |
File details
Details for the file ysfutils-1.0.19-py3-none-any.whl
.
File metadata
- Download URL: ysfutils-1.0.19-py3-none-any.whl
- Upload date:
- Size: 34.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.0 CPython/3.12.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9c3e6734257b7f101c71eff08fb3a28c0db8a34e04f04ce5410be3d66f5c0c12 |
|
MD5 | dfffee0553475efd3126bc5b83d350b0 |
|
BLAKE2b-256 | c2637342a81d1196bbfe308f4a3979020c96edab2ce0d534f7a3d8d16013521d |