大规模整理无结构化AI数据
项目描述
DataChain是一个现代Python数据帧库,专为人工智能设计。它旨在将您的无结构化数据组织成数据集,并在本地机器上对其进行大规模整理。DataChain不抽象或隐藏AI模型和API调用,而是帮助将它们集成到后现代数据堆栈中。
关键特性
- 📂 以存储作为事实来源。
从S3、GCP、Azure和本地文件系统处理非结构化数据,无需冗余副本。
多模态数据支持:图像、视频、文本、PDF、JSON、CSV、parquet。
将文件和元数据合并成持久化、有版本的列式数据集。
- 🐍 Python友好的数据处理管道。
操作Python对象及其字段。
内置并行化和离内存计算,无需SQL或Spark。
- 🧠 数据丰富和处理。
使用本地AI模型和LLM API生成元数据。
根据元数据过滤、连接和分组。通过向量嵌入进行搜索。
将数据集传递到Pytorch和Tensorflow,或将它们导出回存储。
- 🚀 效率。
并行化、离内存工作负载和数据缓存。
在Python对象字段上进行向量操作:求和、计数、平均值等。
优化向量搜索。
快速入门
$ pip install datachain
使用JSON元数据选择文件
存储由猫和狗的图片(dog.1048.jpg、cat.1009.jpg)组成,使用‘json-pairs’格式标注真实标签和模型推断,每个图片都有一个匹配的JSON文件,如cat.1009.json
{
"class": "cat", "id": "1009", "num_annotators": 8,
"inference": {"class": "dog", "confidence": 0.68}
}
仅使用JSON元数据下载“高置信度猫”推断图像的示例
from datachain import Column, DataChain
meta = DataChain.from_json("gs://datachain-demo/dogs-and-cats/*json", object_name="meta")
images = DataChain.from_storage("gs://datachain-demo/dogs-and-cats/*jpg")
images_id = images.map(id=lambda file: file.path.split('.')[-2])
annotated = images_id.merge(meta, on="id", right_on="meta.id")
likely_cats = annotated.filter((Column("meta.inference.confidence") > 0.93) \
& (Column("meta.inference.class_") == "cat"))
likely_cats.export_files("high-confidence-cats/", signal="file")
使用本地AI模型进行数据整理
使用< cite>transformers库通过简单情感模型进行批量推断
pip install transformers
以下代码从云端下载文件,并对每个文件应用用户定义的函数。然后,所有检测到积极情绪的文件都被复制到本地目录。
from transformers import pipeline
from datachain import DataChain, Column
classifier = pipeline("sentiment-analysis", device="cpu",
model="distilbert/distilbert-base-uncased-finetuned-sst-2-english")
def is_positive_dialogue_ending(file) -> bool:
dialogue_ending = file.read()[-512:]
return classifier(dialogue_ending)[0]["label"] == "POSITIVE"
chain = (
DataChain.from_storage("gs://datachain-demo/chatbot-KiT/",
object_name="file", type="text")
.settings(parallel=8, cache=True)
.map(is_positive=is_positive_dialogue_ending)
.save("file_response")
)
positive_chain = chain.filter(Column("is_positive") == True)
positive_chain.export_files("./output")
print(f"{positive_chain.count()} files were exported")
导出13个文件
$ ls output/datachain-demo/chatbot-KiT/
15.txt 20.txt 24.txt 27.txt 28.txt 29.txt 33.txt 37.txt 38.txt 43.txt ...
$ ls output/datachain-demo/chatbot-KiT/ | wc -l
13
LLM判断聊天机器人
LLM可以作为通用分类器工作。在下面的示例中,我们使用Mistral提供的免费API来评估公开可用的聊天机器人对话。请从https://console.mistral.ai获取免费Mistral API密钥
$ pip install mistralai (Requires version >=1.0.0)
$ export MISTRAL_API_KEY=_your_key_
DataChain可以并行化API调用;免费的Mistral层支持同时最多4个请求。
from mistralai import Mistral
from datachain import File, DataChain, Column
PROMPT = "Was this dialog successful? Answer in a single word: Success or Failure."
def eval_dialogue(file: File) -> bool:
client = Mistral()
response = client.chat.complete(
model="open-mixtral-8x22b",
messages=[{"role": "system", "content": PROMPT},
{"role": "user", "content": file.read()}])
result = response.choices[0].message.content
return result.lower().startswith("success")
chain = (
DataChain.from_storage("gs://datachain-demo/chatbot-KiT/", object_name="file")
.settings(parallel=4, cache=True)
.map(is_success=eval_dialogue)
.save("mistral_files")
)
successful_chain = chain.filter(Column("is_success") == True)
successful_chain.export_files("./output_mistral")
print(f"{successful_chain.count()} files were exported")
按照上述说明,Mistral模型认为31/50个文件包含成功的对话
$ ls output_mistral/datachain-demo/chatbot-KiT/
1.txt 15.txt 18.txt 2.txt 22.txt 25.txt 28.txt 33.txt 37.txt 4.txt 41.txt ...
$ ls output_mistral/datachain-demo/chatbot-KiT/ | wc -l
31
序列化Python-对象
LLM响应可能包含对分析有价值的信息,例如使用的标记数量或模型性能参数。
而不是从Mistral响应数据结构(ChatCompletionResponse类)中提取此信息,DataChain可以将整个LLM响应序列化到内部数据库
from mistralai import Mistral
from mistralai.models import ChatCompletionResponse
from datachain import File, DataChain, Column
PROMPT = "Was this dialog successful? Answer in a single word: Success or Failure."
def eval_dialog(file: File) -> ChatCompletionResponse:
client = MistralClient()
return client.chat(
model="open-mixtral-8x22b",
messages=[{"role": "system", "content": PROMPT},
{"role": "user", "content": file.read()}])
chain = (
DataChain.from_storage("gs://datachain-demo/chatbot-KiT/", object_name="file")
.settings(parallel=4, cache=True)
.map(response=eval_dialog)
.map(status=lambda response: response.choices[0].message.content.lower()[:7])
.save("response")
)
chain.select("file.name", "status", "response.usage").show(5)
success_rate = chain.filter(Column("status") == "success").count() / chain.count()
print(f"{100*success_rate:.1f}% dialogs were successful")
输出
file status response response response
name usage usage usage
prompt_tokens total_tokens completion_tokens
0 1.txt success 547 548 1
1 10.txt failure 3576 3578 2
2 11.txt failure 626 628 2
3 12.txt failure 1144 1182 38
4 13.txt success 1100 1101 1
[Limited by 5 rows]
64.0% dialogs were successful
迭代Python数据结构
在前面的示例中,数据集被保存在嵌入的数据库中(工作目录的.datachain文件夹中的SQLite)。这些数据集已自动进行版本控制,可以使用DataChain.from_dataset(“dataset_name”)访问。
以下是如何检索已保存的数据集并遍历对象的示例
chain = DataChain.from_dataset("response")
# Iterating one-by-one: support out-of-memory workflow
for file, response in chain.limit(5).collect("file", "response"):
# verify the collected Python objects
assert isinstance(response, ChatCompletionResponse)
status = response.choices[0].message.content[:7]
tokens = response.usage.total_tokens
print(f"{file.get_uri()}: {status}, file size: {file.size}, tokens: {tokens}")
输出
gs://datachain-demo/chatbot-KiT/1.txt: Success, file size: 1776, tokens: 548
gs://datachain-demo/chatbot-KiT/10.txt: Failure, file size: 11576, tokens: 3578
gs://datachain-demo/chatbot-KiT/11.txt: Failure, file size: 2045, tokens: 628
gs://datachain-demo/chatbot-KiT/12.txt: Failure, file size: 3833, tokens: 1207
gs://datachain-demo/chatbot-KiT/13.txt: Success, file size: 3657, tokens: 1101
在Python对象上执行向量分析
某些操作可以在不反序列化的DB中运行。例如,让我们计算使用LLM API的总成本,假设Mistral调用成本为每100万输入标记2美元,每100万输出标记6美元
chain = DataChain.from_dataset("mistral_dataset")
cost = chain.sum("response.usage.prompt_tokens")*0.000002 \
+ chain.sum("response.usage.completion_tokens")*0.000006
print(f"Spent ${cost:.2f} on {chain.count()} calls")
输出
Spent $0.08 on 50 calls
PyTorch数据加载器
链结果可以导出或直接传递给PyTorch数据加载器。例如,如果我们对基于文件名后缀传递图像和标签感兴趣,以下代码将做到这一点
from torch.utils.data import DataLoader
from transformers import CLIPProcessor
from datachain import C, DataChain
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
chain = (
DataChain.from_storage("gs://datachain-demo/dogs-and-cats/", type="image")
.map(label=lambda name: name.split(".")[0], params=["file.name"])
.select("file", "label").to_pytorch(
transform=processor.image_processor,
tokenizer=processor.tokenizer,
)
)
loader = DataLoader(chain, batch_size=1)
教程
贡献
欢迎贡献。想了解更多,请参阅贡献指南。
社区和支持
项目详情
下载文件
下载适用于您平台的应用程序。如果您不确定选择哪个,请了解更多关于安装包的信息。