Skip to main content

Create web-APIs for long-running tasks

Project description

FastTaskAPI

Create web-APIs for long-running tasks

Call the server and return a job id. Get the result with the job id later.
FastTaskAPI creates threaded jobs and job queues on the fly.
Run services anywhere, be it local, hosted or serverless.

We at SocAIty developed FastTaskAPI to create and deploy our AI services as easy and standardized as possible. Built on-top of FastAPI and runpod you can built high quality endpoints with proven stability.

Table of contents

Introduction

Get started:

Why is this useful?

Creating services for long-running tasks is hard.

  • In AI services inference time makes realtime results difficult. Parallel jobs, and a Job queue is often required. For example as a client you would not like to wait for a server response instead do some work until the server produced the result.
  • Serverless deployments like runpod often DO NOT provide routing functionality. This router works in this conditions.
  • Scaling AI services is hard.
  • Streaming services (for example for generative models) is complicated to setup.
  • Same Syntax for all kind of hosting providers and services.

This package solves these problems, by providing a simple well-known interface to deploy AI services anywhere.
The syntax is oriented by the simplicity of fastapi. Other hazards are taken care of by our router.

What does this do?

  • Jobs, job queues for your service (no code required).
  • Routing functionality: for serverless providers like Runpod
  • Async, sync and streaming functionality.
    • Including progress bars.
  • File support, also for serverless providers like Runpod
    • Simplified sending files to the services with fastSDK
    • One line file response with MediaToolkit including images, audio, video and more.
  • Integration: integrates neatly into the SOCAITY ecosystem for running AI services like python functions with our Client/fastSDK.
  • Monitoring server state.

The code is fast, lightweight, flexible and pure python.

Installation

You can install the package with PIP, or clone the repository.

# install from pypi
pip install fast-task-api
# install from github for the newest version
pip install git+git://github.com/SocAIty/FastTaskAPI

How to use

Create your first service

Use the decorator syntax @app.task_endpoint to add an endpoint. This syntax is similar to fastapi's @app.get syntax.

from fast_task_api import FastTaskAPI, ImageFile

# define the app including your provider (fastapi, runpod..)
app = FastTaskAPI()

# add endpoints to your service
@app.task_endpoint("/predict")
def predict(my_param1: str, my_param2: int = 0):
  return f"my_awesome_prediction {my_param1} {my_param2}"

@app.task_endpoint("/img2img", queue_size=10)
def my_image_manipulator(upload_img: ImageFile):
  img_as_numpy = upload_img.to_np_array() 
  # Do some hard work here...
  # img_as_numpy = img2img(img_as_numpy)
  return ImageFile().from_np_array(img_as_numpy)

# start and run the server
app.start()

If you execute this code you should see the following page under http://localhost:8000/docs.

Jobs and job queues

If you have a long running task, you can use the job queue functionality.

@app.task_endpoint(path="/make_fries", queue_size=100)
def make_fries(job_progress: JobProgress, fries_name: str, amount: int = 1):
    job_progress.set_status(0.1, f"started new fries creation {fries_name}")
    time.sleep(1)
    job_progress.set_status(0.5, f"I am working on it. Lots of work to do {amount}")
    time.sleep(2)
    job_progress.set_status(0.8, "Still working on it. Almost done")
    time.sleep(2)
    return f"Your fries {fries_name} are ready"

What will happen now is:

  • The method will return a "Job" object instead of the result, including a job id. This json is send back to the client.
  • By calling the status endpoint with status?job_id=... one gets the result / status of the job.

Note: in case of "runpod", "serverless" this is not necessary, as the job mechanism is handled by runpod deployment.

Calling the endpoints -> Getting the job result

You can call the endpoints with a simple http request. You can try them out in the browser, with curl or Postman. For more convenience with the socaity package, you can use the endpoints like functions.

Use the endpoints like functions with fastSDK.

With fastSDK, you can use the endpoints like a function. FastSDK will deal with the job id and the status requests in the background. This makes it insanely useful for complex scenarios where you use multiple models and endpoints.

Job status and progress bars

You can provide status updates by changing the values of the job_progress object. If you add a parameter named job_progress to the function we will pass that object to the function. If then a client asks for the status of the task, he will get the messages and the process bar. This is for example in the socaity package used to provide a progress bar.

@app.task_endpoint("/predict", queue_size=10)
def predict(job_progress: JobProgress, my_param1: str, my_param2: int = 0):
  job_progress._message = "I am working on it"
  job_progress._progress = 0.5
  job_progress._message = "Still working on it. Almost done"
  job_progress._progress = 0.8
  return "my_awesome_prediction"

When the return is finished, the job is marked as done and the progress bar is automatically set to 1.

Normal openapi (no-task) endpoints

If you don't want to use the job queue functionality, you can use the @app.endpoint syntax

@app.endpoint("/my_normal_endpoint", methods=["GET", "POST"]):
def my_normal_endpoint(image: str, my_param2: int = 0):
  return f"my_awesome_prediction {my_param1} {my_param2}"

This will return a regular endpoint -> No job_result object with job-id is returned. The method also supports file uploads.

File uploads and files.

The library supports file uploads out of the box. Use the parameter type hints in your method definition to get the file.

from fast_task_api import MediaFile, ImageFile, AudioFile, VideoFile

@app.task_endpoint("/my_upload")
def my_upload(anyfile: MediaFile):
    return anyfile.content

FastTaskAPI supports all file-types of media-toolkit. This includes common file types like: ImageFile, AudioFile and VideoFile.

from fast_task_api import ImageFile, AudioFile, VideoFile

@app.task_endpoint("/my_file_upload")
def my_upload_image(image: ImageFile, audio: AudioFile, video: VideoFile):
    image_as_np_array = np.array(image)

You can call the endpoints, either with bytes or b64 encoded strings.

Sending requests (and files) to the service with FastSDK

FastSDK also supports MediaFiles and for this reason it natively supports file up/downloads. Once you have added the service in FastSDK you can call it like a python function

mysdk = mySDK() # follow the fastSDK tutorial to set up correctly.
task = upload_image(my_imageFile, myAudioFile, myVideoFile) # uploads the data to the service. Retrieves a job back.
result = task.get_result()  # constantly trigger the get_job endpoint in the background until the server finished.

Sending files to the service with httpx / requests

import httpx
with open('my_image_file.png', 'rb') as f:
    image_file_content = f.read()

my_files = {
  "image": ("file_name", image_file_content, 'image/png')
  ...
}
response = httpx.Client().post(url, files=my_files)

Note: In case of runpod you need to convert the file to a b64 encoded string.

Sending file with URLs

One property of media-toolkit is, that it support files from URLs. Thus instead of sending a file directly (as bytes) to the endpoints, you can also send a URL to the file location.

my_files = {
  "image": "https:/my_cloud_storage/my_image_file.png"
  ...
}
response = httpx.Client().post(url, files=my_files)

Deployment of the service with different backends (hosting providers)

Locally

Just run the server by running your script app.start() this will spin-up uvicorn on localhost:port.

Docker

Prerequisite: You have created a python module "yourmodule.server" with the code that starts the server. Then to start the fast-task-api server in docker, add the following command at the end of your Dockerfile.

# Start the fast-task-api server which is instantiated in the module -m yourmodule.server
CMD [ "python", "-m", "yourmodule.server"]

Additional configuration: Backend, deployment type, host, port

You can change the backend (hosting provider) either by setting it in the constructor or by setting the environment variable.

# Options: "fastapi", "runpod"
ENV FTAPI_BACKEND="runpod"
# Options: "localhost", "serverless"
ENV FTAPI_DEPLOYMENT="serverless"

Depending on the environment it is also necessary to specify the host and port.

# allows any IP from the computer to connect to the host
ENV FTAPI_HOST="0.0.0.0"
# allows the docker container to use the port
ARG port=8080
ENV FTAPI_PORT=$port
EXPOSE $port 

Runpod

It is not required to write a handler function anymore. The fast-task-api magic handles it :D Just change the ENV variable and described above. This brings you additional benefits:

  • Same syntax as with fastapi
  • Better file handling ultra easy deploy.

Related projects and its differences

Starlette Background Tasks

The fastapi documentation recommends using starlette background tasks for long-running tasks like sending an e-mail.

  • No common interface / response type.
    • This leads to re-implementing the same functionality over and over again.
    • Creates more overhead in the client and server code.
  • No job progress and monitoring functionality
    • With background tasks clients have no chance to monitor the progress of the job or to know when the job is finished.
  • No job queue:
    • If you don't have a job queue, the server can be overloaded with tasks pretty fast.
    • With socaity you can specify the maximum queue size for a task. If this is exceeded the task is not executed.

This is a good solution for simple tasks, but it does not provide a job queue or job status.

Celery

Celery is a great tool for running jobs in the background on distributed systems. However it comes with several drawbacks:

  • Hard to setup
  • Doesn't run everywhere
  • Overkill for most projects.

Socaity router is lightweight and provides background task functionality abstracted from the developer. This doesn't mean that we don't recommend celery. Indeed, it is planned to integrate celery as possible backend.

Roadmap

  • stabilize runpod deployment
  • streaming support
  • add async functionality for fastapi
  • support other job-queuing systems like celery

Note: THE PACKAGE IS STILL IN DEVELOPMENT!

LEAVE A STAR TO SUPPORT US. ANY BUG REPORT OR CONTRIBUTION IS HIGHLY APPRECIATED.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fast_task_api-0.0.7.tar.gz (35.1 kB view details)

Uploaded Source

Built Distribution

fast_task_api-0.0.7-py3-none-any.whl (35.5 kB view details)

Uploaded Python 3

File details

Details for the file fast_task_api-0.0.7.tar.gz.

File metadata

  • Download URL: fast_task_api-0.0.7.tar.gz
  • Upload date:
  • Size: 35.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.10.11

File hashes

Hashes for fast_task_api-0.0.7.tar.gz
Algorithm Hash digest
SHA256 cba59f2c417fb8188627a7b36f3e7b324a8c859ff96a0cefaca97d07b14720d6
MD5 fe7cc13a3ecff3dcb7bee528b68bbd66
BLAKE2b-256 bdb0de044f73e7638761a31d9aba0cc57e8685267743dc369fa2a70b5dc92d83

See more details on using hashes here.

File details

Details for the file fast_task_api-0.0.7-py3-none-any.whl.

File metadata

File hashes

Hashes for fast_task_api-0.0.7-py3-none-any.whl
Algorithm Hash digest
SHA256 220239236114453cb22d18163a963d2a2e2101aec0be8423d4983b1509a9326f
MD5 6d005e8792fe276f5aa246df80231593
BLAKE2b-256 185cdf4ac2b5f37b03e79e18e3c0134e085b881ae8483730a20d5b0865631e8e

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page