用于流式传输大文件(S3、HDFS、GCS、Azure Blob存储、gzip、bz2...)的实用工具
项目描述
是什么?
smart_open 是一个Python 3库,用于从/到S3、GCS、Azure Blob存储、HDFS、WebHDFS、HTTP、HTTPS、SFTP或本地文件系统等存储中高效流式传输非常大的文件。它支持多种不同格式的透明、即时(解)压缩。
smart_open 是 Python 内置的 open() 函数的替代品:它可以完成 open 能做到的所有事情(100% 兼容,尽可能使用原生 open),并且在此基础上提供了许多巧妙的额外功能。
Python 2.7 已不再受支持。如果您需要 Python 2.7,请使用 smart_open 1.10.1,这是支持 Python 2 的最后一个版本。
为什么?
处理大型远程文件,例如使用亚马逊的 boto3 Python 库,是一件头疼的事情。boto3 的 Object.upload_fileobj() 和 Object.download_fileobj() 方法需要编写许多样板代码才能成功使用,例如构造文件对象包装器。而 smart_open 可以让您免于这些烦恼。它基于 boto3 和其他远程存储库构建,但提供了一个 简洁统一的 Pythonic API。结果是您编写的代码更少,错误也更少。
如何实现?
smart_open 经过良好测试,具有完善的文档,并且具有简单的 Pythonic API。
>>> from smart_open import open
>>>
>>> # stream lines from an S3 object
>>> for line in open('s3://commoncrawl/robots.txt'):
... print(repr(line))
... break
'User-Agent: *\n'
>>> # stream from/to compressed files, with transparent (de)compression:
>>> for line in open('smart_open/tests/test_data/1984.txt.gz', encoding='utf-8'):
... print(repr(line))
'It was a bright cold day in April, and the clocks were striking thirteen.\n'
'Winston Smith, his chin nuzzled into his breast in an effort to escape the vile\n'
'wind, slipped quickly through the glass doors of Victory Mansions, though not\n'
'quickly enough to prevent a swirl of gritty dust from entering along with him.\n'
>>> # can use context managers too:
>>> with open('smart_open/tests/test_data/1984.txt.gz') as fin:
... with open('smart_open/tests/test_data/1984.txt.bz2', 'w') as fout:
... for line in fin:
... fout.write(line)
74
80
78
79
>>> # can use any IOBase operations, like seek
>>> with open('s3://commoncrawl/robots.txt', 'rb') as fin:
... for line in fin:
... print(repr(line.decode('utf-8')))
... break
... offset = fin.seek(0) # seek to the beginning
... print(fin.read(4))
'User-Agent: *\n'
b'User'
>>> # stream from HTTP
>>> for line in open('http://example.com/index.html'):
... print(repr(line))
... break
'<!doctype html>\n'
smart_open 接受的其他 URL 示例
s3://my_bucket/my_key s3://my_key:my_secret@my_bucket/my_key s3://my_key:my_secret@my_server:my_port@my_bucket/my_key gs://my_bucket/my_blob azure://my_bucket/my_blob hdfs:///path/file hdfs://path/file webhdfs://host:port/path/file ./local/path/file ~/local/path/file local/path/file ./local/path/file.gz file:///home/user/file file:///home/user/file.bz2 [ssh|scp|sftp]://username@host//path/file [ssh|scp|sftp]://username@host/path/file [ssh|scp|sftp]://username:password@host/path/file
文档
安装
smart_open 支持广泛的存储解决方案,包括 AWS S3、Google Cloud 和 Azure。每个解决方案都有自己的依赖项。默认情况下,smart_open 不安装任何依赖项,以保持安装包的大小较小。您可以使用以下命令明确安装这些依赖项:
pip install smart_open[azure] # Install Azure deps pip install smart_open[gcs] # Install GCS deps pip install smart_open[s3] # Install S3 deps
或者,如果您不介意安装大量的第三方库,您可以使用以下命令安装所有依赖项:
pip install smart_open[all]
请注意,此选项会显著增加安装大小,例如超过 100MB。
如果您是从 smart_open 的 2.x 及以下版本升级,请参阅 迁移指南。
内置帮助
有关 API 的详细信息,请参阅在线帮助
help('smart_open')
或点击 这里 在您的浏览器中查看帮助。
更多示例
为了简化,下面的示例假设您已经安装了所有依赖项,即您已经做了以下操作:
pip install smart_open[all]
>>> import os, boto3
>>> from smart_open import open
>>>
>>> # stream content *into* S3 (write mode) using a custom session
>>> session = boto3.Session(
... aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
... aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'],
... )
>>> url = 's3://smart-open-py37-benchmark-results/test.txt'
>>> with open(url, 'wb', transport_params={'client': session.client('s3')}) as fout:
... bytes_written = fout.write(b'hello world!')
... print(bytes_written)
12
# stream from HDFS
for line in open('hdfs://user/hadoop/my_file.txt', encoding='utf8'):
print(line)
# stream from WebHDFS
for line in open('webhdfs://host:port/user/hadoop/my_file.txt'):
print(line)
# stream content *into* HDFS (write mode):
with open('hdfs://host:port/user/hadoop/my_file.txt', 'wb') as fout:
fout.write(b'hello world')
# stream content *into* WebHDFS (write mode):
with open('webhdfs://host:port/user/hadoop/my_file.txt', 'wb') as fout:
fout.write(b'hello world')
# stream from a completely custom s3 server, like s3proxy:
for line in open('s3u://user:secret@host:port@mybucket/mykey.txt'):
print(line)
# Stream to Digital Ocean Spaces bucket providing credentials from boto3 profile
session = boto3.Session(profile_name='digitalocean')
client = session.client('s3', endpoint_url='https://ams3.digitaloceanspaces.com')
transport_params = {'client': client}
with open('s3://bucket/key.txt', 'wb', transport_params=transport_params) as fout:
fout.write(b'here we stand')
# stream from GCS
for line in open('gs://my_bucket/my_file.txt'):
print(line)
# stream content *into* GCS (write mode):
with open('gs://my_bucket/my_file.txt', 'wb') as fout:
fout.write(b'hello world')
# stream from Azure Blob Storage
connect_str = os.environ['AZURE_STORAGE_CONNECTION_STRING']
transport_params = {
'client': azure.storage.blob.BlobServiceClient.from_connection_string(connect_str),
}
for line in open('azure://mycontainer/myfile.txt', transport_params=transport_params):
print(line)
# stream content *into* Azure Blob Storage (write mode):
connect_str = os.environ['AZURE_STORAGE_CONNECTION_STRING']
transport_params = {
'client': azure.storage.blob.BlobServiceClient.from_connection_string(connect_str),
}
with open('azure://mycontainer/my_file.txt', 'wb', transport_params=transport_params) as fout:
fout.write(b'hello world')
压缩处理
顶级 compression 参数控制读取和写入时的压缩/解压缩行为。此参数支持以下值:
infer_from_extension(默认行为)
disable
.gz
.bz2
默认情况下,smart_open 根据文件扩展名确定要使用的压缩算法。
>>> from smart_open import open, register_compressor
>>> with open('smart_open/tests/test_data/1984.txt.gz') as fin:
... print(fin.read(32))
It was a bright cold day in Apri
要禁用压缩:
>>> from smart_open import open, register_compressor
>>> with open('smart_open/tests/test_data/1984.txt.gz', 'rb', compression='disable') as fin:
... print(fin.read(32))
b'\x1f\x8b\x08\x08\x85F\x94\\\x00\x031984.txt\x005\x8f=r\xc3@\x08\x85{\x9d\xe2\x1d@'
要显式指定算法(例如,对于非标准文件扩展名):
>>> from smart_open import open, register_compressor
>>> with open('smart_open/tests/test_data/1984.txt.gzip', compression='.gz') as fin:
... print(fin.read(32))
It was a bright cold day in Apri
您还可以轻松添加对其他文件扩展名和压缩格式的支持。例如,要打开 xz 压缩的文件:
>>> import lzma, os
>>> from smart_open import open, register_compressor
>>> def _handle_xz(file_obj, mode):
... return lzma.LZMAFile(filename=file_obj, mode=mode, format=lzma.FORMAT_XZ)
>>> register_compressor('.xz', _handle_xz)
>>> with open('smart_open/tests/test_data/1984.txt.xz') as fin:
... print(fin.read(32))
It was a bright cold day in Apri
lzma 是 Python 3.3 及以上版本的标准库中的内容。对于 2.7,请使用 backports.lzma。
传输特定选项
smart_open 默认支持广泛的传输选项,包括:
S3
HTTP、HTTPS(只读)
SSH、SCP 和 SFTP
WebHDFS
GCS
Azure Blob Storage
每个选项都涉及设置其自身的一组参数。例如,访问S3时,您通常需要设置认证,如API密钥或配置文件名称。"smart_open"的"open"函数接受一个关键字参数"transport_params",该参数接受传输层的附加参数。以下是使用此参数的一些示例
>>> import boto3
>>> fin = open('s3://commoncrawl/robots.txt', transport_params=dict(client=boto3.client('s3')))
>>> fin = open('s3://commoncrawl/robots.txt', transport_params=dict(buffer_size=1024))
有关每个传输选项支持的完整关键字参数列表,请参阅文档
help('smart_open.open')
S3凭证
"smart_open"使用"boto3"库与S3通信。"boto3"有几种机制来确定要使用的凭证。默认情况下,"smart_open"将推迟到"boto3",让后者处理凭证。有几种方法可以覆盖此行为。
第一种方法是向"open"函数传递一个"boto3.Client"对象作为传输参数。您可以在构建客户端会话时自定义凭证。"smart_open"将随后在与其他S3通信时使用该会话。
session = boto3.Session(
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
aws_session_token=SESSION_TOKEN,
)
client = session.client('s3', endpoint_url=..., config=...)
fin = open('s3://bucket/key', transport_params={'client': client})
您的第二种选项是在S3 URL中指定凭证
fin = open('s3://aws_access_key_id:aws_secret_access_key@bucket/key', ...)
重要:上述两种方法是互斥的。如果您传递了AWS客户端并且URL包含凭证,"smart_open"将忽略后者。
重要:"smart_open"忽略了旧"boto"库中的配置文件。将旧"boto"设置迁移到"boto3",以便与"smart_open"一起使用。
S3高级使用
可以通过"client_kwargs"传输参数将附加关键字参数传播给"smart_open"底层使用的boto3方法。
例如,要上传具有元数据、ACL、存储类的blob,可以将这些关键字参数传递给"create_multipart_upload"(文档)。
kwargs = {'Metadata': {'version': 2}, 'ACL': 'authenticated-read', 'StorageClass': 'STANDARD_IA'}
fout = open('s3://bucket/key', 'wb', transport_params={'client_kwargs': {'S3.Client.create_multipart_upload': kwargs}})
遍历S3存储桶的内容
由于遍历S3存储桶中的所有(或选择的)密钥是一个非常常见的操作,因此还有一个额外的函数"smart_open.s3.iter_bucket()",它可以有效地执行此操作,**并行处理存储桶密钥**(使用多进程)
>>> from smart_open import s3
>>> # we use workers=1 for reproducibility; you should use as many workers as you have cores
>>> bucket = 'silo-open-data'
>>> prefix = 'Official/annual/monthly_rain/'
>>> for key, content in s3.iter_bucket(bucket, prefix=prefix, accept_key=lambda key: '/201' in key, workers=1, key_limit=3):
... print(key, round(len(content) / 2**20))
Official/annual/monthly_rain/2010.monthly_rain.nc 13
Official/annual/monthly_rain/2011.monthly_rain.nc 13
Official/annual/monthly_rain/2012.monthly_rain.nc 13
GCS凭证
"smart_open"使用"google-cloud-storage"库与GCS通信。"google-cloud-storage"底层使用"google-cloud"包来处理认证。有几种选项可以提供凭证。默认情况下,"smart_open"将推迟到"google-cloud-storage",并让它处理凭证。
要覆盖此行为,请将"google.cloud.storage.Client"对象作为传输参数传递给"open"函数。您可以在构建客户端时自定义凭证。"smart_open"将随后在与其他GCS通信时使用该客户端。以下示例的说明,请参阅Google的GCS认证指南设置服务帐户的GCS认证。
import os
from google.cloud.storage import Client
service_account_path = os.environ['GOOGLE_APPLICATION_CREDENTIALS']
client = Client.from_service_account_json(service_account_path)
fin = open('gs://gcp-public-data-landsat/index.csv.gz', transport_params=dict(client=client))
如果您需要更多凭证选项,您可以创建一个显式的"google.auth.credentials.Credentials"对象并将其传递给客户端。要创建用于以下示例的API令牌,请参阅GCS认证指南。
import os
from google.auth.credentials import Credentials
from google.cloud.storage import Client
token = os.environ['GOOGLE_API_TOKEN']
credentials = Credentials(token=token)
client = Client(credentials=credentials)
fin = open('gs://gcp-public-data-landsat/index.csv.gz', transport_params={'client': client})
GCS高级使用
额外的关键字参数可以传递给 GCS 的 open 方法(文档),该方法由内部使用的 smart_open 通过 blob_open_kwargs 传输参数实现。
此外,在读取模式下,还可以将关键字参数传递给 GCS 的 get_blob 方法(文档),使用 get_blob_kwargs 传输参数。
在上传之前,可以设置额外的 blob 属性(文档),只要它们不是只读的,使用 blob_properties 传输参数。
open_kwargs = {'predefined_acl': 'authenticated-read'}
properties = {'metadata': {'version': 2}, 'storage_class': 'COLDLINE'}
fout = open('gs://bucket/key', 'wb', transport_params={'blob_open_kwargs': open_kwargs, 'blob_properties': properties})
Azure 凭据
smart_open 使用 azure-storage-blob 库与 Azure Blob Storage 通信。默认情况下,smart_open 将委托给 azure-storage-blob 并让其处理凭据。
Azure Blob Storage 没有推断凭据的方法,因此,需要将 azure.storage.blob.BlobServiceClient 对象作为传输参数传递给 open 函数。您可以在构造客户端时自定义凭据。然后,smart_open 将使用该客户端进行通信。以下示例中,请参考 Azure 的设置认证指南。
import os
from azure.storage.blob import BlobServiceClient
azure_storage_connection_string = os.environ['AZURE_STORAGE_CONNECTION_STRING']
client = BlobServiceClient.from_connection_string(azure_storage_connection_string)
fin = open('azure://my_container/my_blob.txt', transport_params={'client': client})
如果您需要更多的凭据选项,请参阅Azure 存储认证指南。
Azure 高级使用
额外的关键字参数可以传递给 commit_block_list 方法(文档),该方法由 smart_open 在上传时内部使用,使用 blob_kwargs 传输参数。
kwargs = {'metadata': {'version': 2}}
fout = open('azure://container/key', 'wb', transport_params={'blob_kwargs': kwargs})
替换 pathlib.Path.open
smart_open.open 也可以与 Path 对象一起使用。内置的 Path.open() 无法从压缩文件中读取文本,因此使用 patch_pathlib 将其替换为 smart_open.open()。这在处理压缩文件时可能很有用。
>>> from pathlib import Path
>>> from smart_open.smart_open_lib import patch_pathlib
>>>
>>> _ = patch_pathlib() # replace `Path.open` with `smart_open.open`
>>>
>>> path = Path("smart_open/tests/test_data/crime-and-punishment.txt.gz")
>>>
>>> with path.open("r") as infile:
... print(infile.readline()[:41])
В начале июля, в чрезвычайно жаркое время
如何...
请参阅此文档。
扩展 smart_open
请参阅此文档。
测试 smart_open
smart_open 附带了一套完整的单元测试。在运行测试套件之前,请安装测试依赖项
pip install -e .[test]
现在,您可以运行单元测试
pytest smart_open
这些测试也随着 Travis CI 在每次提交推送和拉取请求时自动运行。
项目详情
下载文件
下载适用于您平台文件。如果您不确定选择哪个,请了解有关安装包的更多信息。
源分发
构建分发
smart_open-7.0.5.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | d3672003b1dbc85e2013e4983b88eb9a5ccfd389b0d4e5015f39a9ee5620ec18 |
|
MD5 | e26329aca66b36264db8eea68a5933cf |
|
BLAKE2b-256 | a3d81481294b2d110b805c0f5d23ef34158b7d5d4283633c0d34c69ea89bb76b |
smart_open-7.0.5-py3-none-any.whl的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 8523ed805c12dff3eaa50e9c903a6cb0ae78800626631c5fe7ea073439847b89 |
|
MD5 | f0d2ab2c2aa4cebbfe04258dfd9e8c7d |
|
BLAKE2b-256 | 06bc706838af28a542458bffe74a5d0772ca7f207b5495cd9fccfce61ef71f2a |
评论、错误报告
smart_open 位于 Github。您可以在那里提交问题或拉取请求。欢迎建议、拉取请求和改进!
smart_open 是在 MIT 许可下发布的开源软件。版权(c)2015-现在 Radim Řehůřek。