Schedule async tasks and manage them using a REST API or WEB UI
Project description
aiocronjob
Schedule and run asyncio
coroutines and manage them from a web interface or programmatically using the rest api.
Requires python >= 3.6
How to install
pip3 install aiocronjob
Usage example
# examples/simple_tasks.py
import asyncio
from aiocronjob import manager, Job
from aiocronjob import run_app
async def first_task():
for i in range(20):
print("first task log", i)
await asyncio.sleep(1)
async def second_task():
for i in range(10):
await asyncio.sleep(1.5)
print("second task log", i)
raise Exception("second task exception")
manager.register(first_task, name="First task", crontab="22 * * * *")
manager.register(second_task, name="Second task", crontab="23 * * * *")
async def on_job_exception(job: Job, exc: BaseException):
print(f"An exception occurred for job {job.name}: {exc}")
async def on_job_cancelled(job: Job):
print(f"{job.name} was cancelled...")
async def on_startup():
print("The app started.")
async def on_shutdown():
print("The app stopped.")
manager.set_on_job_cancelled_callback(on_job_cancelled)
manager.set_on_job_exception_callback(on_job_exception)
manager.set_on_shutdown_callback(on_shutdown)
manager.set_on_startup_callback(on_startup)
if __name__ == "__main__":
run_app()
After running the app, the FastAPI server runs at localhost:5000
.
Web Interface
Open localhost:5000 in your browser:
Rest API
Open localhost:5000/docs for endpoints docs.
curl
example:
$ curl http://0.0.0.0:5000/api/jobs
[
{
"name": "First task",
"next_run_in": "3481.906931",
"last_status": "pending",
"enabled": "True",
"crontab": "22 * * * *",
"created_at": "2020-06-06T10:20:25.118630+00:00",
"started_at": null,
"stopped_at": null
},
{
"name": "Second task",
"next_run_in": "3541.904723",
"last_status": "error",
"enabled": "True",
"crontab": "23 * * * *",
"created_at": "2020-06-06T10:20:25.118661+00:00",
"started_at": "2020-06-06T10:23:00.000906+00:00",
"stopped_at": "2020-06-06T10:23:15.004351+00:00"
}
]
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
aiocronjob-0.2.3.tar.gz
(739.0 kB
view hashes)
Built Distribution
Close
Hashes for aiocronjob-0.2.3-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1950c650e2ee98b6d3f5ad7fbeb15530916b906a8fa6c3e70a46cddaee85d10f |
|
MD5 | 6009dd153d3b0cdd42c7294e5127c64d |
|
BLAKE2b-256 | 51991418d47bf227e85018c514dfb5317d75a82c280c85f400029c1081c41df1 |