Skip to main content

new version async retrying package.

Project description

info:

Simple retrying for asyncio

https://img.shields.io/pypi/v/aio_retrying.svg

Installation

pip install aio_retrying

Usage

import asyncio

import aiohttp

from Utils.async_retrying import retry


class Aio:
    def __init__(self):
        self.session = aiohttp.ClientSession()

    async def __aenter__(self):
        return self

    @retry(attempts=3, delay=1, fallback="daoji")
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.session.close()
        print("exit")

    async def main(self):
        for i in range(3):
            assert self.session.closed is False
            await asyncio.sleep(1)
            print(f"task - {i}")


async def second_aio():
    aio = Aio()
    # task = asyncio.create_task(aio.main())
    # task.add_done_callback(lambda _: asyncio.create_task(aio.__aexit__(None, None, None)))
    ret = await aio.__aexit__(None, None, None)
    print(f"结果:{ret}")


async def first_aio():
    await second_aio()
    while True:
        # print("first")
        await asyncio.sleep(3)


def main():
    asyncio.run(first_aio())


if __name__ == "__main__":
    main()

Python 3.8+ is required

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aio_retrying-1.0.3.tar.gz (3.3 kB view hashes)

Uploaded Source

Built Distribution

aio_retrying-1.0.3-py3-none-any.whl (3.7 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page