跳转到主要内容

使用相同接口读取GCS和本地路径,是tensorflow.io.gfile的克隆

项目描述

blobfile

这是TensorFlow的gfile的独立克隆,支持本地路径、Google Cloud Storage路径(gs://<bucket>)和Azure Blob路径(https://<account>.blob.core.windows.net/<container>/)。

主要功能是BlobFile,是GFile的替代品。还有一些额外的函数,如basenamedirnamejoin,它们的功能与它们的os.path同名函数基本相同,但它们也支持GCS路径和Azure Storage路径。

安装

pip install blobfile

用法

import blobfile as bf

with bf.BlobFile("gs://my-bucket-name/cats", "wb") as w:
    w.write(b"meow!")

以下是函数

  • BlobFile - 与open()类似,但也可以与远程路径一起使用,数据可以从远程文件流式传输。它接受以下参数
    • streaming:
      • streaming的默认值是True,当mode"r", "rb"时;当mode"w", "wb", "a", "ab"时为False
      • streaming=True:
        • 读取时不下载整个远程文件。
        • 写入是直接写入远程文件,但只以几个MB大小的块进行。调用flush()不会导致提前写入。
        • 追加未实现。
      • streaming=False:
        • 读取是在构造函数期间将远程文件下载到本地文件中。
        • 写入是在close()或销毁期间上传文件。
        • 追加是在构造期间下载文件,并在close()时上传。
    • buffer_size:缓冲区字节数,这可能会使读取更有效。
    • cache_dir:用于缓存读取文件的目录,仅在 streaming=Falsemode"r", "rb" 之一时有效。您负责清理缓存目录。

一些功能受到现有的 os.pathshutil 函数的启发

  • copy - 将文件从一个路径复制到另一个路径,这将在一个相同的 blob 存储服务上的两个远程路径之间进行远程复制
  • exists - 如果文件或目录存在则返回 True
  • glob/scanglob - 以生成器返回匹配 glob 样式模式的文件。当与 blob 存储一起使用时,Globs 可能具有 令人惊讶的性能特性。模式不支持字符范围。
  • isdir - 如果路径是目录则返回 True
  • listdir/scandir - 以生成器列出目录内容
  • makedirs - 确保目录及其所有父目录存在
  • remove - 删除文件
  • rmdir - 删除空目录
  • rmtree - 删除目录树
  • stat - 获取文件的尺寸和修改时间
  • walk - 使用生成器遍历目录树,生成 (dirpath, dirnames, filenames) 元组
  • basename - 获取路径的最后一部分
  • dirname - 获取路径(除最后一部分外)
  • join - 将两个或多个路径连接起来,在各个组件之间插入目录分隔符

还有一些额外的功能

  • get_url - 返回一个路径的 URL(可用于不需要任何身份验证的 HTTP 客户端)以及该 URL 的过期时间(或 None)
  • md5 - 获取路径的 md5 哈希值,对于 GCS 来说通常很快,但对于其他后端来说可能很慢。在 Azure 上,如果文件的 md5 值被计算并且缺失在文件中,文件将被更新为计算出的 md5 值。
  • set_mtime - 设置文件的修改时间戳
  • configure - 设置 blobfile 的全局配置选项
    • log_callback=_default_log_fn:一个日志回调函数 log(msg: string),用于替代打印到 stdout
    • connection_pool_max_size=32:每个主机连接池的最大大小
    • max_connection_pool_count=10:每个主机的最大连接池数量
    • azure_write_chunk_size=4 * 2 ** 20:写入 Azure 存储块的大小,最大可设置为 100MB。这决定了请求重试的单位以及最大文件大小,该大小为 50,000 * azure_write_chunk_size
    • retry_log_threshold=0:设置一个重试次数阈值,超过此阈值将记录失败到日志回调函数

身份验证

Google Cloud Storage

以下方法将按顺序尝试

  1. 检查环境变量 GOOGLE_APPLICATION_CREDENTIALS 以查找 JSON 格式服务账户凭证的路径。
  2. 检查“应用程序默认凭证”。要设置应用程序默认凭证,请运行 gcloud auth application-default login
  3. 检查 GCE 元数据服务器(如果运行在 GCE 上)并从该服务获取凭证。

Azure Blobs

以下方法将按顺序尝试

  1. 检查环境变量 AZURE_STORAGE_KEY 以查找 Azure 存储帐户密钥(这些是每个存储帐户的共享密钥,如 https://docs.microsoft.com/en-us/azure/storage/common/storage-account-keys-manage 中所述)
  2. 检查环境变量 AZURE_APPLICATION_CREDENTIALS,它应指向由命令 az ad sp create-for-rbac --name <name> 生成的服务主体的 JSON 凭证
  3. 检查环境变量 AZURE_CLIENT_IDAZURE_CLIENT_SECRETAZURE_TENANT_ID,这些对应于上一步中描述的服务主体,但无需 JSON 文件。
  4. 检查环境变量 AZURE_STORAGE_CONNECTION_STRING 以查找 Azure 存储连接字符串
  5. 如果可以找到,请使用来自az命令行工具的凭据。

如果使用凭据访问失败,将尝试匿名访问。blobfile支持对标记为公共的容器进行公共访问,但不支持单个blob。

路径

对于Google Cloud Storage和Azure Blobs,目录实际上并不存在。这些存储系统将文件存储在单个扁平列表中。"/"分隔符只是文件名的一部分,在这些系统中不需要调用os.mkdir的等效函数。

为了使本地行为与远程存储系统保持一致,在以写入模式打开文件时,将自动创建缺失的本地目录。

本地

这些只是当前机器上的正常路径,例如/root/hello.txt

Google Cloud Storage

GCS路径的格式为gs://<bucket>/<blob>,您无法对gs://本身执行任何操作。

Azure Blobs

Azure Blobs URL的格式为https://<account>.blob.core.windows.net/<container>/<blob>。您可以在层次结构中向上到https://<account>.blob.core.windows.net/<container>/,但blobfile无法对https://<account>.blob.core.windows.net/执行任何操作。

错误

  • Error - 库特定异常的基类
  • RequestFailure(Error) - 请求永久失败,具有message:strrequest:Requestresponse:urllib3.HTTPResponse属性。
  • RestartableStreamingWriteFailure(RequestFailure) - 流式写入永久失败,需要从流的开头重新开始。
  • ConcurrentWriteFailure(RequestFailure) - 写入失败是因为另一个进程同时正在写入同一文件。
  • 以下通用异常在某些函数中被抛出,以使行为类似于原始版本:FileNotFoundErrorFileExistsErrorIsADirectoryErrorNotADirectoryErrorOSErrorValueErrorio.UnsupportedOperation

日志记录

blobfile将不断重试暂时性错误,直到成功或遇到永久性错误(这将引发异常)。为了使诊断停滞更容易,blobfile将在重试请求时进行记录。

为了路由这些日志行,使用configure(log_callback=<fn>)设置一个回调函数,该函数将在需要打印日志行时被调用。默认回调将使用前缀blobfile:将输出打印到标准输出。

blobfile不使用python logging模块,但它使用其他使用该模块的库。因此,如果您配置了python logging模块,您可能需要更改设置以调整日志记录行为

  • urllib3: logging.getLogger("urllib3").setLevel(logging.ERROR)
  • filelock: logging.getLogger("filelock").setLevel(logging.ERROR)

安全性

除了BlobFile实例不是线程安全的外(一次只能有一个线程拥有一个BlobFile实例),库应该是线程安全和进程安全的。

并发写入者

Google Cloud Storage支持对同一blob的多个写入者,并且最后完成的一个应该获胜。然而,在大量并发写入者的情况下,服务将返回429或503错误,大多数写入者将停滞。在这种情况下,请写入不同的blob。

Azure Blobs不支持对同一blob的多个写入者。根据BlobFile当前的配置,最后开始写入的写入者将获胜。其他写入者将收到ConcurrentWriteFailure。此外,如果文件大小很大,所有写入者都可能失败。在这种情况下,您可以写入一个临时blob(具有随机文件名),将其复制到最终位置,然后删除原始文件。复制将在容器内进行,因此应该很快。

示例

写入和读取文件

import blobfile as bf

with bf.BlobFile("gs://my-bucket/file.name", "wb") as f:
    f.write(b"meow")

print("exists:", bf.exists("gs://my-bucket/file.name"))

print("contents:", bf.BlobFile("gs://my-bucket/file.name", "rb").read())

参见并行示例

变更

查看 变更

项目详情


下载文件

下载适用于您平台的应用程序。如果您不确定选择哪个,请了解更多关于安装包的信息。

源代码发行版

blobfile-gdb-fork-0.17.3.tar.gz (53.8 kB 查看哈希值)

上传时间 源代码

支持者