Schedule async tasks and manage them using a REST API or WEB UI
Project description
aiocronjob
Schedule and run asyncio
coroutines and manage them from a web interface or programmatically using the rest api.
Requires python >= 3.6
How to install
pip3 install aiocronjob
Usage example
# examples/simple_tasks.py
import asyncio
from aiocronjob import manager, Job
from aiocronjob import run_app
async def first_task():
for i in range(20):
print("first task log", i)
await asyncio.sleep(1)
async def second_task():
for i in range(10):
await asyncio.sleep(1.5)
print("second task log", i)
raise Exception("second task exception")
manager.register(first_task, name="First task", crontab="22 * * * *")
manager.register(second_task, name="Second task", crontab="23 * * * *")
async def on_job_exception(job: Job, exc: BaseException):
print(f"An exception occurred for job {job.name}: {exc}")
async def on_job_cancelled(job: Job):
print(f"{job.name} was cancelled...")
async def on_startup():
print("The app started.")
async def on_shutdown():
print("The app stopped.")
manager.set_on_job_cancelled_callback(on_job_cancelled)
manager.set_on_job_exception_callback(on_job_exception)
manager.set_on_shutdown_callback(on_shutdown)
manager.set_on_startup_callback(on_startup)
if __name__ == "__main__":
run_app()
After running the app, the FastAPI server runs at localhost:5000
.
Web Interface
Open localhost:5000 in your browser:
Rest API
Open localhost:5000/docs for endpoints docs.
curl
example:
$ curl http://0.0.0.0:5000/api/jobs
[
{
"name": "First task",
"next_run_in": "3481.906931",
"last_status": "pending",
"enabled": "True",
"crontab": "22 * * * *",
"created_at": "2020-06-06T10:20:25.118630+00:00",
"started_at": null,
"stopped_at": null
},
{
"name": "Second task",
"next_run_in": "3541.904723",
"last_status": "error",
"enabled": "True",
"crontab": "23 * * * *",
"created_at": "2020-06-06T10:20:25.118661+00:00",
"started_at": "2020-06-06T10:23:00.000906+00:00",
"stopped_at": "2020-06-06T10:23:15.004351+00:00"
}
]
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
aiocronjob-0.2.4.tar.gz
(740.4 kB
view hashes)
Built Distribution
Close
Hashes for aiocronjob-0.2.4-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | affe722423db4d4f2d23946a4cc58a1665e54bccab55d579bcc0bbd71f1361ff |
|
MD5 | 60a01ebbaef2bd60547e065ced00889e |
|
BLAKE2b-256 | 5876bc6dcf7a378a4cc01bebc7b0f0dff18ac559edca39d2a87066717a32ed5f |