python常用底层库封装
Project description
halring
pip源配置方法confluence页面
http://eqops.tc.com/confluence/pages/viewpage.action?pageId=29300773
安装:
pip install halring
升级(目前是2.0.x版本叠加):
pip install --upgrade halring
查看虚拟环境包的可升级版本:
pip list --outdated --trusted-host artifactory.test.com
查看当前库版本:
pip freeze | grep halring
Jira 帮助库
from halring.jira_lib.halring_jira import JiraUtil
jira_client = JiraUtil("username", "password", "http://xx.xxx.xx.xx:xxxx")
jira_client.login() # 登陆成功返回True
# 获取某个issue信息
jira_model = jira_client.issue("key or id")
# 获取所有project
jira_all_project = jira_client.getAllProjects()
# 根据project_id获取某一个project
jira_project_1 = jira_client.getProjectById("project_id")
# 根据project_id获取某一个project
jira_project_2 = jira_client.getProjectByKey("project_key")
# 判断project是否存在
jira_project_3 = jira_client.projectIsExist("project_key")
# 获取指定版本项目的所有版本信息
jira_project_4 = jira_client.getProjectVerById("project_id")
# 获取所有项目信息,仅仅打印在控制台,无返回值:
jira_client.getProjectsInfo()
# 搜索问题
#:param jql: 自定义jql语句
#:param maxResults: maxResults为一次最大查询数量参数,整型数字
# 1.maxResults可不传,不传此参数默认一次查询200,直到查询所有结束,返回结果;
# 2.传此参数,则使用该值作为一次最大查询数量,直到查询所有结束,返回结果;
# 3.注意 maxResults经本机测试最大不得超过 2147483647,否则直接报错
jira_list = jira_client.search_issues("jql语句", maxResults=200)
# 获取制定jql语句的第十个的前二十条
jira_list_2 = jira_client.search_issues_limit("jql语句", 10, 20)
# 创建issue
new_issue_model = jira_client.create_issue({
'project': {'id': '10585'}, # 项目: 门禁开发测试 10585 or 门禁系统 10604
'summary': f"需求概要001", # 概要
'description': '需求描述信息001',
# 'issuetype': {'id': '10501'}, # issuetype 表 -> 10307 -> 准出问题单 10501-> 开发任务是10501
'issuetype': {"name": "开发任务"}, # 问题类型
'priority': {'id': '3'}, # 优先级: 中
'assignee': {'name': 'pei.xiaodong'}, # 经办人必须存在,一般跟域帐号
# 'reporter': {'name': 'admin'},
'duedate': '2023-02-03', # 到期日 必填字段
'customfield_11002': [{"name": "1.10.1"}], # 1.10.1 基线版本必须存在字段
'customfield_10857': [{"name": "peihl"}], # 开发者
'fixVersions': [{"name": "1.1.1"}] # 1.1.1 修复版本必须存在字段
})
# 修改某个issue的字段信息
jira_client.update_issue('issue_id', {"assignee": {'name': 'pei.xiaodong'}})
# 根据项目修改经办人
jira_client.updateAssignee("project_id", "value")
# 限定字段查询
jira_client.getIssues('jql', ['field'])
# 查询某项目的所有的角色
jira_client.getProjectRoles('project_id')
# 查询某项目的某角色的信息
jira_client.getProjectRoleByRoleid('projectKey', 'roleid')
# 设置某个项目的角色为某个用户
jira_client.addProjectRoleActor('projectKey','roleid','username')
# 设置某个项目的角色多个用户
jira_client.addProjectRoleActors('projectKey','roleid','username|name2')
# 删除项目中的角色
jira_client.delProjectRoleActor('projectKey','roleid','username')
# 删除指定项目中的组
jira_client.delProjectRoleGroup('projectKey','roleid','username')
# 设置某个项目的角色为某个用户
jira_client.addProjectRoleGroup('projectKey','roleid','groupname')
# 根据用户名获取用户是否存在
jira_client.get_user_is_exist('username')
# 根据用户名获取用户
jira_client.get_group_is_exist('groupname')
# 查找一个项目中的所有的角色的名字
jira_client.search_project_all_rolenames('projectKey')
# 查找一个项目中的所有角色
jira_client.search_project_all_roles('projectKey')
# 查找一个项目中是否包含某个角色
jira_client.search_project_role_by_rolename('username', 'rolename')
# 查找一个项目中的角色是否包含某个用户
jira_client.search_project_role_group('projectKey','rolename','username')
# 为项目角色添加用户
jira_client.add_project_role_actor('projectKey','rolename','actorname')
# 为项目角色添加Group
jira_client.add_project_role_group('projectKey','rolename','groupname')
# 为项目角色删除用户
jira_client.del_project_role_actor('projectKey','rolename','actorname')
# 为项目角色删除组
jira_client.del_project_role_group('projectKey', 'rolename', 'groupname')
# 清除项目中的某个角色的所有人
jira_client.clean_project_role('projectKey', 'rolename')
confluence 帮助类
from confluence.halring_confluence import ConfluenceUtil
# 初始化
conf = ConfluenceUtil('url', 'username', 'password')
# 创建对应confluence页面
conf.create_confluence_page('space_name', 'parent_title', 'title', 'body')
# 清除对应confluence页面
conf.clean_confluence_page('space_name', 'title')
# 根据命名空间追加confluence数据列表
conf.append_confluence_list('space_name', 'title', 'data')
# 根据命名空间追加confluence数据字典
conf.append_confluence_dict('space_name', 'title', 'data')
# 根据命名空间追加confluence数据表格
conf.append_confluence_table('space_name', 'title', 'ordering', 'data')
# 根据命名空间追加confluence图片
conf.append_confluence_image('space_name', 'title', 'image_file')
# 根据JIRA筛选条件追加confluence
conf.append_confluence_jira_filter('space_name', 'title', 'columns')
# 根据命名空间和标题获取页面url地址
conf.delete_confluence_page_by_title('space_name', 'title')
# 根据命名空间和标题导出pdf
conf.export_confluence_page_as_pdf('space_name', 'title', 'export_name')
# 根据命名空间和标题导出word
conf.export_confluence_page_as_word('space_name', 'title', 'export_name')
# 根据url删除confluence
conf.delete_confluence_page_by_url('url')
# 根据url获取页面标题
conf.get_confluence_page_title_by_url('url')
# 获取confluence所有组
conf.get_confluence_all_groups()
# 根据组名获取所有组成员
conf.get_confluence_group_members('group_name')
# 获取所有组成员
conf.get_confluence_all_members()
# 获取某个成员详细信息
conf.get_confluence_user_details_by_username('username')
Jenkins
from halring.jenkins_lib.halring_jenkins import JenkinsUtil
# 初始化
jks = JenkinsUtil('server_ip', 'user', 'password')
# 登陆, 验证帐号密码
jks.jenkins_login()
# 验证job名字是否存在
result = jks.check_job_queue_exist('job_name')
# 验证job是否在构建中
build_flag = jks.check_job_in_building('job_name')
# 参数化触发构建job
build_number = jks.trigger_job_with_file_parameter('job_name', 'job_params_dict', 'file_params_dict')
# 参数化触发构建job,简化参数版
build_number_2 = jks.trigger_job('job_name', 'job_params_dict')
# 参数化触发构建job,简化参数版
build_number_3 = jks.build_job('job_name', {'job_params_dict': ''}, timeout=1800, interval=10, max_retry_times=3)
# 参数化触发构建job同名函数
build_number_4 = jks.build_job_with_file_parameter('job_name', 'job_params_dict', 'file_params_dict', timeout=1800, interval=10, max_retry_times=3)
# 当前任务正在编译返回True,否则返回False
build_result = jks.check_job_in_building_with_retry('job_name', retry_times=3)
# 当前任务有队列返回True,否则返回False
build_result_2 = jks.check_job_queue_exist_with_retry('job_name', retry_times=3)
# 获取job构建信息
build_info = jks.jenkins_get_build_info_with_retry('job_name', 'job_build', retry_times=3)
# 获取job自身信息
build_info_2 = jks.jenkins_get_job_info_with_retry('job_name', retry_times=3)
# 获取job自身信息同名方法
build_info_3 = jks.jenkins_get_job_info('job_name')
# 获取job构建信息同名方法
build_info_4 = jks.jenkins_get_build_info('job_name', 'job_build')
# 获取jenkins server所有在队列中等待的任务信息
build_info_5 = jks.jenkins_get_queue_info()
# 获取job构建输出
build_info_6 = jks.jenkins_get_build_console_output('job_name', 'job_build')
# 获取job构建请求URL接口
build_info_7 = jks.jenkins_get_build_console_output_url('job_url')
# 获取jenkins信息
build_info_8 = jks.jenkins_get_build_info_with_waiting('job_name', 'build_number', interval=3, timeout=180)
# 获取不同格式的所有job 1: 返回{"view_name": [job1, job2,...]} 2: 返回[job1, job2,...]
build_info_9 = jks.get_all_jobs_by_views(return_type="1")
# 获取不在view视图里的job
build_info_10 = jks.get_jobs_not_in_views()
# 获取所有视图信息
build_info_11 = jks.get_all_views()
linux 帮助库
from halring.linux.halring_linux import LinuxUtil
linux_auth = ("jenkins-server-1", "xx.xxx.xx.xx", "root", "password")
linux_cli = LinuxUtil(*linux_auth)
# 连接linux服务器
linux_cli.connect_to_host()
# 获取主机空闲的cpu
result = linux_cli.get_linux_cpu_free()
# 检查主机的可联通, cpu、 内存、磁盘空间可用性
result_2 = linux_cli.check_host_cpu_mem_disk_useable()
# 检查linux主机java版本
result_3 = linux_cli.check_host_java_vesion()
# 检查linux类主机python版本
result_4 = linux_cli.check_host_python_version()
# 检查linux主机sftp可联通
result_5 = linux_cli.check_host_sftp_connect()
artifactory
from halring.artifactory_lib.halring_artifactory_lib import ArtifactoryLibUtil
artifactory_auth = ("username", "password")
art = ArtifactoryLibUtil(*artifactory_auth)
# 上传本地文件到制品库
art.artifactory_upload_file("local_path", "artifactory_puth")
# 下载远端文件到本地
art.artifactory_download_file('local_path', 'artifactory_path')
# 下载远端文件到本地同名方法
art.artifactory_download('local_path', 'artifactory_path')
# 上传本地文件夹到远程文件夹
art.artifactory_upload_tree('local_dir', 'artifactory_dir')
# 上传文件到远端文件夹
art.artifactory_upload('local_path', 'artifactory_path')
# 列出artifactory文件路径
art.list_artifactory_path('artifactory_path')
# 判断给出的路径是否目录(未转码)
art.artifactory_path_isdir_unquoted('unquoted_artifactory_path')
# 判断给出的路径是否目录
art.artifactory_path_isdir('artifactory_path')
# 创建制品库路径
art.create_artifactory_dir('artifactory_path')
# 创建本地文件夹
art.create_local_dir("local_path")
# 搜索远端文件夹
art.artifactory_search_dir('artifactory_path','nr')
# 判断给出的路径是否目录(未转码)
art.artifactory_path_exist_unquoted('unquoted_artifactory_path')
# 判断路径是否存在
art.artifactory_path_exist('artifactory_path')
# 设置artifactory属性
art.artifactory_set_property('artifactory_path', 'key', 'value')
# 移除artifactory属性
art.artifactory_remove_property('artifactory_path', 'key')
# 展示路径上所有属性
result = art.artifactory_list_properties('artifactory_path')
# 文件剪切
art.artifactory_move('src_path', 'dst_path')
# 文件粘贴
art.artifactory_copy('src_path', 'dst_path')
# 搜索远端文件同名方法
search_result = art.artifactory_search_dir('artifactory_path','nr')
# 删除制品
art.artifactory_remove('artifactory_path')
# 使用aql查询指定目录
query_result = art.artifactory_query('artifactory_path', {})
# 未转码前制品库路径
art.artifactory_filepath_md5('artifactory_path')
# md5值
art.artifactory_path_md5('artifactory_path')
# 制品信息统计
art.artifactory_path_stat('artifactory_path')
# 拷贝docker
art.artifactory_promote_docker('src_path', 'dst_path', copy_flag=True)
# docker路径sha256
art.artifactory_promote_docker('src_path', 'dst_path', copy_flag=True)
# docker,json路径的sha256
art.sub_get_file_json_sha256('artifactory_path')
# 修改时间最晚的子路径
art.artifactory_latest_child_path('artifactory_path')
Apollo
from apollo.halring_apollo_opr import ApolloOpr
apollo_model = ApolloOpr("http://xx.xx.xx.xx:xxxx", "username", "password")
# 获取app消息
result = apollo_model.get_apps('appid')
# 获取App的环境,集群信息
response = apollo_model.get_appid_env_clusters('appid')
# 判断应用(appid)的环境(env)是否存在
result_2 = apollo_model.check_appid_env_exist('appid', 'env')
# 判断应用(appid)的环境(env)中的 集群 (cluster)是否存在
result_3 = apollo_model.check_appid_env_cluster_exist('appid', 'env', 'cluster')
# 获取项目下的指定环境集群的Namespace
result_4 = apollo_model.get_appid_env_cluster_namespace('appid', 'env', 'cluster', namespace=None)
# 检查项目下指定环境的存在性
result_5 = apollo_model.check_appid_env_cluster_namespace_exist('appid', 'env', 'cluster', namespace=None)
# 检查appid 命名空间是否存在
result_6 = apollo_model.check_appid_namespace_item_exist('appid', 'env', 'cluster', 'namespace', 'itemkey')
# 获取item信息
result_7 = apollo_model.get_appid_namespace_item('appid', 'env', 'cluster', 'namespace', 'itemkey')
# 添加appid, env, cluster, namespace, itemkey, itemvalue, comment=None
result_8 = apollo_model.add_appid_namespace_item('appid', 'env', 'cluster', 'namespace', 'itemkey', 'itemvalue', comment=None)
# 修改appid, env, cluster, namespace, itemkey, itemvalue, comment=None
result_9 = apollo_model.modify_appid_namespace_item('appid', 'env', 'cluster', 'namespace', 'itemkey', 'itemvalue', comment=None)
# 删除配置集中的配置项
result_10 = apollo_model.delete_appid_namespace_item('appid', 'env', 'cluster', 'namespace', 'itemkey')
# 发布指定项目的配置
result_11 = apollo_model.release_apppid_namespace('appid', 'env', 'cluster', 'namespace', 'release_title', release_comment=None)
# 获取某个已发布的配置集的口
result_12 = apollo_model.get_latest_released_namespace('appid', 'env', 'cluster', 'namepsacename')
# 导入一个字典的内容作为配置集
result_13 = apollo_model.import_appid_namespace('appid', 'env', 'cluster', 'namespacename', 'inputdict', reset=False)
# 导出目标配置集的内容到dict中返回
result_14 = apollo_model.export_appid_namespace('appid', 'env', 'cluster', 'namespacename')
# 导出已发布的目标配置集的内容到dict中返回
result_15 = apollo_model.export_latest_released_appid_namespace('appid', 'env', 'cluster', 'namespacename')
date_time
from date_time.halring_datetime import DatetimeUtil
dt = DatetimeUtil()
# 根据指定datetime格式返回当前日期时间
dt.get_format_curr_datetime('format_string')
# 返回计算指定日期+当前时间,格式为20070312-08:50:04.000
dt.calc_datetime('select_date')
# 功能:格式转换根据时间格式转换指定格式时间字符串为时间戳(1970年开始的秒数)
dt.conv_datetime_to_timestamp('orgi_datetime', orgi_format_string='FORMAT_DATETIME_MYSQL')
# 功能:格式转换根据时间格式转换指定格式时间字符串为时间戳(1970年开始的秒数)
dt.conv_timestamp_to_datetime('orgi_timestamp', new_format_string='FORMAT_DATETIME_MYSQL')
# 功能:指定时间向前或向后方面进行漂移
dt.drift_datetime_mysql('format_string', 'datetime', 'drift_direction', 'drift_second')
excel帮助类
from excel.halring_excel import ExcelUtil
# 初始化传入xlsx路径和sheet的名词
excel_help = ExcelUtil('xlsxPath', sheetname=None)
# 读取excel转换为 [{'列名1': 'value'},{'列名2': 'value2'}] 格式表格
exc_result = excel_help.read_excel_to_dict()
# 写入excel某一行某一列
excel_help.write_cell('row', 'col', 'value')
# 读取标题行
data_1 = excel_help.readHead()
# 读取某一行
data_2 = excel_help.readRow(10)
# 读取dict字典转换存为excel表格, 格式 {"ID":[1,2,3], "Name":["Tim", "ZhangSan", "LiSi"]}
excel_help.dict_to_excel('data', 'excel_path')
caseconfig帮助类
from halring.caseconfig.halring_parser import IniConfigParser
# 初始化
config = IniConfigParser()
# 读取ini文件
config.read("a.ini")
# 获取所有sections
sections = config.sections()
# 获取其中某一个section
options = config.options('section_name')
# 获取某一个section某一个选项的值
value = config.get('section_name', 'option_name')
docx帮助类
from docx_lib.halring_docx import DocxUtil
bool_result = DocxUtil("xxxxx.docx").creatDocx(jsonStr={}, link_dicts={})
Json帮助库
from json_lib.halring_json import JsonUtil
# Json帮助类初始化
json_util = JsonUtil()
# json字符串转为python字典dict
result = json_util.jsonStrToDict('jsonStr')
# python字典dict转为json对象
result_2 = json_util.dictToJsonObject({})
# python字典dict转为文件存储
result_3 = json_util.dictToFile({}, 'filePath')
# json字符串转文件
result_4 = json_util.jsonStrToFile('jsonStr', 'filePath')
# json文件转为json对象
result_5 = json_util.fileToJson('filePath')
# 从json对象或者json字符串中按字段提取值
resukt_6 = json_util.loadJsonField('jsons', 'field')
redis
from redis_lib.halring_redis import RedisUtil
# redis客户端初始化
rds_cli = RedisUtil()
# redis连接
rds_cli.redis_connect('host', 'port', 'pwd', 'db')
# 选择频道推送消息
rds_cli.redis_publish('channel', 'msg')
# 选择频道订阅
rds_cli.redis_subscribe('channel')
# 消息队列 订阅
rds_cli.redis_sub_scribe('key')
# 消息队列 发布
rds_cli.redis_send_list_msg('key', 'msg')
# lpush json 数据
rds_cli.redis_send_json_msg('key', 'msg')
# SET
rds_cli.redis_send_msg('key', 'msg')
# ltrim key
rds_cli.redis_remove_list('key')
# zset pub json
rds_cli.redis_zset_pub('key', 'msg')
# zset json
rds_cli.redis_zset_sub('key')
# zset
rds_cli.redis_zset_commit('key')
# redis_poll key
rds_cli.redis_poll('timeout', 'key')
ftp
from ftp.halring_ftp import FtpUtil
# ftp客户端
ftp_cli = FtpUtil('hostIp', 'userName', 'userPwd')
# ftp连接
ftp_cli.ftputil_connect()
# 判断文件是否存在,不存在或是文件夹返回错误,文件存在返回正确
ftp_cli.ftputil_file_exist('文件的绝对路径', locality='FTPUTIL_LOCALITY')
# 判断文件夹是否存在
ftp_cli.ftputil_direct_exist('path', locality='FTPUTIL_LOCALITY')
# 从本地上传文件到远程服务器
ftp_cli.ftputil_upload('source', 'destination', transway='FTPUTIL_WAY')
# 从远程服务器下载文件值本地
ftp_cli.ftputil_download('source', 'destination', transway='FTPUTIL_WAY')
# 显示路径下文件、子目录以及子目录下的所有文件
ftp_cli.ftputil_list_loop('path', 'files', locality='FTPUTIL_LOCALITY')
# 显示路径下的所有文件,子目录,以及所有子目录下的文件,不包含本身路径
ftp_cli.ftputil_list('path', locality='FTPUTIL_LOCALITY')
# 检查远程服务器的文件是否存在或文件夹下的所有文件
ftp_cli.ftputil_check('path')
# 删除远程服务器路径下的文件或文件夹下的所有文件、子目录及以下文件
ftp_cli.ftputil_delete('path')
# 创建目录
ftp_cli.ftputil_create_dir('path', locality='FTPUTIL_LOCALITY')
# ftp退出
ftp_cli.ftputil_close()
Git
from git.halring_git import GitUtil
# git 客户端
git_cli = GitUtil('token')
# git 克隆远程仓库到本地指定目录
git_cli.git_clone_branch('repository_url', 'local_repository_path', 'branch')
# git 切换分支
git_cli.git_change_branch('repository_url', 'local_repository_path')
# git diff 比较
git_cli.git_diff('local_repository_path', 'COMMIT_ID_1', 'COMMIT_ID_2')
# 本地获取commit id
git_cli.git_get_commit_id('local_repository_path')
# 使用api获取commit id
git_cli.git_get_commit_id_byapi('source', 'destination', port="8001")
# 转换ssh到http
git_cli.convert_ssh2http('repository_url', port=8081)
# 转化http到ssh
git_cli.convert_http2ssh('repository_url', port=8081)
Mysql
from mysql_lib.halring_mysql import MySqlUtil
# Mysql 客户端
mysql_cli = MySqlUtil('host','user','pwd', db='', port=3306)
# Mysql 客户端连接
mysql_cli.db_connect()
# Mysql 执行查询sql语句
mysql_cli.execute_query('query_sql')
# Mysql 执行增删改sql语句
mysql_cli.execute_sql('sql_statement')
# 导出指定数据库的结构及数据
mysql_cli.db_dump_all('db_name', 'outfile')
# 导出指定数据库的结构
mysql_cli.db_dump_struct('db_name', 'outfile')
# 导入指定数据库
mysql_cli.db_import('db_name', 'input_file')
# Mysql 断开连接
mysql_cli.db_disconnect()
SqlServer
from sqlserver_lib.halring_sqlserver import SqlServerUtil
# SqlServer 客户端
sqlserver_cli = SqlServerUtil('host','port','user', 'password', 'db')
# SqlServer 客户端连接
sqlserver_cli.db_connect()
# SqlServer 执行查询sql语句
sqlserver_cli.query_sql('query_sql')
# SqlServer 执行增删改sql语句
sqlserver_cli.execute_sql('sql_statement')
# SqlServer 断开连接
sqlserver_cli.db_disconnect()
Regex 正则
from reg.halring_reg import RegUtil
# reg 正则初始化
reg = RegUtil()
# reg 匹配版本号
reg.reg_image_version('version')
Ssh2帮助类
from ssh2_con.halring_ssh2 import Ssh2Util
# ssh2 初始化
ssh2 = Ssh2Util('host', 'username', 'password', port=22)
# ssh2 连接
ssh2.connect()
# 交互式shell的读方法,可维持会话状态,读取管道中的响应数据,直到超时时间内没有收到任何数据,则将之前读到的数据返回
ssh2.read()
# 交互式shell的写方法,可维持会话状态,将执行指令写入管道,发送给远程主机
ssh2.send('cmd', timeout=10)
# 远程执行指令,非交互式,不维持会话状态
ssh2.exec_command('cmd', timeout=5)
# 根据返回的结果,查找期望包含的数据
ssh2.find_expect('cmd')
# 交互式shell的读方法,可维持会话状态,读取管道中的响应数据,
# 按自定义buffersize读取数据并回显,可设置判断指令执行完毕的条件, 是否找到制定关键字
# 直到超时时间内没有收到任何数据且未找到指令执行完毕的条件,则超时并退出
ssh2.read_line(cmd_complete='', timeout=5)
# 判断远端连接是否成功
ssh2.connect_judge()
# ssh2 断开连接
ssh2.disconnect()
String帮助类
from common.halring_string import StringUtil
# StringUtil 帮助类初始化
string_lib = StringUtil()
# string 字符串替换
res = string_lib.string_replace('orig_string', 'selected_string', 'replace_string')
# string 比较
res_1 = string_lib.string_diff('left_string', 'right_string')
# step string content diff , ignore something
res_2 = string_lib.string_diff_step('left_string', 'right_string')
# 计算字符串中中文字符的长度
res_3 = string_lib.step_autolen('insert_step')
# step_null 替换null
res_4 = string_lib.step_null('insert_step')
# string_replace_space 替换[SPACE]
res_5 = string_lib.string_replace_space("insert_string")
# 替换空字符串[EMPTY]
res_6 = string_lib.string_replace_empty('insert_string')
# 替换 b'\x00'.decode()
res_7 = string_lib.string_ignore_hex_zero('insert_string')
# 生成字符串 新字符串 content X content_cnt
res_8 = string_lib.string_generate('content', 'content_cnt')
# 从列表生成字符串
res_9 = string_lib.string_from_list_with_delimiter([], delimiter="|")
# 转换列表为字符串
res_10 = string_lib.string_conv_list_to_str([], 'orgi_list', 'item_prefix', 'item_postfix')
# 替换单引号
res_11 = string_lib.string_ignore_single_quota('input_string')
svn帮助类
from svn.halring_svn import SvnUtil
# svn
svn = SvnUtil('username', 'password')
# svn获得详细信息
info = svn.svn_info('remote_path')
# svn获取REVISION
res_1 = svn.svn_info_get_revision('remote_path')
# svn获取COMMIT_ID
res_2 = svn.svn_info_get_commit_id('remote_path')
# svn路径判断文件or文件夹
res_3 = svn.svn_info_is_file_or_directory('remote_path')
# svn获取制定文件夹下的所有文件
res_4 = svn.svn_get_filelist_under_directory('remote_path')
# svn导出到本地路径
res_5 = svn.svn_export("remote_path", "local_path")
# svn创建文件夹
res_6 = svn.svn_mkdir('remote_path')
# svn删除
res_7 = svn.svn_delete('remote_path')
# svn_add 上传文件
res_8 = svn.svn_add('remote_path', 'source_path')
# svn_cp 拷贝文件
res_9 = svn.svn_cp('remote_path', 'source_path')
# svn_diff 比较文本
res_10 = svn.svn_diff_text('remote_path', 'source_path')
# 获取路径下的文件总数
res_11 = svn.svn_get_filenums_under_directory('remote_path')
Xml 帮助类
from xml_lib.halring_xml import XmlUtil
# svn
xml_model = XmlUtil('xml_file_path')
# 返回目标key下的value
xml_model.analysis_key_get_value('root_tag', 'tag_key')
# 输入单层的key ,返回一个字典
xml_model.analysis_key_get_dict('root_tag')
windows os 执行程序 帮助类
from windows_exec.halring_exec import ExecUtil
# 初始化
windows_os_exec = ExecUtil('cmd')
# 阻塞执行windows命令
windows_os_exec.block_execute()
# 非阻塞执行windows命令
windows_os_exec.non_block_execute()
windows or linux 系统操作相关封装
from windows.halring_os import OsUtil
# 初始化
sys_os = OsUtil()
# 判断是否是文件
sys_os.isfile('path')
# 判断是否是路径
sys_os.isdir('path')
# 拷贝文件
sys_os.copyFile('fromPath', 'toPath')
# 全量拷贝文件夹
sys_os.copyDir('srcDir', 'dstDir')
# 默认不拷贝隐藏的文件,此方法和copyDir可以合并,为方便调用单独列出
sys_os.copyDir_ignore_hidden('srcDir', 'dstDir')
# 自定义过滤不需要拷贝的文件夹或文件
sys_os.copyDir_ignore_custom('srcDir', 'dstDir', 'ignore_pattern')
# shutil自带的copytree不是很好用,在已有目标文件夹的时候会报错,建议使用copyDir 系列方法
# 此处 ingore_pattern 暂只支持一个字符窜,如需多个,可直接使用
sys_os.copytree('src', 'tar', 'ingore_pattern')
# 获得文件的md5
sys_os.get_file_md5('file_path')
# 要创建的路径,创建单个目录
sys_os.mkdir('dirName')
# 要创建的路径,创建多级目录
sys_os.mkdirs('dirName')
# 递归删除目录下所有内容,默认不删除最外层目录,靠 isDeleteOuterMostDir判断是否删除最外层文件夹, 不传或False不删除最外层
sys_os.removeDirs('dirName', isDeleteOuterMostDir=None)
# 拼接多个路径
path = ('path', 'path2', 'path3') # 多个路径列表or元组
sys_os.unionPath(*path)
Plink with Putty
from plink.halring_plink import PlinkUtil
# plink 初始化
plink = PlinkUtil('ip', 'user', 'pwd', 'cmd')
# plink_mass_execute
plink.plink_mass_execute('tool_mode')
# plink_execute
plink.plink_execute(input_ip='', input_user='', input_pwd='')
# plink_execute_vms
plink.plink_execute_vms(input_ip='', input_user='', input_pwd='')
RabbitMQ
from halring.rabbitmq_lib.halring_rabbitmq import MQClient, Consumer, Publisher
# rbmq 初始化连接
rbmq = MQClient('host', 'user', 'password', 'virtual_host', port=5672)
# rbmq 连接服务端
conn = rbmq.connect()
# rbmq 连接服务端, 如果connection存在则True反之False
rbmq.is_connect()
# rbmq关闭连接
rbmq.close_connection()
# rbmq初始化消费者
consumer = Consumer(conn)
# 开始消费队列
consumer.start_consuming('queue')
# 关闭频道
consumer.close_channel()
# rbmq初始化生产者
publisher = Publisher(conn)
# rbmq发送消息到队列
bool_result = publisher.send_message('message', 'queue', exchange="", durable=True, routing_key="")
# 返回信道channel
publisher.get_channel()
# 关闭channel
publisher.close_channel()
RdsApi 帮助类
from halring.rds_lib.halring_rds_api import RdsApiUtil
# RdsApi 初始化连接
rds_cli = RdsApiUtil('api_url', 'user', 'api_secret', 'rds_id')
# RdsApi 获取PreToken
token = rds_cli.getPreToken('user', 'secretKey')
# RdsApi rsaEncrypt
b64str = rds_cli.rsaEncrypt('message', 'key')
# RdsApi str2key
str_key = rds_cli.str2key
# RdsApi 创建备份
response = rds_cli.sub_create_backup('db_name', 'backup_type', 'retention_days', 'is_alldbs')
# RdsApi 根据recoverid恢复
response_2 = rds_cli.sub_list_db_all_backup('db_name', 'is_alldbs')
# RdsApi 处理response字典
response_3 = rds_cli.sub_handle_rds_response('request_response')
# RdsApi 创建备份并处理response方法, 对外(公开)
result = rds_cli.create_backup('db_name', backup_type="0", retention_days="7", is_alldbs="0")
# RdsApi 列出所有备份方法, 对外(公开)
result_1 = rds_cli.list_db_all_backup('db_name', is_alldbs="0")
# RdsApi 根据recoverid恢复, 对外(公开)
result_2 = rds_cli.create_recovery('db_name', 'recovery_id', isalldbs="0")
pdf 相关 帮助类
from halring.pdf_lib.halring_pdf import PdfUtil
# pdfutil 初始化
pdf_util = PdfUtil()
# markdown to pdf
pdf_util.markdown_to_pdf(markdonw_file='README.md', output_pdf='test.pdf')
常见错误
# # 常见错误
# key or id 错误:
# jira.exceptions.JIRAError: JiraError HTTP 404
# 字段名错误:
# Error JiraError HTTP 400 url: http://xx.xx.xx.xx:8080/jira/rest/api/2/issue/xxxx text: Field 'customfield_xxxxx' cannot be set. It is not on the appropriate screen, or unknown
# 少传参数
# TypeError: xxxx() missing 1 required positional argument: 'param'
# 函数名或属性错误
# AttributeError: 'xxxxx' object has no attribute 'xxxxx'
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
halring-2.2.6.tar.gz
(125.7 kB
view details)
Built Distribution
halring-2.2.6-py3-none-any.whl
(144.0 kB
view details)
File details
Details for the file halring-2.2.6.tar.gz
.
File metadata
- Download URL: halring-2.2.6.tar.gz
- Upload date:
- Size: 125.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.9.6 readme-renderer/37.3 requests/2.30.0 requests-toolbelt/1.0.0 urllib3/1.26.0 tqdm/4.65.0 importlib-metadata/6.6.0 keyring/23.13.1 rfc3986/2.0.0 colorama/0.4.6 CPython/3.7.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0596664efbf2813e7b7c8c881477f3a573479be5267fd05272fcd98d00371e42 |
|
MD5 | e2e91d57a14bf5d7b7e4e5c1d066ee95 |
|
BLAKE2b-256 | 0c2339d094de455ec0a699315bc5f21eeaa1b19373fcf4dd6710b65e32ed87b9 |
File details
Details for the file halring-2.2.6-py3-none-any.whl
.
File metadata
- Download URL: halring-2.2.6-py3-none-any.whl
- Upload date:
- Size: 144.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.9.6 readme-renderer/37.3 requests/2.30.0 requests-toolbelt/1.0.0 urllib3/1.26.0 tqdm/4.65.0 importlib-metadata/6.6.0 keyring/23.13.1 rfc3986/2.0.0 colorama/0.4.6 CPython/3.7.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0414804c424e6ceb34c4351560294c9cfe99b253b874033dbb123518b4c6f543 |
|
MD5 | 9035c6f1ec2c88fbac59915a099319cc |
|
BLAKE2b-256 | af6ba25af00bf77f3b93a413315b00ce16b507ca15e6b739e298ad9eeeaa19d5 |