跳转到主要内容

数据链

项目描述

PyPI Python Version Codecov Tests License

AI 🔗 数据链

数据链是一个开源的Python数据处理库,用于大规模处理非结构化AI数据。

它支持批量LLM API调用和本地语言和视觉AI模型推理,这些操作可以在多个样本上并行运行,形成类似表的数据集。这些数据集可以保存、版本控制,并直接发送到PyTorch和TensorFlow进行训练。数据链采用严格的Pydantic数据结构,促进更好的数据处理实践,并支持数据库中通常发现的向量化分析操作。

数据链填补了dataframe库、数据仓库和基于Python的多模态AI应用之间的空白。我们的主要用例包括大规模数据整理、LLM分析和验证、批量图像分割和姿态检测、GenAI数据对齐等。

$ pip install data-chain

基本操作

数据链通过组合整理操作构建。

例如,可以指示它从云中读取文件,将它们映射到现代AI服务并返回一个Python对象,并行化API调用,将结果保存为数据集,并导出列

import os
import data_chain as dc

from anthropic.types.message import Message
ClaudeModel = dc.pydantic_to_feature(Message)
PROMPT = "summarize this book in less than 200 words"
service = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
source = "gs://datachain-demo/mybooks/"

chain = dc.DataChain(source)                          \
              .filter(File.name.glob("*.txt"))        \
              .settings(parallel=4)                   \
              .map(                                   \
                             claude = lambda file:                                         \
                                       ClaudeModel(**service.messages.create(                        \
                                         model="claude-3-haiku-20240307",         \
                                         system=PROMPT,                           \
                                         messages=[{"role": "user",               \
                                                    "content": file.get_value()}] \
                                                               ),  \
                                                       ).model_dump()  \
                                                       )               \
                                                       .save("mydataset")

dc.DataChain("mydataset").export("./", "claude.response") # export summaries

数据集持久化

在上面的例子中,链解析为保存的数据集“mydataset”。DataChain数据集是不可变的且带版本号的。可以将其保存的数据集版本用作数据源

ds = dc.DataChain("mydataset", version = 1)

请注意,DataChain将文件样本表示为指向其相应存储位置的指针。这意味着新创建的数据集版本不会在存储中复制文件,存储仍然是原始样本的唯一真相来源

矢量化分析

由于数据集在内部表示为表格,分析查询可以矢量化

rate = ds.filter(chain.response == "Success").count() / chain.count() # ??
print(f"API class success rate: {100*rate:.2f}%")
>> 74.68%

price_input = 0.25
price_output = 1.25
price=(ds.sum(C.claude.usage.input_tokens)*price_input \
       + ds.sum(C.claude.usage.output_tokens)*price_output)/1_000_000
print(f"Cost of API calls: ${price:.2f}")
>> Cost of API calls: $1.42

导入元数据

AI数据通常与元数据(标注、类别等)一起出现。DataChain理解多种元数据格式,可以将存储中的数据样本与外部元数据(例如CSV列)连接起来,形成一个单一的数据集

from dc import parse_csv

files = dc.DataChain("gs://datachain-demo/myimages/")
metadata = dc.DataChain("gs://datachain-demo/myimagesmetadata.csv") \
               .gen(meta=parse_csv)  # TBD, also dependent on dropping file
dataset = chain1.merge(chain2, on = "file.name", right_on="name"])

print(dataset.select("file.name", "class", "prob").limit(5).to_pandas())
....
....
....
....
....

嵌套标注(如JSON)可以展开成行和列,以最适合应用的方式。例如,MS COCO数据集包含JSON标注,详细说明了分割。为了构建一个包含所有COCO图像中所有分割对象的数据库集

image_files = dc.DataChain("gs://datachain-demo/coco/images/")
image_meta  = dc.DataChain("gs://datachain-demo/coco.json")  \
               .gen(meta=parse_json, key="images")       # list of images
images = image_files.merge(image_meta, on = "file.name", right_on="file_name")
objects_meta = dc.DataChain("gs://datachain-demo/coco.json") \
               .gen(meta=parse_json, key="annotations")  # annotated objects

objects = image.full_merge(objects_meta, on = "id", right_on = "image_id")

生成元数据

数据整理的典型步骤是从数据样本中创建特征以供将来选择。DataChain将新创建的元数据表示为列,这使得创建新特征并基于它们进行筛选变得容易

from fashion_clip.fashion_clip import FashionCLIP
from sqlalchemy import JSON
from tabulate import tabulate

from datachain.lib.param import Image
from datachain.query import C, DatasetQuery, udf


@udf(
    params=(Image(),),
    output={"fclip": JSON},
    method="fashion_clip",
    batch=10,
)
class MyFashionClip:
    def __init__(self):
        self.fclip = FashionCLIP("fashion-clip")

    def fashion_clip(self, inputs):
        embeddings = self.fclip.encode_images(
            [input[0] for input in inputs], batch_size=1
        )
        return [(json.dumps(emb),) for emb in embeddings.tolist()]

chain = dc.DataChain("gs://datachain-demo/zalando/images/").filter(
        C.name.glob("*.jpg")
    ).limit(5).add_signals(MyFashionClip).save("zalando_hd_emb")

test_image = "cs://datachain-demo/zalando/test/banner.jpg"
test_embedding = MyFashionClip.fashion_clip.encode_images(Image(test_image))

best_matches = chain.filter(similarity_search(test_embeding)).limit(5)

print best_matches.to_result()

Delta更新

DataChain能够执行“Delta更新”-也就是说,仅批处理新添加的数据样本。例如,让我们将一些图像复制到本地文件夹,并运行一个链来使用HuggingFace提供的本地字幕模型生成字幕

> mkdir demo-images/
> datachain cp gs://datachain-demo/images/ /tmp/demo-images
import torch

from datachain.lib.hf_image_to_text import LLaVAdescribe
from datachain.query import C, DatasetQuery

source = "/tmp/demo-images"

if torch.cuda.is_available():
    device = "cuda"
else:
    device = "cpu"

if __name__ == "__main__":
    results = (
        DatasetQuery(
            source,
            anon=True,
        )
        .filter(C.name.glob("*.jpg"))
        .add_signals(
            LLaVAdescribe(
                device=device,
                model=model,
            ),
            parallel=False,
        )
        .save("annotated-images")
    )

现在让我们向同一个文件夹添加更多图像

> datachain cp gs://datachain-demo/extra-images/ /tmp/demo-images

并仅对Delta计算更新

processed = dc.DataChain("annotated-images")
delta = dc.dataChain("/tmp/demo-images").subtract(processed)

将数据传递到训练

数据集可以导出为CSV或webdataset格式。然而,将数据传递到训练的更好方法是将其包装到PyTorch类中,让库在底层处理文件下载和缓存

ds = dc.DataChain("gs://datachain-demo/name-labeled/images/")
               .filter(C.name.glob("*.jpg"))
               .map(lambda name: (name[:3],), output={"label": str}, parallel=4)
    )

train_loader = DataLoader(
        ds.to_pytorch(
            ImageReader(),
            LabelReader("label", classes=CLASSES),
            transform=transform,
        ),
        batch_size=16,
        parallel=2,
    )

💻  更多示例

  • 在不重新分片Webdataset文件的情况下整理图像以训练自定义CLIP模型

  • 批量转换和索引图像以创建可搜索的商品目录

  • 在规模上评估LLM应用

  • 评估LLM检索策略

  • 批处理中的Delta更新

贡献

欢迎贡献。要了解更多信息,请参阅贡献指南

许可证

Apache 2.0许可证下分发,DataChain是免费和开源软件。

问题

如果您遇到任何问题,请提交问题,并附上详细描述。

项目详情


发布历史 发布通知 | RSS源

下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源分布

dvcx-0.95.0.tar.gz (427.9 kB 查看散列值)

上传时间

构建分布

dvcx-0.95.0-py3-none-any.whl (189.5 kB 查看散列值)

上传时间 Python 3

由以下支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面