Skip to main content

Wrangle unstructured AI data at scale

Project description

PyPI Python Version Codecov Tests

AI 🔗 DataChain

DataChain is an open-source Python library for processing and curating unstructured data at scale.

🤖 AI-Driven Data Curation: Use local ML models, LLM APIs calls to enrich your data.

🚀 GenAI Dataset scale: Handle 10s of milions of files or file snippets.

🐍 Python-friendly: Use strictly typed Pydantic objects instead of JSON.

To ensure efficiency, Datachain supports parallel processing, parallel data downloads, and out-of-memory computing. It excels at optimizing batch operations. While most GenAI tools focus on online applications and realtime, DataChain is designed for offline data processing, data curation and ETL.

The typical use cases are Computer Vision data curation, LLM analytics and validation.

$ pip install datachain

DataChain FlowChart

Quick Start

Basic evaluation

We will evaluate chatbot dialogs stored as text files in Google Cloud Storage - 50 files total in the example. These dialogs involve users looking for better wireless plans chatting with bot. Our goal is to identify successful dialogs.

The data used in the examples is publicly available. Please feel free to run this code.

First, we’ll use a simple sentiment analysis model. Please install transformers.

pip install transformers

The code below downloads files the cloud, applies function is_positive_dialogue_ending() to each. All files with a positive sentiment are copied to local directory output/.

from transformers import pipeline
from datachain import DataChain, Column

classifier = pipeline("sentiment-analysis", device="cpu",
                model="distilbert/distilbert-base-uncased-finetuned-sst-2-english")

def is_positive_dialogue_ending(file) -> bool:
    dialogue_ending = file.read()[-512:]
    return classifier(dialogue_ending)[0]["label"] == "POSITIVE"

chain = (
   DataChain.from_storage("gs://datachain-demo/chatbot-KiT/",
                          object_name="file", type="text")
   .settings(parallel=8, cache=True)
   .map(is_positive=is_positive_dialogue_ending)
   .save("file_response")
)

positive_chain = chain.filter(Column("is_positive") == True)
positive_chain.export_files("./output1")

print(f"{positive_chain.count()} files were exported")

13 files were exported

$ ls output/datachain-demo/chatbot-KiT/
15.txt 20.txt 24.txt 27.txt 28.txt 29.txt 33.txt 37.txt 38.txt 43.txt ...
$ ls output/datachain-demo/chatbot-KiT/ | wc -l
13

LLM judging LLMs dialogs

Finding good dialogs using an LLM can be more efficient. In this example, we use Mistral with a free API. Please install the package and get a free Mistral API key at https://console.mistral.ai

$ pip install mistralai
$ export MISTRAL_API_KEY=_your_key_

Below is a similar code example, but this time using an LLM to evaluate the dialogs. Note, only 4 threads were used in this example parallel=4 due to a limitation of the free LLM service.

from mistralai.client import MistralClient
from mistralai.models.chat_completion import ChatMessage
from datachain import File, DataChain, Column

PROMPT = "Was this dialog successful? Answer in a single word: Success or Failure."

def eval_dialogue(file: File) -> bool:
     client = MistralClient()
     response = client.chat(
         model="open-mixtral-8x22b",
         messages=[ChatMessage(role="system", content=PROMPT),
                   ChatMessage(role="user", content=file.read())])
     result = response.choices[0].message.content
     return result.lower().startswith("success")

chain = (
   DataChain.from_storage("gs://datachain-demo/chatbot-KiT/", object_name="file")
   .settings(parallel=4, cache=True)
   .map(is_success=eval_dialogue)
   .save("mistral_files")
)

successful_chain = chain.filter(Column("is_success") == True)
successful_chain.export_files("./output_mistral")

print(f"{successful_chain.count()} files were exported")

With the current prompt, we found 31 files considered successful dialogs:

$ ls output_mistral/datachain-demo/chatbot-KiT/
1.txt  15.txt 18.txt 2.txt  22.txt 25.txt 28.txt 33.txt 37.txt 4.txt  41.txt ...
$ ls output_mistral/datachain-demo/chatbot-KiT/ | wc -l
31

Serializing Python-objects

LLM responses contain valuable information for analytics, such as tokens used and the model. Preserving this information can be beneficial.

Instead of extracting this information from the Mistral data structure (class ChatCompletionResponse), we serialize the entire Python object to the internal DB.

from mistralai.client import MistralClient
from mistralai.models.chat_completion import ChatMessage, ChatCompletionResponse
from datachain import File, DataChain, Column

PROMPT = "Was this dialog successful? Answer in a single word: Success or Failure."

def eval_dialog(file: File) -> ChatCompletionResponse:
     client = MistralClient()
     return client.chat(
         model="open-mixtral-8x22b",
         messages=[ChatMessage(role="system", content=PROMPT),
                   ChatMessage(role="user", content=file.read())])

chain = (
   DataChain.from_storage("gs://datachain-demo/chatbot-KiT/", object_name="file")
   .settings(parallel=4, cache=True)
   .map(response=eval_dialog)
   .map(status=lambda response: response.choices[0].message.content.lower()[:7])
   .save("response")
)

chain.select("file.name", "status", "response.usage").show(5)

success_rate = chain.filter(Column("status") == "success").count() / chain.count()
print(f"{100*success_rate:.1f}% dialogs were successful")

Output:

     file   status      response     response          response
     name                  usage        usage             usage
                   prompt_tokens total_tokens completion_tokens
0   1.txt  success           547          548                 1
1  10.txt  failure          3576         3578                 2
2  11.txt  failure           626          628                 2
3  12.txt  failure          1144         1182                38
4  13.txt  success          1100         1101                 1

[Limited by 5 rows]
64.0% dialogs were successful

Complex Python data structures

In the previous examples, a few dataset were saved in the embedded database (SQLite in directory .datachain). These datasets are versioned, and can be accessed using DataChain.from_dataset(“dataset_name”).

chain = DataChain.from_dataset("response")

# Iterating one-by-one: out of memory
for file, response in chain.limit(5).collect("file", "response"):
    # You work with Python objects
    assert isinstance(response, ChatCompletionResponse)

    status = response.choices[0].message.content[:7]
    tokens = response.usage.total_tokens
    print(f"{file.get_uri()}: {status}, file size: {file.size}, tokens: {tokens}")

Output:

gs://datachain-demo/chatbot-KiT/1.txt: Success, file size: 1776, tokens: 548
gs://datachain-demo/chatbot-KiT/10.txt: Failure, file size: 11576, tokens: 3578
gs://datachain-demo/chatbot-KiT/11.txt: Failure, file size: 2045, tokens: 628
gs://datachain-demo/chatbot-KiT/12.txt: Failure, file size: 3833, tokens: 1207
gs://datachain-demo/chatbot-KiT/13.txt: Success, file size: 3657, tokens: 1101

Vectorized analytics over Python objects

Some operations can be efficiently run inside the DB without deserializing Python objects. Let’s calculate the cost of using LLM APIs in a vectorized way. Mistral calls cost $2 per 1M input tokens and $6 per 1M output tokens:

chain = DataChain.from_dataset("mistral_dataset")

cost = chain.sum("response.usage.prompt_tokens")*0.000002 \
           + chain.sum("response.usage.completion_tokens")*0.000006
print(f"Spent ${cost:.2f} on {chain.count()} calls")

Output:

Spent $0.08 on 50 calls

PyTorch data loader

Chain results can be exported or passed directly to PyTorch dataloader. For example, if we are interested in passing image and a label based on file name suffix, the following code will do it:

from torch.utils.data import DataLoader
from transformers import CLIPProcessor

from datachain import C, DataChain

processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

chain = (
    DataChain.from_storage("gs://datachain-demo/dogs-and-cats/", type="image")
    .map(label=lambda name: name.split(".")[0], params=["file.name"])
    .select("file", "label").to_pytorch(
        transform=processor.image_processor,
        tokenizer=processor.tokenizer,
    )
)
loader = DataLoader(chain, batch_size=1)

Tutorials

Contributions

Contributions are very welcome. To learn more, see the Contributor Guide.

Community and Support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

datachain-0.2.12.tar.gz (3.3 MB view details)

Uploaded Source

Built Distribution

datachain-0.2.12-py3-none-any.whl (197.9 kB view details)

Uploaded Python 3

File details

Details for the file datachain-0.2.12.tar.gz.

File metadata

  • Download URL: datachain-0.2.12.tar.gz
  • Upload date:
  • Size: 3.3 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.0 CPython/3.12.4

File hashes

Hashes for datachain-0.2.12.tar.gz
Algorithm Hash digest
SHA256 2e5313252279458330d0e700ff7ecd029b395ce80a12b3e7e1704e3b3e3dff54
MD5 d5dae1e0a0d6bac8782b7978baf60049
BLAKE2b-256 edb2b1232d3842bc84fa6e1fd0fe6094829e3fe9805aca73562be10f52053fd9

See more details on using hashes here.

File details

Details for the file datachain-0.2.12-py3-none-any.whl.

File metadata

  • Download URL: datachain-0.2.12-py3-none-any.whl
  • Upload date:
  • Size: 197.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.0 CPython/3.12.4

File hashes

Hashes for datachain-0.2.12-py3-none-any.whl
Algorithm Hash digest
SHA256 3da368a10ed17f5fd1fdd58cd7bdc70d96723d0048abfd43ee98bfc71ad5ab92
MD5 b7cfdd73edc481de2ae2e6601907e865
BLAKE2b-256 e6a8e21d9ba6b2ad2c3596b1411d7036cd2f709c383040dfc5970dfa2ebd9201

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page