This library is designed for asynchronous execution of scheduled tasks.
Project description
AIOBGJOBS DOCS
En - docs
This library is designed for asynchronous execution of scheduled tasks.
Simple example:
import asyncio
import datetime
from aiobgjobs.dispatcher import BgDispatcher
from aiobgjobs.types import Every, Repeats
bg_dp = BgDispatcher()
@bg_dp.handler_job(
every=Every.seconds(15),
count_repeats=3
)
async def simple_func_every_second():
print('Test')
@bg_dp.handler_job(
every=Every.weekdays.monday(hour=11, minute=40),
count_repeats=Repeats.infinity
)
async def simple_func_infinity_monday():
print('Test2')
@bg_dp.handler_job(
every=Every.minutes(2),
count_repeats=Repeats.infinity
)
async def simple_func_every_2_minutes():
print('Test3')
@bg_dp.handler_job(
count_repeats=Repeats.one,
datetime_start=datetime.timedelta(minutes=2.0)
)
async def simple_func_delta_to_start():
print('Test4')
async def main():
await asyncio.create_task(bg_dp.start(relax=.3))
if __name__ == '__main__':
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
print('Goodbye!')
Example without using decorators:
import asyncio
import datetime
from aiobgjobs.dispatcher import BgDispatcher
from aiobgjobs.handlers import Handler
from aiobgjobs.jobs import Job
from aiobgjobs.types import Every, Repeats
bg_dp = BgDispatcher()
async def simple_func_every_seconds():
print('Test')
async def simple_func_infinity_monday():
print('Test2')
async def simple_func_every_2_minutes():
print('Test3')
async def simple_func_delta_to_start():
print('Test4')
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_every_seconds,
name='Job - 1',
kwargs=None
),
count_repeats=3,
every=Every.seconds(15)
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_infinity_monday,
name='Job - 2',
kwargs=None
),
count_repeats=Repeats.infinity,
every=Every.weekdays.monday()
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_every_2_minutes,
name='Job - 3',
kwargs=None
),
count_repeats=Repeats.infinity,
every=Every.minutes(2)
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_delta_to_start,
name='Job - 4',
kwargs=None
),
count_repeats=Repeats.one,
datetime_start=datetime.timedelta(minutes=2)
)
)
async def main():
await asyncio.create_task(bg_dp.start(relax=.3))
if __name__ == '__main__':
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
print('Goodbye!')
Ru - docs
Эта библиотека предназначена для асинхронного выполнения задач по расписанию.
Простой пример:
import asyncio
import datetime
from aiobgjobs.dispatcher import BgDispatcher
from aiobgjobs.types import Every, Repeats
bg_dp = BgDispatcher()
@bg_dp.handler_job(
every=Every.seconds(15),
count_repeats=3
)
async def simple_func_every_second():
print('Test')
@bg_dp.handler_job(
every=Every.weekdays.monday(hour=11, minute=40),
count_repeats=Repeats.infinity
)
async def simple_func_infinity_monday():
print('Test2')
@bg_dp.handler_job(
every=Every.minutes(2),
count_repeats=Repeats.infinity
)
async def simple_func_every_2_minutes():
print('Test3')
@bg_dp.handler_job(
count_repeats=Repeats.one,
datetime_start=datetime.timedelta(minutes=2.0)
)
async def simple_func_delta_to_start():
print('Test4')
async def main():
await asyncio.create_task(bg_dp.start(relax=.3))
if __name__ == '__main__':
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
print('Goodbye!')
Пример без использования декораторов:
import asyncio
import datetime
from aiobgjobs.dispatcher import BgDispatcher
from aiobgjobs.handlers import Handler
from aiobgjobs.jobs import Job
from aiobgjobs.types import Every, Repeats
bg_dp = BgDispatcher()
async def simple_func_every_seconds():
print('Test')
async def simple_func_infinity_monday():
print('Test2')
async def simple_func_every_2_minutes():
print('Test3')
async def simple_func_delta_to_start():
print('Test4')
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_every_seconds,
name='Job - 1',
kwargs=None
),
count_repeats=3,
every=Every.seconds(15)
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_infinity_monday,
name='Job - 2',
kwargs=None
),
count_repeats=Repeats.infinity,
every=Every.weekdays.monday()
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_every_2_minutes,
name='Job - 3',
kwargs=None
),
count_repeats=Repeats.infinity,
every=Every.minutes(2)
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_delta_to_start,
name='Job - 4',
kwargs=None
),
count_repeats=Repeats.one,
datetime_start=datetime.timedelta(minutes=2)
)
)
async def main():
await asyncio.create_task(bg_dp.start(relax=.3))
if __name__ == '__main__':
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
print('Goodbye!')
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
aiobgjobs-0.1.5.tar.gz
(6.8 kB
view details)
Built Distribution
File details
Details for the file aiobgjobs-0.1.5.tar.gz
.
File metadata
- Download URL: aiobgjobs-0.1.5.tar.gz
- Upload date:
- Size: 6.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.2.2 CPython/3.10.8 Darwin/20.6.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | d9327c6907ea2153225628dbe660e891782630cffb65eabe4cd4ad868d9c3831 |
|
MD5 | 0f9b4760783dd57d4d65204e68693e1a |
|
BLAKE2b-256 | fac20d404a0d09c2ba110c36b8ac26438ad1632391e1a7b753271f34ad1c4f2f |
File details
Details for the file aiobgjobs-0.1.5-py3-none-any.whl
.
File metadata
- Download URL: aiobgjobs-0.1.5-py3-none-any.whl
- Upload date:
- Size: 7.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.2.2 CPython/3.10.8 Darwin/20.6.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | aa9eb75a0352d05a85d64d9c480ee7ed76663940aded2728b616f37bb736b159 |
|
MD5 | 2e364663782ef38186cbf4c57d8e0e7e |
|
BLAKE2b-256 | e2df3ab05bca77f3366b758f1ce33e8ab2a64cd1c327ff56715dbbccaed460fe |