跳转到主要内容

Alluxio Fsspec提供Alluxio文件系统规范实现。

项目描述

Alluxio 文件系统

此快速入门展示了如何使用FSSpec接口连接到Alluxio。有关期望内容的更多信息,请阅读博客使用Ray和Alluxio加速大规模机器学习训练中的数据加载

依赖关系

运行中的Alluxio服务器,具有ETCD成员服务

Alluxio版本 >= 309

使用示例配置启动Alluxio集群

# only one master, one worker are running in this example
alluxio.master.hostname=localhost
alluxio.worker.hostname=localhost

# Critical properties for this example
# UFS address (e.g., the src of data to cache), change it to your bucket
alluxio.dora.client.ufs.root=s3://example_bucket/datasets/
# storage dir
alluxio.worker.page.store.dirs=/tmp/page_ufs
# size of storage dir
alluxio.worker.page.store.sizes=10GB
# use etcd to keep consistent hashing ring
alluxio.worker.membership.manager.type=ETCD
# default etcd endpoint
alluxio.etcd.endpoints=http://localhost:2379
# number of vnodes per worker on the ring
alluxio.user.consistent.hash.virtual.node.count.per.worker=5

# Other optional settings, good to have
alluxio.job.batch.size=200
alluxio.master.journal.type=NOOP
alluxio.master.scheduler.initial.wait.time=10s
alluxio.network.netty.heartbeat.timeout=5min
alluxio.underfs.io.threads=50

Python依赖

Python版本在[3.8, 3.9, 3.10]范围内,ray >= 2.8.2,fsspec发布于2023.6之后

安装底层数据存储的fsspec实现

Alluxio fsspec作为现有底层数据湖存储连接的缓存。需要安装对应于底层数据湖存储的fsspec实现。在下面的Alluxio配置示例中,Amazon S3是数据湖存储,数据集从中读取。

要连接到现有的底层存储,有两个要求

  • 安装底层存储fsspec
  • 设置底层数据湖存储的凭证

示例:将S3作为底层数据湖存储安装第三方S3 fsspec

pip install s3fs

安装alluxiofs

直接安装最新发布的alluxiofs

pip install alluxiofs

[可选] 从源代码安装

git clone git@github.com:fsspec/alluxiofs.git
cd alluxiofs && python3 setup.py bdist_wheel && \
     pip3 install dist/alluxiofs-<alluxiofs_version>-py3-none-any.whl

运行Hello World示例

加载数据集

使用Alluxio CLI load命令加载数据集

bin/alluxio job load --path s3://example_bucket/datasets/ --submit

这将触发一个异步加载作业,指定作业ID。您可以等待加载完成或使用以下命令检查加载进度

bin/alluxio job load --path s3://example_bucket/datasets/ --progress

创建AlluxioFS(由S3支持)

创建使用S3中数据的Alluxio文件系统

import fsspec
from alluxiofs import AlluxioFileSystem

# Register Alluxio to fsspec
fsspec.register_implementation("alluxiofs", AlluxioFileSystem, clobber=True)

# Create Alluxio filesystem
alluxio_fs = fsspec.filesystem("alluxiofs", etcd_hosts="localhost", etcd_port=2379, target_protocol="s3")

运行Alluxio文件系统操作

类似于fsspec示例alluxiofs示例。请注意,所有读取操作只有在父文件夹已加载到Alluxio中时才能成功。

# list files
contents = alluxio_fs.ls("s3://apc999/datasets/nyc-taxi-csv/green-tripdata/", detail=True)

# Read files
with alluxio_fs.open("s3://apc999/datasets/nyc-taxi-csv/green-tripdata/green_tripdata_2021-01.csv", "rb") as f:
    data = f.read()

使用Ray运行示例

import fsspec
import ray
from alluxiofs import AlluxioFileSystem

# Register the Alluxio fsspec implementation
fsspec.register_implementation("alluxiofs", AlluxioFileSystem, clobber=True)
alluxio_fs = fsspec.filesystem(
  "alluxiofs", etcd_hosts="localhost", target_protocol="s3"
)

# Pass the initialized Alluxio filesystem to Ray and read the NYC taxi ride data set
ds = ray.data.read_csv("s3://example_bucket/datasets/example.csv", filesystem=alluxio_fs)

# Get a count of the number of records in the single CSV file
ds.count()

# Display the schema derived from the CSV file header record
ds.schema()

# Display the header record
ds.take(1)

# Display the first data record
ds.take(2)

# Read multiple CSV files:
ds2 = ray.data.read_csv("s3://apc999/datasets/csv_dir/", filesystem=alluxio_fs)

# Get a count of the number of records in the twelve CSV files
ds2.count()

# End of Python example

启用alluxiocommon增强模块

alluxiocommon包是基于PyO3 rust绑定的alluxiofs的本地增强模块。目前,它通过向alluxio发出多线程请求来增强大读取(从alluxio的多页读取)。

要启用它,首先安装alluxiocommon包

pip install alluxiocommon

当启动Alluxio fsspec实例时,添加一个额外的选项标志

alluxio_options = {"alluxio.common.extension.enable" : "True"}
alluxio_fs = fsspec.filesystem(
  "alluxiofs", etcd_hosts="localhost", target_protocol="s3",
  options=alluxio_options
)

使用Pyarrow运行示例

import fsspec
from alluxiofs import AlluxioFileSystem

# Register the Alluxio fsspec implementation
fsspec.register_implementation("alluxiofs", AlluxioFileSystem, clobber=True)
alluxio_fs = fsspec.filesystem(
  "alluxiofs", etcd_hosts="localhost", target_protocol="s3"
)

# Example 1
# Pass the initialized Alluxio filesystem to Pyarrow and read the data set from the example parquet file
import pyarrow.dataset as ds
dataset = ds.dataset("s3://example_bucket/datasets/example.parquet", filesystem=alluxio_fs)

# Get a count of the number of records in the parquet file
dataset.count_rows()

# Display the schema derived from the parquet file header record
dataset.schema

# Display the first record
dataset.take(0)

# Example 2
# Create a python-based PyArrow filesystem using FsspecHandler
py_fs = PyFileSystem(FSSpecHandler(alluxio_file_system))

# Read the data by using the Pyarrow filesystem interface
with py_fs.open_input_file("s3://example_bucket/datasets/example.parquet") as f:
    alluxio_file_data = f.read()

# End of Python example

项目详情


下载文件

下载您平台上的文件。如果您不确定要选择哪个,请了解更多关于安装包的信息。

源分发

alluxiofs-1.0.4.tar.gz (40.5 kB 查看哈希值)

上传时间

构建分发

alluxiofs-1.0.4-py3-none-any.whl (46.6 kB 查看哈希值)

上传时间 Python 3

支持者:

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面