Skip to main content

Remote-backed workspace sync for Temporal activities

Project description

temporal-workdir

Shared working directories for Temporal activities across distributed workers.

The Problem

Temporal activities that read and write files break when you run multiple workers — each worker has its own disk. An activity that writes output.csv on Worker A won't find it when retried on Worker B.

temporal-workdir solves this by syncing a local directory with remote storage (GCS, S3, local filesystem, etc.) before and after each activity execution.

Install

pip install temporal-workdir

# Add your cloud storage backend:
pip install gcsfs    # Google Cloud Storage
pip install s3fs     # Amazon S3
pip install adlfs    # Azure Blob Storage

Quick Start

Context Manager

Use Workspace anywhere you need shared file state between activities:

import json
from temporal_workdir import Workspace

# Pull remote files → work locally → push changes back
async with Workspace("gs://my-bucket/jobs/job-123") as ws:
    config = json.loads((ws.path / "config.json").read_text())
    (ws.path / "result.csv").write_text("id,score\n1,0.95")
# Clean exit → changes uploaded automatically
# Exception → nothing uploaded, remote state unchanged

Activity Decorator

The @workspace decorator handles pull/push around your activity. Template variables are resolved from activity.info():

from temporalio import activity
from temporal_workdir import workspace, get_workspace_path

@workspace("gs://my-bucket/{workflow_id}/{activity_type}")
@activity.defn
async def process_data(order_id: str) -> str:
    ws = get_workspace_path()  # local Path to synced directory
    (ws / "orders.txt").write_text(order_id)
    return "done"

Available template variables: {workflow_id}, {activity_id}, {activity_type}, {task_queue}.

Custom Template Variables

Use key_fn to add your own template variables from the activity arguments:

@workspace(
    "gs://my-bucket/{workflow_id}/users/{user_id}",
    key_fn=lambda user_id, **_: {"user_id": user_id},
)
@activity.defn
async def process_user(user_id: str) -> None:
    ws = get_workspace_path()
    # Each user gets their own workspace
    ...

Read-Only Access

Pull without pushing back — useful for reading shared state:

async with Workspace("gs://my-bucket/shared/config", read_only=True) as ws:
    config = json.loads((ws.path / "settings.json").read_text())
    # No upload on exit, even if you modify files

Managing Workspaces

from temporal_workdir import list_workspace_names, delete_workspace

# List workspace names under a prefix
names = list_workspace_names("gs://my-bucket/jobs")
# → ["job-001", "job-002", "job-003"]

# Delete a workspace
delete_workspace("gs://my-bucket/jobs/job-001")  # → True if deleted

How It Works

Activity starts
  ↓
Pull: download {url}.tar.gz → unpack to temp dir
  ↓
Execute: your code reads/writes files at ws.path
  ↓
Push: pack temp dir → upload {url}.tar.gz
  ↓
Cleanup: delete temp dir
  • First run (no archive exists): starts with an empty directory
  • Exception during execution: no push — remote state stays unchanged
  • Empty workspace after execution: remote archive is deleted

Storage Backends

Uses fsspec for storage. Any fsspec-compatible backend works:

Scheme Backend Package
gs:// Google Cloud Storage gcsfs
s3:// Amazon S3 s3fs
az:// Azure Blob Storage adlfs
file:// Local filesystem
memory:// In-memory (testing)

Pass backend-specific options as keyword arguments:

Workspace("gs://bucket/key", project="my-gcp-project", token="cloud")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

temporal_workdir-0.7.2.tar.gz (10.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

temporal_workdir-0.7.2-py3-none-any.whl (9.7 kB view details)

Uploaded Python 3

File details

Details for the file temporal_workdir-0.7.2.tar.gz.

File metadata

  • Download URL: temporal_workdir-0.7.2.tar.gz
  • Upload date:
  • Size: 10.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for temporal_workdir-0.7.2.tar.gz
Algorithm Hash digest
SHA256 eab339fc9317921d790dd99a97e68010fd6d52049ad668639412d23dc36c6eb4
MD5 0073fa8e7922a6fa3a05ce3c0b438716
BLAKE2b-256 871395202d2ab7ee00c9ef7d6ad7214566d3b09a99fecdda522aa0ddbfaee991

See more details on using hashes here.

Provenance

The following attestation bundles were made for temporal_workdir-0.7.2.tar.gz:

Publisher: release.yml on saeedseyfi/temporal-workdir

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file temporal_workdir-0.7.2-py3-none-any.whl.

File metadata

File hashes

Hashes for temporal_workdir-0.7.2-py3-none-any.whl
Algorithm Hash digest
SHA256 17bf9a9dfe016bcd27c2a96290f27626b9f9e89357a54f7a34e0d8e06ab8ce1e
MD5 96719ee3168b08e0be57ae75336a00c9
BLAKE2b-256 7bf8e2850accf118d14f06569b182ae3e7ee3fdb99bbaa3dcf9206f63082b20e

See more details on using hashes here.

Provenance

The following attestation bundles were made for temporal_workdir-0.7.2-py3-none-any.whl:

Publisher: release.yml on saeedseyfi/temporal-workdir

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page