Skip to main content

A simple way to synthesize LLM training data.

Project description

BlossomData

中文 | English

GitHub License PyPI - Version Ask DeepWiki

BlossomData是一个专为大模型训练打造的数据处理框架,旨在提供灵活、高效的数据处理解决方案。它内置丰富的算子库,支持不同模态、不同训练阶段的数据处理与合成。能够从单机环境无缝迁移到分布式环境,允许用户灵活扩展自定义算子,从而大幅降低数据处理流程的开发和计算成本。

使用示例

pip install blossom-data
# 从源码安装
# pip install git+https://github.com/Azure99/BlossomData.git

在使用之前,请在config.yaml文件中配置模型服务提供商的API密钥和相关参数。(可参考config.yaml.example

第一个数据合成任务

下面是一个非常实用的示例,仅依赖数学题目和参考答案,即可合成经过验证的长推理中文训练数据。

框架提供了大量内置算子,可以在blossom.op中查看。例如,当原始数据缺失答案时,可以使用ChatDistiller生成回答,然后通过ChatMultiReasoningFilter基于投票方式过滤掉潜在错误样本。

from blossom import *
from blossom.dataframe import *
from blossom.op import *
from blossom.schema import *

# 示例数学指令数据
data = [
    ChatSchema(
        messages=[
            user("Suppose that $wz = 12-8i$, and $|w| = \\sqrt{13}$. What is $|z|$?"),
            assistant("4"),
        ]
    ),
]

# 定义要使用的算子
ops = [
    # 利用非推理模型将指令数据翻译为中文
    ChatTranslator(model="deepseek-chat", target_language="Chinese"),
    # 使用推理模型生成问题的回答,并使用非推理模型验证回答正确性
    ChatVerifyDistiller(
        model="deepseek-reasoner",
        mode=ChatVerifyDistiller.Mode.LLM,
        validation_model="deepseek-chat",
    ),
    # 将reasoning_content合并到content中,以便用于训练
    ChatReasoningContentMerger(),
]

dataset = create_dataset(data)
result = dataset.execute(ops).collect()
print(result)

框架核心概念

Schema

Schema是框架中的基础数据结构,用于表示和处理不同类型的数据。所有Schema都继承自基础Schema类,提供了统一的接口和功能。

# 文本数据
text_data = TextSchema(content="这是一段文本内容")

# 对话数据
chat_data = ChatSchema(
    messages=[
        user("你好"),
        assistant("你好,请问需要什么帮助?")
    ]
)

# 结构化数据
row_data = RowSchema(data={"name": "张三", "age": 30, "score": 95})

# 自定义数据
custom_data = CustomSchema(data=1)

每个 Schema 实例都包含以下通用字段:

  • id: 唯一标识符
  • type: Schema类型标识
  • failed: 处理失败标志
  • failure_reason: 失败原因,当failed=True时记录具体的失败信息
  • metadata: dict类型的附加元数据

DataFrame与Dataset

DataFrame是对数据的抽象表示,提供了对数据进行转换、过滤和聚合的接口。框架支持多种DataFrame实现,包括Local、Spark和Ray,使得同一套代码可以在不同的执行引擎中运行。

Dataset是对DataFrame的高级封装,提供了更加便捷的接口和额外的功能,特别是对算子的支持。Dataset是用户交互的主要接口,隐藏了底层执行引擎的复杂性。

为保证数据的类型和结构一致,框架推荐使用强类型Schema。对于已有数据集,可通过实现自定义DataHandler来定制不同格式数据的加载与保存逻辑。

# 创建数据集
dataset = create_dataset(data, engine=DatasetEngine.LOCAL)
# 或从文件系统加载,并指定执行引擎为Ray,使用默认的数据加载器
dataset = load_dataset(
    "example/data/chat.jsonl",
    engine=DatasetEngine.RAY,
    data_handler=DefaultDataHandler(),
)
dataset = (
    # 对数据重新分区
    dataset.repartition(2)
    # 一对一映射, 取第一轮对话的用户prompt并转换为文本数据
    # .map(lambda x: TextSchema(content=x.messages[0].content))
    # 过滤出语言为英文的对话
    .filter(lambda x: x.metadata["language"] == "en")
    # 排序,按反馈分数降序
    .sort(lambda x: x.metadata["feedback"], ascending=False)
    # 新增元信息,基于对话内容计算总上下文长度
    .add_metadata(
        lambda x: {
            "context_length": sum(len(message.content) for message in x.messages)
        }
    )
    # 最多保留4条数据
    .limit(4)
    # 执行一系列算子
    .execute(
        [
            # 翻译算子,将数据翻译为中文
            ChatTranslator(model="gpt-4o-mini", target_language="Chinese"),
            # 蒸馏算子,将翻译后的数据使用模型重新生成回复,并发为4
            ChatDistiller(model="gpt-4o-mini", parallel=4),
        ]
    )
    # 缓存数据集,避免后续多个分支操作的重复计算
    .cache()
)

# 收集结果(数据量较大时,请使用write_json)
results = dataset.collect()
# 将结果写入文件
dataset.write_json("output.jsonl")

# 聚合操作
agg_result = dataset.aggregate(
    Sum(lambda x: x.metadata["context_length"]),
    Mean(lambda x: x.metadata["context_length"]),
    Count(),
)

# 分组后聚合
grouped_count = dataset.group_by(lambda x: x.metadata["country"]).count().collect()

print(results)
print(agg_result)
print(grouped_count)

Operator

Operator(算子)是数据处理的核心单元,封装了特定的数据处理逻辑。框架提供了三种基本类型的算子:

  1. MapOperator: 一对一映射,对每个元素单独处理
  2. FilterOperator: 过滤操作,决定保留或删除元素
  3. TransformOperator: 多对多映射,可以对多个元素同时操作
from blossom.op import *
from blossom.util import loads_markdown_first_json

# 使用装饰器定义自定义算子,第一个参数为元素
@map_operator()
def uppercase_text(item):
    item.content = item.content.upper()
    return item

@filter_operator()
def filter_short_text(item):
    return len(item.content) > 10

@transform_operator()
def batch_process(items):
    return [item for item in items if "keyword" in item.content]

# 使用上下文调用模型的算子,名称以context开头,第一个参数为上下文,第二个参数为元素
# 传入parallel以指定并发数
@context_map_operator(parallel=4)
def translate_with_model(context, item):
    result = context.chat_completion(
        "gpt-4o-mini", 
        [user(f"Translate to Chinese: {item.content}")]
    )
    return TextSchema(content=result)

# 继承Operator实现自定义算子
# 对于map算子,需要实现process_item方法,通过self.context访问上下文
class SelfQA(MapOperator):
    def process_item(self, item):
        self_qa_prompt = (
            "基于给定的文本,随意生成一个问题以及对应的长答案。\n"
            "你的输出应该是一个json,包含question、answer两个字符串字段,不需要输出任何其他的无关解释。\n"
            f"给定的文本:{item.content}"
        )
        raw_result = self.context.chat_completion("gpt-4o-mini", [user(self_qa_prompt)])
        result = loads_markdown_first_json(raw_result)
        return ChatSchema(
            messages=[
                user(result["question"]),
                assistant(result["answer"]),
            ]
        )

Context

Context(上下文)提供了算子执行所需的环境和资源,包括配置信息和模型提供者的访问。它是算子与外部资源交互的桥梁,使得算子可以访问模型服务、配置参数等。

# 创建上下文
context = Context()

# 访问配置
config = context.get_config()

# 获取模型提供者
provider = context.get_model("gpt-4o-mini")

# 使用模型生成内容
response = context.chat_completion(
    "gpt-4o-mini",
    [user("你好,请问今天天气如何?")]
)

# 生成嵌入向量
embedding = context.embedding("text-embedding-3-small", "这是一段文本")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

blossom_data-0.4.3.tar.gz (43.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

blossom_data-0.4.3-py3-none-any.whl (66.1 kB view details)

Uploaded Python 3

File details

Details for the file blossom_data-0.4.3.tar.gz.

File metadata

  • Download URL: blossom_data-0.4.3.tar.gz
  • Upload date:
  • Size: 43.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.1 CPython/3.12.4 Linux/6.11.0-26-generic

File hashes

Hashes for blossom_data-0.4.3.tar.gz
Algorithm Hash digest
SHA256 db620772d01891b307a5c978bc7d258d3f8b55fb50d595cc795c233e8637fcb1
MD5 677ea42e046a8f59d15c492cd04daa3f
BLAKE2b-256 f3fd49c353c5c10c4c79926646206f9a55a41f4ec46660311c2df814b76aacf9

See more details on using hashes here.

File details

Details for the file blossom_data-0.4.3-py3-none-any.whl.

File metadata

  • Download URL: blossom_data-0.4.3-py3-none-any.whl
  • Upload date:
  • Size: 66.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.1 CPython/3.12.4 Linux/6.11.0-26-generic

File hashes

Hashes for blossom_data-0.4.3-py3-none-any.whl
Algorithm Hash digest
SHA256 9386be7645a0104f7201c60b9d8d54c4efac572dc56b153c236744acc754475f
MD5 403c5276d51f350dadb8b9408047c44e
BLAKE2b-256 90eadfb6593152ce299f7eaf4f52c843583447421ba561740850c33016ac2e42

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page