Skip to main content

Modal serverless processing backend for atdata datasets

Project description

atdata-lambda

Modal serverless processing backend for atdata datasets.

Fan out sample-level transformations across sharded WebDataset tar files using Modal for compute and Cloudflare R2 for storage. One shard = one worker — no job queue required.

Installation

pip install atdata-lambda

Requires Python 3.12+.

Quickstart

import modal
from atdata_lambda import processor, Dispatcher

app = modal.App("my-pipeline")

@processor(app, gpu="T4", memory=8192)
def embed_images(sample: ImageSample) -> EmbeddingSample:
    embedding = model.encode(sample.image)
    return EmbeddingSample(embedding=embedding, label=sample.label)

dispatcher = Dispatcher(embed_images)
result = dispatcher.run(
    input_prefix="data/raw/train/",
    output_prefix="data/embedded/train/",
)
print(result.summary())
# Job a1b2c3d4e5f6: SUCCESS — 128/128 shards completed, 0 failed

R2 Configuration

Credentials are read from environment variables (typically injected via Modal Secrets):

Variable Description
R2_ENDPOINT_URL Cloudflare R2 S3-compatible endpoint
R2_ACCESS_KEY_ID R2 access key
R2_SECRET_ACCESS_KEY R2 secret key
R2_BUCKET Target bucket name

Or pass an R2Config explicitly:

from atdata_lambda import R2Config

config = R2Config(
    endpoint_url="https://acct.r2.cloudflarestorage.com",
    access_key_id="...",
    secret_access_key="...",
    bucket="my-bucket",
)
dispatcher = Dispatcher(embed_images, r2_config=config)

HTTP Endpoint

Expose a Modal web endpoint for event-driven dispatch (e.g. R2 bucket notifications):

from atdata_lambda import create_dispatch_endpoint

create_dispatch_endpoint(app, dispatcher)

This registers a POST endpoint accepting {"input_prefix": "...", "output_prefix": "..."}.

Architecture

Client (notebook / CLI / R2 event)
     │
     ▼
  Dispatcher ── list_shards(prefix) ──→ R2
     │
     │  .map()
     ▼
  Modal workers (1 per shard)
     │  read shard → apply @processor fn → write output shard
     │  write .manifest.json
     ▼
  R2 (output shards + manifests)

Modules

Module Responsibility
_processor.py @processor decorator — wraps user functions, extracts Packable type info, registers with Modal
_dispatcher.py Dispatcher — enumerates shards, fans out via .map(), collects JobResult
_shard_worker.py process_shard — runs inside each Modal worker, reads/writes tar shards
_manifest.py ManifestBuilder — per-shard .manifest.json generation alongside output
_config.py R2Config — R2 credentials, list_shards for shard enumeration via boto3

Public API

from atdata_lambda import (
    processor,           # decorator
    Dispatcher,          # fan-out orchestrator
    JobResult,           # aggregate result
    R2Config,            # R2 connection config
    ShardIndex,          # shard listing result
    list_shards,         # enumerate .tar files at prefix
    ManifestBuilder,     # incremental manifest construction
    ShardManifest,       # manifest dataclass
    write_manifest,      # upload manifest to R2
    ShardResult,         # per-shard processing result
    ProcessorError,      # invalid processor configuration
    create_dispatch_endpoint,  # HTTP trigger registration
)

Development

uv sync                          # install dependencies
uv run pytest                    # run tests
uv run ruff check .              # lint
uv run ruff format .             # format

License

See LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

atdata_lambda-0.1.1a2.tar.gz (9.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

atdata_lambda-0.1.1a2-py3-none-any.whl (12.6 kB view details)

Uploaded Python 3

File details

Details for the file atdata_lambda-0.1.1a2.tar.gz.

File metadata

  • Download URL: atdata_lambda-0.1.1a2.tar.gz
  • Upload date:
  • Size: 9.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for atdata_lambda-0.1.1a2.tar.gz
Algorithm Hash digest
SHA256 f4662a79b1f906e20f735ca1ca4158e32c70301439fb344355c6612a37e78936
MD5 27518e9c65d9a0362828827f2d69b582
BLAKE2b-256 667fc209d7cdab3d963f64de298bd28029c7e64a24c5e5defac05de2fe21dc7f

See more details on using hashes here.

Provenance

The following attestation bundles were made for atdata_lambda-0.1.1a2.tar.gz:

Publisher: release.yml on forecast-bio/atdata-lambda

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file atdata_lambda-0.1.1a2-py3-none-any.whl.

File metadata

File hashes

Hashes for atdata_lambda-0.1.1a2-py3-none-any.whl
Algorithm Hash digest
SHA256 1d00150e3623f7405a2bc7ab253d4a7b70fa42de0a088bf67e944f7ee33ee972
MD5 4f6cd3d1542db8956fb665eb33f8782e
BLAKE2b-256 a871945a12b2b4a37a6bfb995db66e57c5fa989b4e187321b7aec820978e3cf7

See more details on using hashes here.

Provenance

The following attestation bundles were made for atdata_lambda-0.1.1a2-py3-none-any.whl:

Publisher: release.yml on forecast-bio/atdata-lambda

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page