This library is designed for asynchronous execution of scheduled tasks.
Project description
AIOBGJOBS DOCS
En - docs
This library is designed for asynchronous execution of scheduled tasks.
Simple example:
import asyncio
import datetime
from aiobgjobs.dispatcher import BgDispatcher
from aiobgjobs.types import Every, Repeats
bg_dp = BgDispatcher()
@bg_dp.handler_job(
every=Every.seconds(15),
count_repeats=3
)
async def simple_func_every_second():
print('Test')
@bg_dp.handler_job(
every=Every.weekdays.monday(hour=11, minute=40),
count_repeats=Repeats.infinity
)
async def simple_func_infinity_monday():
print('Test2')
@bg_dp.handler_job(
every=Every.minutes(2),
count_repeats=Repeats.infinity
)
async def simple_func_every_2_minutes():
print('Test3')
@bg_dp.handler_job(
count_repeats=Repeats.one,
datetime_start=datetime.timedelta(minutes=2.0)
)
async def simple_func_delta_to_start():
print('Test4')
async def main():
await asyncio.create_task(bg_dp.start(relax=.3))
if __name__ == '__main__':
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
print('Goodbye!')
Example without using decorators:
import asyncio
import datetime
from aiobgjobs.dispatcher import BgDispatcher
from aiobgjobs.handlers import Handler
from aiobgjobs.jobs import Job
from aiobgjobs.types import Every, Repeats
bg_dp = BgDispatcher()
async def simple_func_every_seconds():
print('Test')
async def simple_func_infinity_monday():
print('Test2')
async def simple_func_every_2_minutes():
print('Test3')
async def simple_func_delta_to_start():
print('Test4')
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_every_seconds,
name='Job - 1',
kwargs=None
),
count_repeats=3,
every=Every.seconds(15)
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_infinity_monday,
name='Job - 2',
kwargs=None
),
count_repeats=Repeats.infinity,
every=Every.weekdays.monday()
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_every_2_minutes,
name='Job - 3',
kwargs=None
),
count_repeats=Repeats.infinity,
every=Every.minutes(2)
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_delta_to_start,
name='Job - 4',
kwargs=None
),
count_repeats=Repeats.one,
datetime_start=datetime.timedelta(minutes=2)
)
)
async def main():
await asyncio.create_task(bg_dp.start(relax=.3))
if __name__ == '__main__':
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
print('Goodbye!')
Ru - docs
Эта библиотека предназначена для асинхронного выполнения задач по расписанию.
Простой пример:
import asyncio
import datetime
from aiobgjobs.dispatcher import BgDispatcher
from aiobgjobs.types import Every, Repeats
bg_dp = BgDispatcher()
@bg_dp.handler_job(
every=Every.seconds(15),
count_repeats=3
)
async def simple_func_every_second():
print('Test')
@bg_dp.handler_job(
every=Every.weekdays.monday(hour=11, minute=40),
count_repeats=Repeats.infinity
)
async def simple_func_infinity_monday():
print('Test2')
@bg_dp.handler_job(
every=Every.minutes(2),
count_repeats=Repeats.infinity
)
async def simple_func_every_2_minutes():
print('Test3')
@bg_dp.handler_job(
count_repeats=Repeats.one,
datetime_start=datetime.timedelta(minutes=2.0)
)
async def simple_func_delta_to_start():
print('Test4')
async def main():
await asyncio.create_task(bg_dp.start(relax=.3))
if __name__ == '__main__':
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
print('Goodbye!')
Пример без использования декораторов:
import asyncio
import datetime
from aiobgjobs.dispatcher import BgDispatcher
from aiobgjobs.handlers import Handler
from aiobgjobs.jobs import Job
from aiobgjobs.types import Every, Repeats
bg_dp = BgDispatcher()
async def simple_func_every_seconds():
print('Test')
async def simple_func_infinity_monday():
print('Test2')
async def simple_func_every_2_minutes():
print('Test3')
async def simple_func_delta_to_start():
print('Test4')
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_every_seconds,
name='Job - 1',
kwargs=None
),
count_repeats=3,
every=Every.seconds(15)
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_infinity_monday,
name='Job - 2',
kwargs=None
),
count_repeats=Repeats.infinity,
every=Every.weekdays.monday()
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_every_2_minutes,
name='Job - 3',
kwargs=None
),
count_repeats=Repeats.infinity,
every=Every.minutes(2)
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_delta_to_start,
name='Job - 4',
kwargs=None
),
count_repeats=Repeats.one,
datetime_start=datetime.timedelta(minutes=2)
)
)
async def main():
await asyncio.create_task(bg_dp.start(relax=.3))
if __name__ == '__main__':
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
print('Goodbye!')
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
aiobgjobs-0.1.0.tar.gz
(6.7 kB
view details)
Built Distribution
File details
Details for the file aiobgjobs-0.1.0.tar.gz
.
File metadata
- Download URL: aiobgjobs-0.1.0.tar.gz
- Upload date:
- Size: 6.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.2.2 CPython/3.10.8 Darwin/20.6.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5663a6c06088477c85b642d097fa62a7b464afd958ead5ef93c0402f0c9b7094 |
|
MD5 | ebc9d12553834a76dd90346bbb3bf934 |
|
BLAKE2b-256 | fad1d822a4103f256df07b54993eb2b6be79a989f837730ed863e6c3e00ef979 |
File details
Details for the file aiobgjobs-0.1.0-py3-none-any.whl
.
File metadata
- Download URL: aiobgjobs-0.1.0-py3-none-any.whl
- Upload date:
- Size: 6.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.2.2 CPython/3.10.8 Darwin/20.6.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | be077e47ef2971c7a1ad305f86424d3489fa3021034b7e23794ab30f8c17f475 |
|
MD5 | 9b54f87b64b74424b1b941d886dc4a2d |
|
BLAKE2b-256 | 9324b140bda8d168524dd81acf14ced3d64ba0d3a305547a5946ed8bb9d26fbd |