Temporal.io Python SDK
Reason this release was yanked:
Alpha release no longer valid
Project description
Temporal Python SDK
UNDER DEVELOPMENT
The Python SDK is under development. There are no compatibility guarantees nor proper documentation pages at this time.
Usage
Installation
Install the temporalio
package from PyPI. If using pip
directly, this might
look like:
python -m pip install temporalio
Client
A client can be created and used to start a workflow like so:
from temporalio.client import Client
async def main():
# Create client connected to server at the given address
client = await Client.connect("http://localhost:7233", namespace="my-namespace")
# Start a workflow
handle = await client.start_workflow("my workflow name", "some arg", id="my-workflow-id", task_queue="my-task-queue")
# Wait for result
result = await handle.result()
print(f"Result: {result}")
Some things to note about the above code:
- A
Client
does not have an explicit "close" - Positional arguments can be passed to
start_workflow
- The
handle
represents the workflow that was started and can be used for more than just getting the result - Since we are just getting the handle and waiting on the result, we could have called
client.execute_workflow
which does the same thing - Clients can have many more options not shown here (e.g. data converters and interceptors)
Data Conversion
Data converters are used to convert raw Temporal payloads to/from actual Python types. A custom data converter of type
temporalio.converter.DataConverter
can be set via the data_converter
client parameter.
The default data converter supports converting multiple types including:
None
bytes
google.protobuf.message.Message
- As JSON when encoding, but has ability to decode binary proto from other languages- Anything that
json.dump
supports
As a special case in the default converter, data classes are
automatically converted to dictionaries before
encoding as JSON. Since Python is a dynamic language, when decoding via
json.load
, the type is not known at runtime so, for example,
a JSON object will be a dict
. As a special case, if the parameter type hint is a data class for a JSON payload, it is
decoded into an instance of that data class (properly recursing into child data classes).
Activities
Activity-only Worker
An activity-only worker can be started like so:
import asyncio
import logging
from temporalio.client import Client
from temporalio.worker import Worker
async def say_hello_activity(name: str) -> str:
return f"Hello, {name}!"
async def main(stop_event: asyncio.Event):
# Create client connected to server at the given address
client = await Client.connect("http://localhost:7233", namespace="my-namespace")
# Run the worker until the event is set
worker = Worker(client, task_queue="my-task-queue", activities={"say-hello-activity": say_hello_activity})
async with worker:
await stop_event.wait()
Some things to note about the above code:
- This creates/uses the same client that is used for starting workflows
- The
say_hello_activity
isasync
which is the recommended activity type (see "Types of Activities" below) - The created worker only runs activities, not workflows
- Activities are passed as a mapping with the key as a string activity name and the value as a callable
- While this example accepts a stop event and uses
async with
,run()
andshutdown()
may be used instead - Workers can have many more options not shown here (e.g. data converters and interceptors)
Types of Activities
There are 3 types of activity callables accepted and described below: asynchronous, synchronous multithreaded, and synchronous multiprocess/other. Only positional parameters are allowed in activity callables.
Asynchronous Activities
Asynchronous activities, i.e. functions using async def
, are the recommended activity type. When using asynchronous
activities no special worker parameters are needed.
Cancellation for asynchronous activities is done via
asyncio.Task.cancel
. This means that
asyncio.CancelledError
will be raised (and can be caught, but it is not recommended). An activity must heartbeat to
receive cancellation and there are other ways to be notified about cancellation (see "Activity Context" and
"Heartbeating and Cancellation" later).
Synchronous Activities
Synchronous activities, i.e. functions that do not have async def
, can be used with workers, but the
activity_executor
worker parameter must be set with a concurrent.futures.Executor
instance to use for executing the
activities.
Cancellation for synchronous activities is done in the background and the activity must choose to listen for it and react appropriately. An activity must heartbeat to receive cancellation and there are other ways to be notified about cancellation (see "Activity Context" and "Heartbeating and Cancellation" later).
Synchronous Multithreaded Activities
If activity_executor
is set to an instance of concurrent.futures.ThreadPoolExecutor
then the synchronous activities
are considered multithreaded activities. Besides activity_executor
, no other worker parameters are required for
synchronous multithreaded activities.
Synchronous Multiprocess/Other Activities
Synchronous activities, i.e. functions that do not have async def
, can be used with workers, but the
activity_executor
worker parameter must be set with a concurrent.futures.Executor
instance to use for executing the
activities. If this is not set to an instance of concurrent.futures.ThreadPoolExecutor
then the synchronous
activities are considered multiprocess/other activities.
These require special primitives for heartbeating and cancellation. The shared_state_manager
worker parameter must be
set to an instance of temporalio.worker.SharedStateManager
. The most common implementation can be created by passing a
multiprocessing.managers.SyncManager
(i.e. result of multiprocessing.managers.Manager()
) to
temporalio.worker.SharedStateManager.create_from_multiprocessing()
.
Also, all of these activity functions must be "picklable".
Activity Context
During activity execution, an implicit activity context is set as a
context variable. The context variable itself is not visible, but
calls in the temporalio.activity
package make use of it. Specifically:
in_activity()
- Whether an activity context is presentinfo()
- Returns the immutable info of the currently running activityheartbeat(*details)
- Record a heartbeatis_cancelled()
- Whether a cancellation has been requested on this activitywait_for_cancelled()
-async
call to wait for cancellation requestwait_for_cancelled_sync(timeout)
- Synchronous blocking call to wait for cancellation requestis_worker_shutdown()
- Whether the worker has started graceful shutdownwait_for_worker_shutdown()
-async
call to wait for start of graceful worker shutdownwait_for_worker_shutdown_sync(timeout)
- Synchronous blocking call to wait for start of graceful worker shutdownraise_complete_async()
- Raise an error that this activity will be completed asynchronously (i.e. after return of the activity function in a separate client call)
With the exception of in_activity()
, if any of the functions are called outside of an activity context, an error
occurs. Synchronous activities cannot call any of the async
functions.
Heartbeating and Cancellation
In order for an activity to be notified of cancellation requests, they must invoke temporalio.activity.heartbeat()
.
It is strongly recommended that all but the fastest executing activities call this function regularly. "Types of
Activities" has specifics on cancellation for asynchronous and synchronous activities.
In addition to obtaining cancellation information, heartbeats also support detail data that is persisted on the server
for retrieval during activity retry. If an activity calls temporalio.activity.heartbeat(123, 456)
and then fails and
is retried, temporalio.activity.info().heartbeat_details
will return an iterable containing 123
and 456
on the
next run.
Worker Shutdown
An activity can react to a worker shutdown. Using is_worker_shutdown
or one of the wait_for_worker_shutdown
functions an activity can react to a shutdown.
When the graceful_shutdown_timeout
worker parameter is given a datetime.timedelta
, on shutdown the worker will
notify activities of the graceful shutdown. Once that timeout has passed (or if wasn't set), the worker will perform
cancellation of all outstanding activities.
The shutdown()
invocation will wait on all activities to complete, so if a long-running activity does not at least
respect cancellation, the shutdown may never complete.
Development
The Python SDK is built to work with Python 3.7 and newer. It is built using SDK Core which is written in Rust.
Local development environment
-
Install the system dependencies:
-
Use a local virtual env environment (helps IDEs and Windows):
poetry config virtualenvs.in-project true
-
Install the package dependencies (requires Rust):
poetry install
-
Build the project (requires Rust):
poe build-develop
-
Run the tests (requires Go):
poe test
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distributions
File details
Details for the file temporalio-0.1a1.tar.gz
.
File metadata
- Download URL: temporalio-0.1a1.tar.gz
- Upload date:
- Size: 520.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.63.0 importlib-metadata/4.11.2 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.10.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9f8a975cc4285bdfe6f0e3cbffed56f942bb3fbbe4d45bb828d70e7c9c889d17 |
|
MD5 | eecd21ebc12be0ff340935c505cabe65 |
|
BLAKE2b-256 | c8adcdf5fe4f45b54afb05bb183412f9a2877f7b1e3cf4cc644af5a6fa91f8fd |
File details
Details for the file temporalio-0.1a1-cp37-abi3-win_amd64.whl
.
File metadata
- Download URL: temporalio-0.1a1-cp37-abi3-win_amd64.whl
- Upload date:
- Size: 6.4 MB
- Tags: CPython 3.7+, Windows x86-64
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.63.0 importlib-metadata/4.11.2 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.10.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | d73b8c3f75d4cdc3e11e70c67d62d57a4d755cccc9c7146a9dc94485096a0a43 |
|
MD5 | 71cde4f2771f7e55cc19d7b61244c907 |
|
BLAKE2b-256 | 3672c11e7c0d52f428cd677cc9c45cc17c33c7aeedcf7053c3057297217fb2ac |
File details
Details for the file temporalio-0.1a1-cp37-abi3-manylinux_2_31_x86_64.whl
.
File metadata
- Download URL: temporalio-0.1a1-cp37-abi3-manylinux_2_31_x86_64.whl
- Upload date:
- Size: 7.3 MB
- Tags: CPython 3.7+, manylinux: glibc 2.31+ x86-64
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.63.0 importlib-metadata/4.11.2 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.10.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8dfe1e49998498ac88ef7efd7231e0bfcaa3637d4421d090b11972fb770199f4 |
|
MD5 | 5c36d0d40acd1f6197d20125c481eb58 |
|
BLAKE2b-256 | 360115d1ea37c277b2e72ef78bfa42dd7558633660b902c1b7c4406bf117d157 |
File details
Details for the file temporalio-0.1a1-cp37-abi3-macosx_10_16_x86_64.whl
.
File metadata
- Download URL: temporalio-0.1a1-cp37-abi3-macosx_10_16_x86_64.whl
- Upload date:
- Size: 6.8 MB
- Tags: CPython 3.7+, macOS 10.16+ x86-64
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.63.0 importlib-metadata/4.11.2 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.10.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | a779dd8cd88fbc7bce9b0da3cbf859ddea5256205653286607caa753964a113e |
|
MD5 | 92bc9421bf89d08e24ab179bad77a833 |
|
BLAKE2b-256 | 641fc569cb4fbc07fa242faeac0c63470459cb79e978d40b93b8168d7b2cc8d1 |