Example repository demonstrating integration of Temporal workflows with FastAPI.
Project description
Example repository demonstrating integration of Temporal workflows with FastAPI
Temporal Python
Installation
pip install gadfastemporal
Usage
import contextlib
import contextvars
import typing
import uuid
from fastapi import FastAPI
from pydantic import BaseModel
from gadfastemporal import Temporal
from gadfastemporal import Workflow
from gadfastemporal import converters
from gadfastemporal import decorators
from gadfastemporal import interceptors
contextvar = contextvars.ContextVar("contextvar", default=None)
class Settings(BaseModel):
TEMPORAL_HOST: str = "localhost:7233"
TEMPORAL_NAMESPACE: str = "default"
TEMPORAL_QUEUE: str = "my-queue"
settings = Settings()
@decorators.workflow
class SimpleWorkflow(Workflow):
@staticmethod
def id(_: dict) -> str:
return str(uuid.uuid4())
@staticmethod
def activities() -> typing.List[typing.Callable]:
return [SimpleWorkflow.process]
@decorators.run
async def run(self, command: dict) -> str:
return await temporal.activity.sync(self.process, command)
@staticmethod
@decorators.activity
async def process(data: dict) -> str:
return f"Processed: {data}"
temporal = Temporal(
host=settings.TEMPORAL_HOST,
namespace=settings.TEMPORAL_NAMESPACE,
task_queue=settings.TEMPORAL_QUEUE,
workflows=[SimpleWorkflow],
worker_interceptors=[interceptors.SentryInterceptor()],
client_interceptors=[interceptors.ContextPropagationInterceptor(contexts=[contextvar])],
data_converter=converters.pydantic,
search_attributes=[contextvar],
)
@contextlib.asynccontextmanager
async def lifespan(_: FastAPI):
await temporal.start()
yield
await temporal.shutdown()
app = FastAPI(lifespan=lifespan)
@app.post("/run")
async def run():
data = dict(x=1)
contextvar.set({"Traceparent": "12345"})
return await temporal.workflow.sync(SimpleWorkflow, data)
Up containers
docker compose -f .compose/docker-compose.yml up -d --build
Create search-attribute
temporal operator search-attribute create --name Traceparent --type Keyword
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
gadfastemporal-0.0.1.tar.gz
(10.0 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file gadfastemporal-0.0.1.tar.gz.
File metadata
- Download URL: gadfastemporal-0.0.1.tar.gz
- Upload date:
- Size: 10.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6cf463554b8657670ac17c1c88056c7d06ff3bcf7f94bc89721871ac2796db14
|
|
| MD5 |
27c6ab2bb5b08853b19d1727b43dda54
|
|
| BLAKE2b-256 |
900f3c46be56069e2987d26d91d6a6dbe37962ce5c764bf5e652fb955d74bd3a
|
File details
Details for the file gadfastemporal-0.0.1-py3-none-any.whl.
File metadata
- Download URL: gadfastemporal-0.0.1-py3-none-any.whl
- Upload date:
- Size: 14.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f5fc54436cc436f4c8a05e086f46e0a14832db81db52452126923e2ff6e25a57
|
|
| MD5 |
b24cb6217902137320328235e56373a2
|
|
| BLAKE2b-256 |
3f03106d19095bdffc60c8f4053c221ef5885cde9bf1e034edabe9203fdae99a
|