Skip to main content

Remote-backed workspace sync for Temporal activities

Project description

temporal-workdir

Shared working directories for Temporal activities across distributed workers.

The Problem

Temporal activities that read and write files break when you run multiple workers — each worker has its own disk. An activity that writes output.csv on Worker A won't find it when retried on Worker B.

temporal-workdir solves this by syncing a local directory with remote storage (GCS, S3, local filesystem, etc.) before and after each activity execution.

Install

pip install temporal-workdir

# Add your cloud storage backend:
pip install gcsfs    # Google Cloud Storage
pip install s3fs     # Amazon S3
pip install adlfs    # Azure Blob Storage

Quick Start

Context Manager

Use Workspace anywhere you need shared file state between activities:

import json
from temporal_workdir import Workspace

# Pull remote files → work locally → push changes back
async with Workspace("gs://my-bucket/jobs/job-123") as ws:
    config = json.loads((ws.path / "config.json").read_text())
    (ws.path / "result.csv").write_text("id,score\n1,0.95")
# Clean exit → changes uploaded automatically
# Exception → nothing uploaded, remote state unchanged

Activity Decorator

The @workspace decorator handles pull/push around your activity. Template variables are resolved from activity.info():

from temporalio import activity
from temporal_workdir import workspace, get_workspace_path

@workspace("gs://my-bucket/{workflow_id}/{activity_type}")
@activity.defn
async def process_data(order_id: str) -> str:
    ws = get_workspace_path()  # local Path to synced directory
    (ws / "orders.txt").write_text(order_id)
    return "done"

Available template variables: {workflow_id}, {activity_id}, {activity_type}, {task_queue}.

Custom Template Variables

Use key_fn to add your own template variables from the activity arguments:

@workspace(
    "gs://my-bucket/{workflow_id}/users/{user_id}",
    key_fn=lambda user_id, **_: {"user_id": user_id},
)
@activity.defn
async def process_user(user_id: str) -> None:
    ws = get_workspace_path()
    # Each user gets their own workspace
    ...

Read-Only Access

Pull without pushing back — useful for reading shared state:

async with Workspace("gs://my-bucket/shared/config", read_only=True) as ws:
    config = json.loads((ws.path / "settings.json").read_text())
    # No upload on exit, even if you modify files

Managing Workspaces

from temporal_workdir import list_workspace_names, delete_workspace

# List workspace names under a prefix
names = list_workspace_names("gs://my-bucket/jobs")
# → ["job-001", "job-002", "job-003"]

# Delete a workspace
delete_workspace("gs://my-bucket/jobs/job-001")  # → True if deleted

How It Works

Activity starts
  ↓
Pull: download {url}.tar.gz → unpack to temp dir
  ↓
Execute: your code reads/writes files at ws.path
  ↓
Push: pack temp dir → upload {url}.tar.gz
  ↓
Cleanup: delete temp dir
  • First run (no archive exists): starts with an empty directory
  • Exception during execution: no push — remote state stays unchanged
  • Empty workspace after execution: remote archive is deleted

Storage Backends

Uses fsspec for storage. Any fsspec-compatible backend works:

Scheme Backend Package
gs:// Google Cloud Storage gcsfs
s3:// Amazon S3 s3fs
az:// Azure Blob Storage adlfs
file:// Local filesystem
memory:// In-memory (testing)

Pass backend-specific options as keyword arguments:

Workspace("gs://bucket/key", project="my-gcp-project", token="cloud")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

temporal_workdir-0.6.0.tar.gz (10.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

temporal_workdir-0.6.0-py3-none-any.whl (9.6 kB view details)

Uploaded Python 3

File details

Details for the file temporal_workdir-0.6.0.tar.gz.

File metadata

  • Download URL: temporal_workdir-0.6.0.tar.gz
  • Upload date:
  • Size: 10.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for temporal_workdir-0.6.0.tar.gz
Algorithm Hash digest
SHA256 c90ee0c5786af3a51730fadd5844a65789c95691c81762374881c44bb2a75710
MD5 1ce0b3da2f0a21666fce6c9f55103116
BLAKE2b-256 5ec778249098d8807328aa9ce6d155e53e0c1885c7c0303cdb8f74090bfd9b83

See more details on using hashes here.

Provenance

The following attestation bundles were made for temporal_workdir-0.6.0.tar.gz:

Publisher: release.yml on saeedseyfi/temporal-workdir

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file temporal_workdir-0.6.0-py3-none-any.whl.

File metadata

File hashes

Hashes for temporal_workdir-0.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c811c93ebac5443b33633075b1903e6e762a9dacbbb0e17636ac76230bc9324d
MD5 d8950d9772e01a5c44ce70d48dd0d4fe
BLAKE2b-256 9849f67dbc7b288ebe48b4b4e3cfffccbc7475785095d8e46b4edfcb4c76581b

See more details on using hashes here.

Provenance

The following attestation bundles were made for temporal_workdir-0.6.0-py3-none-any.whl:

Publisher: release.yml on saeedseyfi/temporal-workdir

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page