跳转到主要内容

数据集的快速格式。

项目描述

PyPI

Granular

Granular是一种数据集格式,从简单到复杂。每个Granular数据集是袋文件格式中链接文件的集合,这是一个可寻址容器结构。Granular附带高性能数据加载器。

pip install granular

特性

  • 🚀 性能:本地和云上具有高读写吞吐量。
  • 🔎 寻址:通过数据点索引从磁盘快速随机访问。
  • 🎞️ 序列:数据点可以包含可寻址的模态性列表。
  • 🤸 灵活性:用户提供编码器和解码器;示例可用。
  • 👥 分片:将数据集存储到分片中以分割处理工作负载。
  • 🔄 确定性:每个epoch的确定性全局洗牌可恢复。
  • 正确性:具有高代码覆盖率的单元测试套件。

快速入门

import pathlib
import granular
import numpy as np

directory = './dataset'

编写

spec = {
    'foo': 'int',      # integer
    'bar': 'utf8[]',   # *list* of strings
    'baz': 'msgpack',  # packed structure
    'abc': 'jpg',      # image
    'xyz': 'array',    # array
}

with granular.DatasetWriter(directory, spec, granular.encoders) as writer:
  for i in range(10):
    datapoint = {
        'foo': i,
        'bar': ['hello'] * i,
        'baz': {'a': 1},
        'abc': np.zeros((60, 80, 3), np.uint8),
        'xyz': np.arange(0, 1 + i, np.float32),
    }
    writer.append(datapoint)

print(list(directory.glob('*')))
# ['spec.json', 'refs.bag', 'foo.bag', 'bar.bag', 'baz.bag', 'abc.bag', 'xyz.bag']

读取

with granular.DatasetReader(directory, granular.decoders) as reader:
  print(reader.spec)    # {'foo': 'int', 'bar': 'utf8[]', 'baz': 'msgpack', ...}
  print(reader.size)    # Dataset size in bytes
  print(len(reader))    # Number of datapoints

  datapoint = reader[2]
  print(datapoint['foo'])        # 2
  print(datapoint['bar'])        # ['hello', 'hello']
  print(datapoint['abc'].shape)  # (60, 80, 3)

加载

def preproc(datapoint, seed):
  return {'image': datapoint['abc'], 'label': datapoint['foo']}

loader = granular.Loader(
    reader, batch=8, fns=[preproc], shuffle=True, workers=64, seed=0)

print(loader.spec)
# {'image': (np.uint8, (60, 80, 3)), 'label': (np.int64, ())}

dataset = iter(loader)
for _ in range(100):
  batch = next(dataset)
  print(batch['image'].shape)  # (8, 60, 80, 3)

高级

文件系统

通过提供不同的Path实现支持自定义文件系统。例如,在Google Cloud上,您可以使用elements中的Path,该Path针对数据加载吞吐量进行了优化

import elements  # pip install elements

directory = elements.Path('gs://<bucket>/dataset')

reader = granular.DatasetReader(directory, ...)
wrtier = granular.DatasetWriter(directory, ...)

格式

Granular不会对用户施加序列化解决方案。只要提供编码器和解码器函数,任何字符串都可以用作spec中的类型,例如

import msgpack

encoders = {
    'bytes': lambda x: x,
    'utf8': lambda x: x.encode('utf-8'),
    'msgpack': msgpack.packb,
}

decoders = {
    'bytes': lambda x: x,
    'utf8': lambda x: x.decode('utf-8'),
    'msgpack': msgpack.unpackb,
}

formats.py中提供了常见的编码和解码函数示例。这些支持Numpy数组、图像、视频等。它们可以用作granular.encodersgranular.decoders

恢复

数据加载器完全确定性和可恢复,只需要给定的步长和种子整数。为此,请保存由loader.save()返回的状态字典,并在存储检查点时将其传递给loader.load()

state = loader.save()
print(state)  # {'step': 100, 'seed': 0}
loader.load(state)

缓存

检索数据点首先需要从 refs.bag 中读取以找到其他数据包文件的引用,然后从每个模态数据包文件中读取。如果某些模态足够小,可以通过设置 cache_keys 将它们缓存在 RAM 中。通常建议缓存 refs 以及所有小型模态,例如整数标签。

此外,从数据包文件中读取需要两个读取操作。第一个操作查看文件末尾的索引表以定位记录的字节偏移量。第二个操作检索实际记录。通常建议缓存所有数据包文件的索引。这些表格总共占用 8 * len(spec) * len(reader) 字节的 RAM。

reader = granular.DatasetReader(
    directory, decoders,
    cache_index=True,            # Cache index tables of all bag files in memory.
    cache_keys=('refs', 'foo'),  # Fully cache refs.bag and foo.bag in memory.
)

掩码

可以只加载数据点的一部分键的值。为此,除了数据点索引外,还需要提供掩码。这可以减少读取请求,仅针对实际需要的数据包文件。

print(reader.spec)  # {'foo': 'int', 'bar': 'utf8', 'baz': 'array'}

mask = {'foo': True, 'baz': True}
datapoint = reader[index, mask]
print('foo' in datapoint)  # True
print('bar' in datapoint)  # False
print('baz' in datapoint)  # True

序列

每个数据集是一系列数据点。每个数据点是一个字典,键为字符串,值为单个字节值或字节值列表。要使用序列值,请在 spec 中的类型后添加 [] 后缀。

spec = {
    'title': 'utf8',
    'frames': 'jpg[]',
    'captions': 'utf8[]',
    'times': 'int[]',
}

序列字段不仅可以存储可变长度的值,还可以通过掩码允许读取值范围,而无需从磁盘加载整个序列。

available = reader.available(index)
print(available)
# {'title': True, 'frames': range(54), 'captions': range(7), 'times': range(7)}

mask = {
    'title': True,            # Read the title modality
    'frames': range(32, 42),  # Read a range of 10 frames.
    'captions': range(0, 7),  # Read all captions.
    'times': True,            # Another way to read the full list.
}
datapoint = reader[index, mask]
print(len(datapoint['frames']))  # 10

范围通过单个读取操作加载,对应于云基础设施上的单个下载请求。

分片

大型数据集可以作为较小数据集的列表存储,以便于并行处理,通过在单独的进程或机器上单独处理每个较小的数据集来实现。分片长度指定每个分片的数据点数量。一个很好的默认值是将数据点数量设置为每个分片大约为 10 Gb。

# Write into a sharded dataset.
writer = granular.ShardedDatasetWriter(directory, spec, encoders, shardlen=10000)

# Read from a sharded dataset.
reader = granular.ShardedDatasetReader(directory, decoders)

分片数据集的文件结构是每个分片一个文件夹,文件夹名称为分片编号。每个分片本身也是一个数据集,也可以使用非分片 granular.DatasetReader 读取。

$ tree ./directory
.
├── 000000  ├── spec.json
│  ├── refs.bag
│  ├── foo.bag
│  ├── bar.bag
│  └── baz.bag
├── 000001  ├── spec.json
│  ├── refs.bag
│  ├── foo.bag
│  ├── bar.bag
│  └── baz.bag
└── ...

当使用较少的工人数处理具有大量分片的数据集时,指定 shardstartshardstep 以便每个工作器读取和写入其专有的分片子集。

# Write into a sharded dataset.
writer = granular.ShardedDatasetWriter(
    directory, spec, encoders, shardlen=10000,
    shardstart=worker_id,   # Start writing at this shard.
    shardstep=num_workers,  # Afterwards, jump this many shards ahead.
)

# Read from a sharded dataset.
reader = granular.ShardedDatasetReader(
    directory, decoders,
    shardstart=worker_id,   # Start reading at this shard.
    shardstep=num_workers,  # Afterwards, jump this many shards ahead.
)

问题

如果您有任何问题,请 提交一个问题

项目详情


下载文件

下载适用于您的平台的文件。如果您不确定选择哪个,请了解更多关于 安装软件包 的信息。

源分发

granular-0.17.3.tar.gz (20.3 kB 查看散列值)

上传时间

由以下组织支持