Distributed deep learning for Hugging Face Transformers on Spark
Project description
Overview
Welcome to Sparkformers, where we offer distributed training of Transformers models on Spark!
Motivation / Purpose
Derived from Elephas, however with HuggingFace removing support for Tensorflow, I decided to spin some of the logic off into its own separate project, and also rework the paradigm to support the Torch backend! The purpose of this project is to serve as an experimental backend for distributed training that may be more developergonomic compared to other solutions such as Ray.
Examples
Note that all examples are also available in the examples directory.
Autoregressive (Causal) Language Model Training and Inference
from datasets import load_dataset
from sklearn.model_selection import train_test_split
from sparkformers.sparkformer import SparkFormer
from transformers import (
AutoTokenizer,
AutoModelForCausalLM,
)
import torch
batch_size = 20
epochs = 10
dataset = load_dataset("ag_news")
x = dataset["train"]["text"]
x_train, x_test = train_test_split(x, test_size=0.2)
model_name = "sshleifer/tiny-gpt2"
model = AutoModelForCausalLM.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token
tokenizer_kwargs = {
"max_length": 15,
"padding": True,
"truncation": True,
"padding_side": "left",
}
sparkformer_model = SparkFormer(
model=model,
tokenizer=tokenizer,
loader=AutoModelForCausalLM,
optimizer_fn=lambda params: torch.optim.AdamW(params, lr=5e-5),
loss_fn=lambda: torch.nn.CrossEntropyLoss(),
tokenizer_kwargs=tokenizer_kwargs
)
# perform distributed training
sparkformer_model.train(x_train, epochs=epochs, batch_size=batch_size)
# perform distributed generation
generations = sparkformer_model.generate(
x_test, max_new_tokens=10, num_return_sequences=1
)
# decode the generated texts
generated_texts = [
tokenizer.decode(output, skip_special_tokens=True) for output in generations
]
for i, text in enumerate(generated_texts):
print(f"Original text {i}: {x_test[i]}")
print(f"Generated text {i}: {text}")
Sequence Classification
from datasets import load_dataset
from sklearn.model_selection import train_test_split
from torch import softmax
from sparkformers.sparkformer import SparkFormer
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
)
import numpy as np
import torch
batch_size = 16
epochs = 20
dataset = load_dataset("ag_news")
x = dataset["train"]["text"][:2000] # ty: ignore[possibly-unbound-implicit-call]
y = dataset["train"]["label"][:2000] # ty: ignore[possibly-unbound-implicit-call]
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.1)
model_name = "prajjwal1/bert-tiny"
model = AutoModelForSequenceClassification.from_pretrained(
model_name,
num_labels=len(np.unique(y)),
problem_type="single_label_classification",
)
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer_kwargs = {"padding": True, "truncation": True, "max_length": 512}
sparkformer_model = SparkFormer(
model=model,
tokenizer=tokenizer,
loader=AutoModelForSequenceClassification,
optimizer_fn=lambda params: torch.optim.AdamW(params, lr=2e-4),
tokenizer_kwargs=tokenizer_kwargs,
num_workers=2,
)
# perform distributed training
sparkformer_model.train(x_train, y_train, epochs=epochs, batch_size=batch_size)
# perform distributed inference
predictions = sparkformer_model.predict(x_test)
for i, pred in enumerate(predictions[:10]):
probs = softmax(torch.tensor(pred), dim=-1)
print(f"Example {i}: probs={probs.numpy()}, predicted={probs.argmax().item()}")
# review the predicted labels
print([int(np.argmax(pred)) for pred in predictions])
Token Classification (NER)
from sklearn.model_selection import train_test_split
from sparkformers.sparkformer import SparkFormer
from transformers import (
AutoTokenizer,
AutoModelForTokenClassification,
)
from datasets import load_dataset
import numpy as np
import torch
batch_size = 5
epochs = 1
model_name = "hf-internal-testing/tiny-bert-for-token-classification"
model = AutoModelForTokenClassification.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)
def tokenize_and_align_labels(examples):
tokenized_inputs = tokenizer(
examples["tokens"], truncation=True, is_split_into_words=True
)
labels = []
for i, label in enumerate(examples["ner_tags"]):
word_ids = tokenized_inputs.word_ids(batch_index=i)
previous_word_idx = None
label_ids = []
for word_idx in word_ids:
if word_idx is None:
label_ids.append(-100)
elif word_idx != previous_word_idx:
label_ids.append(label[word_idx])
else:
label_ids.append(-100)
previous_word_idx = word_idx
labels.append(label_ids)
tokenized_inputs["labels"] = labels
return tokenized_inputs
dataset = load_dataset("conll2003", split="train[:5%]", trust_remote_code=True)
dataset = dataset.map(tokenize_and_align_labels, batched=True)
x = dataset["tokens"] # ty: ignore[possibly-unbound-implicit-call]
y = dataset["labels"] # ty: ignore[possibly-unbound-implicit-call]
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.1)
tokenizer_kwargs = {
"padding": True,
"truncation": True,
"is_split_into_words": True,
}
sparkformer_model = SparkFormer(
model=model,
tokenizer=tokenizer,
loader=AutoModelForTokenClassification,
optimizer_fn=lambda params: torch.optim.AdamW(params, lr=5e-5),
tokenizer_kwargs=tokenizer_kwargs,
num_workers=2,
)
sparkformer_model.train(x_train, y_train, epochs=epochs, batch_size=batch_size)
inputs = tokenizer(x_test, **tokenizer_kwargs)
distributed_preds = sparkformer_model(**inputs)
print([int(np.argmax(x)) for x in np.squeeze(distributed_preds)])
TODO
- Validate both GPU and CPU are supported (Elephas supports both, just need to validate the Torch API is being used correctly)
- Add support for distributed training of other model types (e.g., image classification, object detection, etc.)
- Add support for distributed training of custom models
- Consider simplifying the API further (e.g; builder pattern, providing the model string and push loader logic inside the
SparkFormerclass, etc.) - Support Tensorflow/Keras models for completeness (potentially taking similar approach as
transformerswhere each class is prefixed withTF- it would essentially be copying the old logic from Elephas)
💡 Interested in contributing? Check out the Local Development & Contributions Guide.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file sparkformers-0.0.3.tar.gz.
File metadata
- Download URL: sparkformers-0.0.3.tar.gz
- Upload date:
- Size: 117.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.5.21
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ebf89a0e937fcc76d9c18ded4a77cbade555906adaa93fdf04e46fe55ead59e0
|
|
| MD5 |
5e1cfc08d2319a1b3b2773ed59222247
|
|
| BLAKE2b-256 |
1715842f70e1bb4bcf046844c404710997f4b04035d1fa3be9fc1c295b8ccfd8
|
File details
Details for the file sparkformers-0.0.3-py3-none-any.whl.
File metadata
- Download URL: sparkformers-0.0.3-py3-none-any.whl
- Upload date:
- Size: 9.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.5.21
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6368f5bb0fefe76aba3b86439416a4ca5d79f5a7b7e0bdf768a2b1b783aedc5b
|
|
| MD5 |
585474da49c8cfc6d9c1553adfeae183
|
|
| BLAKE2b-256 |
ca62fd6d5f5223dedbb3e7ede43d0ead3b27eb2effd80a4235888ce3783c353b
|