Skip to main content

Remote-backed workspace sync for Temporal activities

Project description

temporal-workdir

Shared working directories for Temporal activities across distributed workers.

The Problem

Temporal activities that read and write files break when you run multiple workers — each worker has its own disk. An activity that writes output.csv on Worker A won't find it when retried on Worker B.

temporal-workdir solves this by syncing a local directory with remote storage (GCS, S3, local filesystem, etc.) before and after each activity execution.

Install

pip install temporal-workdir

# Add your cloud storage backend:
pip install gcsfs    # Google Cloud Storage
pip install s3fs     # Amazon S3
pip install adlfs    # Azure Blob Storage

Quick Start

Context Manager

Use Workspace anywhere you need shared file state between activities:

import json
from temporal_workdir import Workspace

# Pull remote files → work locally → push changes back
async with Workspace("gs://my-bucket/jobs/job-123") as ws:
    config = json.loads((ws.path / "config.json").read_text())
    (ws.path / "result.csv").write_text("id,score\n1,0.95")
# Clean exit → changes uploaded automatically
# Exception → nothing uploaded, remote state unchanged

Activity Decorator

The @workspace decorator handles pull/push around your activity. Template variables are resolved from activity.info():

from temporalio import activity
from temporal_workdir import workspace, get_workspace_path

@workspace("gs://my-bucket/{workflow_id}/{activity_type}")
@activity.defn
async def process_data(order_id: str) -> str:
    ws = get_workspace_path()  # local Path to synced directory
    (ws / "orders.txt").write_text(order_id)
    return "done"

Available template variables: {workflow_id}, {activity_id}, {activity_type}, {task_queue}.

Custom Template Variables

Use key_fn to add your own template variables from the activity arguments:

@workspace(
    "gs://my-bucket/{workflow_id}/users/{user_id}",
    key_fn=lambda user_id, **_: {"user_id": user_id},
)
@activity.defn
async def process_user(user_id: str) -> None:
    ws = get_workspace_path()
    # Each user gets their own workspace
    ...

Read-Only Access

Pull without pushing back — useful for reading shared state:

async with Workspace("gs://my-bucket/shared/config", read_only=True) as ws:
    config = json.loads((ws.path / "settings.json").read_text())
    # No upload on exit, even if you modify files

Managing Workspaces

from temporal_workdir import list_workspace_names, delete_workspace

# List workspace names under a prefix
names = list_workspace_names("gs://my-bucket/jobs")
# → ["job-001", "job-002", "job-003"]

# Delete a workspace
delete_workspace("gs://my-bucket/jobs/job-001")  # → True if deleted

How It Works

Activity starts
  ↓
Pull: download {url}.tar.gz → unpack to temp dir
  ↓
Execute: your code reads/writes files at ws.path
  ↓
Push: pack temp dir → upload {url}.tar.gz
  ↓
Cleanup: delete temp dir
  • First run (no archive exists): starts with an empty directory
  • Exception during execution: no push — remote state stays unchanged
  • Empty workspace after execution: remote archive is deleted

Storage Backends

Uses fsspec for storage. Any fsspec-compatible backend works:

Scheme Backend Package
gs:// Google Cloud Storage gcsfs
s3:// Amazon S3 s3fs
az:// Azure Blob Storage adlfs
file:// Local filesystem
memory:// In-memory (testing)

Pass backend-specific options as keyword arguments:

Workspace("gs://bucket/key", project="my-gcp-project", token="cloud")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

temporal_workdir-0.7.0.tar.gz (10.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

temporal_workdir-0.7.0-py3-none-any.whl (9.7 kB view details)

Uploaded Python 3

File details

Details for the file temporal_workdir-0.7.0.tar.gz.

File metadata

  • Download URL: temporal_workdir-0.7.0.tar.gz
  • Upload date:
  • Size: 10.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for temporal_workdir-0.7.0.tar.gz
Algorithm Hash digest
SHA256 7fed08732824aa2cf8a60dbca4b0c0af5e0368eb3a14d078e9a9709efc48996f
MD5 0c7294585b02e377c302765f8932e9b9
BLAKE2b-256 7e7fad6937570d35586392b073a77d324e1ae667bf45c0bc0139cf669baf525c

See more details on using hashes here.

Provenance

The following attestation bundles were made for temporal_workdir-0.7.0.tar.gz:

Publisher: release.yml on saeedseyfi/temporal-workdir

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file temporal_workdir-0.7.0-py3-none-any.whl.

File metadata

File hashes

Hashes for temporal_workdir-0.7.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6542c487e70b10b25baddad9c9cb266744f1c2d8ae22fea5a22a942ecdb03c90
MD5 fc51783596f3fe3dcd59fb720c86105d
BLAKE2b-256 7bcd809aea5443eaed1de4f85ae5269c391a7583766195f4eb265dd73169a2a4

See more details on using hashes here.

Provenance

The following attestation bundles were made for temporal_workdir-0.7.0-py3-none-any.whl:

Publisher: release.yml on saeedseyfi/temporal-workdir

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page