跳转到主要内容

HuggingFace库用于处理和筛选大量网络数据

项目描述

DataTrove

DataTrove是一个用于在非常大规模上处理、过滤和去重文本数据的库。它提供了一套预构建的常用处理块,并具有一个框架,可以轻松添加自定义功能。

数据宝库处理管道是平台无关的,可以在本地或Slurm集群上直接运行。其(相对)低内存使用量和多步骤设计使其非常适合大型工作负载,例如处理大型语言模型的训练数据。

本地、远程和其他文件系统通过fsspec支持。

目录

安装

pip install datatrove[FLAVOUR]

可用的版本(例如,[processing,s3])

  • all安装所有内容:pip install datatrove[all]
  • io依赖项,用于读取warc/arc/wet文件和arrow/parquet格式:pip install datatrove[io]
  • processing依赖项,用于文本提取、过滤和分词:pip install datatrove[processing]
  • s3支持s3:pip install datatrove[s3]
  • cli用于命令行工具:pip install datatrove[cli]

快速入门示例

您可以查看以下示例

术语

  • pipeline:要执行的加工步骤列表(读取数据、过滤、写入磁盘等)
  • executor:在给定的执行环境中运行特定的管道(Slurm、多核机器等)
  • job:在给定的执行器上执行管道
  • task:一个job由多个task组成,这些用于并行执行,通常是通过让每个task处理数据的一个shard来实现的。Datatrove会跟踪哪些任务已完成,并且当您重新启动时,只有未完成的任务会运行。
  • file:单个输入文件(.json、.csv等)。

[!TIP] 注意,每个文件将由一个单独的task处理。Datatrove不会自动将文件分割成多个部分,因此为了完全并行化,您应该有多个中等大小的文件,而不是单个大文件。

  • shard:一组输入数据(通常是多个file的一组),将被分配给特定的task。每个task将处理从完整输入文件列表中不同非重叠的shard的数据
  • worker:执行单个任务的计算资源,例如,如果您有50个CPU核心,则可以使用workers=50运行LocalPipelineExecutor,以同时执行50个tasks(每个CPU一个)。一旦worker完成一个task,它将开始处理另一个等待的task

[!TIP] 您的tasks数量控制了您可以并行化的程度以及每个单独的处理单元将花费多少时间。如果您有少量任务(并且因此每个任务都必须处理大量文件),并且它们失败,您将不得不从头开始,而如果您有更多的小任务,每个失败的任务重新运行所需的时间将大大减少。

[!CAUTION] 如果您的tasks > files,某些任务将不会处理任何数据,因此通常没有必要将tasks设置为一个比`files`大的数字。

示例

在具有workers的机器上运行处理10000filesjob,有100个CPU核心。如果我们选择使用1000个任务,每个任务将处理10个文件的shard。`workers=100`意味着我们可以同时处理100个任务。

管道

数据宝库文档

每个管道块以datatrove中的Document格式处理数据

  • text每个样本的实际文本内容
  • id该样本的唯一ID(字符串)
  • metadata一个字典,其中可以存储任何附加信息

管道块类型

每个管道块接受一个Document生成器作为输入,并返回另一个Document生成器。

  • readers从不同格式读取数据并产生Document
  • writersDocument以不同格式保存到磁盘/云中
  • extractors从原始格式(如网页HTML)中提取文本内容
  • filters根据特定规则/标准过滤掉(删除)一些Document
  • stats收集数据集统计信息的块
  • tokens将数据分词或计数单词的块
  • dedup用于去重的块

完整管道

管道定义为管道块的列表。例如,以下管道将从磁盘读取数据,随机过滤掉一些文档,并将它们写回磁盘

from datatrove.pipeline.readers import CSVReader
from datatrove.pipeline.filters import SamplerFilter
from datatrove.pipeline.writers import JsonlWriter

pipeline = [
    CSVReader(
        data_folder="/my/input/path"
    ),
    SamplerFilter(rate=0.5),
    JsonlWriter(
        output_folder="/my/output/path"
    )
]

执行器

管道是平台无关的,这意味着相同的管道可以在不同的执行环境中无缝运行,而无需对其步骤进行任何更改。每个环境都有自己的PipelineExecutor。

所有执行器共有的某些选项

  • pipeline应运行的管道步骤的列表
  • logging_dir应保存日志文件、统计信息和更多内容的文件夹。不要为不同的管道/作业重复使用文件夹,因为这将覆盖您的统计数据、日志和完成情况。
  • skip_completed(布尔值,默认为True)datatrove跟踪完成的任务,以便在您重新启动作业时可以跳过。将此设置为False以禁用此行为
  • randomize_start_duration(整数,默认为0)延迟每个任务开始的最大秒数,以防止所有任务同时启动并可能过载系统。

调用执行器的run方法以执行其管道。

[!提示] Datatrove通过在${logging_dir}/completions文件夹中创建一个标记(一个空文件)来跟踪哪些任务成功完成。一旦作业完成,如果其中一些任务失败了,您可以简单地重新启动完全相同的执行器,datatrove将检查并只运行之前未完成的任务。

[!注意] 如果因为一些任务失败而重新启动管道,请不要更改任务的总数,因为这会影响输入文件/分片的分配。

LocalPipelineExecutor

此执行器将在本地机器上启动一个管道。选项

  • tasks 要运行的总体任务数
  • workers 同时运行的任务数。如果设置为-1,则没有限制。任何大于1的值将使用多进程来执行任务。
  • start_method 用于创建多进程Pool的方法。如果workers设置为1,则忽略。
示例执行器
from datatrove.executor import LocalPipelineExecutor
executor = LocalPipelineExecutor(
    pipeline=[
        ...
    ],
    logging_dir="logs/",
    tasks=10,
    workers=5
)
executor.run()
多节点并行处理

您可以通过使用local_taskslocal_rank_offset来使用不同的节点/机器处理总任务的不同部分。对于每个节点/实例/机器,启动时使用以下选项

  • tasks 要在所有机器上执行的总任务数。此值必须在每台机器上相同,否则输入文件分配可能会重叠! 示例:500
  • local_tasks 在这台特定机器上将要执行的总任务数。请注意,您可以为每台机器使用不同的值。示例:100
  • local_rank_offset 在这台机器上要执行的第一个任务的排名。如果您是在启动作业的第三台机器上,并且前两台机器分别运行了250和150个作业,则当前机器的此值为400

要获取最终的合并统计信息,您必须手动在包含所有机器的统计信息的路径上调用merge_stats脚本。

SlurmPipelineExecutor

此执行器将在slurm集群上启动一个管道,使用slurm作业数组来分组和管理任务。选项

  • tasks 要运行的总任务数。必需
  • time slurm时间限制字符串。必需
  • partition slurm分区。必需
  • workers 同时运行的任务数。如果设置为-1,则没有限制。Slurm将同时运行workers个任务。(默认:-1
  • job_name slurm作业名称(默认:"data_processing")
  • depends 另一个SlurmPipelineExecutor实例,它将成为此管道的依赖项(当前管道仅在依赖的管道成功完成后才开始执行)
  • sbatch_args 包含您想要传递给sbatch的任何其他参数的字典
  • slurm_logs_folder 保存slurm日志文件的位置。如果使用本地路径作为logging_dir,它们将保存在logging_dir/slurm_logs中。否则,它们将保存在当前目录的子目录中。
其他选项
  • cpus_per_task 每个任务分配的CPU数(默认:1
  • qos slurm QoS(默认:"normal")
  • mem_per_cpu_gb 每个CPU的内存,单位为GB(默认:2)
  • env_command 如果需要,激活Python环境的自定义命令
  • condaenv 要激活的conda环境
  • venv_path 要激活的Python环境路径
  • max_array_size$ scontrol show config中的MaxArraySize值。如果任务数量超过此数字,它将分成多个数组作业(默认:1001)
  • max_array_launch_parallel 如果由于max_array_size而需要多个作业,是是否一次(并行)启动它们,还是顺序启动(默认:False
  • stagger_max_array_jobs 当 max_array_launch_parallel 为 True 时,此参数决定了在启动每个并行作业之间等待多少秒(默认:0
  • run_on_dependency_fail 即使依赖的作业失败,也启动执行(默认:False
  • randomize_start 在约 3 分钟的窗口内随机化作业中每个任务的启动时间。当例如大量访问 s3 桶时很有用。(默认:False
示例执行器
from datatrove.executor import SlurmPipelineExecutor
executor1 = SlurmPipelineExecutor(
    pipeline=[
        ...
    ],
    job_name="my_cool_job1",
    logging_dir="logs/job1",
    tasks=500,
    workers=100,  # omit to run all at once
    time="10:00:00",  # 10 hours
    partition="hopper-cpu"
)
executor2 = SlurmPipelineExecutor(
    pipeline=[
        ...
    ],
    job_name="my_cool_job2",
    logging_dir="logs/job2",
    tasks=1,
    time="5:00:00",  # 5 hours
    partition="hopper-cpu",
    depends=executor1  # this pipeline will only be launched after executor1 successfully completes
)
# executor1.run()
executor2.run() # this will actually launch executor1, as it is a dependency, so no need to launch it explicitly

日志记录

对于具有 logging_dir mylogspath/exp1 的管道,将创建以下目录结构

请参阅目录结构
└── mylogspath/exp1
    │── executor.json ⟵ json dump of the executor options and pipeline steps
    │── launch_script.slurm ⟵ the slurm config created and used to launch this job (if running on slurm)
    │── executor.pik ⟵ the slurm config created and used to launch this job (if running on slurm)
    │── ranks_to_run.json ⟵ list of tasks that are being run
    │── logs/
    │   └──[task_00000.log, task_00001.log, task_00002.log, ...] ⟵ individual logging files for each task
    │── completions/
    │   └──[00004, 00007, 00204, ...] ⟵ empty files marking a task as completed. Using when relaunching/resuming a job (only unfinished tasks will be run)
    │── stats/
    │   └──[00000.json, 00001.json, 00002.json, ...] ⟵ individual stats for each task (number of samples processed, filtered, removed, etc)
    └── stats.json ⟵ global stats from all tasks

颜色化

日志消息支持颜色化。默认情况下,颜色化将自动检测控制台消息并禁用日志文件(日志/task_XXXXX.log)。要显式启用或禁用颜色化,可以设置以下环境变量

  • DATATROVE_COLORIZE_LOGS "1" 向控制台日志消息添加 ANSI 颜色,"0" 禁用颜色化。
  • DATATROVE_COLORIZE_LOG_FILES 设置为 "1" 向保存到 logs/task_XXXXX.log 的日志消息添加 ANSI 颜色。

数据文件夹/路径

Datatrove 通过 fsspec 支持广泛的输入/输出源。

有几种方式可以提供 datatrove 块的路径(对于 input_folderlogging_dirdata_folder 等参数)

  • str:最简单的方法是传递单个字符串。例如:/home/user/mydirs3://mybucket/myinputdatahf://datasets/allenai/c4/en/

  • (str, fsspec filesystem instance):一个字符串路径和一个完全初始化的文件系统对象。例如:("s3://mybucket/myinputdata", S3FileSystem(client_kwargs={"endpoint_url": endpoint_uri}))

  • (str, dict):一个字符串路径和一个用于初始化 fs 的选项字典。例如(与上一行等效):("s3://mybucket/myinputdata", {"client_kwargs": {"endpoint_url": endpoint_uri}})

  • DataFolder:您可以直接初始化一个 DataFolder 对象并将其作为参数传递

在内部,这些参数组合由 get_datafolder 解析。

实用指南

读取数据

通常,管道将以一个 Reader 块开始。大多数读者需要一个 data_folder 参数——包含要读取的数据的文件夹的路径。

这些文件将被分配到每个任务。如果您有 N 个任务,则 rank 为 i(基于 0)的任务将处理文件 i、i+N、i+2N、i+3N、...

内部,每个读者读取数据并将其转换为字典,然后创建一个 Document 对象。

一些大多数读者都通用的选项

  • text_key 包含每个样本文本内容的字典键。默认:text
  • id_key 包含每个样本 id 的字典键。默认:id
  • default_metadata 一个字典,用于添加任何默认元数据值(例如它们的来源等)
  • recursive 是否在 data_folder 的子目录中递归查找文件
  • glob_pattern 使用此字段匹配特定文件。例如,glob_pattern="*/warc/*.warc.gz" 将匹配 data_folder 的每个子目录中的 warc/ 文件夹上的具有 .warc.gz 扩展名的文件
  • adapter 此函数接受从读者获得的原始字典,并返回具有 Document 字段名称的字典。如果您想覆盖此函数(_default_adapter),则可以这样做。
  • limit 只读取一定数量的样本。对于测试/调试很有用

提取文本

您可以使用 extractors 从原始 HTML 中提取文本内容。在 datatrove 中,最常用的提取器是 Trafilatura,它使用 trafilatura 库。

过滤数据

过滤器是任何数据处理管道中最重要的模块之一。Datatrove的过滤器模块接收一个Document对象,并返回一个布尔值(True保留文档,False移除文档)。被移除的样本不会继续到下一个管道阶段。您还可以通过将一个Writer传递给exclusion_writer参数,将移除的样本保存到磁盘。

保存数据

处理完数据后,您可能希望将其保存到某个地方。为此,您可以使用writer。Writers需要一个output_folder(数据应保存的路径)。您可以选择要使用的compression(默认:gzip)以及保存每个文件的名称。对于output_filename,会应用以下参数的模板

  • ${rank}将被当前任务的rank替换。注意,如果此标签不存在,不同的任务可能会尝试写入同一位置
  • ${id}将被样本id替换
  • 元数据:任何其他${tag}都将替换为相应的document.metadata['tag']

以下是一个根据其lang元数据字段对样本进行语言分离的示例

JsonlWriter(
    f"{MAIN_OUTPUT_PATH}/non_english/",
    output_filename="${language}/" + DUMP + "/${rank}.jsonl.gz",  # folder structure: language/dump/file
)

数据去重

有关去重示例,请参阅minhash_deduplication.pysentence_deduplication.pyexact_substrings.py

摘要统计

要获取您数据的摘要统计信息,您可以使用Stats模块。这些模块以分布式方式收集数据集的数据概要,提供了一个简单的方法。这是一个两步过程,您首先

  1. 对于每个分片,遍历文档并将统计信息收集到以下分组中summary(所有文档计为“summary”键)、fqdn(完全限定域名分组)、suffix(url路径的最后部分分组)或histogram(基于值的分组)。
  2. 将来自不同分片的统计信息合并到一个文件中。有关更多详细信息,请参阅summary_stats.py

每个结果统计信息都保存到一个单独的文件中,其结构如下:output_folder/{fqdn,suffix,summary,histogram}/{stat_name}/metric.json

每个这样的文件都是一个MetricStatsDict对象,您可以使用以下方式轻松加载它

from datatrove.pipeline.stats.summary_stats import MetricStatsDict
import json
stats = MetricStatsDict.from_dict(json.load(open("fqdn/length/metric.json")))

# E.g for total length of nytimes.com docs
stats["nytimes.com"].total

# Or for mean of cnn.com docs
stats["cnn.com"].mean

以下统计信息可用

  • contamination_stats.py:`word_contamination_{words[0]}:文档中单词污染的频率。
  • doc_stats.py:`length`:文档长度,`white_space_ratio`:空白字符比率,`non_alpha_digit_ratio`:非字母数字字符比率,`digit_ratio`:数字比率,`uppercase_ratio`:大写字母比率,`elipsis_ratio`:省略号字符比率,`punctuation_ratio`:标点符号比率
  • lang_stats.py:`fasttext_{language}`:使用fastText的文档语言
  • line_stats.py:`n_lines`:每篇文档的行数,`avg_line_length`:每篇文档的平均行长度,`long_line_ratio_words`:超过k个字符的行的比率,`short_line_ratio_chars`:超过k个字符的行的比率,`bullet_point_lines_ratio`:项目符号行的比率,`line_duplicates`:重复行的比率,`line_char_duplicates`:重复行中字符的比率
  • paragraph_stats.py:`n_paragraphs`:段落数,`avg_paragraph_length`:平均段落长度,`short_paragraph_ratio_{chars}`:短段落比率(< < chars>字符),`long_paragraph_ratio_{chars}`:长段落比率(> chars)
  • perplexity_stats.py:`ccnet_perplexity_{model_dataset}_{language}`:使用CCNet模型在{model}上在{dataset}中用{language}的文档的困惑度
  • sentence_stats.py: n_sentences: 句子数量, avg_sentence_length: 平均句子长度, short_sentence_ratio_{chars}: 短句比例(<{chars}个字符以内), long_sentence_ratio_{chars}: 长句比例(>{chars}个字符)
  • token_stats.py: token_count: 文档中的标记数量
  • word_stats.py: n_words: 文档中的单词数量, avg_word_length: 文档中单词的平均长度, avg_words_per_line: 文档中每行的平均单词数, short_word_ratio_{chars}: 短词比例(少于{chars}个字符的单词),stop_word_ratio: 停用词比例, long_word_ratio_{chars}: 长词比例(超过{chars}个字符的单词),type_token_ratio: 唯一单词数/标记数, capitalized_word_ratio: 大写单词比例, uppercase_word_ratio: 大写字母比例

自定义块

简单数据

您可以直接将Document的可迭代对象作为管道块传递,如下所示

from datatrove.data import Document
from datatrove.pipeline.filters import SamplerFilter
from datatrove.pipeline.writers import JsonlWriter

pipeline = [
    [
        Document(text="some data", id="0"),
        Document(text="some more data", id="1"),
        Document(text="even more data", id="2"),
    ],
    SamplerFilter(rate=0.5),
    JsonlWriter(
        output_folder="/my/output/path"
    )
]

请注意,然而,此可迭代对象不会被分片(如果您启动超过1个任务,它们都将获得整个可迭代对象)。这对于小型工作负载/测试通常很有用。

自定义函数

对于简单的处理,您只需传递一个具有以下签名的自定义函数

from datatrove.data import DocumentsPipeline

def uppercase_everything(data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
    """
        `data` is a generator of Document. You must also return a generator of Document (yield)
        You can optionally use `rank` and `world_size` for sharding
    """
    for document in data:
        document.text = document.text.upper()
        yield document

pipeline = [
    ...,
    uppercase_everything,
    ...
]

[!TIP] 由于导入,您可能遇到一些pickling问题。如果发生这种情况,只需将所需的所有导入移动到函数体内。

自定义块

您还可以定义一个继承自PipelineStep或其子类的完整块

from datatrove.pipeline.base import PipelineStep
from datatrove.data import DocumentsPipeline
from datatrove.io import DataFolderLike, get_datafolder


class UppercaserBlock(PipelineStep):
    def __init__(self, some_folder: DataFolderLike, some_param: int = 5):
        super().__init__()
        # you can take whatever parameters you need and save them here
        self.some_param = some_param
        # to load datafolders use get_datafolder()
        self.some_folder = get_datafolder(some_folder)

    def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
        # you could also load data from the `some_folder`:
        for filepath in self.some_folder.get_shard(rank, world_size): # it also accepts a glob pattern, among other things
            with self.some_folder.open(filepath, "rt") as f:
                # do something
                ...
                yield doc

        #
        # OR process data from previous blocks (`data`)
        #

        for doc in data:
            with self.track_time():
                # you can wrap the main processing code in `track_time` to know how much each document took to process
                nr_uppercase_letters = sum(map(lambda c: c.isupper(), doc.text))
                # you can also keep track of stats per document using stat_update
                self.stat_update("og_upper_letters", value=nr_uppercase_letters)
                doc.text = doc.text.upper()
            # make sure you keep the yield outside the track_time block, or it will affect the time calculation
            yield doc

        #
        # OR save data to disk
        #

        with self.some_folder.open("myoutput", "wt") as f:
            for doc in data:
                f.write(doc...)
pipeline = [
    ...,
    UppercaserBlock("somepath"),
    ...
]

您还可以从BaseExtractorBaseFilterBaseReader/BaseDiskReaderDiskWriter继承。

贡献

git clone git@github.com:huggingface/datatrove.git && cd datatrove
pip install -e ".[dev]"

安装pre-commit代码风格钩子

pre-commit install

运行测试

pytest -sv ./tests/

引用

@misc{penedo2024datatrove,
  author = {Penedo, Guilherme and Kydlíček, Hynek and Cappelli, Alessandro and Sasko, Mario and Wolf, Thomas},
  title = {DataTrove: large scale data processing},
  year = {2024},
  publisher = {GitHub},
  journal = {GitHub repository},
  url = {https://github.com/huggingface/datatrove}
}

项目详情


下载文件

下载适用于您的平台的文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。

源分布

datatrove-0.3.0.tar.gz (16.6 MB 查看散列)

上传时间:

构建分布

datatrove-0.3.0-py3-none-any.whl (16.7 MB 查看散列)

上传时间: Python 3

由以下支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面