Skip to main content

Example repository demonstrating integration of Temporal workflows with FastAPI.

Project description

logo

Example repository demonstrating integration of Temporal workflows with FastAPI


Temporal Python

Installation

pip install gadfastemporal

Usage

import contextlib
import contextvars
import typing
import uuid

from fastapi import FastAPI
from pydantic import BaseModel

from gadfastemporal import Temporal
from gadfastemporal import Workflow
from gadfastemporal import converters
from gadfastemporal import decorators
from gadfastemporal import interceptors

contextvar = contextvars.ContextVar("contextvar", default=None)

class Settings(BaseModel):
    TEMPORAL_HOST: str = "localhost:7233"
    TEMPORAL_NAMESPACE: str = "default"
    TEMPORAL_QUEUE: str = "my-queue"


settings = Settings()


@decorators.workflow
class SimpleWorkflow(Workflow):
    @staticmethod
    def id(_: dict) -> str:
        return str(uuid.uuid4())

    @staticmethod
    def activities() -> typing.List[typing.Callable]:
        return [SimpleWorkflow.process]

    @decorators.run
    async def run(self, command: dict) -> str:
        return await temporal.activity.sync(self.process, command)

    @staticmethod
    @decorators.activity
    async def process(data: dict) -> str:
        return f"Processed: {data}"


temporal = Temporal(
    host=settings.TEMPORAL_HOST,
    namespace=settings.TEMPORAL_NAMESPACE,
    task_queue=settings.TEMPORAL_QUEUE,
    workflows=[SimpleWorkflow],
    worker_interceptors=[interceptors.SentryInterceptor()],
    client_interceptors=[interceptors.ContextPropagationInterceptor(contexts=[contextvar])],
    data_converter=converters.pydantic,
    search_attributes=[contextvar],
)

@contextlib.asynccontextmanager
async def lifespan(_: FastAPI):
    await temporal.start()
    yield
    await temporal.shutdown()


app = FastAPI(lifespan=lifespan)

@app.post("/run")
async def run():
    data = dict(x=1)
    contextvar.set({"Traceparent": "12345"})
    return await temporal.workflow.sync(SimpleWorkflow, data)

Up containers

docker compose -f .compose/docker-compose.yml up -d --build

Create search-attribute

temporal operator search-attribute create --name Traceparent --type Keyword

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gadfastemporal-0.0.1.tar.gz (10.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

gadfastemporal-0.0.1-py3-none-any.whl (14.2 kB view details)

Uploaded Python 3

File details

Details for the file gadfastemporal-0.0.1.tar.gz.

File metadata

  • Download URL: gadfastemporal-0.0.1.tar.gz
  • Upload date:
  • Size: 10.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.12.3

File hashes

Hashes for gadfastemporal-0.0.1.tar.gz
Algorithm Hash digest
SHA256 6cf463554b8657670ac17c1c88056c7d06ff3bcf7f94bc89721871ac2796db14
MD5 27c6ab2bb5b08853b19d1727b43dda54
BLAKE2b-256 900f3c46be56069e2987d26d91d6a6dbe37962ce5c764bf5e652fb955d74bd3a

See more details on using hashes here.

File details

Details for the file gadfastemporal-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: gadfastemporal-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 14.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.12.3

File hashes

Hashes for gadfastemporal-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 f5fc54436cc436f4c8a05e086f46e0a14832db81db52452126923e2ff6e25a57
MD5 b24cb6217902137320328235e56373a2
BLAKE2b-256 3f03106d19095bdffc60c8f4053c221ef5885cde9bf1e034edabe9203fdae99a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page