HuggingFace库用于处理和筛选大量网络数据
项目描述
DataTrove
DataTrove是一个用于在非常大规模上处理、过滤和去重文本数据的库。它提供了一套预构建的常用处理块,并具有一个框架,可以轻松添加自定义功能。
数据宝库处理管道是平台无关的,可以在本地或Slurm集群上直接运行。其(相对)低内存使用量和多步骤设计使其非常适合大型工作负载,例如处理大型语言模型的训练数据。
本地、远程和其他文件系统通过fsspec支持。
目录
安装
pip install datatrove[FLAVOUR]
可用的版本(例如,[processing,s3])
all
安装所有内容:pip install datatrove[all]
io
依赖项,用于读取warc/arc/wet
文件和arrow/parquet格式:pip install datatrove[io]
processing
依赖项,用于文本提取、过滤和分词:pip install datatrove[processing]
s3
支持s3:pip install datatrove[s3]
cli
用于命令行工具:pip install datatrove[cli]
快速入门示例
您可以查看以下示例
- fineweb.py完整重现FineWeb数据集
- process_common_crawl_dump.py完整管道,用于读取Commoncrawl warc文件,提取其文本内容,过滤并保存结果数据到s3。在Slurm上运行
- tokenize_c4.py直接从huggingface的hub读取数据,使用
gpt2
分词器对C4数据集的英文部分进行分词 - minhash_deduplication.py完整管道,用于运行文本数据的minhash去重
- sentence_deduplication.py示例,用于运行句子级别的精确去重
- exact_substrings.py示例,用于运行ExactSubstr(需要此存储库)
术语
pipeline
:要执行的加工步骤列表(读取数据、过滤、写入磁盘等)executor
:在给定的执行环境中运行特定的管道(Slurm、多核机器等)job
:在给定的执行器上执行管道task
:一个job
由多个task
组成,这些用于并行执行,通常是通过让每个task
处理数据的一个shard
来实现的。Datatrove会跟踪哪些任务已完成,并且当您重新启动时,只有未完成的任务会运行。file
:单个输入文件(.json、.csv等)。
[!TIP] 注意,每个文件将由一个单独的
task
处理。Datatrove不会自动将文件分割成多个部分,因此为了完全并行化,您应该有多个中等大小的文件,而不是单个大文件。
shard
:一组输入数据(通常是多个file
的一组),将被分配给特定的task
。每个task
将处理从完整输入文件列表中不同非重叠的shard
的数据worker
:执行单个任务的计算资源,例如,如果您有50个CPU核心,则可以使用workers=50
运行LocalPipelineExecutor,以同时执行50个tasks
(每个CPU一个)。一旦worker
完成一个task
,它将开始处理另一个等待的task
[!TIP] 您的
tasks
数量控制了您可以并行化的程度以及每个单独的处理单元将花费多少时间。如果您有少量任务(并且因此每个任务都必须处理大量文件),并且它们失败,您将不得不从头开始,而如果您有更多的小任务,每个失败的任务重新运行所需的时间将大大减少。
[!CAUTION] 如果您的
tasks
>files
,某些任务将不会处理任何数据,因此通常没有必要将tasks
设置为一个比`files`大的数字。
示例
在具有workers
的机器上运行处理10000
个files
的job
,有100
个CPU核心。如果我们选择使用1000
个任务,每个任务将处理10个文件的shard
。`workers=100`意味着我们可以同时处理100
个任务。
管道
数据宝库文档
每个管道块以datatrove中的Document
格式处理数据
text
每个样本的实际文本内容id
该样本的唯一ID(字符串)metadata
一个字典,其中可以存储任何附加信息
管道块类型
每个管道块接受一个Document
生成器作为输入,并返回另一个Document
生成器。
- readers从不同格式读取数据并产生
Document
- writers将
Document
以不同格式保存到磁盘/云中 - extractors从原始格式(如网页HTML)中提取文本内容
- filters根据特定规则/标准过滤掉(删除)一些
Document
- stats收集数据集统计信息的块
- tokens将数据分词或计数单词的块
- dedup用于去重的块
完整管道
管道定义为管道块的列表。例如,以下管道将从磁盘读取数据,随机过滤掉一些文档,并将它们写回磁盘
from datatrove.pipeline.readers import CSVReader
from datatrove.pipeline.filters import SamplerFilter
from datatrove.pipeline.writers import JsonlWriter
pipeline = [
CSVReader(
data_folder="/my/input/path"
),
SamplerFilter(rate=0.5),
JsonlWriter(
output_folder="/my/output/path"
)
]
执行器
管道是平台无关的,这意味着相同的管道可以在不同的执行环境中无缝运行,而无需对其步骤进行任何更改。每个环境都有自己的PipelineExecutor。
所有执行器共有的某些选项
pipeline
应运行的管道步骤的列表logging_dir
应保存日志文件、统计信息和更多内容的文件夹。不要为不同的管道/作业重复使用文件夹,因为这将覆盖您的统计数据、日志和完成情况。skip_completed
(布尔值,默认为True
)datatrove跟踪完成的任务,以便在您重新启动作业时可以跳过。将此设置为False
以禁用此行为randomize_start_duration
(整数,默认为0
)延迟每个任务开始的最大秒数,以防止所有任务同时启动并可能过载系统。
调用执行器的run
方法以执行其管道。
[!提示] Datatrove通过在
${logging_dir}/completions
文件夹中创建一个标记(一个空文件)来跟踪哪些任务成功完成。一旦作业完成,如果其中一些任务失败了,您可以简单地重新启动完全相同的执行器,datatrove将检查并只运行之前未完成的任务。
[!注意] 如果因为一些任务失败而重新启动管道,请不要更改任务的总数,因为这会影响输入文件/分片的分配。
LocalPipelineExecutor
此执行器将在本地机器上启动一个管道。选项
tasks
要运行的总体任务数workers
同时运行的任务数。如果设置为-1
,则没有限制。任何大于1
的值将使用多进程来执行任务。start_method
用于创建多进程Pool的方法。如果workers
设置为1,则忽略。
示例执行器
from datatrove.executor import LocalPipelineExecutor
executor = LocalPipelineExecutor(
pipeline=[
...
],
logging_dir="logs/",
tasks=10,
workers=5
)
executor.run()
多节点并行处理
您可以通过使用local_tasks
和local_rank_offset
来使用不同的节点/机器处理总任务的不同部分。对于每个节点/实例/机器,启动时使用以下选项
tasks
要在所有机器上执行的总任务数。此值必须在每台机器上相同,否则输入文件分配可能会重叠! 示例:500local_tasks
在这台特定机器上将要执行的总任务数。请注意,您可以为每台机器使用不同的值。示例:100local_rank_offset
在这台机器上要执行的第一个任务的排名。如果您是在启动作业的第三台机器上,并且前两台机器分别运行了250和150个作业,则当前机器的此值为400
。
要获取最终的合并统计信息,您必须手动在包含所有机器的统计信息的路径上调用merge_stats
脚本。
SlurmPipelineExecutor
此执行器将在slurm集群上启动一个管道,使用slurm作业数组来分组和管理任务。选项
tasks
要运行的总任务数。必需time
slurm时间限制字符串。必需partition
slurm分区。必需workers
同时运行的任务数。如果设置为-1
,则没有限制。Slurm将同时运行workers
个任务。(默认:-1
)job_name
slurm作业名称(默认:"data_processing")depends
另一个SlurmPipelineExecutor实例,它将成为此管道的依赖项(当前管道仅在依赖的管道成功完成后才开始执行)sbatch_args
包含您想要传递给sbatch的任何其他参数的字典slurm_logs_folder
保存slurm日志文件的位置。如果使用本地路径作为logging_dir
,它们将保存在logging_dir/slurm_logs
中。否则,它们将保存在当前目录的子目录中。
其他选项
cpus_per_task
每个任务分配的CPU数(默认:1
)qos
slurm QoS(默认:"normal")mem_per_cpu_gb
每个CPU的内存,单位为GB(默认:2)env_command
如果需要,激活Python环境的自定义命令condaenv
要激活的conda环境venv_path
要激活的Python环境路径max_array_size
在$ scontrol show config
中的MaxArraySize值。如果任务数量超过此数字,它将分成多个数组作业(默认:1001)max_array_launch_parallel
如果由于max_array_size而需要多个作业,是是否一次(并行)启动它们,还是顺序启动(默认:False
)stagger_max_array_jobs
当 max_array_launch_parallel 为 True 时,此参数决定了在启动每个并行作业之间等待多少秒(默认:0
)run_on_dependency_fail
即使依赖的作业失败,也启动执行(默认:False
)randomize_start
在约 3 分钟的窗口内随机化作业中每个任务的启动时间。当例如大量访问 s3 桶时很有用。(默认:False
)
示例执行器
from datatrove.executor import SlurmPipelineExecutor
executor1 = SlurmPipelineExecutor(
pipeline=[
...
],
job_name="my_cool_job1",
logging_dir="logs/job1",
tasks=500,
workers=100, # omit to run all at once
time="10:00:00", # 10 hours
partition="hopper-cpu"
)
executor2 = SlurmPipelineExecutor(
pipeline=[
...
],
job_name="my_cool_job2",
logging_dir="logs/job2",
tasks=1,
time="5:00:00", # 5 hours
partition="hopper-cpu",
depends=executor1 # this pipeline will only be launched after executor1 successfully completes
)
# executor1.run()
executor2.run() # this will actually launch executor1, as it is a dependency, so no need to launch it explicitly
日志记录
对于具有 logging_dir
mylogspath/exp1 的管道,将创建以下目录结构
请参阅目录结构
└── mylogspath/exp1
│── executor.json ⟵ json dump of the executor options and pipeline steps
│── launch_script.slurm ⟵ the slurm config created and used to launch this job (if running on slurm)
│── executor.pik ⟵ the slurm config created and used to launch this job (if running on slurm)
│── ranks_to_run.json ⟵ list of tasks that are being run
│── logs/
│ └──[task_00000.log, task_00001.log, task_00002.log, ...] ⟵ individual logging files for each task
│── completions/
│ └──[00004, 00007, 00204, ...] ⟵ empty files marking a task as completed. Using when relaunching/resuming a job (only unfinished tasks will be run)
│── stats/
│ └──[00000.json, 00001.json, 00002.json, ...] ⟵ individual stats for each task (number of samples processed, filtered, removed, etc)
└── stats.json ⟵ global stats from all tasks
颜色化
日志消息支持颜色化。默认情况下,颜色化将自动检测控制台消息并禁用日志文件(日志/task_XXXXX.log)。要显式启用或禁用颜色化,可以设置以下环境变量
DATATROVE_COLORIZE_LOGS
"1" 向控制台日志消息添加 ANSI 颜色,"0" 禁用颜色化。DATATROVE_COLORIZE_LOG_FILES
设置为 "1" 向保存到 logs/task_XXXXX.log 的日志消息添加 ANSI 颜色。
数据文件夹/路径
Datatrove 通过 fsspec 支持广泛的输入/输出源。
有几种方式可以提供 datatrove 块的路径(对于 input_folder
、logging_dir
、data_folder
等参数)
-
str
:最简单的方法是传递单个字符串。例如:/home/user/mydir
、s3://mybucket/myinputdata
、hf://datasets/allenai/c4/en/
-
(str, fsspec filesystem instance)
:一个字符串路径和一个完全初始化的文件系统对象。例如:("s3://mybucket/myinputdata", S3FileSystem(client_kwargs={"endpoint_url": endpoint_uri}))
-
(str, dict)
:一个字符串路径和一个用于初始化 fs 的选项字典。例如(与上一行等效):("s3://mybucket/myinputdata", {"client_kwargs": {"endpoint_url": endpoint_uri}})
-
DataFolder
:您可以直接初始化一个 DataFolder 对象并将其作为参数传递
在内部,这些参数组合由 get_datafolder
解析。
实用指南
读取数据
通常,管道将以一个 Reader 块开始。大多数读者需要一个 data_folder
参数——包含要读取的数据的文件夹的路径。
这些文件将被分配到每个任务。如果您有 N
个任务,则 rank 为 i
(基于 0)的任务将处理文件 i、i+N、i+2N、i+3N、...
。
内部,每个读者读取数据并将其转换为字典,然后创建一个 Document
对象。
一些大多数读者都通用的选项
text_key
包含每个样本文本内容的字典键。默认:text
id_key
包含每个样本 id 的字典键。默认:id
default_metadata
一个字典,用于添加任何默认元数据值(例如它们的来源等)recursive
是否在data_folder
的子目录中递归查找文件glob_pattern
使用此字段匹配特定文件。例如,glob_pattern="*/warc/*.warc.gz"
将匹配data_folder
的每个子目录中的warc/
文件夹上的具有.warc.gz
扩展名的文件adapter
此函数接受从读者获得的原始字典,并返回具有Document
字段名称的字典。如果您想覆盖此函数(_default_adapter),则可以这样做。limit
只读取一定数量的样本。对于测试/调试很有用
提取文本
您可以使用 extractors 从原始 HTML 中提取文本内容。在 datatrove 中,最常用的提取器是 Trafilatura,它使用 trafilatura 库。
过滤数据
过滤器是任何数据处理管道中最重要的模块之一。Datatrove的过滤器模块接收一个Document
对象,并返回一个布尔值(True
保留文档,False
移除文档)。被移除的样本不会继续到下一个管道阶段。您还可以通过将一个Writer传递给exclusion_writer
参数,将移除的样本保存到磁盘。
保存数据
处理完数据后,您可能希望将其保存到某个地方。为此,您可以使用writer。Writers需要一个output_folder
(数据应保存的路径)。您可以选择要使用的compression
(默认:gzip
)以及保存每个文件的名称。对于output_filename
,会应用以下参数的模板
${rank}
将被当前任务的rank替换。注意,如果此标签不存在,不同的任务可能会尝试写入同一位置${id}
将被样本id替换- 元数据:任何其他
${tag}
都将替换为相应的document.metadata['tag']
值
以下是一个根据其lang
元数据字段对样本进行语言分离的示例
JsonlWriter(
f"{MAIN_OUTPUT_PATH}/non_english/",
output_filename="${language}/" + DUMP + "/${rank}.jsonl.gz", # folder structure: language/dump/file
)
数据去重
有关去重示例,请参阅minhash_deduplication.py、sentence_deduplication.py和exact_substrings.py。
摘要统计
要获取您数据的摘要统计信息,您可以使用Stats模块。这些模块以分布式方式收集数据集的数据概要,提供了一个简单的方法。这是一个两步过程,您首先
- 对于每个分片,遍历文档并将统计信息收集到以下分组中
summary
(所有文档计为“summary”键)、fqdn
(完全限定域名分组)、suffix
(url路径的最后部分分组)或histogram
(基于值的分组)。 - 将来自不同分片的统计信息合并到一个文件中。有关更多详细信息,请参阅summary_stats.py。
每个结果统计信息都保存到一个单独的文件中,其结构如下:output_folder/{fqdn,suffix,summary,histogram}/{stat_name}/metric.json
每个这样的文件都是一个MetricStatsDict
对象,您可以使用以下方式轻松加载它
from datatrove.pipeline.stats.summary_stats import MetricStatsDict
import json
stats = MetricStatsDict.from_dict(json.load(open("fqdn/length/metric.json")))
# E.g for total length of nytimes.com docs
stats["nytimes.com"].total
# Or for mean of cnn.com docs
stats["cnn.com"].mean
以下统计信息可用
contamination_stats.py
:`word_contamination_{words[0]}:文档中单词污染的频率。doc_stats.py
:`length`:文档长度,`white_space_ratio`:空白字符比率,`non_alpha_digit_ratio`:非字母数字字符比率,`digit_ratio`:数字比率,`uppercase_ratio`:大写字母比率,`elipsis_ratio`:省略号字符比率,`punctuation_ratio`:标点符号比率lang_stats.py
:`fasttext_{language}`:使用fastText的文档语言line_stats.py
:`n_lines`:每篇文档的行数,`avg_line_length`:每篇文档的平均行长度,`long_line_ratio_words`:超过k个字符的行的比率,`short_line_ratio_chars`:超过k个字符的行的比率,`bullet_point_lines_ratio`:项目符号行的比率,`line_duplicates`:重复行的比率,`line_char_duplicates`:重复行中字符的比率paragraph_stats.py
:`n_paragraphs`:段落数,`avg_paragraph_length`:平均段落长度,`short_paragraph_ratio_{chars}`:短段落比率(< < chars>字符),`long_paragraph_ratio_{chars}`:长段落比率(> chars)perplexity_stats.py
:`ccnet_perplexity_{model_dataset}_{language}`:使用CCNet模型在{model}上在{dataset}中用{language}的文档的困惑度sentence_stats.py
:n_sentences
: 句子数量,avg_sentence_length
: 平均句子长度,short_sentence_ratio_{chars}
: 短句比例(<{chars}个字符以内),long_sentence_ratio_{chars}
: 长句比例(>{chars}个字符)token_stats.py
:token_count
: 文档中的标记数量word_stats.py
:n_words
: 文档中的单词数量,avg_word_length
: 文档中单词的平均长度,avg_words_per_line
: 文档中每行的平均单词数,short_word_ratio_{chars}
: 短词比例(少于{chars}个字符的单词),stop_word_ratio
: 停用词比例,long_word_ratio_{chars}
: 长词比例(超过{chars}个字符的单词),type_token_ratio
: 唯一单词数/标记数,capitalized_word_ratio
: 大写单词比例,uppercase_word_ratio
: 大写字母比例
自定义块
简单数据
您可以直接将Document
的可迭代对象作为管道块传递,如下所示
from datatrove.data import Document
from datatrove.pipeline.filters import SamplerFilter
from datatrove.pipeline.writers import JsonlWriter
pipeline = [
[
Document(text="some data", id="0"),
Document(text="some more data", id="1"),
Document(text="even more data", id="2"),
],
SamplerFilter(rate=0.5),
JsonlWriter(
output_folder="/my/output/path"
)
]
请注意,然而,此可迭代对象不会被分片(如果您启动超过1个任务,它们都将获得整个可迭代对象)。这对于小型工作负载/测试通常很有用。
自定义函数
对于简单的处理,您只需传递一个具有以下签名的自定义函数
from datatrove.data import DocumentsPipeline
def uppercase_everything(data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
"""
`data` is a generator of Document. You must also return a generator of Document (yield)
You can optionally use `rank` and `world_size` for sharding
"""
for document in data:
document.text = document.text.upper()
yield document
pipeline = [
...,
uppercase_everything,
...
]
[!TIP] 由于导入,您可能遇到一些pickling问题。如果发生这种情况,只需将所需的所有导入移动到函数体内。
自定义块
您还可以定义一个继承自PipelineStep
或其子类的完整块
from datatrove.pipeline.base import PipelineStep
from datatrove.data import DocumentsPipeline
from datatrove.io import DataFolderLike, get_datafolder
class UppercaserBlock(PipelineStep):
def __init__(self, some_folder: DataFolderLike, some_param: int = 5):
super().__init__()
# you can take whatever parameters you need and save them here
self.some_param = some_param
# to load datafolders use get_datafolder()
self.some_folder = get_datafolder(some_folder)
def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
# you could also load data from the `some_folder`:
for filepath in self.some_folder.get_shard(rank, world_size): # it also accepts a glob pattern, among other things
with self.some_folder.open(filepath, "rt") as f:
# do something
...
yield doc
#
# OR process data from previous blocks (`data`)
#
for doc in data:
with self.track_time():
# you can wrap the main processing code in `track_time` to know how much each document took to process
nr_uppercase_letters = sum(map(lambda c: c.isupper(), doc.text))
# you can also keep track of stats per document using stat_update
self.stat_update("og_upper_letters", value=nr_uppercase_letters)
doc.text = doc.text.upper()
# make sure you keep the yield outside the track_time block, or it will affect the time calculation
yield doc
#
# OR save data to disk
#
with self.some_folder.open("myoutput", "wt") as f:
for doc in data:
f.write(doc...)
pipeline = [
...,
UppercaserBlock("somepath"),
...
]
您还可以从BaseExtractor
、BaseFilter
、BaseReader
/BaseDiskReader
或DiskWriter
继承。
贡献
git clone git@github.com:huggingface/datatrove.git && cd datatrove
pip install -e ".[dev]"
安装pre-commit代码风格钩子
pre-commit install
运行测试
pytest -sv ./tests/
引用
@misc{penedo2024datatrove,
author = {Penedo, Guilherme and Kydlíček, Hynek and Cappelli, Alessandro and Sasko, Mario and Wolf, Thomas},
title = {DataTrove: large scale data processing},
year = {2024},
publisher = {GitHub},
journal = {GitHub repository},
url = {https://github.com/huggingface/datatrove}
}
项目详情
下载文件
下载适用于您的平台的文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。
源分布
构建分布
datatrove-0.3.0.tar.gz的散列
算法 | 散列摘要 | |
---|---|---|
SHA256 | 9d59da521465f38f38ee726d165227802aef02f64c893f1094c81ff5eb61187b |
|
MD5 | bed1c6d6d14d1e2a3d0ffa571b247302 |
|
BLAKE2b-256 | 337acd701ad207aba79f4672d3f7d89588c02627b0849d738c4b2e476802ef54 |
datatrove-0.3.0-py3-none-any.whl的散列
算法 | 散列摘要 | |
---|---|---|
SHA256 | e0606cdf799fa7adecb18fa9a643ded49a20c16911a569bc7d2d103df6e86a34 |
|
MD5 | ee4c0d09e1325e3cfb86e75c91bdbc41 |
|
BLAKE2b-256 | 7ed82a29e01529d4fdee5bddb38a637dbbc0704a1a94af12a0d82a98d6e39794 |