Skip to main content

Distributed deep learning for Hugging Face Transformers on Spark

Project description

Overview

img.png Welcome to Sparkformers, where we offer distributed training of Transformers models on Spark!

Motivation / Purpose

Derived from Elephas, however with HuggingFace removing support for Tensorflow, I decided to spin some of the logic off into its own separate project, and also rework the paradigm to support the Torch backend! The purpose of this project is to serve as an experimental backend for distributed training that may be more developergonomic compared to other solutions such as Ray.

Examples

Note that all examples are also available in the examples directory.

Autoregressive (Causal) Language Model Training and Inference

from datasets import load_dataset
from sklearn.model_selection import train_test_split
from sparkformers.sparkformer import SparkFormer
from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
)
import torch
batch_size = 20
epochs = 10

dataset = load_dataset("ag_news")
x = dataset["train"]["text"]

x_train, x_test = train_test_split(x, test_size=0.2)

model_name = "sshleifer/tiny-gpt2"

model = AutoModelForCausalLM.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token
tokenizer_kwargs = {
    "max_length": 15,
    "padding": True,
    "truncation": True,
    "padding_side": "left",
}

sparkformer_model = SparkFormer(
    model=model,
    tokenizer=tokenizer,
    loader=AutoModelForCausalLM,
    optimizer_fn=lambda params: torch.optim.AdamW(params, lr=5e-5),
    loss_fn=lambda: torch.nn.CrossEntropyLoss(),
    tokenizer_kwargs=tokenizer_kwargs
)

# perform distributed training
sparkformer_model.train(x_train, epochs=epochs, batch_size=batch_size)

# perform distributed generation
generations = sparkformer_model.generate(
    x_test, max_new_tokens=10, num_return_sequences=1
)
# decode the generated texts
generated_texts = [
    tokenizer.decode(output, skip_special_tokens=True) for output in generations
]

for i, text in enumerate(generated_texts):
    print(f"Original text {i}: {x_test[i]}")
    print(f"Generated text {i}: {text}")

Sequence Classification

from sklearn.datasets import fetch_20newsgroups
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sparkformers.sparkformer import SparkFormer
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
)
import numpy as np
import torch
batch_size = 20
epochs = 10

newsgroups = fetch_20newsgroups(subset="train")
x = newsgroups.data
y = newsgroups.target

encoder = LabelEncoder()
y_encoded = encoder.fit_transform(y)

x_train, x_test, y_train, y_test = train_test_split(x, y_encoded, test_size=0.5)

model_name = "albert-base-v2"

model = AutoModelForSequenceClassification.from_pretrained(
    model_name, num_labels=len(np.unique(y_encoded))
)
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer_kwargs = {"padding": True, "truncation": True}

sparkformer_model = SparkFormer(
    model=model,
    tokenizer=tokenizer,
    loader=AutoModelForSequenceClassification,
    optimizer_fn=lambda params: torch.optim.AdamW(params, lr=5e-5),
    loss_fn=lambda: torch.nn.CrossEntropyLoss(),
    tokenizer_kwargs=tokenizer_kwargs,
    num_workers=2,
)

# perform distributed training
sparkformer_model.train(x_train, y_train, epochs=epochs, batch_size=batch_size)

# perform distributed prediction
predictions = sparkformer_model.predict(x_test)

# review the predicted labels
print([np.argmax(pred) for pred in predictions])

Token Classification (NER)

from sklearn.model_selection import train_test_split
from sparkformers.sparkformer import SparkFormer
from transformers import (
    AutoTokenizer,
    AutoModelForTokenClassification,
)
from datasets import load_dataset
import numpy as np
import torch
batch_size = 5
epochs = 2
model_name = "hf-internal-testing/tiny-bert-for-token-classification"

model = AutoModelForTokenClassification.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)

def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(
        examples["tokens"], truncation=True, is_split_into_words=True
    )
    labels = []
    for i, label in enumerate(examples["ner_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            if word_idx is None:
                label_ids.append(-100)
            elif word_idx != previous_word_idx:
                label_ids.append(label[word_idx])
            else:
                label_ids.append(-100)
            previous_word_idx = word_idx
        labels.append(label_ids)
    tokenized_inputs["labels"] = labels
    return tokenized_inputs

dataset = load_dataset("conll2003", split="train[:5%]", trust_remote_code=True)
dataset = dataset.map(tokenize_and_align_labels, batched=True)

x = dataset["tokens"]
y = dataset["labels"]

x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2)

tokenizer_kwargs = {
    "padding": True,
    "truncation": True,
    "is_split_into_words": True,
}

sparkformer_model = SparkFormer(
    model=model,
    tokenizer=tokenizer,
    loader=AutoModelForTokenClassification,
    optimizer_fn=lambda params: torch.optim.AdamW(params, lr=5e-5),
    loss_fn=lambda: torch.nn.CrossEntropyLoss(),
    tokenizer_kwargs=tokenizer_kwargs,
    num_workers=2,
)

sparkformer_model.train(x_train, y_train, epochs=epochs, batch_size=batch_size)

inputs = tokenizer(x_test, **tokenizer_kwargs, return_tensors="pt")
distributed_preds = sparkformer_model(**inputs)
print([int(np.argmax(x)) for x in np.squeeze(distributed_preds)])

TODO

  • Validate both GPU and CPU are supported (Elephas supports both, just need to validate the Torch API is being used correctly)
  • Add support for distributed training of other model types (e.g., image classification, object detection, etc.)
  • Add support for distributed training of custom models
  • Consider simplifying the API further (e.g; builder pattern, providing the model string and push loader logic inside the SparkFormer class, etc.)
  • Support Tensorflow/Keras models for completeness (potentially taking similar approach as transformers where each class is prefixed with TF - it would essentially be copying the old logic from Elephas)

💡 Interested in contributing? Check out the Local Development & Contributions Guide.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sparkformers-0.0.1.tar.gz (116.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sparkformers-0.0.1-py3-none-any.whl (8.9 kB view details)

Uploaded Python 3

File details

Details for the file sparkformers-0.0.1.tar.gz.

File metadata

  • Download URL: sparkformers-0.0.1.tar.gz
  • Upload date:
  • Size: 116.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.21

File hashes

Hashes for sparkformers-0.0.1.tar.gz
Algorithm Hash digest
SHA256 bcf7ae6e2accc024f6794af37dc0ed3e1e2621565ad8642fbcef7d4013afdec9
MD5 2ea5be82c883e9b6410d61fd79bf5222
BLAKE2b-256 d48bf15729b84f2db067923750b3c8e2b361c609b1705b9ca5e35d90f6597850

See more details on using hashes here.

File details

Details for the file sparkformers-0.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for sparkformers-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 f642ea86bf2fae1081bd38bc6946b98f7e6770d02f34d59fbe6b1e5e55dcd5af
MD5 1d8d7cb112b1dce1c35e25d4877f19c7
BLAKE2b-256 20c37ff4d02f83e55f502361d68fb67d9f30d4182c9e05debb07ecdfa3bbaea6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page