跳转到主要内容

用于流式传输大文件的工具(S3、HDFS、GCS、Azure Blob Storage、gzip、bz2...)

项目描述

License GHA Coveralls Downloads

免责声明

这是一个RaRe-Technologies/smart_open的分支!

此分支仅用于Braingeneers组织内的项目。

这些更改最初位于davidparks21/smart_open

是什么?

smart_open 是一个 Python 3 库,用于从/向 S3、GCS、Azure Blob Storage、HDFS、WebHDFS、HTTP、HTTPS、SFTP 或本地文件系统等存储进行高效的大文件流操作。它支持对各种不同格式的透明、即时(解压/压缩)。

smart_open 是 Python 内置 open() 的直接替代品:它可以完成 open 可以做到的所有事情(100% 兼容,尽可能回退到原生 open),并且还提供了许多额外的便捷功能。

Python 2.7 已不再受支持。如果您需要 Python 2.7,请使用 smart_open 1.10.1,这是支持 Python 2 的最后一个版本。

为什么?

与大型远程文件一起工作,例如使用 Amazon 的 boto3 Python 库,是一件头疼的事情。boto3 的 Object.upload_fileobj()Object.download_fileobj() 方法需要使用容易出错的基础模板才能成功,例如构建类似文件对象的包装器。smart_open 会保护您免受这些问题的困扰。它建立在 boto3 和其他远程存储库的基础上,但提供了一个 干净的统一 Pythonic API。结果是您要编写的代码更少,要犯的错误也更少。

如何?

smart_open 经过良好的测试,有良好的文档,并且具有简单的 Pythonic API。

>>> from smart_open import open
>>>
>>> # stream lines from an S3 object
>>> for line in open('s3://commoncrawl/robots.txt'):
...    print(repr(line))
...    break
'User-Agent: *\n'

>>> # stream from/to compressed files, with transparent (de)compression:
>>> for line in open('smart_open/tests/test_data/1984.txt.gz', encoding='utf-8'):
...    print(repr(line))
'It was a bright cold day in April, and the clocks were striking thirteen.\n'
'Winston Smith, his chin nuzzled into his breast in an effort to escape the vile\n'
'wind, slipped quickly through the glass doors of Victory Mansions, though not\n'
'quickly enough to prevent a swirl of gritty dust from entering along with him.\n'

>>> # can use context managers too:
>>> with open('smart_open/tests/test_data/1984.txt.gz') as fin:
...    with open('smart_open/tests/test_data/1984.txt.bz2', 'w') as fout:
...        for line in fin:
...           fout.write(line)
74
80
78
79

>>> # can use any IOBase operations, like seek
>>> with open('s3://commoncrawl/robots.txt', 'rb') as fin:
...     for line in fin:
...         print(repr(line.decode('utf-8')))
...         break
...     offset = fin.seek(0)  # seek to the beginning
...     print(fin.read(4))
'User-Agent: *\n'
b'User'

>>> # stream from HTTP
>>> for line in open('http://example.com/index.html'):
...     print(repr(line))
...     break
'<!doctype html>\n'

smart_open 接受的其他 URL 示例

s3://my_bucket/my_key
s3://my_key:my_secret@my_bucket/my_key
s3://my_key:my_secret@my_server:my_port@my_bucket/my_key
gs://my_bucket/my_blob
azure://my_bucket/my_blob
hdfs:///path/file
hdfs://path/file
webhdfs://host:port/path/file
./local/path/file
~/local/path/file
local/path/file
./local/path/file.gz
file:///home/user/file
file:///home/user/file.bz2
[ssh|scp|sftp]://username@host//path/file
[ssh|scp|sftp]://username@host/path/file
[ssh|scp|sftp]://username:password@host/path/file

文档

安装

smart_open 支持广泛的存储解决方案,包括 AWS S3、Google Cloud 和 Azure。每个解决方案都有自己的依赖项。默认情况下,smart_open 不安装任何依赖项,以保持安装包的大小小。您可以使用以下命令显式安装这些依赖项:

pip install smart_open[azure] # Install Azure deps
pip install smart_open[gcs] # Install GCS deps
pip install smart_open[s3] # Install S3 deps

或者,如果您不介意安装大量第三方库,您可以使用以下命令安装所有依赖项:

pip install smart_open[all]

请注意,此选项会显著增加安装包的大小,例如超过 100MB。

如果您是从 smart_open 2.x 及以下版本升级,请查看 迁移指南

内置帮助

有关详细的 API 信息,请参阅在线帮助

help('smart_open')

或单击 此处 在浏览器中查看帮助。

更多示例

为了简化,以下示例假设您已安装所有依赖项,即您已经执行了以下操作:

pip install smart_open[all]
>>> import os, boto3
>>>
>>> # stream content *into* S3 (write mode) using a custom session
>>> session = boto3.Session(
...     aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
...     aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'],
... )
>>> url = 's3://smart-open-py37-benchmark-results/test.txt'
>>> with open(url, 'wb', transport_params={'client': session.client('s3')}) as fout:
...     bytes_written = fout.write(b'hello world!')
...     print(bytes_written)
12
# stream from HDFS
for line in open('hdfs://user/hadoop/my_file.txt', encoding='utf8'):
    print(line)

# stream from WebHDFS
for line in open('webhdfs://host:port/user/hadoop/my_file.txt'):
    print(line)

# stream content *into* HDFS (write mode):
with open('hdfs://host:port/user/hadoop/my_file.txt', 'wb') as fout:
    fout.write(b'hello world')

# stream content *into* WebHDFS (write mode):
with open('webhdfs://host:port/user/hadoop/my_file.txt', 'wb') as fout:
    fout.write(b'hello world')

# stream from a completely custom s3 server, like s3proxy:
for line in open('s3u://user:secret@host:port@mybucket/mykey.txt'):
    print(line)

# Stream to Digital Ocean Spaces bucket providing credentials from boto3 profile
session = boto3.Session(profile_name='digitalocean')
client = session.client('s3', endpoint_url='https://ams3.digitaloceanspaces.com')
transport_params = {'client': client}
with open('s3://bucket/key.txt', 'wb', transport_params=transport_params) as fout:
    fout.write(b'here we stand')

# stream from GCS
for line in open('gs://my_bucket/my_file.txt'):
    print(line)

# stream content *into* GCS (write mode):
with open('gs://my_bucket/my_file.txt', 'wb') as fout:
    fout.write(b'hello world')

# stream from Azure Blob Storage
connect_str = os.environ['AZURE_STORAGE_CONNECTION_STRING']
transport_params = {
    'client': azure.storage.blob.BlobServiceClient.from_connection_string(connect_str),
}
for line in open('azure://mycontainer/myfile.txt', transport_params=transport_params):
    print(line)

# stream content *into* Azure Blob Storage (write mode):
connect_str = os.environ['AZURE_STORAGE_CONNECTION_STRING']
transport_params = {
    'client': azure.storage.blob.BlobServiceClient.from_connection_string(connect_str),
}
with open('azure://mycontainer/my_file.txt', 'wb', transport_params=transport_params) as fout:
    fout.write(b'hello world')

压缩处理

顶级 compression 参数控制读取和写入时的压缩/解压缩行为。此参数支持的值包括

  • infer_from_extension(默认行为)

  • disable

  • .gz

  • .bz2

默认情况下,smart_open 根据文件扩展名确定要使用的压缩算法。

>>> from smart_open import open, register_compressor
>>> with open('smart_open/tests/test_data/1984.txt.gz') as fin:
...     print(fin.read(32))
It was a bright cold day in Apri

您可以通过覆盖此行为来禁用压缩,或显式指定要使用的算法。要禁用压缩:

>>> from smart_open import open, register_compressor
>>> with open('smart_open/tests/test_data/1984.txt.gz', 'rb', compression='disable') as fin:
...     print(fin.read(32))
b'\x1f\x8b\x08\x08\x85F\x94\\\x00\x031984.txt\x005\x8f=r\xc3@\x08\x85{\x9d\xe2\x1d@'

要显式指定算法(例如,对于非标准文件扩展名):

>>> from smart_open import open, register_compressor
>>> with open('smart_open/tests/test_data/1984.txt.gzip', compression='.gz') as fin:
...     print(fin.read(32))
It was a bright cold day in Apri

您还可以轻松添加对其他文件扩展名和压缩格式的支持。例如,要打开 xz 压缩文件:

>>> import lzma, os
>>> from smart_open import open, register_compressor

>>> def _handle_xz(file_obj, mode):
...      return lzma.LZMAFile(filename=file_obj, mode=mode, format=lzma.FORMAT_XZ)

>>> register_compressor('.xz', _handle_xz)

>>> with open('smart_open/tests/test_data/1984.txt.xz') as fin:
...     print(fin.read(32))
It was a bright cold day in Apri

lzma 是 Python 3.3 及以上版本的标准库中的内容。对于 2.7,请使用 backports.lzma

特定传输选项

smart_open 默认支持广泛的传输选项,包括

  • S3

  • HTTP、HTTPS(只读)

  • SSH、SCP 和 SFTP

  • WebHDFS

  • GCS

  • Azure Blob Storage

每个选项都涉及设置自己的一组参数。例如,要访问S3,通常需要设置身份验证,如API密钥或配置文件名称。"smart_open" 的 "open" 函数接受一个关键字参数 "transport_params",它接受传输层的附加参数。以下是使用此参数的一些示例。

>>> import boto3
>>> fin = open('s3://commoncrawl/robots.txt', transport_params=dict(client=boto3.client('s3')))
>>> fin = open('s3://commoncrawl/robots.txt', transport_params=dict(buffer_size=1024))

有关每个传输选项支持的完整关键字参数列表,请参阅文档。

help('smart_open.open')

S3凭据

"smart_open" 使用 "boto3" 库与S3通信。 "boto3" 有几种 机制 来确定要使用的凭据。默认情况下,"smart_open" 将推迟到 "boto3",让后者处理凭据。有几种方法可以覆盖这种行为。

第一种方法是向 "open" 函数传递作为传输参数的 "boto3.Client" 对象。您可以在构造客户端会话时自定义凭据。"smart_open" 然后将在与S3通信时使用该会话。

session = boto3.Session(
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY,
    aws_session_token=SESSION_TOKEN,
)
client = session.client('s3', endpoint_url=..., config=...)
fin = open('s3://bucket/key', transport_params=dict(client=client))

您的第二个选项是在S3 URL本身中指定凭据

fin = open('s3://aws_access_key_id:aws_secret_access_key@bucket/key', ...)

重要:上述两种方法是 互斥的。如果您传递了AWS客户端并且URL包含凭据,"smart_open" 将忽略后者。

重要:"smart_open" 忽略来自旧版 "boto" 库的配置文件。"smart_open" 将旧版 "boto" 设置迁移到 "boto3" 以使用它们。

遍历S3存储桶的内容

由于遍历S3存储桶中所有(或所选)键是一个非常常见的操作,因此还有一个额外的函数 "smart_open.s3.iter_bucket()" 来高效地执行此操作,并行处理存储桶键(使用多进程)

>>> from smart_open import s3
>>> # we use workers=1 for reproducibility; you should use as many workers as you have cores
>>> bucket = 'silo-open-data'
>>> prefix = 'Official/annual/monthly_rain/'
>>> for key, content in s3.iter_bucket(bucket, prefix=prefix, accept_key=lambda key: '/201' in key, workers=1, key_limit=3):
...     print(key, round(len(content) / 2**20))
Official/annual/monthly_rain/2010.monthly_rain.nc 13
Official/annual/monthly_rain/2011.monthly_rain.nc 13
Official/annual/monthly_rain/2012.monthly_rain.nc 13

GCS凭据

"smart_open" 使用 "google-cloud-storage" 库与GCS通信。"google-cloud-storage" 在内部使用 "google-cloud" 包来处理身份验证。有几种 选项 来提供凭据。默认情况下,"smart_open" 将推迟到 "google-cloud-storage",让其处理凭据。

要覆盖此行为,请将 "google.cloud.storage.Client" 对象作为传输参数传递给 "open" 函数。您可以在构造客户端时自定义凭据。"smart_open" 然后将在与GCS通信时使用该客户端。以下面的示例为例,请参阅 Google的指南 以设置GCS服务帐户身份验证。

import os
from google.cloud.storage import Client
service_account_path = os.environ['GOOGLE_APPLICATION_CREDENTIALS']
client = Client.from_service_account_json(service_account_path)
fin = open('gs://gcp-public-data-landsat/index.csv.gz', transport_params=dict(client=client))

如果您需要更多凭据选项,可以创建显式的 "google.auth.credentials.Credentials" 对象并将其传递给客户端。要创建用于以下示例的API令牌,请参阅 GCS身份验证指南

import os
from google.auth.credentials import Credentials
from google.cloud.storage import Client
token = os.environ['GOOGLE_API_TOKEN']
credentials = Credentials(token=token)
client = Client(credentials=credentials)
fin = open('gs://gcp-public-data-landsat/index.csv.gz', transport_params=dict(client=client))

Azure凭据

"smart_open" 使用 "azure-storage-blob" 库与Azure Blob Storage通信。默认情况下,"smart_open" 将推迟到 "azure-storage-blob",让其处理凭据。

由于 Azure Blob Storage 没有推断凭证的方式,因此需要将 azure.storage.blob.BlobServiceClient 对象作为传输参数传递给 open 函数。您可以在构建客户端时 自定义凭证。然后 smart_open 将使用该客户端进行通信。为了遵循下面的示例,请参考 Azure 的设置认证指南

import os
from azure.storage.blob import BlobServiceClient
azure_storage_connection_string = os.environ['AZURE_STORAGE_CONNECTION_STRING']
client = BlobServiceClient.from_connection_string(azure_storage_connection_string)
fin = open('azure://my_container/my_blob.txt', transport_params=dict(client=client))

如果您需要更多的凭证选项,请参阅 Azure Storage 认证指南

pathlib.Path.open 的替代品

smart_open.open 也可以与 Path 对象一起使用。内置的 Path.open() 无法从压缩文件中读取文本,因此可以使用 patch_pathlibsmart_open.open() 替换它。这对于处理压缩文件等情况很有帮助。

>>> from pathlib import Path
>>> from smart_open.smart_open_lib import patch_pathlib
>>>
>>> _ = patch_pathlib()  # replace `Path.open` with `smart_open.open`
>>>
>>> path = Path("smart_open/tests/test_data/crime-and-punishment.txt.gz")
>>>
>>> with path.open("r") as infile:
...     print(infile.readline()[:41])
В начале июля, в чрезвычайно жаркое время

我该如何…?

请参阅 此文档

扩展 smart_open

请参阅 此文档

测试 smart_open

smart_open 随带一套完整的单元测试。在您能够运行测试套件之前,请安装测试依赖项

pip install -e .[test]

现在,您可以运行单元测试

pytest smart_open

测试还会在每次提交推送和拉取请求时通过 Travis CI 自动运行。

评论、错误报告

smart_open 存在于 Github 上。您可以在那里提交问题或拉取请求。欢迎建议、拉取请求和改进!


smart_open 是在 MIT 许可证 下发布的开源软件。版权 (c) 2015-现在 Radim Řehůřek

项目详情


下载文件

下载您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分发

braingeneers_smart_open-2023.10.6.tar.gz (61.0 kB 查看哈希)

上传时间

构建分发

braingeneers_smart_open-2023.10.6-py3-none-any.whl (59.9 kB 查看哈希)

上传时间 Python 3

由以下支持