Cron, interval, and condition-based auto-scheduling for Google ADK agents — zero patching, full FastAPI integration
Project description
adk-task-scheduler
Auto-scheduling for Google ADK agents.
Bake cron, interval, or condition-based triggers directly into your agent definitions so they self-wake — no separate orchestrator, no cloud scheduler, no code changes to ADK.
Why
ADK agents are purely invocation-driven. Out of the box an agent only runs when a user or external system sends a message. adk-task-scheduler adds a self-wake capability that coexists with the standard ad-hoc POST /run interface:
Without this library With this library
───────────────────────────── ──────────────────────────────────────────
User/system → POST /run → agent User/system → POST /run → agent (unchanged)
APScheduler tick → agent (new)
How it works
get_fast_api_app (the ADK function that powers adk api_server) accepts a lifespan= parameter. This library builds an APScheduler AsyncIOScheduler lifespan and passes it in — no monkey-patching, no ADK fork required. ADK's own runner_dict handles all ad-hoc calls; the scheduler maintains a separate RunnerPool so the two paths never interfere.
┌──────────────────────────────────────────────────────────────┐
│ FastAPI app (returned by build_scheduled_app) │
│ │
│ ┌─────────────────────────┐ ┌──────────────────────────┐ │
│ │ ADK runner_dict │ │ Scheduler RunnerPool │ │
│ │ POST /run │ │ cron / interval / │ │
│ │ POST /run_sse │ │ condition triggers │ │
│ │ WebSocket /run_live │ │ │ │
│ │ GET/POST /a2a/... │ │ Separate Runner per │ │
│ └─────────────────────────┘ │ app_name — no overlap │ │
│ └──────────────────────────┘ │
│ ADK lifespan ── scheduler lifespan (composed) │
└──────────────────────────────────────────────────────────────┘
Installation
pip install adk-task-scheduler
Requirements: Python ≥ 3.10, google-adk ≥ 1.7.0
Quick start
1. Attach a schedule to root_agent
Note: Python's
@decoratorsyntax only applies todef/classstatements, not variable assignments. Usewith_schedule()(the recommended API) or thescheduled()(agent)call form.
# agents/ticker_agent/agent.py
from google.adk.agents import Agent
from adk_task_scheduler import with_schedule
root_agent = with_schedule(
Agent(
name="ticker_agent",
model="gemini-2.0-flash",
instruction="""
You are a periodic monitoring agent.
When the user message is exactly '__tick__', you are being invoked by
the scheduler (not a real user). Run your monitoring routine and report
status. Do NOT ask clarifying questions.
For any other message, respond normally as a helpful assistant.
""",
),
interval_seconds=60,
trigger_text="__tick__",
on_response=lambda text: print(f"[monitor] {text}"),
)
2. Build the app
# main.py
from adk_task_scheduler import build_scheduled_app
app = build_scheduled_app(
agents_dir="./agents",
auto_discover=True, # picks up with_schedule() agents automatically
web=False,
a2a=True,
)
uvicorn main:app --host 0.0.0.0 --port 8000
The agent now:
- Responds to ad-hoc
POST /runcalls (standard ADK — unchanged). - Self-wakes every 60 seconds and logs its response.
Examples
Cron-triggered daily briefing agent
# agents/briefing_agent/agent.py
import logging
from google.adk.agents import Agent
from adk_task_scheduler import with_schedule
logger = logging.getLogger(__name__)
root_agent = with_schedule(
Agent(
name="briefing_agent",
model="gemini-2.0-flash",
instruction="""
You are a morning briefing assistant.
When the user message is '__morning__', produce a concise daily briefing:
- Key tasks for the day (you may fabricate plausible examples)
- Weather summary for London
- One motivational note
For other messages, respond normally.
""",
),
cron="0 8 * * 1-5", # 08:00 every weekday
trigger_text="__morning__",
user_id="briefing-system",
session_service_uri="sqlite:///./briefing.db", # persist sessions
on_response=lambda text: logger.info("Daily briefing:\n%s", text),
on_error=lambda exc: logger.error("Briefing failed: %s", exc),
)
Condition-triggered market monitor
# agents/market_monitor/agent.py
import logging
from datetime import datetime, timezone
from google.adk.agents import Agent
from adk_task_scheduler import with_schedule
logger = logging.getLogger(__name__)
def is_market_open() -> bool:
"""True during NYSE trading hours Mon-Fri 14:30-21:00 UTC."""
now = datetime.now(timezone.utc)
if now.weekday() >= 5: # weekend
return False
return 14 <= now.hour < 21
root_agent = with_schedule(
Agent(
name="market_monitor",
model="gemini-2.0-flash",
instruction="""
You are a market monitoring agent.
When the user message is '__market_check__', analyse current market
conditions (simulate with plausible data) and flag any anomalies.
Keep the response under 3 bullet points.
For other messages respond normally.
""",
),
condition=is_market_open, # evaluated every 5 minutes
condition_poll_interval=300,
trigger_text="__market_check__",
max_concurrent_runs=1, # never overlap
on_response=lambda text: logger.info("[market] %s", text),
)
Multi-agent: mix scheduled and ad-hoc in one app
agents/
├── briefing_agent/
│ ├── __init__.py
│ └── agent.py ← with_schedule(cron="0 8 * * 1-5")
├── market_monitor/
│ ├── __init__.py
│ └── agent.py ← with_schedule(condition=is_market_open)
└── assistant/
├── __init__.py
└── agent.py ← plain Agent, no schedule (ad-hoc only)
# main.py
from adk_task_scheduler import build_scheduled_app
app = build_scheduled_app(
agents_dir="./agents",
auto_discover=True, # discovers all three agents
web=False,
a2a=True,
session_service_uri="sqlite:///./sessions.db",
)
auto_discover=True (default) scans agents_dir for any root_agent that
carries a schedule config. Agents without a schedule are registered in ADK's
router as usual and remain fully available for ad-hoc calls.
Explicit ScheduleConfig (wiring outside agent.py)
Useful when the schedule is environment-specific (e.g. different cron for staging vs. production) and you don't want it hard-coded in the agent file:
# main.py
import os
from adk_task_scheduler import ScheduleConfig, build_scheduled_app
from agents.reporter.agent import root_agent # plain Agent, no schedule
cfg = ScheduleConfig(
agent=root_agent,
cron=os.environ["REPORT_CRON"], # e.g. "0 6 * * *"
trigger_text="__generate_report__",
user_id="report-system",
session_service_uri=os.environ["DB_URI"],
on_response=lambda text: publish_to_slack(text),
on_error=lambda exc: alert_oncall(exc),
max_concurrent_runs=1,
misfire_grace_time=120,
)
app = build_scheduled_app(
agents_dir="./agents",
schedules=[cfg],
auto_discover=False, # opt out of scanning when configs are explicit
web=False,
a2a=True,
)
Trigger types
| Parameter | Type | Description |
|---|---|---|
cron="0 8 * * 1-5" |
str |
Standard 5-field crontab (APScheduler CronTrigger). |
interval_seconds=300 |
int |
Fixed interval. Must be ≥ 1. |
condition=fn |
Callable[[], Any] |
Polled every condition_poll_interval seconds (default 60). Agent fires when fn() is truthy. Supports both sync and async callables. |
API reference
with_schedule(agent, *, ...) ← recommended
from adk_task_scheduler import with_schedule
root_agent = with_schedule(
Agent(name="my_agent", ...),
cron="0 * * * *",
trigger_text="__tick__",
session_service_uri="sqlite:///./sessions.db",
on_response=lambda text: print(text),
on_error=lambda exc: print(f"Error: {exc}"),
)
| Parameter | Type | Default | Description |
|---|---|---|---|
cron |
str |
— | Crontab expression |
interval_seconds |
int |
— | Seconds between runs (≥ 1) |
condition |
Callable[[], Any] |
— | Condition function |
condition_poll_interval |
int |
60 |
How often (s) to evaluate condition |
trigger_text |
str |
"__tick__" |
Synthetic user message |
user_id |
str |
"adk-scheduler" |
Session user identity |
session_service_uri |
str |
— | SQLAlchemy URI; defaults to in-memory |
on_response |
Callable[[str], Any] |
— | Called with the agent's final text |
on_error |
Callable[[Exception], Any] |
— | Called on invocation failure |
max_concurrent_runs |
int |
1 |
Max overlapping invocations |
misfire_grace_time |
int |
30 |
APScheduler misfire grace (seconds) |
scheduled(**kwargs) → Callable[[BaseAgent], BaseAgent]
Equivalent to with_schedule but curried:
from adk_task_scheduler import scheduled
root_agent = scheduled(interval_seconds=30)(Agent(...))
All parameters identical to with_schedule.
build_scheduled_app(*, agents_dir, ...) → FastAPI
Drop-in for get_fast_api_app. All get_fast_api_app keyword arguments
(a2a, session_service_uri, allow_origins, trace_to_cloud, etc.) are
forwarded verbatim.
| Extra parameter | Type | Default | Description |
|---|---|---|---|
schedules |
list[ScheduleConfig] |
[] |
Explicit schedule configs |
auto_discover |
bool |
True |
Scan agents_dir for scheduled agents |
web |
bool |
False |
Passed to get_fast_api_app |
ScheduleConfig
Dataclass holding all scheduling metadata. Construct directly when you need
environment-specific configuration that shouldn't live in agent.py.
Session strategy
Each scheduled invocation creates a fresh session by default, so runs are
fully isolated. To retain state across ticks (e.g. accumulate context over
time), pass a session_service_uri pointing to a persistent store and manage
session IDs yourself via extra_state or the on_response callback.
ScheduleConfig(
agent=root_agent,
interval_seconds=3600,
session_service_uri="sqlite:///./scheduler.db",
extra_state={"environment": "production", "tenant": "acme"},
)
Development
git clone https://github.com/STHITAPRAJNAS/adk-task-scheduler.git
cd adk-task-scheduler
python -m venv .venv && source .venv/bin/activate
pip install -e ".[dev]"
# Run tests
pytest
# Lint
ruff check .
Running the example
cd example
pip install -e "..[dev]" # install from parent
uvicorn main:app --reload --log-level info
The ticker_agent will fire every 30 seconds and log its response. You can
also call it ad-hoc:
curl -s -X POST http://localhost:8000/run \
-H "Content-Type: application/json" \
-d '{
"app_name": "ticker_agent",
"user_id": "me",
"session_id": "test-1",
"new_message": {"role": "user", "parts": [{"text": "What time is it?"}]}
}' | python -m json.tool
Releasing
This project uses PyPI Trusted Publishing (OIDC) — no API token needed.
Steps:
- Bump the version in
pyproject.tomlandadk_task_scheduler/__init__.py. - Add a section to
CHANGELOG.md. - Commit and push to
main. - Tag the release:
git tag v0.2.0 git push origin v0.2.0
- The publish workflow triggers automatically on
v*.*.*tags, runs the full test suite, then uploads to PyPI viapypa/gh-action-pypi-publish.
One-time setup (first release only):
Add a Trusted Publisher entry on pypi.org/manage/account/publishing/ pointing to STHITAPRAJNAS/adk-task-scheduler, workflow publish.yml, and environment pypi.
License
Apache 2.0 — see LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file adk_task_scheduler-0.1.2.tar.gz.
File metadata
- Download URL: adk_task_scheduler-0.1.2.tar.gz
- Upload date:
- Size: 24.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cb4c1b74651db6ee1923820324e917c89608c7bb14caa69ec1943986e4344882
|
|
| MD5 |
81852d99b9f4a9bed0c2957bb5ec4759
|
|
| BLAKE2b-256 |
392475f4eae886bee58b67f55c24093d0d966d03798f981360bcd70e59d0b4e9
|
Provenance
The following attestation bundles were made for adk_task_scheduler-0.1.2.tar.gz:
Publisher:
publish.yml on STHITAPRAJNAS/adk-task-scheduler
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
adk_task_scheduler-0.1.2.tar.gz -
Subject digest:
cb4c1b74651db6ee1923820324e917c89608c7bb14caa69ec1943986e4344882 - Sigstore transparency entry: 1439240184
- Sigstore integration time:
-
Permalink:
STHITAPRAJNAS/adk-task-scheduler@01cfd118a03632bf045a37526a6ea1f795f13328 -
Branch / Tag:
refs/tags/v0.1.2 - Owner: https://github.com/STHITAPRAJNAS
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@01cfd118a03632bf045a37526a6ea1f795f13328 -
Trigger Event:
push
-
Statement type:
File details
Details for the file adk_task_scheduler-0.1.2-py3-none-any.whl.
File metadata
- Download URL: adk_task_scheduler-0.1.2-py3-none-any.whl
- Upload date:
- Size: 19.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6149471fa17dac1655abb7a4794f3d860f4df6e2f9b67d3cea4badadddae5e80
|
|
| MD5 |
a69d42303c23e14763c6fd9a09a3732f
|
|
| BLAKE2b-256 |
61680f4e63649aeaee675dc4da4acdc00182b8f0e2175739418143529ec66f2b
|
Provenance
The following attestation bundles were made for adk_task_scheduler-0.1.2-py3-none-any.whl:
Publisher:
publish.yml on STHITAPRAJNAS/adk-task-scheduler
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
adk_task_scheduler-0.1.2-py3-none-any.whl -
Subject digest:
6149471fa17dac1655abb7a4794f3d860f4df6e2f9b67d3cea4badadddae5e80 - Sigstore transparency entry: 1439240189
- Sigstore integration time:
-
Permalink:
STHITAPRAJNAS/adk-task-scheduler@01cfd118a03632bf045a37526a6ea1f795f13328 -
Branch / Tag:
refs/tags/v0.1.2 - Owner: https://github.com/STHITAPRAJNAS
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@01cfd118a03632bf045a37526a6ea1f795f13328 -
Trigger Event:
push
-
Statement type: