This library is designed for asynchronous execution of scheduled tasks.
Project description
AIOBGJOBS DOCS
En - docs
This library is designed for asynchronous execution of scheduled tasks.
Simple example:
import asyncio
import datetime
from aiobgjobs.dispatcher import BgDispatcher
from aiobgjobs.types import Every, Repeats
bg_dp = BgDispatcher()
@bg_dp.handler_job(
every=Every.seconds(15),
count_repeats=3
)
async def simple_func_every_second():
print('Test')
@bg_dp.handler_job(
every=Every.weekdays.monday(hour=11, minute=40),
count_repeats=Repeats.infinity
)
async def simple_func_infinity_monday():
print('Test2')
@bg_dp.handler_job(
every=Every.minutes(2),
count_repeats=Repeats.infinity
)
async def simple_func_every_2_minutes():
print('Test3')
@bg_dp.handler_job(
count_repeats=Repeats.one,
datetime_start=datetime.timedelta(minutes=2.0)
)
async def simple_func_delta_to_start():
print('Test4')
async def main():
await asyncio.create_task(bg_dp.start(relax=.3))
if __name__ == '__main__':
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
print('Goodbye!')
Example without using decorators:
import asyncio
import datetime
from aiobgjobs.dispatcher import BgDispatcher
from aiobgjobs.handlers import Handler
from aiobgjobs.jobs import Job
from aiobgjobs.types import Every, Repeats
bg_dp = BgDispatcher()
async def simple_func_every_seconds():
print('Test')
async def simple_func_infinity_monday():
print('Test2')
async def simple_func_every_2_minutes():
print('Test3')
async def simple_func_delta_to_start():
print('Test4')
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_every_seconds,
name='Job - 1',
kwargs=None
),
count_repeats=3,
every=Every.seconds(15)
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_infinity_monday,
name='Job - 2',
kwargs=None
),
count_repeats=Repeats.infinity,
every=Every.weekdays.monday()
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_every_2_minutes,
name='Job - 3',
kwargs=None
),
count_repeats=Repeats.infinity,
every=Every.minutes(2)
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_delta_to_start,
name='Job - 4',
kwargs=None
),
count_repeats=Repeats.one,
datetime_start=datetime.timedelta(minutes=2)
)
)
async def main():
await asyncio.create_task(bg_dp.start(relax=.3))
if __name__ == '__main__':
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
print('Goodbye!')
Ru - docs
Эта библиотека предназначена для асинхронного выполнения задач по расписанию.
Простой пример:
import asyncio
import datetime
from aiobgjobs.dispatcher import BgDispatcher
from aiobgjobs.types import Every, Repeats
bg_dp = BgDispatcher()
@bg_dp.handler_job(
every=Every.seconds(15),
count_repeats=3
)
async def simple_func_every_second():
print('Test')
@bg_dp.handler_job(
every=Every.weekdays.monday(hour=11, minute=40),
count_repeats=Repeats.infinity
)
async def simple_func_infinity_monday():
print('Test2')
@bg_dp.handler_job(
every=Every.minutes(2),
count_repeats=Repeats.infinity
)
async def simple_func_every_2_minutes():
print('Test3')
@bg_dp.handler_job(
count_repeats=Repeats.one,
datetime_start=datetime.timedelta(minutes=2.0)
)
async def simple_func_delta_to_start():
print('Test4')
async def main():
await asyncio.create_task(bg_dp.start(relax=.3))
if __name__ == '__main__':
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
print('Goodbye!')
Пример без использования декораторов:
import asyncio
import datetime
from aiobgjobs.dispatcher import BgDispatcher
from aiobgjobs.handlers import Handler
from aiobgjobs.jobs import Job
from aiobgjobs.types import Every, Repeats
bg_dp = BgDispatcher()
async def simple_func_every_seconds():
print('Test')
async def simple_func_infinity_monday():
print('Test2')
async def simple_func_every_2_minutes():
print('Test3')
async def simple_func_delta_to_start():
print('Test4')
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_every_seconds,
name='Job - 1',
kwargs=None
),
count_repeats=3,
every=Every.seconds(15)
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_infinity_monday,
name='Job - 2',
kwargs=None
),
count_repeats=Repeats.infinity,
every=Every.weekdays.monday()
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_every_2_minutes,
name='Job - 3',
kwargs=None
),
count_repeats=Repeats.infinity,
every=Every.minutes(2)
)
)
bg_dp.register_handler(
Handler(
job=Job(
func=simple_func_delta_to_start,
name='Job - 4',
kwargs=None
),
count_repeats=Repeats.one,
datetime_start=datetime.timedelta(minutes=2)
)
)
async def main():
await asyncio.create_task(bg_dp.start(relax=.3))
if __name__ == '__main__':
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
print('Goodbye!')
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
aiobgjobs-0.1.2.tar.gz
(6.8 kB
view details)
Built Distribution
File details
Details for the file aiobgjobs-0.1.2.tar.gz
.
File metadata
- Download URL: aiobgjobs-0.1.2.tar.gz
- Upload date:
- Size: 6.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.2.2 CPython/3.10.8 Darwin/20.6.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9b7ef2271f5ec640faeedb7bfc496ce7ff2fc65ce196bb05769668bddc1a93ca |
|
MD5 | aa18441cbf61462a76ac0eeb3bbdbe55 |
|
BLAKE2b-256 | 3df0cd6603706e3f46dd63bd0f1f9e57f2ba0b7370eb577be66a2e2732b2a352 |
File details
Details for the file aiobgjobs-0.1.2-py3-none-any.whl
.
File metadata
- Download URL: aiobgjobs-0.1.2-py3-none-any.whl
- Upload date:
- Size: 7.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.2.2 CPython/3.10.8 Darwin/20.6.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 6a450378bc8d0b9c138734e2e5d4e0299fb353571532f21ab6f12442ae688f75 |
|
MD5 | 95a8c71e25c59db825720c8c7a17cd02 |
|
BLAKE2b-256 | 81f4b7d488d91a3cf7916db85262e5f5baa64bf8cb49fc5a85fe61d52111f79b |