Simple, lightweight task scheduler for Python with async support
Project description
FastScheduler
Simple, lightweight task scheduler for Python with async support, timezone handling, cron expressions, and a beautiful real-time dashboard.
If this saves you time, ⭐️ the repo and open an issue for ideas — I'm actively improving it.
Features
- 🎯 Simple decorator-based API - Schedule tasks in one line
- ⚡ Async/await support - Native async function support
- 🕐 Timezone support - Schedule jobs in any timezone
- 📅 Cron expressions - Complex schedules with cron syntax
- 💾 Persistent state - Survives restarts, handles missed jobs
- 🎨 FastAPI dashboard - Beautiful real-time monitoring UI
- 🔄 Automatic retries - Configurable retry with exponential backoff
- ⏱️ Job timeouts - Kill long-running jobs automatically
- ⏸️ Pause/Resume - Control jobs without removing them
Installation
# Basic installation
pip install fastscheduler
# With FastAPI dashboard
pip install fastscheduler[fastapi]
# With cron expression support
pip install fastscheduler[cron]
# All features
pip install fastscheduler[all]
Quick Start
from fastscheduler import FastScheduler
scheduler = FastScheduler(quiet=True)
@scheduler.every(10).seconds
def task():
print("Task executed")
@scheduler.daily.at("14:30")
async def daily_task():
print("Daily task at 2:30 PM")
scheduler.start()
Scheduling Options
Interval-based
@scheduler.every(10).seconds
@scheduler.every(5).minutes
@scheduler.every(2).hours
@scheduler.every(1).days
Time-based
@scheduler.daily.at("09:00") # Daily at 9 AM
@scheduler.hourly.at(":30") # Every hour at :30
@scheduler.weekly.monday.at("10:00") # Every Monday at 10 AM
@scheduler.weekly.weekdays.at("09:00") # Weekdays at 9 AM
@scheduler.weekly.weekends.at("12:00") # Weekends at noon
Cron Expressions
Requires: pip install fastscheduler[cron]
@scheduler.cron("0 9 * * MON-FRI") # 9 AM on weekdays
def market_open():
...
@scheduler.cron("*/15 * * * *") # Every 15 minutes
def frequent_check():
...
@scheduler.cron("0 0 1 * *") # First day of each month
def monthly_report():
...
One-time Jobs
@scheduler.once(60) # Run once after 60 seconds
def delayed_task():
...
@scheduler.at("2024-12-25 00:00:00") # Run at specific datetime
def christmas_task():
...
Timezone Support
Schedule jobs in any timezone:
# Using the tz parameter
@scheduler.daily.at("09:00", tz="America/New_York")
def nyc_morning():
print("Good morning, New York!")
# Using the .tz() method (chainable)
@scheduler.weekly.monday.tz("Europe/London").at("09:00")
def london_standup():
print("Monday standup")
# With cron expressions
@scheduler.cron("0 9 * * MON-FRI").tz("Asia/Tokyo")
def tokyo_market():
print("Tokyo market open")
Common timezones: UTC, America/New_York, America/Los_Angeles, Europe/London, Europe/Paris, Asia/Tokyo, Asia/Shanghai, Australia/Sydney
Job Control
Timeouts
Kill jobs that run too long:
@scheduler.every(1).minutes.timeout(30) # Kill if runs > 30 seconds
def quick_task():
...
@scheduler.daily.at("02:00").timeout(3600) # 1 hour max
def nightly_backup():
...
Retries
Configure automatic retries on failure:
@scheduler.every(5).minutes.retries(5) # Retry up to 5 times
def flaky_api_call():
...
Retries use exponential backoff (2s, 4s, 8s, 16s, ...).
Skip Catch-up
Don't run missed jobs after restart:
@scheduler.every(1).hours.no_catch_up()
def hourly_stats():
...
Pause, Resume, and Cancel
# Pause a job (stays in queue but won't execute)
scheduler.pause_job("job_0")
# Resume a paused job
scheduler.resume_job("job_0")
# Cancel and remove a job
scheduler.cancel_job("job_0")
# Cancel all jobs with a specific function name
scheduler.cancel_job_by_name("my_task")
FastAPI Integration
Add a beautiful real-time dashboard to your FastAPI app:
from fastapi import FastAPI
from fastscheduler import FastScheduler
from fastscheduler.fastapi_integration import create_scheduler_routes
app = FastAPI()
scheduler = FastScheduler(quiet=True)
# Add dashboard at /scheduler/
app.include_router(create_scheduler_routes(scheduler))
@scheduler.every(30).seconds
def background_task():
print("Background work")
scheduler.start()
Dashboard Features
Access at http://localhost:8000/scheduler/
- Real-time updates via Server-Sent Events (SSE)
- Job cards with status indicators and countdown timers
- Quick actions - Pause/Resume/Cancel directly from the UI
- Execution history with error logs
- Statistics - Success rate, uptime, jobs per hour
- Filter & search - Find jobs by status or name
- Toast notifications - Alerts for job completions and failures
API Endpoints
| Endpoint | Method | Description |
|---|---|---|
/scheduler/ |
GET | Dashboard UI |
/scheduler/api/status |
GET | Scheduler status |
/scheduler/api/jobs |
GET | List all jobs |
/scheduler/api/jobs/{job_id} |
GET | Get specific job |
/scheduler/api/jobs/{job_id}/pause |
POST | Pause a job |
/scheduler/api/jobs/{job_id}/resume |
POST | Resume a job |
/scheduler/api/jobs/{job_id}/cancel |
POST | Cancel a job |
/scheduler/api/history |
GET | Execution history |
/scheduler/events |
GET | SSE event stream |
Configuration
scheduler = FastScheduler(
state_file="scheduler.json", # Persistence file (default: fastscheduler_state.json)
quiet=True, # Suppress log messages (default: False)
auto_start=False, # Start immediately (default: False)
max_history=5000, # Max history entries to keep (default: 10000)
max_workers=20, # Concurrent job threads (default: 10)
history_retention_days=8, # Delete history older than X days (default: 7)
)
History Retention
History is automatically cleaned up based on two limits (both are enforced):
- Count limit:
max_history- maximum number of entries - Time limit:
history_retention_days- maximum age in days
Set history_retention_days=0 to disable time-based cleanup (only count limit applies).
Monitoring
Programmatic Access
# Get all jobs
jobs = scheduler.get_jobs()
# Get specific job
job = scheduler.get_job("job_0")
# Get execution history
history = scheduler.get_history(limit=100)
history = scheduler.get_history(func_name="my_task", limit=50)
# Get statistics
stats = scheduler.get_statistics()
# Returns: total_runs, total_failures, uptime, per_job stats
# Print simple status to console
scheduler.print_status()
Context Manager
with FastScheduler(quiet=True) as scheduler:
@scheduler.every(5).seconds
def task():
print("Running")
# Scheduler starts automatically
time.sleep(30)
# Scheduler stops automatically on exit
State Persistence
FastScheduler automatically saves state to disk:
- Job definitions and schedules
- Execution history
- Statistics
- Job counter (ensures unique IDs across restarts)
On restart, it:
- Restores all jobs
- Calculates missed executions
- Runs catch-up jobs (unless
no_catch_up()is set)
Examples
Complete Example
import asyncio
import time
from fastscheduler import FastScheduler
scheduler = FastScheduler(quiet=True)
# Simple interval job
@scheduler.every(10).seconds
def heartbeat():
print(f"[{time.strftime('%H:%M:%S')}] ❤️ Heartbeat")
# Async job with timezone
@scheduler.daily.at("09:00", tz="America/New_York").timeout(60)
async def morning_report():
print("Generating report...")
await asyncio.sleep(5)
print("Report sent!")
# Cron job with retries
@scheduler.cron("*/5 * * * *").retries(3)
def check_api():
print("Checking API health")
# Weekly job
@scheduler.weekly.monday.at("10:00")
def weekly_standup():
print("Time for standup!")
# Start scheduler
scheduler.start()
try:
while True:
time.sleep(60)
scheduler.print_status()
except KeyboardInterrupt:
scheduler.stop()
FastAPI with Lifespan
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastscheduler import FastScheduler
from fastscheduler.fastapi_integration import create_scheduler_routes
scheduler = FastScheduler(quiet=True)
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler.start()
yield
scheduler.stop(wait=True)
app = FastAPI(lifespan=lifespan)
app.include_router(create_scheduler_routes(scheduler))
@scheduler.every(30).seconds
def background_job():
print("Working...")
API Reference
FastScheduler
| Method | Description |
|---|---|
start() |
Start the scheduler |
stop(wait=True, timeout=30) |
Stop gracefully |
get_jobs() |
List all scheduled jobs |
get_job(job_id) |
Get specific job by ID |
get_history(func_name=None, limit=50) |
Get execution history |
get_statistics() |
Get runtime statistics |
pause_job(job_id) |
Pause a job |
resume_job(job_id) |
Resume a paused job |
cancel_job(job_id) |
Cancel and remove a job |
cancel_job_by_name(func_name) |
Cancel all jobs by function name |
print_status() |
Print status to console |
Scheduler Methods
| Method | Description |
|---|---|
every(n).seconds/minutes/hours/days |
Interval scheduling |
daily.at("HH:MM") |
Daily at specific time |
hourly.at(":MM") |
Hourly at specific minute |
weekly.monday/tuesday/.../sunday.at("HH:MM") |
Weekly scheduling |
weekly.weekdays/weekends.at("HH:MM") |
Weekday/weekend scheduling |
cron("expression") |
Cron expression scheduling |
once(seconds) |
One-time delayed execution |
at("YYYY-MM-DD HH:MM:SS") |
One-time at specific datetime |
Chainable Modifiers
| Modifier | Description |
|---|---|
.timeout(seconds) |
Maximum execution time |
.retries(n) |
Maximum retry attempts |
.no_catch_up() |
Skip missed executions |
.tz("timezone") |
Set timezone for schedule |
License
MIT
Contributing
Contributions welcome! Please open an issue or PR on GitHub.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fastscheduler-0.1.1.tar.gz.
File metadata
- Download URL: fastscheduler-0.1.1.tar.gz
- Upload date:
- Size: 25.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ae7e4df71605bdb0578617780f042cc9ed46e70349b583c92bbdc98048c8c655
|
|
| MD5 |
9c1e8669c52feef24f615f4bb101ddfe
|
|
| BLAKE2b-256 |
685541e804fafe2071851ab1a43bcf19f64b62fdffe3f3e81987f9409ed8e478
|
File details
Details for the file fastscheduler-0.1.1-py3-none-any.whl.
File metadata
- Download URL: fastscheduler-0.1.1-py3-none-any.whl
- Upload date:
- Size: 26.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
36694769628421e62ebea644a39ce79d63084af876377729f87be77172351cb5
|
|
| MD5 |
94c8ec507899a58c250cd23ee9d80a9f
|
|
| BLAKE2b-256 |
a0346a043841623f8fd24dba4e17bd85d131fbd0b66e82ca15463ce615bc0dab
|