平时常用的http server|mysql|腾讯cos|微信公众号等库的集合
Project description
http服务开发经常用到的库集合(持续更新,第一版写的简单)
1.包含内容
dbmysql mysql数据库权限控制,可用于前端直接执行mysql
http_server 基于aiohttp的http服务器
tencent_cos 腾讯对象存储
token_mgr 用于管理token
wx_gzh 用于处理微信公众号开发获取授权信息
2.使用文档
安装
pip install l0n0lhttputils
使用方法,以微信登录验证为例
import logging
import asyncio
import requests
from l0n0lhttputils.config_file import yaml_file
from l0n0lhttputils.http_server import http_server, web
from l0n0lhttputils.dbmysql import dbmysql
from l0n0lhttputils.token_mgr import token_mgr, check_token
g_config = yaml_file("config.yaml")
g_config.load_config()
db = dbmysql(
g_config.get("mysql_host"),
g_config.get("mysql_port"),
g_config.get("mysql_user"),
g_config.get("mysql_password"),
g_config.get("mysql_db")
)
# 用户csrf token
g_token_mgr: token_mgr = token_mgr()
g_token_mgr.token_timestamp = g_config.get("token_expire_time")
g_server = http_server(g_config.get("listen_host"),
g_config.get("listen_port"))
session_keys = {}
@g_server.route("post", "/{prefix}/login")
async def login(request: web.Request):
"""用户登录
@code:微信提供的code
"""
# 获取用户的微信code
request_data = await request.json()
# 向微信验证用户的code是否正确,并获取用户的openid
check_result = requests.get("https://api.weixin.qq.com/sns/jscode2session", params={
'appid': config.get("appid"),
"secret": config.get("secret_key"),
"js_code": request_data['code'],
"grant_type": 'authorization_code'
})
# 判断验证时网络是否正常
if check_result.status_code != 200:
return web.json_response({"errMsg": "向微信检查登录状态错误"})
# 获取微信的验证结果,openid
json_value = check_result.json()
openid = json_value['openid']
# 生成本次会话的token,防止csrf攻击
token = g_token_mgr.gen_token(openid)
# 保存会话key
session_keys[openid] = json_value['session_key']
# 检测是否有该用户,没有则创建该用户
db.post("insert into `users` (`openid`) values (%s) on duplicate key update `openid` = values(`openid`)",
[openid])
# 验证成功,把会话token返回给用户
return web.json_response({
"errMsg": "OK",
"token": token,
})
@routes.post("/{prefix}/user_data")
@check_token(g_token_mgr) # check_token 会检测用户http请求中的
async def user_data(request: web.Request):
"""
获取用户基本数据
"""
data = db.get("select * from `users` where `openid` = %s",
[request.openid])
if not data:
return web.json_response({
"errMsg": "服务器错误"
})
# 验证成功,把会话token返回给用户
return web.json_response({
"errMsg": "OK",
"data": data,
})
def update_test(cost):
print(cost)
async def update_test_async(cost):
print(cost)
async def main():
#####################设置log#################################
logging.basicConfig(filename="testserver.log", level=logging.INFO)
####################向主循环添加自己的更新函数#################
g_server.update_elapse = 0.1 # 设置主循环更新时间(默认0.1)
g_server.add_update_func(update_test)
g_server.add_async_update_func(update_test_async)
#############################################################
await g_server.common_run()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
l0n0lhttputils-1.0.5.tar.gz
(19.6 kB
view details)
Built Distribution
File details
Details for the file l0n0lhttputils-1.0.5.tar.gz
.
File metadata
- Download URL: l0n0lhttputils-1.0.5.tar.gz
- Upload date:
- Size: 19.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.3.0 pkginfo/1.7.0 requests/2.25.1 setuptools/52.0.0.post20210125 requests-toolbelt/0.9.1 tqdm/4.56.0 CPython/3.8.3
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 261fdb0623b7677f1d269878e942e77f74a97352255b4ab3e1e0d6721b991e21 |
|
MD5 | 6a1d9b298b4f08aca0538bfe6755b46d |
|
BLAKE2b-256 | c60627c71076bf2031a2dc79ef69fdbf86d943185026b1e26cb786c967d193ff |
File details
Details for the file l0n0lhttputils-1.0.5-py3-none-any.whl
.
File metadata
- Download URL: l0n0lhttputils-1.0.5-py3-none-any.whl
- Upload date:
- Size: 33.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.3.0 pkginfo/1.7.0 requests/2.25.1 setuptools/52.0.0.post20210125 requests-toolbelt/0.9.1 tqdm/4.56.0 CPython/3.8.3
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 13b165a94fa4c05c3115156c82cf274ef963f8340c70610bcae813ce26960527 |
|
MD5 | ce407cd52d5d181506a13453da547bf4 |
|
BLAKE2b-256 | e35d46fe2f00547b78984e3c9dce83d0db7e24f09816c756dc25429548b0a2ab |