基于异步IO的Redis缓存操作工具
Project description
red-helper
Redis缓存工具red-cache 的 asyncio 版本.
安装
$ pip install red-helper
示例
初始化
import red_helper
red_helper.new("redis://redis:6379",db=0)
或:
from red_helper import RedHelper
from aredis import StrictRedis
redis=StrictRedis(**{})
helper=RedHelper(redis)
一般操作
import red_helper
import asyncio
helper = red_helper.new("redis://redis", db=0)
async def simple_operations():
# 设置
await helper.set("hello", "world", ex=180)
# 查询字段
print(await helper.get("hello", default_value="WORLD!"))
# 删除字段
await helper.delete("hello")
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(simple_operations())
Hash
import asyncio
import red_helper
helper = red_helper.new("redis://redis", db=0)
hs = helper.red_hash("red::hash")
async def simple_operations():
# 设置
await hs.set("hello", "world", ex=180)
# 查询字段
print(await hs.get("hello", default_value="WORLD!"))
# 删除字段
await hs.delete("hello")
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(simple_operations())
###缓存
import asyncio
import red_helper
helper = red_helper.new("redis://redis", db=0)
# 缓存函数返回值
@helper.cache_it(lambda asset_id: "asset::cache:key:{}".format(asset_id), ttl=180)
async def read_data(asset_id: int) -> dict:
await asyncio.sleep(0.1)
return dict(zip(range(10), range(10)))
# 删除缓存
@helper.remove_it(lambda asset_id: "asset::cache:key:{}".format(asset_id), by_return=True)
async def update_date(asset_id: int) -> int:
await asyncio.sleep(0.1)
return asset_id
async def main():
await read_data(10)
await update_date(10)
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
基于HASH的缓存
import asyncio
import red_helper
helper = red_helper.new("redis://redis", db=0)
hs = helper.red_hash("red::hash")
# 缓存函数返回值
@hs.cache_it(lambda asset_id: "asset::cache:key:{}".format(asset_id), ttl=180)
async def read_data(asset_id: int) -> dict:
await asyncio.sleep(0.1)
return dict(zip(range(10), range(10)))
# 删除缓存
@hs.remove_it(lambda asset_id: "asset::cache:key:{}".format(asset_id), by_return=True)
async def update_date(asset_id: int) -> int:
await asyncio.sleep(0.1)
return asset_id
async def main():
await read_data(10)
await update_date(10)
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
author:Memory_Leak
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
No source distribution files available for this release.See tutorial on generating distribution archives.
Built Distribution
Close
Hashes for red_helper-0.1.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | ee1ca76e32778167b63b0a7033e55fbad927195444c65f35c50dc97f5beedb3a |
|
MD5 | 2abb05b45d28110680836622468800fe |
|
BLAKE2b-256 | 2deb9ab4b84f0b5eecd6dcc67b037d2631cdb262a3a54059a8e6e7f1c237d8d8 |