Skip to main content

My library of frequently used tools.

Project description

utils - 我常用的工具类

1. mongo

  1. MongoClient类: 通过该类的get_client()获取连接;
  2. bulk_upsert方法: 适用于单节点部署的MongoDB数据库的高性能insertupdate操作;
  3. TimeObjectId类: 通过该类的generate()生成基于时间的MongoDB的ObjectId, 入参格式为YYYY-MM-DD或者YYYY-MM-DD HH:MM:SS;

例子:

import math
import os
from multiprocessing.pool import Pool

from mongo import MongoClient, bulk_upsert


class Dmo:
    @staticmethod
    def multiprocessing_insert():
        # 生成待处理的原始数据
        all_data = []
        for num in range(1, 10000):
            all_data.append({"_id": num, "name": "chenjp"})
        ori_data_count = len(all_data)

        # 将原始数据分成cpu核数-1块
        sub_list_len = int(math.ceil(len(all_data) / float(os.cpu_count())))
        chunks_data = [all_data[i:i + sub_list_len] for i in range(0, len(all_data), sub_list_len)]

        deal_data_pool = Pool(processes=os.cpu_count())
        multiprocessing_deal_data_results = []
        for sub_list in chunks_data:
            multiprocessing_deal_data_results.append(
                deal_data_pool.apply_async(Dmo.multiprocessing_data, args=(sub_list, "insert"))
            )
        deal_data_pool.close()
        deal_data_pool.join()

        # 合并各进程的处理结果、收集各进程错误信息
        data_count_list = []
        err_list = []
        for res in multiprocessing_deal_data_results:
            sub_process_result = res.get()
            if sub_process_result[1]:
                err_list.append(sub_process_result[1])
            data_count_list.extend([sub_process_result[0]])

        # 获取各进程处理的数量
        deal_count = 0
        for sub_data in data_count_list:
            deal_count += sub_data

        # 返回原始数据总量、处理的总量、err信息list
        return ori_data_count, deal_count, err_list

    @staticmethod
    def multiprocessing_data(data_list, opera_type):
        url = "mongodb://xxx:xxx@192.168.0.88:49102/?authSource=xxx&authMechanism=SCRAM-SHA-1"
        ca = "/Users/yingsf/.ssh/ca/mongo/gwola/ca.crt"
        client = "/Users/yingsf/.ssh/ca/mongo/gwola/gwola_client.pem"
        db_client = MongoClient(url, client_cert=client, ca_cert=ca).get_client()
        return bulk_upsert(db_client, "demo_bulk_test", data_list, opera_type)


if __name__ == "__main__":
    try:
        aa = Dmo.multiprocessing_insert()
        bb = 1
    except Exception as err:
        print(err)

2. list

  1. chunks_list方法: 按指定条件将list分片, 可以按片数或长度进行分片;
  2. compare_dict_in_list方法: 比较两个list中的dict是否相等(key相等);

3. dataframe

取消pandasdataframe的科学计数法显示, 实际使用时先import该函数, 然后执行pd.options.display.float_format=pdFloatFormat即可取消df的科学计数法显示.


4. pid

操作磁盘上的pid文件


5. job

例子:

import time

from job import JobUtils, BackgroundScheduler
from mongo import MongoClient

JOB_FOLDER = "/Users/yingsf/Mycode.localized/Github/utils/demo/jobs"
# 从顶级目录到job文件夹所在目录的"."格式字符串, 注意顶级目录取决于你运行时所在的位置, 所以这里的顶级目录是demo不是utils
JOB_PACKAGE_PREFIX = "demo.jobs"
ALL_JOBS = JobUtils.get_all_jobs_from_job_folder(JOB_FOLDER, JOB_PACKAGE_PREFIX)


def default_job():
    """ 添加指定文件夹下的所有job, 全部使用默认配置
    """
    scheduler = BackgroundScheduler().scheduler
    for job in ALL_JOBS:
        scheduler.add_job(job.method, **job.run_args)
    scheduler.start()
    while True:
        time.sleep(10)


def custom_parameters_job():
    """ 自定义scheduler参数和job参数
    """
    job_config = {
        "jobstores": "memory",
        "executors": "process",
        "timezone": "Asia/Harbin",
        "max_workers": 10,
        "coalesce": False,
        "max_instances": "20",
        "replace_existing": False,
        "misfire_grace_time": 60 * 2,
        "daemonic": False
    }
    scheduler = BackgroundScheduler(**job_config).scheduler
    for job in ALL_JOBS:
        # job2使用进程池执行, 覆盖默认线程池的参数
        if job.run_args["id"] == "job2_test":
            job.run_args["executor"] = "thread"
            job.run_args["misfire_grace_time"] = 60 * 10
            job.run_args["replace_existing"] = True
        scheduler.add_job(job.method, executor="thread", replace_existing=True, misfire_grace_time=60*10, **job.run_args)
    scheduler.start()
    while True:
        time.sleep(10)


def mongo_job():
    """ 使用MongoDB作为后端存储的job
    """
    url = "mongodb://xxx:xxx@192.168.0.88:49102/?authSource=xxx&authMechanism=SCRAM-SHA-1"
    ca = "/Users/yingsf/.ssh/ca/mongo/gwola/ca.crt"
    client = "/Users/yingsf/.ssh/ca/mongo/gwola/gwola_client.pem"
    db_client = MongoClient(url, client_cert=client, ca_cert=ca).get_client()
    scheduler = BackgroundScheduler(jobstores="mongo", database_name="warehouse", collection_name="job_demo_mongo_store", mongo_client=db_client).scheduler
    for job in ALL_JOBS:
        scheduler.add_job(job.method, executor="process", replace_existing=True, misfire_grace_time=60*10, **job.run_args)
    scheduler.start()
    while True:
        time.sleep(10)


if __name__ == "__main__":
    try:
        mongo_job()
    except Exception as err:
        print(err)

6. sqlites

支持SQLCipher版本4加密的sqlite数据库

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ysfutils-1.0.19.tar.gz (34.6 kB view details)

Uploaded Source

Built Distribution

ysfutils-1.0.19-py3-none-any.whl (34.2 kB view details)

Uploaded Python 3

File details

Details for the file ysfutils-1.0.19.tar.gz.

File metadata

  • Download URL: ysfutils-1.0.19.tar.gz
  • Upload date:
  • Size: 34.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.12.5

File hashes

Hashes for ysfutils-1.0.19.tar.gz
Algorithm Hash digest
SHA256 0fb4ed87801a55f18c7371b248026f1d0c814433640db0a43a24293f3a4cd7ce
MD5 3531c00eaaa342719ea36af1f92b19ce
BLAKE2b-256 2e4ecd6ea3bcd197c21608ee2e139fc31960c9c8b23ca9409015e530eeaa3e38

See more details on using hashes here.

File details

Details for the file ysfutils-1.0.19-py3-none-any.whl.

File metadata

  • Download URL: ysfutils-1.0.19-py3-none-any.whl
  • Upload date:
  • Size: 34.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.12.5

File hashes

Hashes for ysfutils-1.0.19-py3-none-any.whl
Algorithm Hash digest
SHA256 9c3e6734257b7f101c71eff08fb3a28c0db8a34e04f04ce5410be3d66f5c0c12
MD5 dfffee0553475efd3126bc5b83d350b0
BLAKE2b-256 c2637342a81d1196bbfe308f4a3979020c96edab2ce0d534f7a3d8d16013521d

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page