Skip to main content
Join the official 2020 Python Developers SurveyStart the survey!

基于异步IO的Redis缓存操作工具

Project description

red-helper

Redis缓存工具red-cache 的 asyncio 版本.

安装

$ pip install red-helper

示例

初始化

import red_helper

red_helper.new("redis://redis:6379",db=0)

或:

from red_helper import RedHelper
from aredis import StrictRedis

redis=StrictRedis(**{})
helper=RedHelper(redis)

一般操作

import red_helper
import asyncio

helper = red_helper.new("redis://redis", db=0)


async def simple_operations():
    # 设置
    await helper.set("hello", "world", ex=180)
    # 查询字段
    print(await helper.get("hello", default_value="WORLD!"))
    # 删除字段
    await helper.delete("hello")


if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(simple_operations())

Hash

import asyncio

import red_helper

helper = red_helper.new("redis://redis", db=0)
hs = helper.red_hash("red::hash")


async def simple_operations():
    # 设置
    await hs.set("hello", "world", ex=180)
    # 查询字段
    print(await hs.get("hello", default_value="WORLD!"))
    # 删除字段
    await hs.delete("hello")


if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(simple_operations())

###缓存

import asyncio

import red_helper

helper = red_helper.new("redis://redis", db=0)


# 缓存函数返回值
@helper.cache_it(lambda asset_id: "asset::cache:key:{}".format(asset_id), ttl=180)
async def read_data(asset_id: int) -> dict:
    await asyncio.sleep(0.1)
    return dict(zip(range(10), range(10)))


# 删除缓存
@helper.remove_it(lambda asset_id: "asset::cache:key:{}".format(asset_id), by_return=True)
async def update_date(asset_id: int) -> int:
    await asyncio.sleep(0.1)
    return asset_id


async def main():
    await read_data(10)
    await update_date(10)


if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())

基于HASH的缓存

import asyncio

import red_helper

helper = red_helper.new("redis://redis", db=0)
hs = helper.red_hash("red::hash")


# 缓存函数返回值
@hs.cache_it(lambda asset_id: "asset::cache:key:{}".format(asset_id), ttl=180)
async def read_data(asset_id: int) -> dict:
    await asyncio.sleep(0.1)
    return dict(zip(range(10), range(10)))


# 删除缓存
@hs.remove_it(lambda asset_id: "asset::cache:key:{}".format(asset_id), by_return=True)
async def update_date(asset_id: int) -> int:
    await asyncio.sleep(0.1)
    return asset_id


async def main():
    await read_data(10)
    await update_date(10)


if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())

author:Memory_Leak

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for red-helper, version 0.1.1
Filename, size File type Python version Upload date Hashes
Filename, size red_helper-0.1.1-py3-none-any.whl (7.3 kB) File type Wheel Python version py3 Upload date Hashes View

Supported by

Pingdom Pingdom Monitoring Google Google Object Storage and Download Analytics Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN DigiCert DigiCert EV certificate StatusPage StatusPage Status page