Skip to main content

Remote-backed workspace sync for Temporal activities

Project description

temporal-workdir

Shared working directories for Temporal activities across distributed workers.

The Problem

Temporal activities that read and write files break when you run multiple workers — each worker has its own disk. An activity that writes output.csv on Worker A won't find it when retried on Worker B.

temporal-workdir solves this by syncing a local directory with remote storage (GCS, S3, local filesystem, etc.) before and after each activity execution.

Install

pip install temporal-workdir

# Add your cloud storage backend:
pip install gcsfs    # Google Cloud Storage
pip install s3fs     # Amazon S3
pip install adlfs    # Azure Blob Storage

Quick Start

Context Manager

Use Workspace anywhere you need shared file state between activities:

import json
from temporal_workdir import Workspace

# Pull remote files → work locally → push changes back
async with Workspace("gs://my-bucket/jobs/job-123") as ws:
    config = json.loads((ws.path / "config.json").read_text())
    (ws.path / "result.csv").write_text("id,score\n1,0.95")
# Clean exit → changes uploaded automatically
# Exception → nothing uploaded, remote state unchanged

Activity Decorator

The @workspace decorator handles pull/push around your activity. Template variables are resolved from activity.info():

from temporalio import activity
from temporal_workdir import workspace, get_workspace_path

@workspace("gs://my-bucket/{workflow_id}/{activity_type}")
@activity.defn
async def process_data(order_id: str) -> str:
    ws = get_workspace_path()  # local Path to synced directory
    (ws / "orders.txt").write_text(order_id)
    return "done"

Available template variables: {workflow_id}, {activity_id}, {activity_type}, {task_queue}.

Custom Template Variables

Use key_fn to add your own template variables from the activity arguments:

@workspace(
    "gs://my-bucket/{workflow_id}/users/{user_id}",
    key_fn=lambda user_id, **_: {"user_id": user_id},
)
@activity.defn
async def process_user(user_id: str) -> None:
    ws = get_workspace_path()
    # Each user gets their own workspace
    ...

Read-Only Access

Pull without pushing back — useful for reading shared state:

async with Workspace("gs://my-bucket/shared/config", read_only=True) as ws:
    config = json.loads((ws.path / "settings.json").read_text())
    # No upload on exit, even if you modify files

Managing Workspaces

from temporal_workdir import list_workspace_names, delete_workspace

# List workspace names under a prefix
names = list_workspace_names("gs://my-bucket/jobs")
# → ["job-001", "job-002", "job-003"]

# Delete a workspace
delete_workspace("gs://my-bucket/jobs/job-001")  # → True if deleted

How It Works

Activity starts
  ↓
Pull: download {url}.tar.gz → unpack to temp dir
  ↓
Execute: your code reads/writes files at ws.path
  ↓
Push: pack temp dir → upload {url}.tar.gz
  ↓
Cleanup: delete temp dir
  • First run (no archive exists): starts with an empty directory
  • Exception during execution: no push — remote state stays unchanged
  • Empty workspace after execution: remote archive is deleted

Storage Backends

Uses fsspec for storage. Any fsspec-compatible backend works:

Scheme Backend Package
gs:// Google Cloud Storage gcsfs
s3:// Amazon S3 s3fs
az:// Azure Blob Storage adlfs
file:// Local filesystem
memory:// In-memory (testing)

Pass backend-specific options as keyword arguments:

Workspace("gs://bucket/key", project="my-gcp-project", token="cloud")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

temporal_workdir-0.5.0.tar.gz (10.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

temporal_workdir-0.5.0-py3-none-any.whl (9.4 kB view details)

Uploaded Python 3

File details

Details for the file temporal_workdir-0.5.0.tar.gz.

File metadata

  • Download URL: temporal_workdir-0.5.0.tar.gz
  • Upload date:
  • Size: 10.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for temporal_workdir-0.5.0.tar.gz
Algorithm Hash digest
SHA256 206ba6bb92f13bcd3cf1c06cb54a2e382ab30e0d8b743c7f7a0a7be9f382bff6
MD5 3198b19e6b81bd10d55692398e51823d
BLAKE2b-256 456f91b2581956da19f2826b32008047ca2647269d0fc2aac1fbac8403894d5c

See more details on using hashes here.

Provenance

The following attestation bundles were made for temporal_workdir-0.5.0.tar.gz:

Publisher: release.yml on saeedseyfi/temporal-workdir

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file temporal_workdir-0.5.0-py3-none-any.whl.

File metadata

File hashes

Hashes for temporal_workdir-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 0281d6f9eaad73748530efbc6a7b62e69b7f705cfbc829a43862074275ab17b4
MD5 5803a1e449266579ea75b53ad634c231
BLAKE2b-256 c2ddf3082992bd5d3b05754d436c0af0236a93202b6be5e0846ed1a94728aaf5

See more details on using hashes here.

Provenance

The following attestation bundles were made for temporal_workdir-0.5.0-py3-none-any.whl:

Publisher: release.yml on saeedseyfi/temporal-workdir

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page