Skip to main content

Modal serverless processing backend for atdata datasets

Project description

atdata-lambda

Modal serverless processing backend for atdata datasets.

Fan out sample-level transformations across sharded WebDataset tar files using Modal for compute and Cloudflare R2 for storage. One shard = one worker — no job queue required.

Installation

pip install atdata-lambda

Requires Python 3.12+.

Quickstart

import modal
from atdata_lambda import processor, Dispatcher

app = modal.App("my-pipeline")

@processor(app, gpu="T4", memory=8192)
def embed_images(sample: ImageSample) -> EmbeddingSample:
    embedding = model.encode(sample.image)
    return EmbeddingSample(embedding=embedding, label=sample.label)

dispatcher = Dispatcher(embed_images)
result = dispatcher.run(
    input_prefix="data/raw/train/",
    output_prefix="data/embedded/train/",
)
print(result.summary())
# Job a1b2c3d4e5f6: SUCCESS — 128/128 shards completed, 0 failed

R2 Configuration

Credentials are read from environment variables (typically injected via Modal Secrets):

Variable Description
R2_ENDPOINT_URL Cloudflare R2 S3-compatible endpoint
R2_ACCESS_KEY_ID R2 access key
R2_SECRET_ACCESS_KEY R2 secret key
R2_BUCKET Target bucket name

Or pass an R2Config explicitly:

from atdata_lambda import R2Config

config = R2Config(
    endpoint_url="https://acct.r2.cloudflarestorage.com",
    access_key_id="...",
    secret_access_key="...",
    bucket="my-bucket",
)
dispatcher = Dispatcher(embed_images, r2_config=config)

HTTP Endpoint

Expose a Modal web endpoint for event-driven dispatch (e.g. R2 bucket notifications):

from atdata_lambda import create_dispatch_endpoint

create_dispatch_endpoint(app, dispatcher)

This registers a POST endpoint accepting {"input_prefix": "...", "output_prefix": "..."}.

Architecture

Client (notebook / CLI / R2 event)
     │
     ▼
  Dispatcher ── list_shards(prefix) ──→ R2
     │
     │  .map()
     ▼
  Modal workers (1 per shard)
     │  read shard → apply @processor fn → write output shard
     │  write .manifest.json
     ▼
  R2 (output shards + manifests)

Modules

Module Responsibility
_processor.py @processor decorator — wraps user functions, extracts Packable type info, registers with Modal
_dispatcher.py Dispatcher — enumerates shards, fans out via .map(), collects JobResult
_shard_worker.py process_shard — runs inside each Modal worker, reads/writes tar shards
_manifest.py ManifestBuilder — per-shard .manifest.json generation alongside output
_config.py R2Config — R2 credentials, list_shards for shard enumeration via boto3

Public API

from atdata_lambda import (
    processor,           # decorator
    Dispatcher,          # fan-out orchestrator
    JobResult,           # aggregate result
    R2Config,            # R2 connection config
    ShardIndex,          # shard listing result
    list_shards,         # enumerate .tar files at prefix
    ManifestBuilder,     # incremental manifest construction
    ShardManifest,       # manifest dataclass
    write_manifest,      # upload manifest to R2
    ShardResult,         # per-shard processing result
    ProcessorError,      # invalid processor configuration
    create_dispatch_endpoint,  # HTTP trigger registration
)

Development

uv sync                          # install dependencies
uv run pytest                    # run tests
uv run ruff check .              # lint
uv run ruff format .             # format

License

See LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

atdata_lambda-0.1.1a4.tar.gz (10.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

atdata_lambda-0.1.1a4-py3-none-any.whl (13.7 kB view details)

Uploaded Python 3

File details

Details for the file atdata_lambda-0.1.1a4.tar.gz.

File metadata

  • Download URL: atdata_lambda-0.1.1a4.tar.gz
  • Upload date:
  • Size: 10.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for atdata_lambda-0.1.1a4.tar.gz
Algorithm Hash digest
SHA256 0a9a2377ba1c52222fd1d975534ac567e2d0d5566683e90c45f65df219f1a7fa
MD5 e45fa1c30eed08f0422372745efc3221
BLAKE2b-256 d7507c1e898a8874e00ba4701d0e510d4cfe685ee113caa20e46b5205c0acd13

See more details on using hashes here.

Provenance

The following attestation bundles were made for atdata_lambda-0.1.1a4.tar.gz:

Publisher: release.yml on forecast-bio/atdata-lambda

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file atdata_lambda-0.1.1a4-py3-none-any.whl.

File metadata

File hashes

Hashes for atdata_lambda-0.1.1a4-py3-none-any.whl
Algorithm Hash digest
SHA256 7680baad998716b1588e847b9c72ef0ec9ae43e3cff99cce7eccdf321655b965
MD5 3a394c53a97f4db05322a0f56bebc8f7
BLAKE2b-256 d8a462cc69dabc8aa19e23329b2583c3572ebcf9e36493c765218812145df75b

See more details on using hashes here.

Provenance

The following attestation bundles were made for atdata_lambda-0.1.1a4-py3-none-any.whl:

Publisher: release.yml on forecast-bio/atdata-lambda

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page