No project description provided
Project description
Async scheduler
OOP async scheduler
Usage
import asyncio
from datetime import timedelta
from async_scheduler_object import AsyncScheduler, PeriodicEvent
class AgePeriodicEvent(PeriodicEvent):
def __init__(self, start: int) -> None:
self._age = start
async def run(self) -> None:
print("Age", self._age)
self._age += 1
class CatsPeriodicEvent(PeriodicEvent):
def __init__(self, start: int) -> None:
self._cats_count = start
async def run(self) -> None:
print("Cats", self._cats_count)
self._cats_count *= 1
async def main() -> None:
scheduler_1 = AsyncScheduler(
events=[AgePeriodicEvent(start=1)],
interval=timedelta(seconds=1),
)
scheduler_2 = AsyncScheduler(
events=[AgePeriodicEvent(start=10), CatsPeriodicEvent(start=20)],
interval=timedelta(seconds=0.5),
)
await scheduler_1.start()
await scheduler_2.start()
await asyncio.sleep(10)
await scheduler_1.stop()
await scheduler_2.stop()
asyncio.run(main())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for async_scheduler_object-1.0.0.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8da7b92f8d1da48654708a71aa2870185e528ae7fe4ca71c739fe14d6d204cc5 |
|
MD5 | c269e14a8a4d6ba750608ca2c1399205 |
|
BLAKE2b-256 | a01b63dbaee43b1f7b27db30c12e667ba248afe6b8c6bf537452bb9af16b0e02 |
Close
Hashes for async_scheduler_object-1.0.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9a8482a58c06926b072774e958d5a750273f873158da8726f5c1a6b985bfb3de |
|
MD5 | 19b209036ce4ad4d6759baa7746fd701 |
|
BLAKE2b-256 | f0e7997371e72e60805a6bbb62ee8d062c8858a5ecb1771618c4d40ba4a8bab6 |