A library to create a FastAPI-based OGC API Processes wrapper around existing projects.
Project description
fastprocesses
A library to create a FastAPI-based OGC API Processes wrapper around existing projects. This library simplifies the process of defining and registering processes, making it easy to build and deploy OGC API Processes.
AI helped to create this code.
Version: 0.14.0
Description
fastprocesses is a Python library that provides a simple and efficient way to create OGC API Processes using FastAPI. It allows you to define processes, register them, and expose them through a FastAPI application with minimal effort, following the OGC API Processes 1.0.0 specification.
Features
- OGC API Processes Compliance: Fully implements the OGC API Processes 1.0.0 Core specification
- FastAPI Integration: Leverages FastAPI for building high-performance APIs
- Process Management: Supports both synchronous and asynchronous process execution
- Job Control: Implements job control options (sync-execute, async-execute)
- Output Handling: Supports various output transmission modes (value, reference)
- Result Caching: Built-in Redis-based caching for process results
- Celery Integration: Asynchronous task processing using Celery
- Pydantic Models: Strong type validation for process inputs and outputs
- Logging: Uses
logurufor modern logging with rotation support
fastprocesses uses Celery for async execution of arbitrary processes and result retrieval from a backend like Redis. For deterministic processes, that means processes that return the same results for identical inputs, redis is used as temporary cache. For both, the celery backend and the temporary Redis cache, time to live can be configured.
Architecture
graph TB
subgraph Client
CLI[Client Request]
end
subgraph FastAPI Application
API[OGCProcessesAPI]
Router[API Router]
PM[ProcessManager]
PR[ProcessRegistry]
end
subgraph Redis
RC[Redis Cache]
RR[Redis Registry]
end
subgraph Process
BP[BaseProcess]
SP[SimpleProcess]
end
subgraph Worker
CW[Celery Worker]
CT[CacheResultTask]
end
%% Client interactions
CLI -->|HTTP Request| API
API -->|Route Request| Router
Router -->|Execute Process| PM
%% Process Manager flow
PM -->|Get Process| PR
PM -->|Check Cache| RC
PM -->|Submit Task| CW
PM -->|Get Result| RC
%% Process Registry
PR -->|Store/Retrieve| RR
PR -.->|Registers| SP
SP -->|Inherits| BP
%% Worker flow
CW -->|Execute| SP
CW -->|Cache Result| CT
CT -->|Store| RC
%% Styling
classDef api fill:#f9f,stroke:#333,stroke-width:2px
classDef cache fill:#bbf,stroke:#333,stroke-width:2px
classDef process fill:#bfb,stroke:#333,stroke-width:2px
classDef worker fill:#fbb,stroke:#333,stroke-width:2px
class API,Router api
class RC,RR cache
class BP,SP process
class CW,CT worker
Routes
graph TB
%% Routes
subgraph Routes
RP["GET /processes"]:::route
RPD["GET /processes/{process_id}"]:::route
RE["POST /processes/{process_id}/execution"]:::route
RJ["GET /jobs"]:::route
RJS["GET /jobs/{job_id}"]:::route
RJR["GET /jobs/{job_id}/results"]:::route
end
%% FastAPI Application
subgraph FastAPI Application
PM_get["ProcessManager.get_available_processes"]:::component
PM_get_desc["ProcessManager.get_process_description"]:::component
PM_exec["ProcessManager.execute_process"]:::component
PM_list_jobs["ProcessManager.list_jobs"]:::component
PM_job_status["ProcessManager.get_job_status"]:::component
PM_job_results["ProcessManager.get_job_results"]:::component
PR["ProcessRegistry"]:::component
CT["CacheResultTask"]:::component
CW["Celery Worker"]:::component
%% Integrated Redis Stores
RC["Redis Cache (Temporary Results)"]:::redis
RR["Redis Registry (Process Metadata)"]:::redis
CB["Redis Broker (Celery Tasks)"]:::redis
CR["Redis Backend (Celery Results)"]:::redis
end
%% Routes to ProcessManager
RP -->|List Processes| PM_get
RPD -->|Get Process Description| PM_get_desc
RE -->|Execute Process| PM_exec
RJ -->|List Jobs| PM_list_jobs
RJS -->|Get Job Status| PM_job_status
RJR -->|Get Job Results| PM_job_results
%% ProcessManager to Redis
PM_get -->|Read Process Metadata| RR
PM_get_desc -->|Read Process Metadata| RR
PM_exec -->|Read/Write Temporary Results| RC
PM_exec -->|Submit Task| CB
PM_list_jobs -->|Read Job Metadata| CR
PM_job_status -->|Read Job Metadata| CR
PM_job_results -->|Read Job Results| CR
%% ProcessManager to ProcessRegistry
PM_get -->|Get Processes| PR
PM_get_desc -->|Get Process| PR
%% ProcessRegistry to Redis
PR -->|Store/Retrieve Process Metadata| RR
%% Celery Worker Flow
CB -->|Distribute Tasks| CW
CW -->|Execute Process| CT
CT -->|Write Temporary Results| RC
CW -->|Write Task Results| CR
%% Styling
classDef route fill:#f9f,stroke:#333,stroke-width:2px,color:#000
classDef component fill:#bbf,stroke:#333,stroke-width:2px,color:#000
classDef redis fill:#bfb,stroke:#333,stroke-width:2px,color:#000
Usage
- Define a Process: Create a new process by subclassing
BaseProcessand using the@register_processdecorator.
from fastprocesses.core.base_process import BaseProcess
from fastprocesses.core.models import (
ProcessDescription,
ProcessInput,
ProcessJobControlOptions,
ProcessOutput,
ProcessOutputTransmission,
Schema,
)
from fastprocesses.processes.process_registry import register_process
@register_process("simple_process")
class SimpleProcess(BaseProcess):
# Define process description as a class variable
process_description = ProcessDescription(
id="simple_process",
title="Simple Process",
version="1.0.0",
description="A simple example process",
jobControlOptions=[
ProcessJobControlOptions.SYNC_EXECUTE,
ProcessJobControlOptions.ASYNC_EXECUTE
],
outputTransmission=[
ProcessOutputTransmission.VALUE
],
inputs={
"input_text": ProcessInput(
title="Input Text",
description="Text to process",
schema=Schema(
type="string",
minLength=1,
maxLength=1000
)
)
},
outputs={
"output_text": ProcessOutput(
title="Output Text",
description="Processed text",
schema=Schema(
type="string"
)
)
},
keywords=["text", "processing"],
metadata={
"created": "2024-02-19",
"provider": "Example Organization"
}
)
async def execute(
self,
exec_body: Dict[str, Any],
job_progress_callback: JobProgressCallback
) -> Dict[str, Any]:
input_text = inputs["inputs"]["input_text"]
output_text = input_text.upper()
return {"output_text": output_text}
- Create the FastAPI Application:
import uvicorn
from fastprocesses.api.server import OGCProcessesAPI
app = OGCProcessesAPI(
title="Simple Process API",
version="1.0.0",
description="A simple API for running processes"
).get_app()
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
- Start the Services:
Start Redis (required for caching and Celery):
docker run -d -p 6379:6379 redis
Start the Celery worker:
celery -A fastprocesses.worker.celery_app worker
Start the FastAPI application:
poetry run python examples/run_example.py
- Use the API:
Execute a process (async):
curl -X POST "http://localhost:8000/processes/simple_process/execution" \
-H "Content-Type: application/json" \
-H "Prefer: respond-async" \
-d '{
"inputs": {
"input_text": "hello world"
},
"outputs": {
"lower": {}
}
}'
Execute a process (sync):
curl -X POST "http://localhost:8000/processes/simple_process/execution" \
-H "Content-Type: application/json" \
-H "Prefer: respond-sync" \
-d '{
"inputs": {
"input_text": "hello world"
},
"outputs": {
"lower": {}
}
}'
API Endpoints
GET /: Landing pageGET /conformance: OGC API conformance declarationGET /processes: List available processesGET /processes/{process_id}: Get process descriptionPOST /processes/{process_id}/execution: Execute a processGET /jobs: List all jobsGET /jobs/{job_id}: Get job statusGET /jobs/{job_id}/results: Get job results
Configuration
The library can be configured using environment variables:
RESULT_CACHE_HOST="redis"
RESULT_CACHE_PORT=6379
RESULT_CACHE_DB=1
CELERY_BROKER_HOST="redis"
CELERY_BROKER_PORT=6379
CELERY_BROKER_DB=0
CELERY_RESULTS_TTL_DAYS=365 # job results are stored for this time period
CELERY_TASK_TLIMIT_HARD=900 # seconds
CELERY_TASK_TLIMIT_SOFT=600 # seconds
RESULTS_TEMP_TTL_HOURS=48 # this period determines how long results can be retrieved from cache, when the inputs are exactly the same
Notes:
!IMPORTANT!: Cache hash key is based on original unprocessed inputs always. This ensures consistent caching and cache retrieval which does not depend on arbitrary processed data, which can change when the process is updated or changed!
Version Notes
- 0.14.0: Renamed settings and allowed to add metadata to server app, added a html landing page
- 0.13.0: Validation occurs against schema fragment provided by process description
- 0.12.0: results will be retrieved from cache only if inputs and outputs are the same
- 0.11.0: improved error handling
- 0.10.0: improved cache handling and added cache settings
- 0.9.0: read process description from file, added set execution mode via Prefer-header
- 0.8.0: added retry mechanism to Cache class and allow for separate connections each for Celery and results/jobs Cache
- 0.7.0: added progress callback for job updates and SoftTimeLimit for tasks
- 0.6.0: added paging to processes and jobs, including limit and offset query params
- 0.5.0: Extended Schema model
- 0.4.0: Added full OGC API Processes 1.0.0 Core compliance
- 0.3.0: Added job control and output transmission options
- 0.2.0: Added Redis caching and Celery integration
- 0.1.0: Initial release with basic process support
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fastprocesses-0.14.0.tar.gz.
File metadata
- Download URL: fastprocesses-0.14.0.tar.gz
- Upload date:
- Size: 26.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.1 CPython/3.12.9 Linux/6.8.0-1020-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
df781565ab01529138d61acab2f70f973381fe9fb5bc37704a7fe6f23a6d6bd9
|
|
| MD5 |
7ed34389236607edd5eef1791e7aabaf
|
|
| BLAKE2b-256 |
bb930f3212154788f4529cd41c8fae4b0c3513e8866306236c3c24f1f1c1e2ee
|
File details
Details for the file fastprocesses-0.14.0-py3-none-any.whl.
File metadata
- Download URL: fastprocesses-0.14.0-py3-none-any.whl
- Upload date:
- Size: 30.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.1 CPython/3.12.9 Linux/6.8.0-1020-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
387eafdd0acbbc5e9187697ebbe36ac3d9211f0d120f403118bf273a042024ec
|
|
| MD5 |
c84b92bc1097bc3e420e732bcb9b41ec
|
|
| BLAKE2b-256 |
f31599637c49d631e1824fec8bf28ae310805fd96ad00905ba301bfa797c1666
|