Skip to main content

Simple, lightweight task scheduler for Python with async support

Project description

FastScheduler

Simple, lightweight task scheduler for Python with async support, timezone handling, cron expressions, and a beautiful real-time dashboard.

If this saves you time, ⭐️ the repo and open an issue for ideas — I'm actively improving it.

GitHub Stars License: MIT Python 3.10+

FastScheduler Demo

Features

  • 🎯 Simple decorator-based API - Schedule tasks in one line
  • Async/await support - Native async function support
  • 🕐 Timezone support - Schedule jobs in any timezone
  • 📅 Cron expressions - Complex schedules with cron syntax
  • 💾 Persistent state - Survives restarts, handles missed jobs
  • 🗄️ Database support - SQLite, PostgreSQL, MySQL via SQLModel
  • 🎨 FastAPI dashboard - Beautiful real-time monitoring UI
  • 🔄 Automatic retries - Configurable retry with exponential backoff
  • ⏱️ Job timeouts - Kill long-running jobs automatically
  • ⏸️ Pause/Resume - Control jobs without removing them
  • 📋 Dead Letter Queue - Track and debug failed jobs

Installation

# Basic installation
pip install fastscheduler

# With FastAPI dashboard
pip install fastscheduler[fastapi]

# With cron expression support
pip install fastscheduler[cron]

# With database support (SQLite, PostgreSQL, MySQL)
pip install fastscheduler[database]

# All features
pip install fastscheduler[all]

Quick Start

from fastscheduler import FastScheduler

scheduler = FastScheduler(quiet=True)

@scheduler.every(10).seconds
def task():
    print("Task executed")

@scheduler.daily.at("14:30")
async def daily_task():
    print("Daily task at 2:30 PM")

scheduler.start()

Scheduling Options

Interval-based

@scheduler.every(10).seconds
@scheduler.every(5).minutes
@scheduler.every(2).hours
@scheduler.every(1).days

Time-based

@scheduler.daily.at("09:00")              # Daily at 9 AM
@scheduler.hourly.at(":30")               # Every hour at :30
@scheduler.weekly.monday.at("10:00")      # Every Monday at 10 AM
@scheduler.weekly.weekdays.at("09:00")    # Weekdays at 9 AM
@scheduler.weekly.weekends.at("12:00")    # Weekends at noon

Cron Expressions

Requires: pip install fastscheduler[cron]

@scheduler.cron("0 9 * * MON-FRI")        # 9 AM on weekdays
def market_open():
    ...

@scheduler.cron("*/15 * * * *")           # Every 15 minutes
def frequent_check():
    ...

@scheduler.cron("0 0 1 * *")              # First day of each month
def monthly_report():
    ...

One-time Jobs

@scheduler.once(60)                        # Run once after 60 seconds
def delayed_task():
    ...

@scheduler.at("2024-12-25 00:00:00")      # Run at specific datetime
def christmas_task():
    ...

Timezone Support

Schedule jobs in any timezone:

# Using the tz parameter
@scheduler.daily.at("09:00", tz="America/New_York")
def nyc_morning():
    print("Good morning, New York!")

# Using the .tz() method (chainable)
@scheduler.weekly.monday.tz("Europe/London").at("09:00")
def london_standup():
    print("Monday standup")

# With cron expressions
@scheduler.cron("0 9 * * MON-FRI").tz("Asia/Tokyo")
def tokyo_market():
    print("Tokyo market open")

Common timezones: UTC, America/New_York, America/Los_Angeles, Europe/London, Europe/Paris, Asia/Tokyo, Asia/Shanghai, Australia/Sydney

Job Control

Timeouts

Kill jobs that run too long:

@scheduler.every(1).minutes.timeout(30)   # Kill if runs > 30 seconds
def quick_task():
    ...

@scheduler.daily.at("02:00").timeout(3600)  # 1 hour max
def nightly_backup():
    ...

Retries

Configure automatic retries on failure:

@scheduler.every(5).minutes.retries(5)    # Retry up to 5 times
def flaky_api_call():
    ...

Retries use exponential backoff (2s, 4s, 8s, 16s, ...).

Skip Catch-up

Don't run missed jobs after restart:

@scheduler.every(1).hours.no_catch_up()
def hourly_stats():
    ...

Pause, Resume, and Cancel

# Pause a job (stays in queue but won't execute)
scheduler.pause_job("job_0")

# Resume a paused job
scheduler.resume_job("job_0")

# Cancel and remove a job
scheduler.cancel_job("job_0")

# Cancel all jobs with a specific function name
scheduler.cancel_job_by_name("my_task")

FastAPI Integration

Add a beautiful real-time dashboard to your FastAPI app:

from fastapi import FastAPI
from fastscheduler import FastScheduler
from fastscheduler.fastapi_integration import create_scheduler_routes

app = FastAPI()
scheduler = FastScheduler(quiet=True)

# Add dashboard at /scheduler/
app.include_router(create_scheduler_routes(scheduler))

@scheduler.every(30).seconds
def background_task():
    print("Background work")

scheduler.start()

Dashboard Features

Access at http://localhost:8000/scheduler/

FastScheduler Dashboard

  • Real-time updates via Server-Sent Events (SSE)
  • Job table with status indicators, last 5 run results, and countdown timers
  • Quick actions - Run/Pause/Resume/Cancel directly from the UI
  • Execution history tab with filtering and search
  • Dead letter queue tab - view failed jobs with error details
  • Statistics - Success rate, uptime, active jobs count
  • Toast notifications - Alerts for job completions and failures

API Endpoints

Endpoint Method Description
/scheduler/ GET Dashboard UI
/scheduler/api/status GET Scheduler status
/scheduler/api/jobs GET List all jobs
/scheduler/api/jobs/{job_id} GET Get specific job
/scheduler/api/jobs/{job_id}/pause POST Pause a job
/scheduler/api/jobs/{job_id}/resume POST Resume a job
/scheduler/api/jobs/{job_id}/run POST Trigger immediate execution
/scheduler/api/jobs/{job_id}/cancel POST Cancel a job
/scheduler/api/history GET Execution history
/scheduler/api/dead-letters GET Dead letter queue (failed jobs)
/scheduler/api/dead-letters DELETE Clear dead letter queue
/scheduler/events GET SSE event stream

Configuration

scheduler = FastScheduler(
    state_file="scheduler.json",    # Persistence file for JSON backend (default: fastscheduler_state.json)
    storage="json",                 # Storage backend: "json" (default) or "sqlmodel"
    database_url=None,              # Database URL for sqlmodel backend
    quiet=True,                     # Suppress log messages (default: False)
    auto_start=False,               # Start immediately (default: False)
    max_history=5000,               # Max history entries to keep (default: 10000)
    max_workers=20,                 # Concurrent job threads (default: 10)
    history_retention_days=8,       # Delete history older than X days (default: 7)
    max_dead_letters=500,           # Max failed jobs in dead letter queue (default: 500)
)

History Retention

History is automatically cleaned up based on two limits (both are enforced):

  • Count limit: max_history - maximum number of entries
  • Time limit: history_retention_days - maximum age in days

Set history_retention_days=0 to disable time-based cleanup (only count limit applies).

Dead Letter Queue

Failed job executions are automatically stored in a separate dead letter queue for debugging:

# Get failed jobs
dead_letters = scheduler.get_dead_letters(limit=100)

# Clear the queue
scheduler.clear_dead_letters()

The dead letter queue:

  • Stores the last max_dead_letters failed jobs (default: 500)
  • Persists to a separate JSON file (*_dead_letters.json) or database table
  • Includes error messages, timestamps, run counts, and execution times
  • Viewable in the dashboard "Failed" tab

Database Storage

For production workloads requiring transactional integrity and concurrency, use database storage instead of JSON files.

Requires: pip install fastscheduler[database]

SQLite (Recommended for Single-Server)

scheduler = FastScheduler(
    storage="sqlmodel",
    database_url="sqlite:///scheduler.db"
)

PostgreSQL (Recommended for Production)

scheduler = FastScheduler(
    storage="sqlmodel",
    database_url="postgresql://user:password@localhost:5432/mydb"
)

MySQL

scheduler = FastScheduler(
    storage="sqlmodel",
    database_url="mysql://user:password@localhost:3306/mydb"
)

Custom Storage Backend

Implement your own storage by subclassing StorageBackend:

from fastscheduler.storage import StorageBackend

class MyCustomBackend(StorageBackend):
    def save_state(self, jobs, history, statistics, job_counter, scheduler_running):
        # Your implementation
        ...
    
    def load_state(self):
        # Your implementation
        ...
    
    # Implement other required methods...

scheduler = FastScheduler(storage=MyCustomBackend())

Database Tables

When using SQLModel storage, the following tables are created automatically:

Table Purpose
scheduler_jobs Active job definitions
scheduler_history Execution history
scheduler_dead_letters Failed job records
scheduler_metadata Job counter, statistics

Monitoring

Programmatic Access

# Get all jobs
jobs = scheduler.get_jobs()

# Get specific job
job = scheduler.get_job("job_0")

# Get execution history
history = scheduler.get_history(limit=100)
history = scheduler.get_history(func_name="my_task", limit=50)

# Get statistics
stats = scheduler.get_statistics()
# Returns: total_runs, total_failures, uptime, per_job stats

# Print simple status to console
scheduler.print_status()

Context Manager

with FastScheduler(quiet=True) as scheduler:
    @scheduler.every(5).seconds
    def task():
        print("Running")

    # Scheduler starts automatically
    time.sleep(30)
# Scheduler stops automatically on exit

State Persistence

FastScheduler automatically saves state to disk (JSON) or database:

  • Job definitions and schedules
  • Execution history
  • Statistics
  • Job counter (ensures unique IDs across restarts)

On restart, it:

  1. Restores all jobs
  2. Calculates missed executions
  3. Runs catch-up jobs (unless no_catch_up() is set)

Use JSON storage (default) for simple setups, or database storage for production workloads with multiple instances or high reliability requirements. See Database Storage for details.

Examples

Complete Example

import asyncio
import time
from fastscheduler import FastScheduler

scheduler = FastScheduler(quiet=True)

# Simple interval job
@scheduler.every(10).seconds
def heartbeat():
    print(f"[{time.strftime('%H:%M:%S')}] ❤️ Heartbeat")

# Async job with timezone
@scheduler.daily.at("09:00", tz="America/New_York").timeout(60)
async def morning_report():
    print("Generating report...")
    await asyncio.sleep(5)
    print("Report sent!")

# Cron job with retries
@scheduler.cron("*/5 * * * *").retries(3)
def check_api():
    print("Checking API health")

# Weekly job
@scheduler.weekly.monday.at("10:00")
def weekly_standup():
    print("Time for standup!")

# Start scheduler
scheduler.start()

try:
    while True:
        time.sleep(60)
        scheduler.print_status()
except KeyboardInterrupt:
    scheduler.stop()

FastAPI with Lifespan

from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastscheduler import FastScheduler
from fastscheduler.fastapi_integration import create_scheduler_routes

scheduler = FastScheduler(quiet=True)

@asynccontextmanager
async def lifespan(app: FastAPI):
    scheduler.start()
    yield
    scheduler.stop(wait=True)

app = FastAPI(lifespan=lifespan)
app.include_router(create_scheduler_routes(scheduler))

@scheduler.every(30).seconds
def background_job():
    print("Working...")

API Reference

FastScheduler

Method Description
start() Start the scheduler
stop(wait=True, timeout=30) Stop gracefully
get_jobs() List all scheduled jobs
get_job(job_id) Get specific job by ID
get_history(func_name=None, limit=50) Get execution history
get_statistics() Get runtime statistics
get_dead_letters(limit=100) Get dead letter queue (failed jobs)
clear_dead_letters() Clear all dead letter entries
pause_job(job_id) Pause a job
resume_job(job_id) Resume a paused job
run_job_now(job_id) Trigger immediate execution
cancel_job(job_id) Cancel and remove a job
cancel_job_by_name(func_name) Cancel all jobs by function name
print_status() Print status to console

Scheduler Methods

Method Description
every(n).seconds/minutes/hours/days Interval scheduling
daily.at("HH:MM") Daily at specific time
hourly.at(":MM") Hourly at specific minute
weekly.monday/tuesday/.../sunday.at("HH:MM") Weekly scheduling
weekly.weekdays/weekends.at("HH:MM") Weekday/weekend scheduling
cron("expression") Cron expression scheduling
once(seconds) One-time delayed execution
at("YYYY-MM-DD HH:MM:SS") One-time at specific datetime

Chainable Modifiers

Modifier Description
.timeout(seconds) Maximum execution time
.retries(n) Maximum retry attempts
.no_catch_up() Skip missed executions
.tz("timezone") Set timezone for schedule

License

MIT

Contributing

Contributions welcome! Please open an issue or PR on GitHub.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastscheduler-0.2.1.tar.gz (31.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fastscheduler-0.2.1-py3-none-any.whl (36.0 kB view details)

Uploaded Python 3

File details

Details for the file fastscheduler-0.2.1.tar.gz.

File metadata

  • Download URL: fastscheduler-0.2.1.tar.gz
  • Upload date:
  • Size: 31.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for fastscheduler-0.2.1.tar.gz
Algorithm Hash digest
SHA256 3ee84c9bc943c65b9493c8d8cacdcf8ea14b22cb57e4637cbc313a61e20ceda9
MD5 049c1318bb5f421140a7c855cffc9b92
BLAKE2b-256 71ed51017b81b28f9d45cd3ce4a2d7274562291ef1ff44eb16ca69a6d3bd2def

See more details on using hashes here.

File details

Details for the file fastscheduler-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: fastscheduler-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 36.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for fastscheduler-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 ec6ad97f509227bfd9cdbef1481429732904ef93c319876b738eb697e5ad149c
MD5 94e9a95ede7162ab0401806e06713a22
BLAKE2b-256 2e87dd0a405aa91662cd6579ad3e2f838a0e572b146c885f807ffe65fb59fcb7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page