Kinto客户端
项目描述
kinto-http 是用于与 Kinto 服务器交互的 Python 库。
还有一个类似的 JavaScript 客户端。
安装
使用 pip
$ pip install kinto-http
使用方法
以下是 API 提供的概述
import kinto_http
client = kinto_http.Client(server_url="http://localhost:8888/v1",
auth=('alexis', 'p4ssw0rd'))
records = client.get_records(bucket='default', collection='todos')
for i, record in enumerate(records):
record['title'] = 'Todo {}'.format(i)
client.update_record(data=record)
实例化客户端
传递的 auth 参数是一个 requests 认证策略。
默认情况下,一个简单的元组将变成一个 Basic Auth 认证请求头,可以用于使用 Kinto 账户 认证用户。
import kinto_http
auth = ('alexis', 'p4ssw0rd')
client = kinto_http.Client(server_url='http://localhost:8888/v1',
auth=auth)
也可以传递一个桶 ID 和/或 集合 ID,将它们设置为客户端操作参数的默认值。
client = Client(bucket="payments", collection="receipts", auth=auth)
创建客户端后,还可以复制现有的客户端并覆盖一些关键参数。
client2 = client.clone(collection="orders")
还提供了一个异步客户端。它具有与同步客户端相同的所有端点,除了批处理操作。
from kinto_http import AsyncClient
auth = ('alexis', 'p4ssw0rd')
client = AsyncClient(server_url='http://localhost:8888/v1', auth=auth)
info = await client.server_info()
assert 'schema' in info['capabilities'], "Server doesn't support schema validation."
使用Bearer访问令牌进行认证(OpenID)
import kinto_http
client = kinto_http.Client(auth=kinto_http.BearerTokenAuth("XYPJTNsFKV2"))
默认情况下,授权头前缀为Bearer。如果header_type在服务器上进行了自定义,客户端必须指定预期的类型:kinto_http.BearerTokenAuth("XYPJTNsFKV2", type="Bearer+OIDC")
自定义头
可以在客户端构造函数中指定自定义头,并将它们发送到每个请求
import kinto_http
client = kinto_http.Client(server_url="http://server/v1", headers={
"Allow-Access": "CDN",
"User-Agent": "blocklist-updater"
})
获取服务器信息
可以使用server_info()方法获取服务器信息
from kinto_http import Client
client = Client(server_url='http://localhost:8888/v1')
info = client.server_info()
assert 'schema' in info['capabilities'], "Server doesn't support schema validation."
桶操作
get_bucket(id=None, **kwargs):检索单个桶
get_buckets(**kwargs):检索所有可读桶
create_bucket(id=None, data=None, **kwargs):创建一个桶
update_bucket(id=None, data=None, **kwargs):创建或替换现有桶
patch_bucket(id=None, changes=None, **kwargs):修改现有桶的一些字段
delete_bucket(id=None, **kwargs):删除一个桶及其下的所有内容
delete_buckets(**kwargs):删除所有可写桶
组操作
get_group(id=None, bucket=None, **kwargs):检索单个组
get_groups(bucket=None, **kwargs):检索所有可读组
create_group(id=None, data=None, bucket=None, **kwargs):创建一个组
update_group(id=None, data=None, bucket=None, **kwargs):创建或替换现有组
patch_group(id=None, changes=None, bucket=None, **kwargs):修改现有组的一些字段
delete_group(id=None, bucket=None, **kwargs):删除一个组及其下的所有内容
delete_groups(bucket=None, **kwargs):删除所有可写组
集合
get_collection(id=None, bucket=None, **kwargs):检索单个集合
get_collections(bucket=None, **kwargs):检索所有可读集合
create_collection(id=None, data=None, bucket=None, **kwargs):创建一个集合
update_collection(id=None, data=None, bucket=None, **kwargs):创建或替换现有集合
patch_collection(id=None, changes=None, bucket=None, **kwargs):修改现有集合的一些字段
delete_collection(id=None, bucket=None, **kwargs):删除一个集合及其下的所有内容
delete_collections(bucket=None, **kwargs):删除所有可写集合
记录
get_record(id=None, bucket=None, collection=None, **kwargs):检索单个记录
get_records(bucket=None, collection=None, **kwargs):检索所有可读记录
get_paginated_records(bucket=None, collection=None, **kwargs):记录的分页列表
get_records_timestamp(bucket=None, collection=None, **kwargs):返回此集合的记录时间戳
create_record(id=None, data=None, bucket=None, collection=None, **kwargs):创建一个记录
update_record(id=None, data=None, bucket=None, collection=None, **kwargs): 创建或替换现有记录
patch_record(id=None, changes=None, bucket=None, collection=None, **kwargs): 修改现有记录的一些字段
delete_record(id=None, bucket=None, collection=None, **kwargs): 删除记录及其所有内容
delete_records(bucket=None, collection=None, **kwargs): 删除所有可写记录
权限
可以通过将permissions传递给create_*()、patch_*()或update_*()方法来指定或修改对象的权限
client.create_record(data={'foo': 'bar'},
permissions={'read': ['group:groupid']})
record = client.get_record('123', collection='todos', bucket='alexis')
record['permissions']['write'].append('leplatrem')
client.update_record(data=record)
获取或创建
在某些情况下,你可能只想在不存在时创建bucket、collection、group或record。为此,你可以在create_*()方法中传递if_not_exists=True
client.create_bucket(id='blog', if_not_exists=True) client.create_collection(id='articles', bucket='blog', if_not_exists=True)
如果存在则删除
在某些情况下,你可能只想在存在时删除bucket、collection、group或record。为此,你可以在delete_*方法中传递if_exists=True
client.delete_bucket(id='bucket', if_exists=True)
修补操作
.patch_*()操作接收一个changes参数。
from kinto_http.patch_type import BasicPatch, MergePatch, JSONPatch
client.patch_record(id='abc', changes=BasicPatch({'over': 'write'}))
client.patch_record(id='todo', changes=MergePatch({'assignee': 'bob'}))
client.patch_record(id='receipts', changes=JSONPatch([
{'op': 'add', 'path': '/data/members/0', 'value': 'ldap:user@corp.com'}
]))
并发控制
create_*()、patch_*()和update_*()方法接受一个safe参数(默认:True)。
如果为True,客户端将确保对象在创建时不存在,或者在从我们获取以来服务器端未被修改。时间戳将隐式地从传递的data对象的last_modified字段读取,或从if_match参数显式获取。
批量操作
而不是为每个操作发出请求,可以将多个操作批量处理在一个请求中(仅限同步客户端)。
将batch()方法作为Python上下文管理器(with)使用
with client.batch() as batch:
for idx in range(0, 100):
batch.update_record(data={'id': idx})
通过使用在批量上下文关闭后可用的results()方法从批量操作中读取数据。
with client.batch() as batch:
batch.get_record('r1')
batch.get_record('r2')
batch.get_record('r3')
r1, r2, r3 = batch.results()
错误
失败的操作将引发一个具有request和response属性的KintoException。
try:
client.create_group(id="friends")
except kinto_http.KintoException as e:
if e.response and e.response.status_code == 403:
print("Not allowed!")
请求超时
可以在客户端构造函数中指定以秒为单位的超时值
client = KintoClient(server_url="...", timeout=5)
为了区分连接超时和读取超时,请使用一个元组
client = KintoClient(server_url="...", timeout=(3.05, 27))
对于无限超时,请使用None
client = KintoClient(server_url="...", timeout=None)
请参阅底层requests库的超时文档
错误重试
当服务器被限制(在高负载或维护中)时,它可以返回错误响应。
因此,客户端可以重试发送相同的请求,直到成功为止。为此,在客户端指定重试次数
client = Client(server_url='http://localhost:8888/v1',
auth=credentials,
retry=10)
Kinto协议允许服务器定义重试之间的持续时间(以秒为单位)。在客户端中强制此值(但不推荐)是可能的
client = Client(server_url='http://localhost:8888/v1',
auth=credentials,
retry=10,
retry_after=5)
分页
当服务器响应分页时,客户端将下载每一页并透明地合并它们。
get_paginated_records()方法返回一个生成器,将产生每一页
for page in client.get_paginated_records():
records = page["data"]
可以指定每页要检索的项目数限制
records = client.get_records(_limit=10)
为了检索每页有限数量的可用页面,您可以指定页数
records = client.get_records(_limit=10, pages=float('inf')) # Infinity
历史记录
如果启用了内置的历史插件,则可以检索更改历史记录
# Get the complete history of a bucket
changes = client.get_history(bucket='default')
# and optionally use filters
hist = client.get_history(bucket='default', _limit=2, _sort='-last_modified', _since='1533762576015')
hist = client.get_history(bucket='default', resource_name='collection')
使用以下方式也可以清除存储桶的历史记录
client.purge_history(bucket='default')
端点URL
get_endpoint()方法返回服务器上的端点URL
client = Client(server_url='http://localhost:8888/v1',
auth=('token', 'your-token'),
bucket="payments",
collection="receipts")
print(client.get_endpoint("record",
id="c6894b2c-1856-11e6-9415-3c970ede22b0"))
# '/buckets/payments/collections/receipts/records/c6894b2c-1856-11e6-9415-3c970ede22b0'
处理日期和时间对象
除了JSON支持的数据类型外,kinto-http.py还支持原生Python日期和日期时间对象。
如果负载包含日期或日期时间对象,kinto-http.py将将其编码为ISO格式的字符串。
请注意,此转换是单向的。在读取记录时,如果字符串包含ISO格式的字符串,kinto-http.py不会将其转换为原生Python日期或日期时间对象。
如果您知道某个字段将是日期时间,您可以考虑自行编码以更明确地表示它是一个字符串,供Kinto使用。
命令行脚本
为了使脚本具有共同的参数和选项,提供了一些工具以简化客户端从命令行参数的配置和初始化。
import argparse
import logging
from kinto_http import cli_utils
logger = logging.getLogger(__name__)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Download records")
cli_utils.set_parser_server_options(parser)
args = parser.parse_args()
cli_utils.setup_logger(logger, args)
logger.debug("Instantiate Kinto client.")
client = cli_utils.create_client_from_args(args)
logger.info("Fetch records.")
records = client.get_records()
logger.warn("{} records.".format(len(records)))
脚本现在接受基本选项
$ python example.py --help usage: example.py [-h] [-s SERVER] [-a AUTH] [-b BUCKET] [-c COLLECTION] [-v] [-q] [-D] Download records optional arguments: -h, --help show this help message and exit -s SERVER, --server SERVER The location of the remote server (with prefix) -a AUTH, --auth AUTH BasicAuth credentials: `token:my-secret` or Authorization header: `Bearer token` -b BUCKET, --bucket BUCKET Bucket name. -c COLLECTION, --collection COLLECTION Collection name. --retry RETRY Number of retries when a request fails --retry-after RETRY_AFTER Delay in seconds between retries when requests fail (default: provided by server) -v, --verbose Show all messages. -q, --quiet Show only critical errors. -D, --debug Show all messages, including debug messages.
开发
请参阅贡献文档
项目详情
下载文件
下载您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。
源分布
构建分布
kinto_http-11.2.0.tar.gz的散列
算法 | 散列摘要 | |
---|---|---|
SHA256 | cfb7b206aa9ba7b4b299caac58416648d26512cb9b96f504b7441cc290ff1da2 |
|
MD5 | f83f1766021479d85e0aa6c913378b40 |
|
BLAKE2b-256 | c5ed90ad2ad7445e7bd404e2164a35cf3a8626e2687c0f2a4733b7fe4e4a1d27 |