Trigger delayed Cloud Tasks from FastAPI
Project description
FastAPI Cloud Tasks
GCP's Cloud Tasks + FastAPI = Replacement for celery's async delayed tasks.
GCP's Cloud Scheduler + FastAPI = Replacement for celery beat.
FastAPI Cloud Tasks + Cloud Run = Autoscaled delayed tasks.
Concept
Cloud Tasks
allows us to schedule a HTTP request in the future.
FastAPI makes us define complete schema and params for an HTTP endpoint.
Cloud Scheduler
allows us to schedule recurring HTTP requests in the future.
FastAPI Cloud Tasks works by putting the three together:
- It adds a
.delay
method to existing routes on FastAPI. - When this method is called, it schedules a request with Cloud Tasks.
- The task worker is a regular FastAPI server which gets called by Cloud Tasks.
- It adds a
.scheduler
method to existing routes on FastAPI. - When this method is called, it schedules a recurring job with Cloud Scheduler.
If we host the task worker on Cloud Run, we get autoscaling workers.
Pseudocode
In practice, this is what it looks like:
delayed_router = APIRouter(route_class=DelayedRouteBuilder(...))
scheduled_router = APIRouter(route_class=ScheduledRouteBuilder(...))
class Recipe(BaseModel):
ingredients: List[str]
@delayed_router.post("/{restaurant}/make_dinner")
async def make_dinner(restaurant: str, recipe: Recipe):
# Do a ton of work here.
@scheduled_router.post("/home_cook")
async def home_cook(recipe: Recipe):
# Make my own food
app.include_router(delayed_router)
app.include_router(scheduled_router)
# If you wan to make your own breakfast every morning at 7AM IST.
home_cook.scheduler(name="test-home-cook-at-7AM-IST", schedule="0 7 * * *", time_zone="Asia/Kolkata").schedule(recipe=Recipe(ingredients=["Milk","Cereal"]))
Now we can trigger the task with
make_dinner.delay(restaurant="Taj", recipe=Recipe(ingredients=["Pav","Bhaji"]))
If we want to trigger the task 30 minutes later
make_dinner.options(countdown=1800).delay(...)
Running
Local
Pre-requisites:
- Create a task queue and copy the project id, location and queue name.
- Install and ensure that ngrok works.
We will need a an API endpoint to give to cloud tasks, so let us fire up ngrok on local
ngrok http 8000
You'll see something like this
Forwarding http://feda-49-207-221-153.ngrok.io -> http://localhost:8000
# complete file: examples/simple/main.py
# First we construct our DelayedRoute class with all relevant settings
# This can be done once across the entire project
DelayedRoute = DelayedRouteBuilder(
base_url="http://feda-49-207-221-153.ngrok.io",
queue_path=queue_path(
project="gcp-project-id",
location="asia-south1",
queue="test-queue",
),
)
delayed_router = APIRouter(route_class=DelayedRoute, prefix="/tasks")
class Payload(BaseModel):
message: str
@delayed_router.post("/hello")
async def hello(p: Payload = Payload(message="Default")):
logger.warning(f"Hello task ran with payload: {p.message}")
# Define our app and add trigger to it.
app = FastAPI()
@app.get("/trigger")
async def trigger():
# Trigger the task
hello.delay(p=Payload(message="Triggered task"))
return {"message": "Hello task triggered"}
app.include_router(delayed_router)
Start running the task runner on port 8000 so that it is accessible from cloud tasks.
uvicorn main:app --reload --port 8000
In another terminal, trigger the task with curl
curl http://localhost:8000/trigger
Check the logs on the server, you should see
WARNING: Hello task ran with payload: Triggered task
Note: You can read complete working source code of the above example in examples/simple/main.py
In the real world you'd have a separate process for task runner and actual task.
Cloud Run
Running on Cloud Run with authentication needs us to supply an OIDC token. To do that we can use a hook
.
Pre-requisites:
- Create a task queue. Copy the project id, location and queue name.
- Deploy the worker as a service on Cloud Run and copy it's URL.
- Create a service account in cloud IAM and add
Cloud Run Invoker
role to it.
We'll only edit the parts from above that we need changed from above example.
# URL of the Cloud Run service
base_url = "https://hello-randomchars-el.a.run.app"
DelayedRoute = DelayedRouteBuilder(
base_url=base_url,
# Task queue, same as above.
queue_path=queue_path(...),
pre_create_hook=oidc_task_hook(
token=tasks_v2.OidcToken(
# Service account that you created
service_account_email="fastapi-cloud-tasks@gcp-project-id.iam.gserviceaccount.com",
audience=base_url,
),
),
)
Check the fleshed out example at examples/full/tasks.py
Configuration
DelayedRouteBuilder
Usage:
DelayedRoute = DelayedRouteBuilder(...)
delayed_router = APIRouter(route_class=DelayedRoute)
@delayed_router.get("/simple_task")
def mySimpleTask():
return {}
-
base_url
- The URL of your worker FastAPI service. -
queue_path
- Full path of the Cloud Tasks queue. (Hint: use the util functionqueue_path
) -
task_create_timeout
- How long should we wait before giving up on creating cloud task. -
pre_create_hook
- If you need to edit theCreateTaskRequest
before sending it to Cloud Tasks (eg: Auth for Cloud Run), you can do that with this hook. See hooks section below for more. -
client
- If you need to override the Cloud Tasks client, pass the client here. (eg: changing credentials, transport etc)
Task level default options
Usage:
@task_router.get("/simple_task")
@task_default_options(...)
def mySimpleTask():
return {}
All options from above can be passed as kwargs
to the decorator.
Additional options:
countdown
- Seconds in the future to schedule the task.task_id
- named task id for deduplication. (One task id will only be queued once.)
Eg:
# Trigger after 5 minutes
@task_router.get("/simple_task")
@task_default_options(countdown=300)
def mySimpleTask():
return {}
Delayer Options
Usage:
mySimpleTask.options(...).delay()
All options from above can be overriden per call (including DelayedRouteBuilder options like base_url
) with kwargs to the options
function before calling delay.
Example:
# Trigger after 2 minutes
mySimpleTask.options(countdown=120).delay()
ScheduledRouteBuilder
Usage:
ScheduledRoute = ScheduledRouteBuilder(...)
scheduled_router = APIRouter(route_class=ScheduledRoute)
@scheduled_router.get("/simple_scheduled_task")
def mySimpleScheduledTask():
return {}
mySimpleScheduledTask.scheduler(name="simple_scheduled_task", schedule="* * * * *").schedule()
Hooks
We might need to override things in the task being sent to Cloud Tasks. The pre_create_hook
allows us to do that.
Some hooks are included in the library.
oidc_delayed_hook
/oidc_scheduled_hook
- Used to pass OIDC token (for Cloud Run etc).deadline_delayed_hook
/deadline_scheduled_hook
- Used to change the timeout for the worker of a task. (PS: this deadline is decided by the sender to the queue and not the worker)chained_hook
- If you need to chain multiple hooks together, you can do that withchained_hook(hook1, hook2)
Future work
- Ensure queue exists.
- Make helper features for worker's side. Eg:
- Easier access to current retry count.
- API Exceptions to make GCP back-off.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for fastapi-cloud-tasks-0.1.0rc1.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4db763e2088ad686a3f907908bd6f9c4d79088ecde0c1cab69ea881f66324f44 |
|
MD5 | 5ade26498dd29c75dc3f0046ddca5c8c |
|
BLAKE2b-256 | d4b72b6ccc337736b3914ba30653b03bb518d3c587ef32a50828dc893ddf2b2d |
Hashes for fastapi_cloud_tasks-0.1.0rc1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2d1117d73a1ee465e8387925b41f5353b103ca5b7f0fab07f3ea47799d493d14 |
|
MD5 | d651332406fef27e21fb4e80178df29b |
|
BLAKE2b-256 | 99bbb0196a225bfe0fe781c447eeceebfc8080aa3929351cf8f548f7024e4125 |