Skip to main content

Temporal.io Python SDK

Reason this release was yanked:

Alpha release no longer valid

Project description

Temporal Python SDK

UNDER DEVELOPMENT

The Python SDK is under development. There are no compatibility guarantees nor proper documentation pages at this time.

Usage

Installation

Install the temporalio package from PyPI. If using pip directly, this might look like:

python -m pip install temporalio

Client

A client can be created and used to start a workflow like so:

from temporalio.client import Client

async def main():
  # Create client connected to server at the given address
  client = await Client.connect("http://localhost:7233", namespace="my-namespace")

  # Start a workflow
  handle = await client.start_workflow("my workflow name", "some arg", id="my-workflow-id", task_queue="my-task-queue")

  # Wait for result
  result = await handle.result()
  print(f"Result: {result}")

Some things to note about the above code:

  • A Client does not have an explicit "close"
  • Positional arguments can be passed to start_workflow
  • The handle represents the workflow that was started and can be used for more than just getting the result
  • Since we are just getting the handle and waiting on the result, we could have called client.execute_workflow which does the same thing
  • Clients can have many more options not shown here (e.g. data converters and interceptors)

Data Conversion

Data converters are used to convert raw Temporal payloads to/from actual Python types. A custom data converter of type temporalio.converter.DataConverter can be set via the data_converter client parameter.

The default data converter supports converting multiple types including:

  • None
  • bytes
  • google.protobuf.message.Message - As JSON when encoding, but has ability to decode binary proto from other languages
  • Anything that json.dump supports

As a special case in the default converter, data classes are automatically converted to dictionaries before encoding as JSON. Since Python is a dynamic language, when decoding via json.load, the type is not known at runtime so, for example, a JSON object will be a dict. As a special case, if the parameter type hint is a data class for a JSON payload, it is decoded into an instance of that data class (properly recursing into child data classes).

Activities

Activity-only Worker

An activity-only worker can be started like so:

import asyncio
import logging
from temporalio.client import Client
from temporalio.worker import Worker

async def say_hello_activity(name: str) -> str:
    return f"Hello, {name}!"


async def main(stop_event: asyncio.Event):
  # Create client connected to server at the given address
  client = await Client.connect("http://localhost:7233", namespace="my-namespace")

  # Run the worker until the event is set
  worker = Worker(client, task_queue="my-task-queue", activities={"say-hello-activity": say_hello_activity})
  async with worker:
    await stop_event.wait()

Some things to note about the above code:

  • This creates/uses the same client that is used for starting workflows
  • The say_hello_activity is async which is the recommended activity type (see "Types of Activities" below)
  • The created worker only runs activities, not workflows
  • Activities are passed as a mapping with the key as a string activity name and the value as a callable
  • While this example accepts a stop event and uses async with, run() and shutdown() may be used instead
  • Workers can have many more options not shown here (e.g. data converters and interceptors)

Types of Activities

There are 3 types of activity callables accepted and described below: asynchronous, synchronous multithreaded, and synchronous multiprocess/other. Only positional parameters are allowed in activity callables.

Asynchronous Activities

Asynchronous activities, i.e. functions using async def, are the recommended activity type. When using asynchronous activities no special worker parameters are needed.

Cancellation for asynchronous activities is done via asyncio.Task.cancel. This means that asyncio.CancelledError will be raised (and can be caught, but it is not recommended). An activity must heartbeat to receive cancellation and there are other ways to be notified about cancellation (see "Activity Context" and "Heartbeating and Cancellation" later).

Synchronous Activities

Synchronous activities, i.e. functions that do not have async def, can be used with workers, but the activity_executor worker parameter must be set with a concurrent.futures.Executor instance to use for executing the activities.

Cancellation for synchronous activities is done in the background and the activity must choose to listen for it and react appropriately. An activity must heartbeat to receive cancellation and there are other ways to be notified about cancellation (see "Activity Context" and "Heartbeating and Cancellation" later).

Synchronous Multithreaded Activities

If activity_executor is set to an instance of concurrent.futures.ThreadPoolExecutor then the synchronous activities are considered multithreaded activities. Besides activity_executor, no other worker parameters are required for synchronous multithreaded activities.

Synchronous Multiprocess/Other Activities

Synchronous activities, i.e. functions that do not have async def, can be used with workers, but the activity_executor worker parameter must be set with a concurrent.futures.Executor instance to use for executing the activities. If this is not set to an instance of concurrent.futures.ThreadPoolExecutor then the synchronous activities are considered multiprocess/other activities.

These require special primitives for heartbeating and cancellation. The shared_state_manager worker parameter must be set to an instance of temporalio.worker.SharedStateManager. The most common implementation can be created by passing a multiprocessing.managers.SyncManager (i.e. result of multiprocessing.managers.Manager()) to temporalio.worker.SharedStateManager.create_from_multiprocessing().

Also, all of these activity functions must be "picklable".

Activity Context

During activity execution, an implicit activity context is set as a context variable. The context variable itself is not visible, but calls in the temporalio.activity package make use of it. Specifically:

  • in_activity() - Whether an activity context is present
  • info() - Returns the immutable info of the currently running activity
  • heartbeat(*details) - Record a heartbeat
  • is_cancelled() - Whether a cancellation has been requested on this activity
  • wait_for_cancelled() - async call to wait for cancellation request
  • wait_for_cancelled_sync(timeout) - Synchronous blocking call to wait for cancellation request
  • is_worker_shutdown() - Whether the worker has started graceful shutdown
  • wait_for_worker_shutdown() - async call to wait for start of graceful worker shutdown
  • wait_for_worker_shutdown_sync(timeout) - Synchronous blocking call to wait for start of graceful worker shutdown
  • raise_complete_async() - Raise an error that this activity will be completed asynchronously (i.e. after return of the activity function in a separate client call)

With the exception of in_activity(), if any of the functions are called outside of an activity context, an error occurs. Synchronous activities cannot call any of the async functions.

Heartbeating and Cancellation

In order for an activity to be notified of cancellation requests, they must invoke temporalio.activity.heartbeat(). It is strongly recommended that all but the fastest executing activities call this function regularly. "Types of Activities" has specifics on cancellation for asynchronous and synchronous activities.

In addition to obtaining cancellation information, heartbeats also support detail data that is persisted on the server for retrieval during activity retry. If an activity calls temporalio.activity.heartbeat(123, 456) and then fails and is retried, temporalio.activity.info().heartbeat_details will return an iterable containing 123 and 456 on the next run.

Worker Shutdown

An activity can react to a worker shutdown. Using is_worker_shutdown or one of the wait_for_worker_shutdown functions an activity can react to a shutdown.

When the graceful_shutdown_timeout worker parameter is given a datetime.timedelta, on shutdown the worker will notify activities of the graceful shutdown. Once that timeout has passed (or if wasn't set), the worker will perform cancellation of all outstanding activities.

The shutdown() invocation will wait on all activities to complete, so if a long-running activity does not at least respect cancellation, the shutdown may never complete.

Development

The Python SDK is built to work with Python 3.7 and newer. It is built using SDK Core which is written in Rust.

Local development environment

  • Install the system dependencies:

    • Python >=3.7
    • pipx (only needed for installing the two dependencies below)
    • poetry pipx install poetry
    • poe pipx install poethepoet
  • Use a local virtual env environment (helps IDEs and Windows):

    poetry config virtualenvs.in-project true
    
  • Install the package dependencies (requires Rust):

    poetry install
    
  • Build the project (requires Rust):

    poe build-develop
    
  • Run the tests (requires Go):

    poe test
    

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

temporalio-0.1a1.tar.gz (520.9 kB view hashes)

Uploaded Source

Built Distributions

temporalio-0.1a1-cp37-abi3-win_amd64.whl (6.4 MB view hashes)

Uploaded CPython 3.7+ Windows x86-64

temporalio-0.1a1-cp37-abi3-manylinux_2_31_x86_64.whl (7.3 MB view hashes)

Uploaded CPython 3.7+ manylinux: glibc 2.31+ x86-64

temporalio-0.1a1-cp37-abi3-macosx_10_16_x86_64.whl (6.8 MB view hashes)

Uploaded CPython 3.7+ macOS 10.16+ x86-64

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page