This library is designed for asynchronous execution of scheduled tasks.
Project description
AIOBGJOBS DOCS
En - docs
This library is designed for asynchronous execution of scheduled tasks.
Simple example:
import asyncio
import datetime
from aiobgjobs.dispatcher import BgDispatcher
from aiobgjobs.types import Every, Repeats
bg_dp = BgDispatcher()
@bg_dp.handler_job(
every=Every.seconds(15),
count_repeats=3
)
async def simple_func_every_second():
print('Test')
@bg_dp.handler_job(
every=Every.weekdays.monday(hour=11, minute=40),
count_repeats=Repeats.infinity
)
async def simple_func_infinity_monday():
print('Test2')
@bg_dp.handler_job(
every=Every.minutes(2),
count_repeats=Repeats.infinity
)
async def simple_func_every_2_minutes():
print('Test3')
@bg_dp.handler_job(
count_repeats=Repeats.one,
datetime_start=datetime.timedelta(minutes=2.0)
)
async def simple_func_delta_to_start():
print('Test4')
async def main():
await asyncio.create_task(bg_dp.start(relax=.3))
if __name__ == '__main__':
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
print('Goodbye!')
Example without using decorators:
import asyncio
import datetime
from aiobgjobs.dispatcher import BgDispatcher
from aiobgjobs.handlers import Handler
from aiobgjobs.jobs import Job
from aiobgjobs.types import Every, Repeats
bg_dp = BgDispatcher()
async def simple_func_every_seconds():
print('Test')
async def simple_func_infinity_monday():
print('Test2')
async def simple_func_every_2_minutes():
print('Test3')
async def simple_func_delta_to_start():
print('Test4')
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_every_seconds,
name='Job - 1',
kwargs=None
),
count_repeats=3,
every=Every.seconds(15)
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_infinity_monday,
name='Job - 2',
kwargs=None
),
count_repeats=Repeats.infinity,
every=Every.weekdays.monday()
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_every_2_minutes,
name='Job - 3',
kwargs=None
),
count_repeats=Repeats.infinity,
every=Every.minutes(2)
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_delta_to_start,
name='Job - 4',
kwargs=None
),
count_repeats=Repeats.one,
datetime_start=datetime.timedelta(minutes=2)
)
)
async def main():
await asyncio.create_task(bg_dp.start(relax=.3))
if __name__ == '__main__':
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
print('Goodbye!')
Ru - docs
Эта библиотека предназначена для асинхронного выполнения задач по расписанию.
Простой пример:
import asyncio
import datetime
from aiobgjobs.dispatcher import BgDispatcher
from aiobgjobs.types import Every, Repeats
bg_dp = BgDispatcher()
@bg_dp.handler_job(
every=Every.seconds(15),
count_repeats=3
)
async def simple_func_every_second():
print('Test')
@bg_dp.handler_job(
every=Every.weekdays.monday(hour=11, minute=40),
count_repeats=Repeats.infinity
)
async def simple_func_infinity_monday():
print('Test2')
@bg_dp.handler_job(
every=Every.minutes(2),
count_repeats=Repeats.infinity
)
async def simple_func_every_2_minutes():
print('Test3')
@bg_dp.handler_job(
count_repeats=Repeats.one,
datetime_start=datetime.timedelta(minutes=2.0)
)
async def simple_func_delta_to_start():
print('Test4')
async def main():
await asyncio.create_task(bg_dp.start(relax=.3))
if __name__ == '__main__':
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
print('Goodbye!')
Пример без использования декораторов:
import asyncio
import datetime
from aiobgjobs.dispatcher import BgDispatcher
from aiobgjobs.handlers import Handler
from aiobgjobs.jobs import Job
from aiobgjobs.types import Every, Repeats
bg_dp = BgDispatcher()
async def simple_func_every_seconds():
print('Test')
async def simple_func_infinity_monday():
print('Test2')
async def simple_func_every_2_minutes():
print('Test3')
async def simple_func_delta_to_start():
print('Test4')
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_every_seconds,
name='Job - 1',
kwargs=None
),
count_repeats=3,
every=Every.seconds(15)
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_infinity_monday,
name='Job - 2',
kwargs=None
),
count_repeats=Repeats.infinity,
every=Every.weekdays.monday()
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_every_2_minutes,
name='Job - 3',
kwargs=None
),
count_repeats=Repeats.infinity,
every=Every.minutes(2)
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_delta_to_start,
name='Job - 4',
kwargs=None
),
count_repeats=Repeats.one,
datetime_start=datetime.timedelta(minutes=2)
)
)
async def main():
await asyncio.create_task(bg_dp.start(relax=.3))
if __name__ == '__main__':
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
print('Goodbye!')
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
aiobgjobs-0.1.1.tar.gz
(6.7 kB
view details)
Built Distribution
File details
Details for the file aiobgjobs-0.1.1.tar.gz
.
File metadata
- Download URL: aiobgjobs-0.1.1.tar.gz
- Upload date:
- Size: 6.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.2.2 CPython/3.10.8 Darwin/20.6.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0d908931862b40e1ed43b73b1740f68eb5aa94df9e6a10681718f4467fe5345c |
|
MD5 | 81c76a1b269b0102edc6c68eb01f1803 |
|
BLAKE2b-256 | 7f3205aee518716b495c2f6cc02c24b9ef199ea369bcced692e0a8f7026cd5ff |
File details
Details for the file aiobgjobs-0.1.1-py3-none-any.whl
.
File metadata
- Download URL: aiobgjobs-0.1.1-py3-none-any.whl
- Upload date:
- Size: 6.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.2.2 CPython/3.10.8 Darwin/20.6.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5bade87c27629c8bc191c98b37ec35fb4ce74285ac5b89965bf4e94a2fb7e855 |
|
MD5 | e1747be7a2e08778c601ce561990c4e4 |
|
BLAKE2b-256 | 0d2e74241379be83501d7b872e267ca6ab0a4ad2789b9970c1f4fe6ac9aed0a4 |