Skip to main content

A multi-stage distributed compute tracker and job manager.

Project description

DistCompute Client Library

Discord Chat



LAION-5B Workflow Diagram

Client library for the tracker previously powering LAION's distributed compute network for filtering commoncrawl with CLIP to produce the LAION-400M and LAION-5B datasets. The previous code has now repurposed as a general-use multi-layer distributed compute tracker and job manager, with added support for a frontend web server dashboard, user leaderboards and up to 5 sequential stages of workers for each job.

Prerequisites

  • Python >= 3.7
  • Live tracker server with public URL/IP and a populated jobs database.

Installation

You can install the distcompute-client library using the following command:

pip install distcompute-client

Now, from the current directory, you can import the module:

import distcompute_client as dc

client = dc.init(url="https://tracker.example.com/", stage="a")
print(client.display_name)
print(client.project_name)

>>> "hematin-hanking-71"
>>> "LAION-5B"

Methods

distcompute_client.init(url: str, stage: str, nickname: str = "anonymous", verbose: bool = True) -> Client

Creates and returns a new client instance.

  • url: the public URL / IP address of the hosted tracker server
  • stage: the stage for this worker's specific task (a/b/c/d/e)
    • For example, for creating a web-scale dataset, you could use stage "a" for scraping web content, "b" for downloading scraped content, "c" for filtering downloaded content, etc.
    • The output data created from each stage of the cycle is the input given to the next stage's workers.
    • If you would like a linear input -[worker]-> output workflow, only stage "a" should be enabled in the tracker.
  • nickname: provides a machine/user/company-level identifier for this client (default: "anonymous")
    • E.g. "John Doe", "AWS Pod 3" or "LAION".
    • This feature was during the creation of LAION-400M to reward people with their names & contributions on the leaderboard, but can also be used for any general purpose, such as monitoring workers distributed over different pods on AWS.
  • verbose: enable console messages (default: true)

Client Reference

import distcompute_client as dc
import time

client = dc.init(
    url="https://example.com/",
    stage="b",
    nickname="Cluster 2 on AWS",
    verbose=True
)

while client.is_alive():
    # Wait for new jobs to appear
    if client.job_count() == 0:
        time.sleep(30)
        continue

    client.new_job()
    job_data = client.job # Could be a str/list/dict, depending on what is set by the tracker for stage A, or worker scripts for later stages.
    job_id = client.job_id

    while doing_work:
        # ... process data

        client.log("Analysed x / y images") # Updates the worker's progress to the server

    # Report data as invalid to the tracker, look for a new job.
    if some_error:
        client.flag_invalid_data()
        continue

    # This becomes input for workers operating at the next stage, "c".
    output = {"file": "s3://job_12345.tar", "total_scraped": 123}

    client.complete_job(output)

# Disconnect from tracker.
client.bye()

Client.job_count() -> int

Returns the number of open jobs at the same stage as the client.

  • Note: As jobs are dynamically created, there may be periods of time when your workers don't have any open jobs to fufil. Therefore, you can make use of Client.job_count() to detect these periods of inactivity.

Client.new_job() -> None

Retrieves a new job from the tracker, storing data as class attributes (see below).

  • raises a distcompute_client.errors.ZeroJobError when there are no jobs open to fufil.

Client.complete_job(data: Union[str, list, dict]) -> None

Marks the current job as done to the server, and submits data to be passed as an input to workers at the next stage of the project workflow. If there are no more stages remaining, the job is closed.

  • data (required): data to be passed as an input to workers at the next stage of the project workflow, equivalent to client.job.

Client.log(progress: str) -> None

Logs the progress string progress to the server. The status of each worker can then be clearly viewed on the tracker's dashboard.

  • progress (required): The string detailing the progress, e.g. "12 / 100 (12%)"

Client.is_alive() -> bool

Returns True if the worker is still connected to the tracker, and has not timed out, otherwise returns False.

Client.flag_invalid_data() -> None

Reports the input data (client.job) made by previous worker as invalid. If this repeatedly occurs, the job is re-opened for workers at the previous stage.

Client.bye() -> None

Removes the worker instance from the server, re-opening any pending jobs.

Client Variables

Client.job: Union[str, list, dict]

Input data to be processed by the worker. Could be a str/list/dict, depending on what is set by the tracker for stage A, or worker scripts for later stages.

Client.job_id: int

The job ID set by the tracker, as an incrementing integer (job #1 = 1). Useful when naming/storing files related to each job.

Client.display_name: str

The display name for this worker on the tracker.

Client.project: str

The name of the tracker's defined project name, e.g. "LAION-5B".

Client.stage_name: str

The name of the tracker's stage in the project workflow, e.g. "CPU" or "Scraper".

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

distcompute-client-1.0.0.tar.gz (7.7 kB view details)

Uploaded Source

Built Distribution

distcompute_client-1.0.0-py3-none-any.whl (9.9 kB view details)

Uploaded Python 3

File details

Details for the file distcompute-client-1.0.0.tar.gz.

File metadata

  • Download URL: distcompute-client-1.0.0.tar.gz
  • Upload date:
  • Size: 7.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.8.16

File hashes

Hashes for distcompute-client-1.0.0.tar.gz
Algorithm Hash digest
SHA256 006ac11f7cfa0b4287260f783ac5b5210d7afab219b7161d968701412a0e73a3
MD5 5542d4529a52c088a67efce1a8ce1b51
BLAKE2b-256 555b133c2f5dd001b35fdd462f6f627b59b5658e23bae2d255f0c9addd414c39

See more details on using hashes here.

File details

Details for the file distcompute_client-1.0.0-py3-none-any.whl.

File metadata

File hashes

Hashes for distcompute_client-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e249073a1c10c28236953d6b0994888d552cf57aef0b2ce248103be31b974ea3
MD5 eae61fc03d84e01e784b908f8791b733
BLAKE2b-256 bff4270655c7fbbc5671a1d4d81e2964cc0df79c2187420291b91b4c95da370e

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page