Create web-APIs for long-running tasks
Project description
FastTaskAPI
Create web-APIs for long-running tasks
Call the server and return a job id. Get the result with the job id later.
FastTaskAPI creates threaded jobs and job queues on the fly.
Run services anywhere, be it local, hosted or serverless.
We at SocAIty developed FastTaskAPI to create and deploy our AI services as easy and standardized as possible. Built on-top of FastAPI and runpod you can built high quality endpoints with proven stability.
Table of contents
Introduction
- Why is this useful?: A section explaining why you should use this package.
- What does this do?: A section explaining the features of this package.
Get started:
- Installation: A section explaining how to install the package.
- First-steps: Create your first service with the socaity router.
- Jobs and job queues: A section explaining how to use the job queue functionality.
- File uploads and files: A section explaining how to use file uploads and files.
- Backends and deploying a service: Deploy serverless for example with runpod.
Why is this useful?
Creating services for long-running tasks is hard.
- In AI services inference time makes realtime results difficult. Parallel jobs, and a Job queue is often required. For example as a client you would not like to wait for a server response instead do some work until the server produced the result.
- Serverless deployments like runpod often DO NOT provide routing functionality. This router works in this conditions.
- Scaling AI services is hard.
- Streaming services (for example for generative models) is complicated to setup.
- Same Syntax for all kind of hosting providers and services.
This package solves these problems, by providing a simple well-known interface to deploy AI services anywhere.
The syntax is oriented by the simplicity of fastapi. Other hazards are taken care of by our router.
What does this do?
- Jobs, job queues for your service (no code required).
- Routing functionality: for serverless providers like Runpod
- Async, sync and streaming functionality.
- Including progress bars.
- File support, also for serverless providers like Runpod
- Simplified sending files to the services with fastSDK
- One line file response with MediaToolkit including images, audio, video and more.
- Integration: integrates neatly into the SOCAITY ecosystem for running AI services like python functions with our Client/fastSDK.
- Monitoring server state.
The code is fast, lightweight, flexible and pure python.
Installation
You can install the package with PIP, or clone the repository.
# install from pypi
pip install fast-task-api
# install from github for the newest version
pip install git+git://github.com/SocAIty/FastTaskAPI
How to use
Create your first service
Use the decorator syntax @app.task_endpoint to add an endpoint. This syntax is similar to fastapi's @app.get syntax.
from fast_task_api import FastTaskAPI, ImageFile
# define the app including your provider (fastapi, runpod..)
app = FastTaskAPI()
# add endpoints to your service
@app.task_endpoint("/predict")
def predict(my_param1: str, my_param2: int = 0):
return f"my_awesome_prediction {my_param1} {my_param2}"
@app.task_endpoint("/img2img", queue_size=10)
def my_image_manipulator(upload_img: ImageFile):
img_as_numpy = upload_img.to_np_array()
# Do some hard work here...
# img_as_numpy = img2img(img_as_numpy)
return ImageFile().from_np_array(img_as_numpy)
# start and run the server
app.start()
If you execute this code you should see the following page under http://localhost:8000/docs.
Jobs and job queues
If you have a long running task, you can use the job queue functionality.
@app.task_endpoint(path="/make_fries", queue_size=100)
def make_fries(job_progress: JobProgress, fries_name: str, amount: int = 1):
job_progress.set_status(0.1, f"started new fries creation {fries_name}")
time.sleep(1)
job_progress.set_status(0.5, f"I am working on it. Lots of work to do {amount}")
time.sleep(2)
job_progress.set_status(0.8, "Still working on it. Almost done")
time.sleep(2)
return f"Your fries {fries_name} are ready"
What will happen now is:
- The method will return a "Job" object instead of the result, including a job id. This json is send back to the client.
- By calling the status endpoint with status?job_id=... one gets the result / status of the job.
Note: in case of "runpod", "serverless" this is not necessary, as the job mechanism is handled by runpod deployment.
Calling the endpoints -> Getting the job result
You can call the endpoints with a simple http request. You can try them out in the browser, with curl or Postman. For more convenience with the socaity package, you can use the endpoints like functions.
Use the endpoints like functions with fastSDK.
With fastSDK, you can use the endpoints like a function. FastSDK will deal with the job id and the status requests in the background. This makes it insanely useful for complex scenarios where you use multiple models and endpoints.
Job status and progress bars
You can provide status updates by changing the values of the job_progress object. If you add a parameter named job_progress to the function we will pass that object to the function. If then a client asks for the status of the task, he will get the messages and the process bar. This is for example in the socaity package used to provide a progress bar.
@app.task_endpoint("/predict", queue_size=10)
def predict(job_progress: JobProgress, my_param1: str, my_param2: int = 0):
job_progress._message = "I am working on it"
job_progress._progress = 0.5
job_progress._message = "Still working on it. Almost done"
job_progress._progress = 0.8
return "my_awesome_prediction"
When the return is finished, the job is marked as done and the progress bar is automatically set to 1.
Normal openapi (no-task) endpoints
If you don't want to use the job queue functionality, you can use the @app.endpoint
syntax
@app.endpoint("/my_normal_endpoint", methods=["GET", "POST"]):
def my_normal_endpoint(image: str, my_param2: int = 0):
return f"my_awesome_prediction {my_param1} {my_param2}"
This will return a regular endpoint -> No job_result object with job-id is returned. The method also supports file uploads.
File uploads and files.
The library supports file uploads out of the box. Use the parameter type hints in your method definition to get the file.
from fast_task_api import MediaFile, ImageFile, AudioFile, VideoFile
@app.task_endpoint("/my_upload")
def my_upload(anyfile: MediaFile):
return anyfile.content
FastTaskAPI supports all file-types of media-toolkit. This includes common file types like: ImageFile, AudioFile and VideoFile.
from fast_task_api import ImageFile, AudioFile, VideoFile
@app.task_endpoint("/my_file_upload")
def my_upload_image(image: ImageFile, audio: AudioFile, video: VideoFile):
image_as_np_array = np.array(image)
You can call the endpoints, either with bytes or b64 encoded strings.
Sending requests (and files) to the service with FastSDK
FastSDK also supports MediaFiles and for this reason it natively supports file up/downloads. Once you have added the service in FastSDK you can call it like a python function
mysdk = mySDK() # follow the fastSDK tutorial to set up correctly.
task = upload_image(my_imageFile, myAudioFile, myVideoFile) # uploads the data to the service. Retrieves a job back.
result = task.get_result() # constantly trigger the get_job endpoint in the background until the server finished.
Sending files to the service with httpx / requests
import httpx
with open('my_image_file.png', 'rb') as f:
image_file_content = f.read()
my_files = {
"image": ("file_name", image_file_content, 'image/png')
...
}
response = httpx.Client().post(url, files=my_files)
Note: In case of runpod you need to convert the file to a b64 encoded string.
Sending file with URLs
One property of media-toolkit is, that it support files from URLs. Thus instead of sending a file directly (as bytes) to the endpoints, you can also send a URL to the file location.
my_files = {
"image": "https:/my_cloud_storage/my_image_file.png"
...
}
response = httpx.Client().post(url, files=my_files)
Using a cloud storage provider for file up and download
You can specify a cloud_storage provider. Then instead of sending the file directly back to the client, the file is uploaded to your storage provider and a download link is returned.
Docker & Deploying the service with different backends (hosting providers)
Locally
Just run the server by running your script.
Docker
Prerequisite: You have created a python module "yourmodule.server" with the code that starts the server. Then to start the fast-task-api server in docker, add the following command at the end of your Dockerfile.
# Start the fast-task-api server which is instantiated in the module -m yourmodule.server
CMD [ "python", "-m", "yourmodule.server"]
Additional configuration: Backend, deployment type, host, port
You can change the backend (hosting provider) either by setting it in the constructor or by setting the environment variable.
# Options: "fastapi", "runpod"
ENV FTAPI_BACKEND="runpod"
# Options: "localhost", "serverless"
ENV FTAPI_DEPLOYMENT="serverless"
Depending on the environment it is also necessary to specify the host and port.
# allows any IP from the computer to connect to the host
ENV FTAPI_HOST="0.0.0.0"
# allows the docker container to use the port
ARG port=8080
ENV FTAPI_PORT=$port
EXPOSE $port
Runpod
It is not required to write a handler function anymore. The fast-task-api magic handles it :D Just change the ENV variable and described above. This brings you additional benefits:
- Same syntax as with fastapi
- Better file handling Ultra easy deploy.
Related projects and its differences
Starlette Background Tasks
The fastapi documentation recommends using starlette background tasks for long-running tasks like sending an e-mail.
- No common interface / response type.
- This leads to re-implementing the same functionality over and over again.
- Creates more overhead in the client and server code.
- No job progress and monitoring functionality
- With background tasks clients have no chance to monitor the progress of the job or to know when the job is finished.
- No job queue:
- If you don't have a job queue, the server can be overloaded with tasks pretty fast.
- With socaity you can specify the maximum queue size for a task. If this is exceeded the task is not executed.
This is a good solution for simple tasks, but it does not provide a job queue or job status.
Celery
Celery is a great tool for running jobs in the background on distributed systems. However it comes with several drawbacks:
- Hard to setup
- Doesn't run everywhere
- Overkill for most projects.
Socaity router is lightweight and provides background task functionality abstracted from the developer. This doesn't mean that we don't recommend celery. Indeed it is planned to integrate celery as possible backend.
Roadmap
- stabilize runpod deployment
- streaming support
- add async functionality for fastapi
- support other job-queuing systems like celery
Note: THE PACKAGE IS STILL IN DEVELOPMENT!
LEAVE A STAR TO SUPPORT US. ANY BUG REPORT OR CONTRIBUTION IS HIGHLY APPRECIATED.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for fast_task_api-0.0.6.dev7-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 494c60e3ab1eb7ead4d326f73a301249af2fbc7c2193756abe79a2d724a8e0ec |
|
MD5 | 33d457b628632afdc89c2de4e58ba3e6 |
|
BLAKE2b-256 | 57eb9981a632343be30be87cdc6280197fa6a1ae1f0cb0b79324774a5ed1d212 |