跳转到主要内容

使用相同的接口读取GCS、ABS和本地路径,tensorflow.io.gfile的克隆

项目描述

blobfile

这是一个库,提供类似于Python的接口来读取本地和远程文件(仅从blob存储),具有类似于open()的API以及一些os.pathshutil函数。blobfile支持本地路径、Google Cloud Storage路径(gs://<bucket>)和Azure Blob Storage路径(az://<account>/<container>https://<account>.blob.core.windows.net/<container>/)。

主要功能是BlobFile,它允许您打开类似于本地文件的本地和远程文件。还有一些其他功能,如basenamedirnamejoin,它们主要与它们的os.path同名函数做同样的事情,但它们也支持GCS路径和ABS路径。

这个库是受TensorFlow的gfile启发的,但并不完全具有相同的接口。

安装

pip install blobfile

用法

# write a file, then read it back

import blobfile as bf

with bf.BlobFile("gs://my-bucket-name/cats", "wb") as f:
    f.write(b"meow!")

print("exists:", bf.exists("gs://my-bucket-name/cats"))

with bf.BlobFile("gs://my-bucket-name/cats", "rb") as f:
    print("contents:", f.read())

还有一些并行处理多个blob的示例

blobfile中的函数

  • BlobFile - 类似于open(),但也可以与远程路径一起使用,数据可以从远程文件中流式传输。它接受以下参数
    • 流式传输:
      • mode"r", "rb"时,streaming的默认值为True,当mode"w", "wb", "a", "ab"时,默认值为False
      • streaming=True:
        • 读取不会下载整个远程文件。
        • 写入直接写入远程文件,但大小仅为几MB的块。调用 flush() 不会导致提前写入。
        • 未实现追加。
      • streaming=False:
        • 在构造函数期间通过下载远程文件到本地文件进行读取。
        • 通过在 close() 或析构期间上传文件进行写入。
        • 通过在构造期间下载文件并在 close() 时上传文件进行追加。
    • buffer_size:要缓冲的字节数,这可能会使读取更高效。
    • cache_dir:用于读取时缓存文件的目录,仅在 streaming=Falsemode"r", "rb" 时有效。您负责清理缓存目录。
    • file_size:正在打开的文件大小,可以直接指定以避免在打开文件时检查文件大小。虽然这可以避免网络请求,但也意味着您可能在第一次读取不存在而不是在打开时获得错误。仅适用于模式 "r" 和 "rb"。对于本地文件,此有效值将被忽略。

一些是受现有 os.pathshutil 函数启发的

  • copy - 将文件从一个路径复制到另一个路径,这将执行同一blob存储服务上的两个远程路径之间的远程复制
  • exists - 如果文件或目录存在,则返回 True
  • glob/scanglob - 返回匹配glob样式的模式作为生成器。当与blob存储一起使用时,Globs可能会有令人惊讶的性能特征。模式不支持字符范围。
  • isdir - 如果路径是目录,则返回 True
  • listdir/scandir - 以生成器列出目录的内容
  • makedirs - 确保目录及其所有父目录存在
  • remove - 删除文件
  • rmdir - 删除空目录
  • rmtree - 删除目录树
  • stat - 获取文件的大小和修改时间
  • walk - 使用生成器遍历目录树,该生成器生成 (dirpath, dirnames, filenames) 元组
  • basename - 获取路径的最后一部分
  • dirname - 获取除了最后一部分以外的路径
  • join - 将2个或更多路径连接在一起,在组件之间插入目录分隔符

有几个附加功能

  • get_url - 返回路径的url(可用于无需任何身份验证的HTTP客户端)以及该url的过期时间(或None)
  • md5 - 获取路径的md5哈希,对于GCS通常很快,但对于其他后端,这可能很慢。在Azure上,如果计算了文件的md5且文件中缺少该md5,则文件将使用计算出的md5更新。
  • set_mtime - 设置文件的修改时间戳
  • configure - 设置blobfile的全局配置选项
    • log_callback=default_log_fn:一个日志回调函数 log(msg: string),用于代替打印到stdout。如果您使用 parallel=True,您可能希望使用可pickle的日志回调函数。
    • connection_pool_max_size=32:每个主机连接池的最大大小
    • max_connection_pool_count=10:每个主机连接池的最大计数
    • azure_write_chunk_size=8 * 2 ** 20:写入Azure存储blob的块的大小(以字节为单位),可以设置为最大100MB。这决定了请求重试的单位以及最大文件大小,最大文件大小为 50,000 * azure_write_chunk_size
    • google_write_chunk_size=8 * 2 ** 20:写入Google Cloud Storage blob的块的大小(以字节为单位),这仅决定了请求重试的单位。
    • retry_log_threshold=0:设置重试次数阈值,当超过该阈值时将失败记录到日志回调函数
    • retry_common_log_threshold=2:设置重试次数阈值,当超过该阈值时将非常常见的失败记录到日志回调函数
    • connect_timeout=10:等待连接尝试成功到服务器的最大时间(以秒为单位),设置为None表示无限期等待
    • read_timeout=30:等待从服务器读取响应的最大时间(以秒为单位),设置为None表示无限期等待
    • output_az_paths=True:输出az://路径而不是使用https://进行Azure访问
    • use_azure_storage_account_key_fallback=False:对于Azure容器回退到存储账户密钥,启用此选项需要列出您的订阅,并且如果达到订阅列表示例的Azure配额可能会遇到429错误
    • get_http_pool=None:一个函数,返回一个用于请求的urllib3.PoolManager
    • use_streaming_read=False:如果设置为True,则使用单个文件读取而不是分块读取(对于Azure不建议使用)
    • default_buffer_size=io.DEFAULT_BUFFER_SIZE:用于读取文件(以及写入本地文件)的默认缓冲区大小
    • save_access_token_to_disk=True:如果设置为True,将访问令牌保存到磁盘,以便其他进程可以读取访问令牌以避免获取令牌通常所需的一小段时间(如果令牌仍然有效)
    • multiprocessing_start_method="spawn":创建用于并行工作的进程时要使用的启动方法
  • create_context - (与configure具有相同的参数),使用自定义配置创建一个新的blobfile实例而不是修改全局配置

身份验证

Google Cloud Storage

将按以下顺序尝试以下方法

  1. 检查环境变量GOOGLE_APPLICATION_CREDENTIALS以获取服务账户凭证的路径,这些凭证以JSON格式提供。
  2. 检查“应用程序默认凭证”。要设置应用程序默认凭证,请运行gcloud auth application-default login
  3. 检查GCE元数据服务器(如果正在GCE上运行)并从该服务获取凭证。

Azure Blob

将按以下顺序尝试以下方法

  1. 如果设置了AZURE_USE_IDENTITY=1,则使用来自azure-identity包的DefaultAzureCredential来获取令牌。注意:您的应用程序必须安装azure-identity包;blobfile未将其指定为必需依赖项。
  2. 检查环境变量AZURE_STORAGE_KEY以获取Azure存储账户密钥(这些是每个存储账户的共享密钥,如https://docs.microsoft.com/en-us/azure/storage/common/storage-account-keys-manage中所述)
  3. 检查环境变量AZURE_APPLICATION_CREDENTIALS,它应指向由命令az ad sp create-for-rbac --name <name>输出的服务主体的JSON凭证
  4. 检查环境变量AZURE_CLIENT_IDAZURE_CLIENT_SECRETAZURE_TENANT_ID,它们对应于前一步中描述的服务主体,但无需JSON文件。
  5. 检查环境变量AZURE_STORAGE_CONNECTION_STRING以获取Azure存储连接字符串
  6. 如果找到,则使用az命令行工具的凭证。

如果使用凭证访问失败,将尝试匿名访问。blobfile支持标记为公共的容器的公共访问,但不支持单个blob的访问。

路径

对于Google Cloud Storage和Azure Blobs,目录实际上并不存在。这些存储系统将文件存储在一个单一的扁平列表中。"/"分隔符只是文件名的一部分,在这些系统中不需要调用os.mkdir的等效函数。

为了使本地行为与远程存储系统保持一致,在以写入模式打开文件时,将自动创建缺少的本地目录。

本地

这些都是当前机器的常规路径,例如:/root/hello.txt

Google Cloud Storage

GCS路径的格式为gs://<bucket>/<blob>,你无法对gs://本身执行任何操作。

Azure Blob

Azure Blobs URL的格式为az://<account>/<container>https://<account>.blob.core.windows.net/<container>/<blob>。你可以向上移动到层次结构的最高点为az://<account>/<container>/blobfile不能在az://<account>/上执行任何操作。默认的输出格式是https:// URL,但az:// URL也被接受为输入,你可以设置output_az_paths=True以获取输出为az:// URL。

错误

  • Error - 库特定异常的基类
  • RequestFailure(Error) - 请求已永久失败,状态码可以在response_status:int属性中找到,如果有的话,错误代码在error:Optional[str]中。
  • RestartableStreamingWriteFailure(RequestFailure) - 流式写入已永久失败,需要从头开始重启流。
  • ConcurrentWriteFailure(RequestFailure) - 写入失败是因为另一个进程在同一时间正在写入相同的文件。
  • VersionMismatch(RequestFailure) - 写入失败是因为远程文件与用户指定的版本不匹配。
  • 以下通用异常在函数中抛出,以使行为类似于原始版本:FileNotFoundErrorFileExistsErrorIsADirectoryErrorNotADirectoryErrorOSErrorValueErrorio.UnsupportedOperation

日志记录

blobfile会持续重试暂时性错误,直到成功或遇到永久性错误(这将引发异常)。为了使诊断停滞变得更容易,blobfile将在重试请求时记录。

要路由这些日志行,使用configure(log_callback=<fn>)设置一个回调函数,该函数将在应该打印日志行时被调用。默认回调函数以blobfile:前缀将日志打印到stdout。

使用logging模块

如果你使用python的logging模块,可以将blobfile的日志记录到那里

bf.configure(log_callback=logging.getLogger("blobfile").warning)

blobfile默认不使用python的logging模块,但它使用其他使用该模块的库。因此,如果你配置了python的logging模块,你可能需要更改设置以调整日志记录行为

  • urllib3: logging.getLogger("urllib3").setLevel(logging.ERROR)
  • filelock: logging.getLogger("filelock").setLevel(logging.ERROR)

另外,作为一个提示,确保使用一个格式来告诉你记录器的名称

logging.basicConfig(format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", level=logging.WARNING)

这会让你知道哪个包正在生成日志消息。

安全性

库应该是线程安全和fork安全的,以下是一些例外

  • BlobFile实例不是线程安全的(一次只有一个线程应该拥有一个BlobFile实例)
  • bf.configure()的调用不是线程安全的,理想情况下应该在执行任何操作之前发生

并发写入者

Google Cloud Storage支持对同一blob的多个写入者,并且最后一个完成写入的应该获胜。然而,在大量并发写入者的情况下,服务将返回429或503错误,大多数写入者将停滞。在这种情况下,请写入不同的blob。

Azure Blob不支持对同一Blob进行多写入。由于当前BlobFile的配置方式,最后一个开始写入的写者将获胜。其他写者将收到ConcurrentWriteFailure错误。此外,如果文件大小很大并且有足够的并发写者,所有写者可能会失败。在这种情况下,您可以写入一个临时Blob(具有随机文件名),将其复制到最终位置,然后删除原始文件。复制将在一个容器内进行,因此应该很快。

更改

请参阅CHANGES

贡献

为每个云服务提供商创建带有适当凭据的测试桶

发布新版本

  • 更新CHANGES.md
  • 更新blobfile/VERSION
  • rm -rf build dist
  • python -m build .
  • twine upload dist/*
  • 在Github上标记发布

测试

这将自动格式化代码,检查类型,然后运行测试

python testing/run.py

运行单个测试

python testing/run.py -v -s -k test_windowed_file

如果您只想执行这些操作中的一些,请修改testing/run.py。测试相当慢,大约需要7分钟才能运行(即使禁用了大文件测试),并且需要每个云服务提供商的账户。

项目详情


发布历史 发布通知 | RSS源

下载文件

下载适用于您平台的应用程序。如果您不确定选择哪个,请了解有关安装软件包的更多信息。

源分布

blobfile-3.0.0.tar.gz (77.9 kB 查看哈希值)

上传时间

构建分布

blobfile-3.0.0-py3-none-any.whl (75.4 kB 查看哈希值)

上传时间: Python 3

由以下支持