Skip to main content

python常用底层库封装

Project description

halring

pip源配置方法confluence页面
http://eqops.tc.com/confluence/pages/viewpage.action?pageId=29300773

安装: pip install halring
升级(目前是2.0.x版本叠加):
pip install --upgrade halring
查看虚拟环境包的可升级版本:
pip list --outdated --trusted-host artifactory.test.com
查看当前库版本:
pip freeze | grep halring

Jira 帮助库


from halring.jira_lib.halring_jira import JiraUtil
jira_client = JiraUtil("username", "password", "http://xx.xxx.xx.xx:xxxx")
jira_client.login()  # 登陆成功返回True
# 获取某个issue信息
jira_model = jira_client.issue("key or id")
# 获取所有project
jira_all_project = jira_client.getAllProjects()
# 根据project_id获取某一个project
jira_project_1 = jira_client.getProjectById("project_id")
# 根据project_id获取某一个project
jira_project_2 = jira_client.getProjectByKey("project_key")
# 判断project是否存在
jira_project_3 = jira_client.projectIsExist("project_key")
# 获取指定版本项目的所有版本信息 
jira_project_4 = jira_client.getProjectVerById("project_id")
# 获取所有项目信息,仅仅打印在控制台,无返回值:
jira_client.getProjectsInfo()
# 搜索问题
#:param jql:  自定义jql语句
#:param maxResults: maxResults为一次最大查询数量参数,整型数字
# 1.maxResults可不传,不传此参数默认一次查询200,直到查询所有结束,返回结果;
# 2.传此参数,则使用该值作为一次最大查询数量,直到查询所有结束,返回结果;
# 3.注意 maxResults经本机测试最大不得超过 2147483647,否则直接报错
jira_list = jira_client.search_issues("jql语句", maxResults=200)
# 获取制定jql语句的第十个的前二十条
jira_list_2 = jira_client.search_issues_limit("jql语句", 10, 20) 
# 创建issue
new_issue_model = jira_client.create_issue({
        'project': {'id': '10585'},  # 项目: 门禁开发测试  10585 or 门禁系统 10604
        'summary': f"需求概要001",  # 概要
        'description': '需求描述信息001',
        # 'issuetype': {'id': '10501'},  # issuetype 表 -> 10307 -> 准出问题单 10501-> 开发任务是10501
        'issuetype': {"name": "开发任务"},  # 问题类型
        'priority': {'id': '3'},  # 优先级: 中
        'assignee': {'name': 'pei.xiaodong'},  # 经办人必须存在,一般跟域帐号
        # 'reporter': {'name': 'admin'},
        'duedate': '2023-02-03',   # 到期日 必填字段
        'customfield_11002': [{"name": "1.10.1"}],  # 1.10.1 基线版本必须存在字段
        'customfield_10857': [{"name": "peihl"}],  # 开发者
        'fixVersions':  [{"name": "1.1.1"}]   # 1.1.1 修复版本必须存在字段
})

# 修改某个issue的字段信息
jira_client.update_issue('issue_id', {"assignee": {'name': 'pei.xiaodong'}})
# 根据项目修改经办人
jira_client.updateAssignee("project_id", "value")
# 限定字段查询
jira_client.getIssues('jql', ['field'])
# 查询某项目的所有的角色
jira_client.getProjectRoles('project_id')
# 查询某项目的某角色的信息
jira_client.getProjectRoleByRoleid('projectKey', 'roleid')
# 设置某个项目的角色为某个用户
jira_client.addProjectRoleActor('projectKey','roleid','username')
# 设置某个项目的角色多个用户
jira_client.addProjectRoleActors('projectKey','roleid','username|name2')
# 删除项目中的角色
jira_client.delProjectRoleActor('projectKey','roleid','username')
# 删除指定项目中的组
jira_client.delProjectRoleGroup('projectKey','roleid','username')
# 设置某个项目的角色为某个用户
jira_client.addProjectRoleGroup('projectKey','roleid','groupname')
# 根据用户名获取用户是否存在
jira_client.get_user_is_exist('username')
# 根据用户名获取用户
jira_client.get_group_is_exist('groupname')
# 查找一个项目中的所有的角色的名字
jira_client.search_project_all_rolenames('projectKey')
# 查找一个项目中的所有角色
jira_client.search_project_all_roles('projectKey')
# 查找一个项目中是否包含某个角色
jira_client.search_project_role_by_rolename('username', 'rolename')
# 查找一个项目中的角色是否包含某个用户
jira_client.search_project_role_group('projectKey','rolename','username')
# 为项目角色添加用户
jira_client.add_project_role_actor('projectKey','rolename','actorname')
# 为项目角色添加Group
jira_client.add_project_role_group('projectKey','rolename','groupname')
# 为项目角色删除用户
jira_client.del_project_role_actor('projectKey','rolename','actorname')
# 为项目角色删除组
jira_client.del_project_role_group('projectKey', 'rolename', 'groupname')
# 清除项目中的某个角色的所有人
jira_client.clean_project_role('projectKey', 'rolename')

confluence 帮助类

from confluence.halring_confluence import ConfluenceUtil
# 初始化
conf = ConfluenceUtil('url', 'username', 'password')
# 创建对应confluence页面
conf.create_confluence_page('space_name', 'parent_title', 'title', 'body')
# 清除对应confluence页面
conf.clean_confluence_page('space_name', 'title')
# 根据命名空间追加confluence数据列表
conf.append_confluence_list('space_name', 'title', 'data')
# 根据命名空间追加confluence数据字典
conf.append_confluence_dict('space_name', 'title', 'data')
# 根据命名空间追加confluence数据表格
conf.append_confluence_table('space_name', 'title', 'ordering', 'data')
# 根据命名空间追加confluence图片
conf.append_confluence_image('space_name', 'title', 'image_file')
# 根据JIRA筛选条件追加confluence
conf.append_confluence_jira_filter('space_name', 'title', 'columns')
# 根据命名空间和标题获取页面url地址
conf.delete_confluence_page_by_title('space_name', 'title')
# 根据命名空间和标题导出pdf
conf.export_confluence_page_as_pdf('space_name', 'title', 'export_name')
# 根据命名空间和标题导出word
conf.export_confluence_page_as_word('space_name', 'title', 'export_name')
# 根据url删除confluence
conf.delete_confluence_page_by_url('url')
# 根据url获取页面标题
conf.get_confluence_page_title_by_url('url')
# 获取confluence所有组
conf.get_confluence_all_groups()
# 根据组名获取所有组成员
conf.get_confluence_group_members('group_name')
# 获取所有组成员
conf.get_confluence_all_members()
# 获取某个成员详细信息
conf.get_confluence_user_details_by_username('username')

Jenkins

from halring.jenkins_lib.halring_jenkins import JenkinsUtil
# 初始化
jks = JenkinsUtil('server_ip', 'user', 'password')
# 登陆, 验证帐号密码
jks.jenkins_login()
# 验证job名字师傅存在 
result = jks.check_job_queue_exist('job_name')
# 验证job是否在构建中
build_flag = jks.check_job_in_building('job_name')
# 参数化触发构建job
build_number = jks.trigger_job_with_file_parameter('job_name', 'job_params_dict', 'file_params_dict')
# 参数化触发构建job,简化参数版
build_number_2 = jks.trigger_job('job_name', 'job_params_dict')
# 参数化触发构建job,简化参数版
build_number_3 = jks.build_job('job_name', {'job_params_dict': ''}, timeout=1800, interval=10, max_retry_times=3)
# 参数化触发构建job同名函数
build_number_4 = jks.build_job_with_file_parameter('job_name', 'job_params_dict', 'file_params_dict', timeout=1800, interval=10, max_retry_times=3)
# 当前任务正在编译返回True,否则返回False
build_result = jks.check_job_in_building_with_retry('job_name', retry_times=3)
# 当前任务有队列返回True,否则返回False
build_result_2 = jks.check_job_queue_exist_with_retry('job_name', retry_times=3)
# 获取job构建信息
build_info = jks.jenkins_get_build_info_with_retry('job_name', 'job_build', retry_times=3)
# 获取job自身信息
build_info_2 = jks.jenkins_get_job_info_with_retry('job_name', retry_times=3)
# 获取job自身信息同名方法
build_info_3 = jks.jenkins_get_job_info('job_name')
# 获取job构建信息同名方法
build_info_4 = jks.jenkins_get_build_info('job_name', 'job_build')
# 获取jenkins server所有在队列中等待的任务信息
build_info_5 = jks.jenkins_get_queue_info()
# 获取job构建输出
build_info_6 = jks.jenkins_get_build_console_output('job_name', 'job_build')
# 获取job构建请求URL接口
build_info_7 = jks.jenkins_get_build_console_output_url('job_url')
# 获取jenkins信息
build_info_8 = jks.jenkins_get_build_info_with_waiting('job_name', 'build_number', interval=3, timeout=180)
# 获取不同格式的所有job 1: 返回{"view_name": [job1, job2,...]} 2: 返回[job1, job2,...]
build_info_9 = jks.get_all_jobs_by_views(return_type="1")
# 获取不在view视图里的job
build_info_10 = jks.get_jobs_not_in_views()
# 获取所有视图信息
build_info_11 = jks.get_all_views()

linux 帮助库

from halring.linux.halring_linux import LinuxUtil
linux_auth = ("jenkins-server-1", "xx.xxx.xx.xx", "root", "password")
linux_cli = LinuxUtil(*linux_auth)
# 连接linux服务器
linux_cli.connect_to_host()
# 获取主机空闲的cpu
result = linux_cli.get_linux_cpu_free()
# 检查主机的可联通, cpu、 内存、磁盘空间可用性
result_2 = linux_cli.check_host_cpu_mem_disk_useable()
# 检查linux主机java版本
result_3 = linux_cli.check_host_java_vesion()
# 检查linux类主机python版本
result_4 = linux_cli.check_host_python_version()
# 检查linux主机sftp可联通
result_5 = linux_cli.check_host_sftp_connect()

artifactory

from halring.artifactory_lib.halring_artifactory_lib import ArtifactoryLibUtil
artifactory_auth = ("username", "password")
art = ArtifactoryLibUtil(*artifactory_auth)
# 上传本地文件到制品库
art.artifactory_upload_file("local_path", "artifactory_puth")
# 下载远端文件到本地
art.artifactory_download_file('local_path', 'artifactory_path')
# 下载远端文件到本地同名方法
art.artifactory_download('local_path', 'artifactory_path')
# 上传本地文件夹到远程文件夹
art.artifactory_upload_tree('local_dir', 'artifactory_dir')
# 上传文件到远端文件夹
art.artifactory_upload('local_path', 'artifactory_path')
# 列出artifactory文件路径
art.list_artifactory_path('artifactory_path')
# 判断给出的路径是否目录(未转码)
art.artifactory_path_isdir_unquoted('unquoted_artifactory_path')
# 判断给出的路径是否目录
art.artifactory_path_isdir('artifactory_path')
# 创建制品库路径
art.create_artifactory_dir('artifactory_path')
# 创建本地文件夹
art.create_local_dir("local_path")
# 搜索远端文件夹 
art.artifactory_search_dir('artifactory_path','nr')
# 判断给出的路径是否目录(未转码)
art.artifactory_path_exist_unquoted('unquoted_artifactory_path')
# 判断路径是否存在
art.artifactory_path_exist('artifactory_path')
# 设置artifactory属性
art.artifactory_set_property('artifactory_path', 'key', 'value')
# 移除artifactory属性
art.artifactory_remove_property('artifactory_path', 'key')
# 展示路径上所有属性
art.artifactory_list_properties('artifactory_path')
# 文件剪切
art.artifactory_move('src_path', 'dst_path')
# 文件粘贴
art.artifactory_copy('src_path', 'dst_path')
# 搜索远端文件同名方法
art.artifactory_search_dir('artifactory_path','nr')
# 删除制品
art.artifactory_remove('artifactory_path')
# 使用aql查询指定目录
art.artifactory_query('artifactory_path', {})
# 未转码前制品库路径
art.artifactory_filepath_md5('artifactory_path')
# md5值
art.artifactory_path_md5('artifactory_path')
# 制品信息统计 
art.artifactory_path_stat('artifactory_path')
# 拷贝docker
art.artifactory_promote_docker('src_path', 'dst_path', copy_flag=True)
# docker路径sha256
art.artifactory_promote_docker('src_path', 'dst_path', copy_flag=True)
# docker,json路径的sha256
art.sub_get_file_json_sha256('artifactory_path')
# 修改时间最晚的子路径
art.artifactory_latest_child_path('artifactory_path')

Apollo

from apollo.halring_apollo_opr import ApolloOpr
apollo_model = ApolloOpr("http://xx.xx.xx.xx:xxxx", "username", "password")
# 获取app消息
result = apollo_model.get_apps('appid')
# 获取App的环境,集群信息
response = apollo_model.get_appid_env_clusters('appid')
# 判断应用(appid)的环境(env)是否存在
result_2 = apollo_model.check_appid_env_exist('appid', 'env')
# 判断应用(appid)的环境(env)中的 集群 (cluster)是否存在
result_3 = apollo_model.check_appid_env_cluster_exist('appid', 'env', 'cluster')
# 获取项目下的指定环境集群的Namespace
result_4 = apollo_model.get_appid_env_cluster_namespace('appid', 'env', 'cluster', namespace=None)
# 检查项目下指定环境的存在性
result_5 = apollo_model.check_appid_env_cluster_namespace_exist('appid', 'env', 'cluster', namespace=None)
# 检查appid 命名空间是否存在
result_6 = apollo_model.check_appid_namespace_item_exist('appid', 'env', 'cluster', 'namespace', 'itemkey')
# 获取item信息
result_7 = apollo_model.get_appid_namespace_item('appid', 'env', 'cluster', 'namespace', 'itemkey')
# 添加appid, env, cluster, namespace, itemkey, itemvalue, comment=None
result_8 = apollo_model.add_appid_namespace_item('appid', 'env', 'cluster', 'namespace', 'itemkey', 'itemvalue', comment=None)
# 修改appid, env, cluster, namespace, itemkey, itemvalue, comment=None
result_9 = apollo_model.modify_appid_namespace_item('appid', 'env', 'cluster', 'namespace', 'itemkey', 'itemvalue', comment=None)
# 删除配置集中的配置项
result_10 = apollo_model.delete_appid_namespace_item('appid', 'env', 'cluster', 'namespace', 'itemkey')
# 发布指定项目的配置
result_11 = apollo_model.release_apppid_namespace('appid', 'env', 'cluster', 'namespace', 'release_title', release_comment=None)
# 获取某个已发布的配置集的口
result_12 = apollo_model.get_latest_released_namespace('appid', 'env', 'cluster', 'namepsacename')
# 导入一个字典的内容作为配置集
result_13 = apollo_model.import_appid_namespace('appid', 'env', 'cluster', 'namespacename', 'inputdict', reset=False)
# 导出目标配置集的内容到dict中返回
result_14 = apollo_model.export_appid_namespace('appid', 'env', 'cluster', 'namespacename')
# 导出已发布的目标配置集的内容到dict中返回
result_15 = apollo_model.export_latest_released_appid_namespace('appid', 'env', 'cluster', 'namespacename')

date_time

from date_time.halring_datetime import DatetimeUtil
dt = DatetimeUtil()
# 根据指定datetime格式返回当前日期时间
dt.get_format_curr_datetime('format_string')
# 返回计算指定日期+当前时间,格式为20070312-08:50:04.000
dt.calc_datetime('select_date')
# 功能:格式转换根据时间格式转换指定格式时间字符串为时间戳(1970年开始的秒数)
dt.conv_datetime_to_timestamp('orgi_datetime', orgi_format_string='FORMAT_DATETIME_MYSQL')
# 功能:格式转换根据时间格式转换指定格式时间字符串为时间戳(1970年开始的秒数)
dt.conv_timestamp_to_datetime('orgi_timestamp', new_format_string='FORMAT_DATETIME_MYSQL')
# 功能:指定时间向前或向后方面进行漂移
dt.drift_datetime_mysql('format_string', 'datetime', 'drift_direction', 'drift_second')

excel帮助类

from excel.halring_excel import ExcelUtil

# 初始化传入xlsx路径和sheet的名词
excel_help = ExcelUtil('xlsxPath', sheetname=None)
# 读取excel转换为 [{'列名1': 'value'},{'列名2': 'value2'}] 格式表格 
exc_result = excel_help.read_excel_to_dict()
# 写入excel某一行某一列
excel_help.write_cell('row', 'col', 'value')
# 读取标题行
data_1 = excel_help.readHead()
# 读取某一行
data_2 = excel_help.readRow(10)
# 读取dict字典转换存为excel表格, 格式  {"ID":[1,2,3], "Name":["Tim", "ZhangSan", "LiSi"]}
excel_help.dict_to_excel('data', 'excel_path')

caseconfig帮助类

from halring.caseconfig.halring_parser import CaseConfigParser

# 初始化
config = CaseConfigParser()
# 读取ini文件
config.read("a.ini")
# 获取所有sections
sections = config.sections()
# 获取其中某一个section
options = config.options('section_name')
# 获取某一个section某一个选项的值
value = config.get('section_name', 'option_name')

docx帮助类

from docx_lib.halring_docx import DocxUtil
bool_result = DocxUtil("xxxxx.docx").creatDocx(jsonStr={}, link_dicts={})

Json帮助库

from json_lib.halring_json import JsonUtil
# Json帮助类初始化
json_util = JsonUtil()
# json字符串转为python字典dict
result = json_util.jsonStrToDict('jsonStr')
# python字典dict转为json对象
result_2 = json_util.dictToJsonObject({})
# python字典dict转为文件存储
result_3 = json_util.dictToFile({}, 'filePath')
# json字符串转文件
result_4 = json_util.jsonStrToFile('jsonStr', 'filePath')
# json文件转为json对象
result_5 = json_util.fileToJson('filePath')
# 从json对象或者json字符串中按字段提取值
resukt_6 = json_util.loadJsonField('jsons', 'field')

redis

from  redis_lib.halring_redis  import RedisUtil
# redis客户端初始化
rds_cli = RedisUtil()
# redis连接
rds_cli.redis_connect('host', 'port', 'pwd', 'db')
# 选择频道推送消息
rds_cli.redis_publish('channel', 'msg')
# 选择频道订阅
rds_cli.redis_subscribe('channel')
# 消息队列 订阅
rds_cli.redis_sub_scribe('key')
# 消息队列 发布
rds_cli.redis_send_list_msg('key', 'msg')
# lpush json 数据
rds_cli.redis_send_json_msg('key', 'msg')
# SET 
rds_cli.redis_send_msg('key', 'msg')
# ltrim key
rds_cli.redis_remove_list('key')
# zset pub json
rds_cli.redis_zset_pub('key', 'msg')
# zset json
rds_cli.redis_zset_sub('key')
# zset
rds_cli.redis_zset_commit('key')
# redis_poll key
rds_cli.redis_poll('timeout', 'key')

ftp

from ftp.halring_ftp import FtpUtil
# ftp客户端
ftp_cli = FtpUtil('hostIp', 'userName', 'userPwd')
# ftp连接
ftp_cli.ftputil_connect()
# 判断文件是否存在,不存在或是文件夹返回错误,文件存在返回正确
ftp_cli.ftputil_file_exist('文件的绝对路径', locality='FTPUTIL_LOCALITY')
# 判断文件夹是否存在
ftp_cli.ftputil_direct_exist('path', locality='FTPUTIL_LOCALITY')
# 从本地上传文件到远程服务器
ftp_cli.ftputil_upload('source', 'destination', transway='FTPUTIL_WAY')
# 从远程服务器下载文件值本地
ftp_cli.ftputil_download('source', 'destination', transway='FTPUTIL_WAY')
# 显示路径下文件、子目录以及子目录下的所有文件
ftp_cli.ftputil_list_loop('path', 'files', locality='FTPUTIL_LOCALITY')
# 显示路径下的所有文件,子目录,以及所有子目录下的文件,不包含本身路径
ftp_cli.ftputil_list('path', locality='FTPUTIL_LOCALITY')
# 检查远程服务器的文件是否存在或文件夹下的所有文件
ftp_cli.ftputil_check('path')
# 删除远程服务器路径下的文件或文件夹下的所有文件、子目录及以下文件
ftp_cli.ftputil_delete('path')
# 创建目录
ftp_cli.ftputil_create_dir('path', locality='FTPUTIL_LOCALITY')
# ftp退出
ftp_cli.ftputil_close()

Git

from git.halring_git import GitUtil
# git 客户端
git_cli = GitUtil('token')
# git 克隆远程仓库到本地指定目录
git_cli.git_clone_branch('repository_url', 'local_repository_path', 'branch')
# git 切换分支
git_cli.git_change_branch('repository_url', 'local_repository_path')
# git diff 比较
git_cli.git_diff('local_repository_path', 'COMMIT_ID_1', 'COMMIT_ID_2')
# 本地获取commit id
git_cli.git_get_commit_id('local_repository_path')
# 使用api获取commit id
git_cli.git_get_commit_id_byapi('source', 'destination', port="8001")
# 转换ssh到http
git_cli.convert_ssh2http('repository_url', port=8081)
# 转化http到ssh
git_cli.convert_http2ssh('repository_url', port=8081)

Mysql

from mysql_lib.halring_mysql import MySqlUtil
# Mysql 客户端
mysql_cli = MySqlUtil('host','user','pwd', db='', port=3306)
# Mysql 客户端连接
mysql_cli.db_connect()
# Mysql 执行查询sql语句
mysql_cli.execute_query('query_sql')
# Mysql 执行增删改sql语句
mysql_cli.execute_sql('sql_statement')
# 导出指定数据库的结构及数据
mysql_cli.db_dump_all('db_name', 'outfile')
# 导出指定数据库的结构
mysql_cli.db_dump_struct('db_name', 'outfile')
# 导入指定数据库
mysql_cli.db_import('db_name', 'input_file')
# Mysql 断开连接 
mysql_cli.db_disconnect()

SqlServer

from sqlserver_lib.halring_sqlserver import SqlServerUtil
# SqlServer 客户端
sqlserver_cli = SqlServerUtil('host','port','user', 'password', 'db')
# SqlServer 客户端连接
sqlserver_cli.db_connect()
# SqlServer 执行查询sql语句
sqlserver_cli.query_sql('query_sql')
# SqlServer 执行增删改sql语句
sqlserver_cli.execute_sql('sql_statement')
# SqlServer 断开连接 
sqlserver_cli.db_disconnect()

Regex 正则

from reg.halring_reg import RegUtil
# reg 正则初始化
reg = RegUtil()
# reg 匹配版本号
reg.reg_image_version('version')

Ssh2帮助类

from ssh2_con.halring_ssh2 import Ssh2Util
# ssh2 初始化
ssh2 = Ssh2Util('host', 'username', 'password', port=22)
# ssh2 连接
ssh2.connect()
# 交互式shell的读方法,可维持会话状态,读取管道中的响应数据,直到超时时间内没有收到任何数据,则将之前读到的数据返回
ssh2.read()
# 交互式shell的写方法,可维持会话状态,将执行指令写入管道,发送给远程主机
ssh2.send('cmd', timeout=10)
# 远程执行指令,非交互式,不维持会话状态
ssh2.exec_command('cmd', timeout=5)
# 根据返回的结果,查找期望包含的数据
ssh2.find_expect('cmd')
# 交互式shell的读方法,可维持会话状态,读取管道中的响应数据,
# 按自定义buffersize读取数据并回显,可设置判断指令执行完毕的条件, 是否找到制定关键字
# 直到超时时间内没有收到任何数据且未找到指令执行完毕的条件,则超时并退出
ssh2.read_line(cmd_complete='', timeout=5)
# 判断远端连接是否成功
ssh2.connect_judge()
# ssh2 断开连接
ssh2.disconnect()

String帮助类

from common.halring_string import StringUtil
# StringUtil 帮助类初始化
string_lib = StringUtil()
# string 字符串替换
res = string_lib.string_replace('orig_string', 'selected_string', 'replace_string')
# string 比较
res_1 = string_lib.string_diff('left_string', 'right_string')
# step string content diff , ignore something
res_2 = string_lib.string_diff_step('left_string', 'right_string')
# 计算字符串中中文字符的长度
res_3 = string_lib.step_autolen('insert_step')
# step_null 替换null
res_4 = string_lib.step_null('insert_step')
# string_replace_space 替换[SPACE]
res_5 = string_lib.string_replace_space("insert_string")
# 替换空字符串[EMPTY]
res_6 = string_lib.string_replace_empty('insert_string')
# 替换 b'\x00'.decode()
res_7 = string_lib.string_ignore_hex_zero('insert_string')
# 生成字符串 新字符串 content X content_cnt
res_8 = string_lib.string_generate('content', 'content_cnt')
# 从列表生成字符串
res_9 = string_lib.string_from_list_with_delimiter([], delimiter="|")
# 转换列表为字符串
res_10 = string_lib.string_conv_list_to_str([], 'orgi_list', 'item_prefix', 'item_postfix')
# 替换单引号
res_11 = string_lib.string_ignore_single_quota('input_string')

svn帮助类

from svn.halring_svn import SvnUtil
# svn 
svn = SvnUtil('username', 'password')
# svn获得详细信息
info = svn.svn_info('remote_path')
# svn获取REVISION
res_1 = svn.svn_info_get_revision('remote_path')
# svn获取COMMIT_ID
res_2 = svn.svn_info_get_commit_id('remote_path')
# svn路径判断文件or文件夹
res_3 = svn.svn_info_is_file_or_directory('remote_path')
# svn获取制定文件夹下的所有文件
res_4 = svn.svn_get_filelist_under_directory('remote_path')
# svn导出到本地路径
res_5 = svn.svn_export("remote_path", "local_path")
# svn创建文件夹
res_6 = svn.svn_mkdir('remote_path')
# svn删除
res_7 = svn.svn_delete('remote_path')
# svn_add 上传文件
res_8 = svn.svn_add('remote_path', 'source_path')
# svn_cp 拷贝文件
res_9 = svn.svn_cp('remote_path', 'source_path')
# svn_diff 比较文本
res_10 = svn.svn_diff_text('remote_path', 'source_path')
# 获取路径下的文件总数
res_11 = svn.svn_get_filenums_under_directory('remote_path')

Xml 帮助类

from xml_lib.halring_xml import XmlUtil
# svn 
xml_model = XmlUtil('xml_file_path')
# 返回目标key下的value
xml_model.analysis_key_get_value('root_tag', 'tag_key')
# 输入单层的key ,返回一个字典
xml_model.analysis_key_get_dict('root_tag')

windows os 执行程序 帮助类

from windows_exec.halring_exec import ExecUtil
# 初始化
windows_os_exec = ExecUtil('cmd')
# 阻塞执行windows命令
windows_os_exec.block_execute()
# 非阻塞执行windows命令
windows_os_exec.non_block_execute()

windows or linux 系统操作相关封装

from windows.halring_os import OsUtil
# 初始化
sys_os = OsUtil()
# 判断是否是文件
sys_os.isfile('path')
# 判断是否是路径
sys_os.isdir('path')
# 拷贝文件
sys_os.copyFile('fromPath', 'toPath')
# 全量拷贝文件夹
sys_os.copyDir('srcDir', 'dstDir')
# 默认不拷贝隐藏的文件,此方法和copyDir可以合并,为方便调用单独列出
sys_os.copyDir_ignore_hidden('srcDir', 'dstDir')
# 自定义过滤不需要拷贝的文件夹或文件
sys_os.copyDir_ignore_custom('srcDir', 'dstDir', 'ignore_pattern')
# shutil自带的copytree不是很好用,在已有目标文件夹的时候会报错,建议使用copyDir 系列方法
# 此处 ingore_pattern  暂只支持一个字符窜,如需多个,可直接使用
sys_os.copytree('src', 'tar', 'ingore_pattern')
# 获得文件的md5
sys_os.get_file_md5('file_path')
# 要创建的路径,创建单个目录
sys_os.mkdir('dirName')
# 要创建的路径,创建多级目录
sys_os.mkdirs('dirName')
# 递归删除目录下所有内容,默认不删除最外层目录,靠 isDeleteOuterMostDir判断是否删除最外层文件夹, 不传或False不删除最外层
sys_os.removeDirs('dirName', isDeleteOuterMostDir=None)
# 拼接多个路径
path = ('path', 'path2', 'path3') # 多个路径列表or元组
sys_os.unionPath(*path)

Plink with Putty

from plink.halring_plink import PlinkUtil
# plink 初始化
plink = PlinkUtil('ip', 'user', 'pwd', 'cmd')
# plink_mass_execute
plink.plink_mass_execute('tool_mode')
# plink_execute
plink.plink_execute(input_ip='', input_user='', input_pwd='')
# plink_execute_vms
plink.plink_execute_vms(input_ip='', input_user='', input_pwd='')

RabbitMQ

from halring.rabbitmq_lib.halring_rabbitmq import MQClient, Consumer, Publisher
# rbmq 初始化连接
rbmq = MQClient('host', 'user', 'password', 'virtual_host', port=5672)
# rbmq 连接服务端
conn = rbmq.connect()
# rbmq 连接服务端, 如果connection存在则True反之False
rbmq.is_connect()
# rbmq关闭连接
rbmq.close_connection()
# rbmq初始化消费者 
consumer = Consumer(conn)
# 开始消费队列
consumer.start_consuming('queue')
# 关闭频道
consumer.close_channel()
# rbmq初始化生产者 
publisher = Publisher(conn)
# rbmq发送消息到队列
bool_result = publisher.send_message('message', 'queue', exchange="", durable=True, routing_key="")
# 返回信道channel
publisher.get_channel()
# 关闭channel
publisher.close_channel()

RdsApi 帮助类

from halring.rds_lib.halring_rds_api import RdsApiUtil

# RdsApi 初始化连接
rds_cli = RdsApiUtil('api_url', 'user', 'api_secret', 'rds_id')
# RdsApi 获取PreToken
token = rds_cli.getPreToken('user', 'secretKey')
# RdsApi rsaEncrypt
b64str = rds_cli.rsaEncrypt('message', 'key')
# RdsApi str2key
str_key = rds_cli.str2key
# RdsApi 创建备份 
response = rds_cli.sub_create_backup('db_name', 'backup_type', 'retention_days', 'is_alldbs')
# RdsApi 根据recoverid恢复
response_2 = rds_cli.sub_list_db_all_backup('db_name', 'is_alldbs')
# RdsApi 处理response字典
response_3 = rds_cli.sub_handle_rds_response('request_response')
# RdsApi 创建备份并处理response方法, 对外(公开)
result = rds_cli.create_backup('db_name', backup_type="0", retention_days="7", is_alldbs="0")
# RdsApi 列出所有备份方法, 对外(公开)
result_1 = rds_cli.list_db_all_backup('db_name', is_alldbs="0")
# RdsApi 根据recoverid恢复, 对外(公开)
result_2 = rds_cli.create_recovery('db_name', 'recovery_id', isalldbs="0")

常见错误

# # 常见错误
# key or id 错误:
# jira.exceptions.JIRAError: JiraError HTTP 404
# 字段名错误:
# Error JiraError HTTP 400 url: http://xx.xx.xx.xx:8080/jira/rest/api/2/issue/xxxx text: Field 'customfield_xxxxx' cannot be set. It is not on the appropriate screen, or unknown
# 少传参数
# TypeError: xxxx() missing 1 required positional argument: 'param'
# 函数名或属性错误
# AttributeError: 'xxxxx' object has no attribute 'xxxxx'

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

halrings-2.0.7-py3-none-any.whl (139.2 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page