跳转到主要内容

大规模整理无结构化AI数据

项目描述

PyPI Python Version Codecov Tests

DataChain是一个现代Python数据帧库,专为人工智能设计。它旨在将您的无结构化数据组织成数据集,并在本地机器上对其进行大规模整理。DataChain不抽象或隐藏AI模型和API调用,而是帮助将它们集成到后现代数据堆栈中。

关键特性

📂 以存储作为事实来源。
  • 从S3、GCP、Azure和本地文件系统处理非结构化数据,无需冗余副本。

  • 多模态数据支持:图像、视频、文本、PDF、JSON、CSV、parquet。

  • 将文件和元数据合并成持久化、有版本的列式数据集。

🐍 Python友好的数据处理管道。
  • 操作Python对象及其字段。

  • 内置并行化和离内存计算,无需SQL或Spark。

🧠 数据丰富和处理。
  • 使用本地AI模型和LLM API生成元数据。

  • 根据元数据过滤、连接和分组。通过向量嵌入进行搜索。

  • 将数据集传递到Pytorch和Tensorflow,或将它们导出回存储。

🚀 效率。
  • 并行化、离内存工作负载和数据缓存。

  • 在Python对象字段上进行向量操作:求和、计数、平均值等。

  • 优化向量搜索。

快速入门

$ pip install datachain

使用JSON元数据选择文件

存储由猫和狗的图片(dog.1048.jpgcat.1009.jpg)组成,使用‘json-pairs’格式标注真实标签和模型推断,每个图片都有一个匹配的JSON文件,如cat.1009.json

{
    "class": "cat", "id": "1009", "num_annotators": 8,
    "inference": {"class": "dog", "confidence": 0.68}
}

仅使用JSON元数据下载“高置信度猫”推断图像的示例

from datachain import Column, DataChain

meta = DataChain.from_json("gs://datachain-demo/dogs-and-cats/*json", object_name="meta")
images = DataChain.from_storage("gs://datachain-demo/dogs-and-cats/*jpg")

images_id = images.map(id=lambda file: file.path.split('.')[-2])
annotated = images_id.merge(meta, on="id", right_on="meta.id")

likely_cats = annotated.filter((Column("meta.inference.confidence") > 0.93) \
                               & (Column("meta.inference.class_") == "cat"))
likely_cats.export_files("high-confidence-cats/", signal="file")

使用本地AI模型进行数据整理

使用< cite>transformers库通过简单情感模型进行批量推断

pip install transformers

以下代码从云端下载文件,并对每个文件应用用户定义的函数。然后,所有检测到积极情绪的文件都被复制到本地目录。

from transformers import pipeline
from datachain import DataChain, Column

classifier = pipeline("sentiment-analysis", device="cpu",
                model="distilbert/distilbert-base-uncased-finetuned-sst-2-english")

def is_positive_dialogue_ending(file) -> bool:
    dialogue_ending = file.read()[-512:]
    return classifier(dialogue_ending)[0]["label"] == "POSITIVE"

chain = (
   DataChain.from_storage("gs://datachain-demo/chatbot-KiT/",
                          object_name="file", type="text")
   .settings(parallel=8, cache=True)
   .map(is_positive=is_positive_dialogue_ending)
   .save("file_response")
)

positive_chain = chain.filter(Column("is_positive") == True)
positive_chain.export_files("./output")

print(f"{positive_chain.count()} files were exported")

导出13个文件

$ ls output/datachain-demo/chatbot-KiT/
15.txt 20.txt 24.txt 27.txt 28.txt 29.txt 33.txt 37.txt 38.txt 43.txt ...
$ ls output/datachain-demo/chatbot-KiT/ | wc -l
13

LLM判断聊天机器人

LLM可以作为通用分类器工作。在下面的示例中,我们使用Mistral提供的免费API来评估公开可用的聊天机器人对话。请从https://console.mistral.ai获取免费Mistral API密钥

$ pip install mistralai (Requires version >=1.0.0)
$ export MISTRAL_API_KEY=_your_key_

DataChain可以并行化API调用;免费的Mistral层支持同时最多4个请求。

from mistralai import Mistral
from datachain import File, DataChain, Column

PROMPT = "Was this dialog successful? Answer in a single word: Success or Failure."

def eval_dialogue(file: File) -> bool:
     client = Mistral()
     response = client.chat.complete(
         model="open-mixtral-8x22b",
         messages=[{"role": "system", "content": PROMPT},
                   {"role": "user", "content": file.read()}])
     result = response.choices[0].message.content
     return result.lower().startswith("success")

chain = (
   DataChain.from_storage("gs://datachain-demo/chatbot-KiT/", object_name="file")
   .settings(parallel=4, cache=True)
   .map(is_success=eval_dialogue)
   .save("mistral_files")
)

successful_chain = chain.filter(Column("is_success") == True)
successful_chain.export_files("./output_mistral")

print(f"{successful_chain.count()} files were exported")

按照上述说明,Mistral模型认为31/50个文件包含成功的对话

$ ls output_mistral/datachain-demo/chatbot-KiT/
1.txt  15.txt 18.txt 2.txt  22.txt 25.txt 28.txt 33.txt 37.txt 4.txt  41.txt ...
$ ls output_mistral/datachain-demo/chatbot-KiT/ | wc -l
31

序列化Python-对象

LLM响应可能包含对分析有价值的信息,例如使用的标记数量或模型性能参数。

而不是从Mistral响应数据结构(ChatCompletionResponse类)中提取此信息,DataChain可以将整个LLM响应序列化到内部数据库

from mistralai import Mistral
from mistralai.models import ChatCompletionResponse
from datachain import File, DataChain, Column

PROMPT = "Was this dialog successful? Answer in a single word: Success or Failure."

def eval_dialog(file: File) -> ChatCompletionResponse:
     client = MistralClient()
     return client.chat(
         model="open-mixtral-8x22b",
         messages=[{"role": "system", "content": PROMPT},
                   {"role": "user", "content": file.read()}])

chain = (
   DataChain.from_storage("gs://datachain-demo/chatbot-KiT/", object_name="file")
   .settings(parallel=4, cache=True)
   .map(response=eval_dialog)
   .map(status=lambda response: response.choices[0].message.content.lower()[:7])
   .save("response")
)

chain.select("file.name", "status", "response.usage").show(5)

success_rate = chain.filter(Column("status") == "success").count() / chain.count()
print(f"{100*success_rate:.1f}% dialogs were successful")

输出

     file   status      response     response          response
     name                  usage        usage             usage
                   prompt_tokens total_tokens completion_tokens
0   1.txt  success           547          548                 1
1  10.txt  failure          3576         3578                 2
2  11.txt  failure           626          628                 2
3  12.txt  failure          1144         1182                38
4  13.txt  success          1100         1101                 1

[Limited by 5 rows]
64.0% dialogs were successful

迭代Python数据结构

在前面的示例中,数据集被保存在嵌入的数据库中(工作目录的.datachain文件夹中的SQLite)。这些数据集已自动进行版本控制,可以使用DataChain.from_dataset(“dataset_name”)访问。

以下是如何检索已保存的数据集并遍历对象的示例

chain = DataChain.from_dataset("response")

# Iterating one-by-one: support out-of-memory workflow
for file, response in chain.limit(5).collect("file", "response"):
    # verify the collected Python objects
    assert isinstance(response, ChatCompletionResponse)

    status = response.choices[0].message.content[:7]
    tokens = response.usage.total_tokens
    print(f"{file.get_uri()}: {status}, file size: {file.size}, tokens: {tokens}")

输出

gs://datachain-demo/chatbot-KiT/1.txt: Success, file size: 1776, tokens: 548
gs://datachain-demo/chatbot-KiT/10.txt: Failure, file size: 11576, tokens: 3578
gs://datachain-demo/chatbot-KiT/11.txt: Failure, file size: 2045, tokens: 628
gs://datachain-demo/chatbot-KiT/12.txt: Failure, file size: 3833, tokens: 1207
gs://datachain-demo/chatbot-KiT/13.txt: Success, file size: 3657, tokens: 1101

在Python对象上执行向量分析

某些操作可以在不反序列化的DB中运行。例如,让我们计算使用LLM API的总成本,假设Mistral调用成本为每100万输入标记2美元,每100万输出标记6美元

chain = DataChain.from_dataset("mistral_dataset")

cost = chain.sum("response.usage.prompt_tokens")*0.000002 \
           + chain.sum("response.usage.completion_tokens")*0.000006
print(f"Spent ${cost:.2f} on {chain.count()} calls")

输出

Spent $0.08 on 50 calls

PyTorch数据加载器

链结果可以导出或直接传递给PyTorch数据加载器。例如,如果我们对基于文件名后缀传递图像和标签感兴趣,以下代码将做到这一点

from torch.utils.data import DataLoader
from transformers import CLIPProcessor

from datachain import C, DataChain

processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

chain = (
    DataChain.from_storage("gs://datachain-demo/dogs-and-cats/", type="image")
    .map(label=lambda name: name.split(".")[0], params=["file.name"])
    .select("file", "label").to_pytorch(
        transform=processor.image_processor,
        tokenizer=processor.tokenizer,
    )
)
loader = DataLoader(chain, batch_size=1)

教程

贡献

欢迎贡献。想了解更多,请参阅贡献指南

社区和支持

项目详情


下载文件

下载适用于您平台的应用程序。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分发

datachain-0.5.0.tar.gz(1.5 MB 查看哈希值

上传时间:

构建分发

datachain-0.5.0-py3-none-any.whl(206.5 kB 查看哈希值

上传时间: Python 3

由以下支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面