跳转到主要内容

将对象从与Amazon S3兼容的端点存入和获取的简单模块

项目描述

aiohttp-s3-client

PyPI - License Wheel Mypy PyPI PyPI Coverage Status tox

将对象从与Amazon S3兼容的端点存入和获取的简单模块

安装

pip install aiohttp-s3-client

使用

from http import HTTPStatus

from aiohttp import ClientSession
from aiohttp_s3_client import S3Client


async with ClientSession(raise_for_status=True) as session:
    client = S3Client(
        url="http://s3-url",
        session=session,
        access_key_id="key-id",
        secret_access_key="hackme",
        region="us-east-1"
    )

    # Upload str object to bucket "bucket" and key "str"
    async with client.put("bucket/str", "hello, world") as resp:
        assert resp.status == HTTPStatus.OK

    # Upload bytes object to bucket "bucket" and key "bytes"
    async with await client.put("bucket/bytes", b"hello, world") as resp:
        assert resp.status == HTTPStatus.OK

    # Upload AsyncIterable to bucket "bucket" and key "iterable"
    async def gen():
        yield b'some bytes'

    async with client.put("bucket/file", gen()) as resp:
        assert resp.status == HTTPStatus.OK

    # Upload file to bucket "bucket" and key "file"
    async with client.put_file("bucket/file", "/path_to_file") as resp:
        assert resp.status == HTTPStatus.OK

    # Check object exists using bucket+key
    async with client.head("bucket/key") as resp:
        assert resp == HTTPStatus.OK

    # Get object by bucket+key
    async with client.get("bucket/key") as resp:
        data = await resp.read()

    # Make presigned URL
    url = client.presign_url("GET", "bucket/key", expires=60 * 60)

    # Delete object using bucket+key
    async with client.delete("bucket/key") as resp:
        assert resp == HTTPStatus.NO_CONTENT

    # List objects by prefix
    async for result, prefixes in client.list_objects_v2("bucket/", prefix="prefix"):
        # Each result is a list of metadata objects representing an object
        # stored in the bucket.  Each prefixes is a list of common prefixes
        do_work(result, prefixes)

存储桶可以作为子域或对象名称指定

import aiohttp
from aiohttp_s3_client import S3Client


client = S3Client(url="http://bucket.your-s3-host",
                  session=aiohttp.ClientSession())
async with client.put("key", gen()) as resp:
    ...

client = S3Client(url="http://your-s3-host",
                  session=aiohttp.ClientSession())
async with await client.put("bucket/key", gen()) as resp:
    ...

client = S3Client(url="http://your-s3-host/bucket",
                  session=aiohttp.ClientSession())
async with client.put("key", gen()) as resp:
    ...

认证可以用关键字或URL指定

import aiohttp
from aiohttp_s3_client import S3Client

client_credentials_as_kw = S3Client(
    url="http://your-s3-host",
    access_key_id="key_id",
    secret_access_key="access_key",
    session=aiohttp.ClientSession(),
)

client_credentials_in_url = S3Client(
    url="http://key_id:access_key@your-s3-host",
    session=aiohttp.ClientSession(),
)

凭证

默认情况下,S3Client尝试从关键字参数如access_key_id=secret_access_key=收集所有可用的凭证,然后从传递的url参数中的用户名和密码,然后是环境变量解析,最后是配置文件的来源。

您可以使用aiohttp_s3_client.credentials模块显式传递凭证。

aiohttp_s3_client.credentials.StaticCredentials

import aiohttp
from aiohttp_s3_client import S3Client
from aiohttp_s3_client.credentials import StaticCredentials

credentials = StaticCredentials(
    access_key_id='aaaa',
    secret_access_key='bbbb',
    region='us-east-1',
)
client = S3Client(
    url="http://your-s3-host",
    session=aiohttp.ClientSession(),
    credentials=credentials,
)

aiohttp_s3_client.credentials.URLCredentials

import aiohttp
from aiohttp_s3_client import S3Client
from aiohttp_s3_client.credentials import URLCredentials

url = "http://key@hack-me:your-s3-host"
credentials = URLCredentials(url, region="us-east-1")
client = S3Client(
    url="http://your-s3-host",
    session=aiohttp.ClientSession(),
    credentials=credentials,
)

aiohttp_s3_client.credentials.EnvironmentCredentials

import aiohttp
from aiohttp_s3_client import S3Client
from aiohttp_s3_client.credentials import EnvironmentCredentials

credentials = EnvironmentCredentials(region="us-east-1")
client = S3Client(
    url="http://your-s3-host",
    session=aiohttp.ClientSession(),
    credentials=credentials,
)

aiohttp_s3_client.credentials.ConfigCredentials

使用用户配置文件

import aiohttp
from aiohttp_s3_client import S3Client
from aiohttp_s3_client.credentials import ConfigCredentials


credentials = ConfigCredentials()   # Will be used ~/.aws/credentials config
client = S3Client(
    url="http://your-s3-host",
    session=aiohttp.ClientSession(),
    credentials=credentials,
)

使用自定义配置位置

import aiohttp
from aiohttp_s3_client import S3Client
from aiohttp_s3_client.credentials import ConfigCredentials


credentials = ConfigCredentials("~/.my-custom-aws-credentials")
client = S3Client(
    url="http://your-s3-host",
    session=aiohttp.ClientSession(),
    credentials=credentials,
)

aiohttp_s3_client.credentials.merge_credentials

此函数收集所有传递的凭证实例,并返回一个包含所有非空字段的新实例。第一个参数具有更高的优先级。

import aiohttp
from aiohttp_s3_client import S3Client
from aiohttp_s3_client.credentials import (
    ConfigCredentials, EnvironmentCredentials, merge_credentials
)

credentials = merge_credentials(
    EnvironmentCredentials(),
    ConfigCredentials(),
)
client = S3Client(
    url="http://your-s3-host",
    session=aiohttp.ClientSession(),
    credentials=credentials,
)

aiohttp_s3_client.credentials.MetadataCredentials

尝试从元数据服务获取凭证

import aiohttp
from aiohttp_s3_client import S3Client
from aiohttp_s3_client.credentials import MetadataCredentials

credentials = MetadataCredentials()

# start refresh credentials from metadata server
await credentials.start()
client = S3Client(
    url="http://your-s3-host",
    session=aiohttp.ClientSession(),
)
await credentials.stop()

分片上传

对于上传大文件,可以使用 分片上传。这允许您异步地将文件的多个部分上传到S3。S3Client处理部分上传的重试,并计算部分哈希以进行完整性检查。

import aiohttp
from aiohttp_s3_client import S3Client


client = S3Client(url="http://your-s3-host", session=aiohttp.ClientSession())
await client.put_file_multipart(
    "test/bigfile.csv",
    headers={
        "Content-Type": "text/csv",
    },
    workers_count=8,
)

并行下载到文件

S3支持带有 Range 标头的 GET 请求。可以通过多个连接并行下载对象以加速。S3Client处理部分请求的重试,并确保文件在下载过程中不会更改(使用 ETag 标头)。如果您的系统支持 pwrite 系统调用(Linux、macOS等),则将用于同时写入单个文件。否则,每个工作进程将拥有自己的文件,在下载后将进行连接。

import aiohttp
from aiohttp_s3_client import S3Client


client = S3Client(url="http://your-s3-host", session=aiohttp.ClientSession())

await client.get_file_parallel(
    "dump/bigfile.csv",
    "/home/user/bigfile.csv",
    workers_count=8,
)

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。

源分布

aiohttp_s3_client-1.0.0.tar.gz (20.1 kB 查看哈希值)

上传时间

构建分布

aiohttp_s3_client-1.0.0-py3-none-any.whl (22.7 kB 查看哈希值)

上传时间 Python 3

支持者

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面