Skip to main content

A simple way to synthesize LLM training data.

Project description

BlossomData

GitHub License PyPI - Version Ask DeepWiki

BlossomData是一个专为大模型训练打造的数据处理框架,旨在提供灵活、高效的数据处理解决方案。它内置丰富的算子库,支持不同模态、不同训练阶段的数据处理与合成。能够从单机环境无缝迁移到分布式环境,允许用户灵活扩展自定义算子,从而大幅降低数据处理流程的开发和计算成本。

使用示例

pip install blossom-data
# 从源码安装
# pip install git+https://github.com/Azure99/BlossomData.git

在使用之前,请在config.yaml文件中配置模型服务提供商的API密钥和相关参数。(可参考config.yaml.example

第一个数据合成任务

下面是一个非常实用的示例,仅依赖数学题目和参考答案,即可合成经过验证的长推理中文训练数据。

框架提供了大量内置算子,可以在blossom.op中查看。例如,当原始数据缺失答案时,可以使用ChatDistiller生成回答,然后通过ChatMultiReasoningFilter基于投票方式过滤掉潜在错误样本。

from blossom import *
from blossom.op import *
from blossom.schema import *

# 示例数学指令数据
data = [
    ChatSchema(
        messages=[
            user("Suppose that $wz = 12-8i$, and $|w| = \\sqrt{13}$. What is $|z|$?"),
            assistant("4"),
        ]
    ),
]

# 定义要使用的算子
ops = [
    # 利用非推理模型将指令数据翻译为中文
    ChatTranslator(model="deepseek-chat", target_language="Chinese"),
    # 使用推理模型生成问题的回答,并使用非推理模型验证回答正确性
    ChatVerifyDistiller(
        model="deepseek-reasoner",
        mode=ChatVerifyDistiller.Mode.LLM,
        validation_model="deepseek-chat",
    ),
    # 将reasoning_content合并到content中,以便用于训练
    ChatReasoningContentMerger(),
]

dataset = create_dataset(data)
result = dataset.execute(ops).collect()
print(result)

框架核心概念

Schema

Schema是框架中的基础数据结构,用于表示和处理不同类型的数据。所有Schema类型都继承自基础Schema类,提供了统一的接口和功能。

# 文本数据
text_data = TextSchema(content="这是一段文本内容")

# 对话数据
chat_data = ChatSchema(
    messages=[
        user("你好"),
        assistant("你好,请问需要什么帮助?")
    ]
)

# 结构化数据
row_data = RowSchema(data={"name": "张三", "age": 30, "score": 95})

# 自定义数据
custom_data = CustomSchema(data=1)

每个 Schema 实例都包含以下通用字段:

  • id: 唯一标识符
  • type: Schema类型标识
  • failed: 处理失败标志
  • metadata: dict类型的附加元数据

DataFrame与Dataset

DataFrame是对数据的抽象表示,提供了对数据进行转换、过滤和聚合的接口。框架支持多种DataFrame实现,包括Local、Spark和Ray,使得同一套代码可以在不同的执行环境中运行。

Dataset是对DataFrame的高级封装,提供了更加便捷的接口和额外的功能,特别是对算子的支持。Dataset是用户交互的主要接口,隐藏了底层执行引擎的复杂性。

# 创建数据集
dataset = create_dataset(data, engine=DatasetEngine.LOCAL)
# 或从文件加载,并指定执行引擎为Spark
dataset = load_dataset("/path/to/data.json", engine=DatasetEngine.SPARK)

# Dataset 提供丰富的数据处理接口
dataset = (
    # 对每个元素应用转换函数
    dataset.map(lambda x: TextSchema(content=x.content.upper()))
    # 过滤数据
    .filter(lambda x: len(x.content) > 10)
    # 批量处理数据
    .transform(lambda items: [item for item in items if "关键词" in item.content])
    # 排序
    .sort(lambda x: len(x.content))
    # 限制数量
    .limit(100)
    # 随机打乱
    .shuffle()
    # 重新分区
    .repartition(8)
    # 执行一系列算子
    .execute([
        TextContentFilter(min_length=100),
        TextTranslator(model="gpt-4o-mini", target_language="Chinese")
    ])
)

# 聚合操作
result = dataset.aggregate(
    Sum(lambda x: x["score"]),
    Mean(lambda x: x["score"]),
    Count()
)

# 分组操作
grouped = dataset.group_by(lambda x: x["category"])
group_stats = grouped.aggregate(Count(), Mean(lambda x: x["score"]))

# 收集结果
results = dataset.collect()
# 写入文件
dataset.write_json("/path/to/output.json")

Operator

Operator(算子)是数据处理的核心单元,封装了特定的数据处理逻辑。框架提供了三种基本类型的算子:

  1. MapOperator: 一对一映射,对每个元素单独处理
  2. FilterOperator: 过滤操作,决定保留或删除元素
  3. TransformOperator: 批量处理,可以对多个元素同时操作
# 使用装饰器定义自定义算子
@map_operator()
def uppercase_text(item):
    if isinstance(item, TextSchema):
        return TextSchema(content=item.content.upper())
    return item

@filter_operator()
def filter_short_text(item):
    if isinstance(item, TextSchema):
        return len(item.content) > 10
    return True

@transform_operator()
def batch_process(items):
    # 批量处理逻辑
    return [item for item in items if "关键词" in item.content]

# 使用上下文调用模型的算子
@context_map_operator(parallel=4)
def translate_with_model(context, item):
    if isinstance(item, TextSchema):
        result = context.chat_completion(
            "gpt-4o-mini", 
            [user(f"Translate to Chinese: {item.content}")]
        )
        return TextSchema(content=result)
    return item

Context

Context(上下文)提供了算子执行所需的环境和资源,包括配置信息和模型提供者的访问。它是算子与外部资源交互的桥梁,使得算子可以访问模型服务、配置参数等。

# 创建上下文
context = Context()

# 访问配置
config = context.get_config()

# 获取模型提供者
provider = context.get_model("gpt-4o-mini")

# 使用模型生成内容
response = context.chat_completion(
    "gpt-4o-mini",
    [user("你好,请问今天天气如何?")]
)

# 生成嵌入向量
embedding = context.embedding("text-embedding-3-small", "这是一段文本")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

blossom_data-0.4.2.tar.gz (41.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

blossom_data-0.4.2-py3-none-any.whl (64.5 kB view details)

Uploaded Python 3

File details

Details for the file blossom_data-0.4.2.tar.gz.

File metadata

  • Download URL: blossom_data-0.4.2.tar.gz
  • Upload date:
  • Size: 41.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.1 CPython/3.12.4 Linux/6.11.0-25-generic

File hashes

Hashes for blossom_data-0.4.2.tar.gz
Algorithm Hash digest
SHA256 47607efed7b187adee374e97395b6d2ebe4b95191f9d6836a8dba7fafb201059
MD5 80609dca5fd03318601a7886f5ad73b1
BLAKE2b-256 e2fc3eeb83cdda136aa7e1c54074a215e18f52ad2055caee8e8cabe760bca011

See more details on using hashes here.

File details

Details for the file blossom_data-0.4.2-py3-none-any.whl.

File metadata

  • Download URL: blossom_data-0.4.2-py3-none-any.whl
  • Upload date:
  • Size: 64.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.1 CPython/3.12.4 Linux/6.11.0-25-generic

File hashes

Hashes for blossom_data-0.4.2-py3-none-any.whl
Algorithm Hash digest
SHA256 87c4c673ca6ad1de73589f9b3f77579ebae23ce41c2e465d177787757335ee8e
MD5 018c721b469aaa674285a1ef63a8d4cd
BLAKE2b-256 433d9cc0cffabbcc226e257f31ea7b137ca51cd0ed56e8eb5335cbeeefc89950

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page