数据链
项目描述
AI 🔗 数据链
数据链是一个开源的Python数据处理库,用于大规模处理非结构化AI数据。
它支持批量LLM API调用和本地语言和视觉AI模型推理,这些操作可以在多个样本上并行运行,形成类似表的数据集。这些数据集可以保存、版本控制,并直接发送到PyTorch和TensorFlow进行训练。数据链采用严格的Pydantic数据结构,促进更好的数据处理实践,并支持数据库中通常发现的向量化分析操作。
数据链填补了dataframe库、数据仓库和基于Python的多模态AI应用之间的空白。我们的主要用例包括大规模数据整理、LLM分析和验证、批量图像分割和姿态检测、GenAI数据对齐等。
$ pip install data-chain
基本操作
数据链通过组合整理操作构建。
例如,可以指示它从云中读取文件,将它们映射到现代AI服务并返回一个Python对象,并行化API调用,将结果保存为数据集,并导出列
import os
import data_chain as dc
from anthropic.types.message import Message
ClaudeModel = dc.pydantic_to_feature(Message)
PROMPT = "summarize this book in less than 200 words"
service = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
source = "gs://datachain-demo/mybooks/"
chain = dc.DataChain(source) \
.filter(File.name.glob("*.txt")) \
.settings(parallel=4) \
.map( \
claude = lambda file: \
ClaudeModel(**service.messages.create( \
model="claude-3-haiku-20240307", \
system=PROMPT, \
messages=[{"role": "user", \
"content": file.get_value()}] \
), \
).model_dump() \
) \
.save("mydataset")
dc.DataChain("mydataset").export("./", "claude.response") # export summaries
数据集持久化
在上面的例子中,链解析为保存的数据集“mydataset”。DataChain数据集是不可变的且带版本号的。可以将其保存的数据集版本用作数据源
ds = dc.DataChain("mydataset", version = 1)
请注意,DataChain将文件样本表示为指向其相应存储位置的指针。这意味着新创建的数据集版本不会在存储中复制文件,存储仍然是原始样本的唯一真相来源
矢量化分析
由于数据集在内部表示为表格,分析查询可以矢量化
rate = ds.filter(chain.response == "Success").count() / chain.count() # ??
print(f"API class success rate: {100*rate:.2f}%")
>> 74.68%
price_input = 0.25
price_output = 1.25
price=(ds.sum(C.claude.usage.input_tokens)*price_input \
+ ds.sum(C.claude.usage.output_tokens)*price_output)/1_000_000
print(f"Cost of API calls: ${price:.2f}")
>> Cost of API calls: $1.42
导入元数据
AI数据通常与元数据(标注、类别等)一起出现。DataChain理解多种元数据格式,可以将存储中的数据样本与外部元数据(例如CSV列)连接起来,形成一个单一的数据集
from dc import parse_csv
files = dc.DataChain("gs://datachain-demo/myimages/")
metadata = dc.DataChain("gs://datachain-demo/myimagesmetadata.csv") \
.gen(meta=parse_csv) # TBD, also dependent on dropping file
dataset = chain1.merge(chain2, on = "file.name", right_on="name"])
print(dataset.select("file.name", "class", "prob").limit(5).to_pandas())
....
....
....
....
....
嵌套标注(如JSON)可以展开成行和列,以最适合应用的方式。例如,MS COCO数据集包含JSON标注,详细说明了分割。为了构建一个包含所有COCO图像中所有分割对象的数据库集
image_files = dc.DataChain("gs://datachain-demo/coco/images/")
image_meta = dc.DataChain("gs://datachain-demo/coco.json") \
.gen(meta=parse_json, key="images") # list of images
images = image_files.merge(image_meta, on = "file.name", right_on="file_name")
objects_meta = dc.DataChain("gs://datachain-demo/coco.json") \
.gen(meta=parse_json, key="annotations") # annotated objects
objects = image.full_merge(objects_meta, on = "id", right_on = "image_id")
生成元数据
数据整理的典型步骤是从数据样本中创建特征以供将来选择。DataChain将新创建的元数据表示为列,这使得创建新特征并基于它们进行筛选变得容易
from fashion_clip.fashion_clip import FashionCLIP
from sqlalchemy import JSON
from tabulate import tabulate
from datachain.lib.param import Image
from datachain.query import C, DatasetQuery, udf
@udf(
params=(Image(),),
output={"fclip": JSON},
method="fashion_clip",
batch=10,
)
class MyFashionClip:
def __init__(self):
self.fclip = FashionCLIP("fashion-clip")
def fashion_clip(self, inputs):
embeddings = self.fclip.encode_images(
[input[0] for input in inputs], batch_size=1
)
return [(json.dumps(emb),) for emb in embeddings.tolist()]
chain = dc.DataChain("gs://datachain-demo/zalando/images/").filter(
C.name.glob("*.jpg")
).limit(5).add_signals(MyFashionClip).save("zalando_hd_emb")
test_image = "cs://datachain-demo/zalando/test/banner.jpg"
test_embedding = MyFashionClip.fashion_clip.encode_images(Image(test_image))
best_matches = chain.filter(similarity_search(test_embeding)).limit(5)
print best_matches.to_result()
Delta更新
DataChain能够执行“Delta更新”-也就是说,仅批处理新添加的数据样本。例如,让我们将一些图像复制到本地文件夹,并运行一个链来使用HuggingFace提供的本地字幕模型生成字幕
> mkdir demo-images/
> datachain cp gs://datachain-demo/images/ /tmp/demo-images
import torch
from datachain.lib.hf_image_to_text import LLaVAdescribe
from datachain.query import C, DatasetQuery
source = "/tmp/demo-images"
if torch.cuda.is_available():
device = "cuda"
else:
device = "cpu"
if __name__ == "__main__":
results = (
DatasetQuery(
source,
anon=True,
)
.filter(C.name.glob("*.jpg"))
.add_signals(
LLaVAdescribe(
device=device,
model=model,
),
parallel=False,
)
.save("annotated-images")
)
现在让我们向同一个文件夹添加更多图像
> datachain cp gs://datachain-demo/extra-images/ /tmp/demo-images
并仅对Delta计算更新
processed = dc.DataChain("annotated-images")
delta = dc.dataChain("/tmp/demo-images").subtract(processed)
将数据传递到训练
数据集可以导出为CSV或webdataset格式。然而,将数据传递到训练的更好方法是将其包装到PyTorch类中,让库在底层处理文件下载和缓存
ds = dc.DataChain("gs://datachain-demo/name-labeled/images/")
.filter(C.name.glob("*.jpg"))
.map(lambda name: (name[:3],), output={"label": str}, parallel=4)
)
train_loader = DataLoader(
ds.to_pytorch(
ImageReader(),
LabelReader("label", classes=CLASSES),
transform=transform,
),
batch_size=16,
parallel=2,
)
💻 更多示例
在不重新分片Webdataset文件的情况下整理图像以训练自定义CLIP模型
批量转换和索引图像以创建可搜索的商品目录
在规模上评估LLM应用
评估LLM检索策略
批处理中的Delta更新
贡献
欢迎贡献。要了解更多信息,请参阅贡献指南。
许可证
在Apache 2.0许可证下分发,DataChain是免费和开源软件。
问题
如果您遇到任何问题,请提交问题,并附上详细描述。
项目详情
下载文件
下载适用于您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。
源分布
构建分布
dvcx-0.95.0.tar.gz的散列值
算法 | 散列摘要 | |
---|---|---|
SHA256 | 2be3a6122cef80a3924d446539d888e1b5fa1795101ba7e95285f3bdefb24298 |
|
MD5 | 1f19f7be401994c2f202e8498fe736cb |
|
BLAKE2b-256 | ad40653d27b5bdc9f65cce013c494eac2b5043588276550c16ab661571f7b805 |
dvcx-0.95.0-py3-none-any.whl的散列值
算法 | 散列摘要 | |
---|---|---|
SHA256 | b4024e1fc2af0efc27d3704e8f744bb19a39d62fd8b2cb340cd32801e4c6484d |
|
MD5 | cc088e46f8442c081ae2398ef2b481a3 |
|
BLAKE2b-256 | e7cc5c7734cc4a1407fd46c182d63f5a243865929aebdf8c1e188e20573f398f |