跳转到主要内容

Kinto客户端

项目描述

https://github.com/Kinto/kinto-http.py/actions/workflows/test.yml/badge.svg https://img.shields.io/pypi/v/kinto-http.svg

kinto-http 是用于与 Kinto 服务器交互的 Python 库。

还有一个类似的 JavaScript 客户端

安装

使用 pip

$ pip install kinto-http

使用方法

以下是 API 提供的概述

import kinto_http

client = kinto_http.Client(server_url="http://localhost:8888/v1",
                           auth=('alexis', 'p4ssw0rd'))

records = client.get_records(bucket='default', collection='todos')
for i, record in enumerate(records):
    record['title'] = 'Todo {}'.format(i)
    client.update_record(data=record)

实例化客户端

传递的 auth 参数是一个 requests 认证策略。

默认情况下,一个简单的元组将变成一个 Basic Auth 认证请求头,可以用于使用 Kinto 账户 认证用户。

import kinto_http

auth = ('alexis', 'p4ssw0rd')

client = kinto_http.Client(server_url='http://localhost:8888/v1',
                           auth=auth)

也可以传递一个 ID 和/或 集合 ID,将它们设置为客户端操作参数的默认值。

client = Client(bucket="payments", collection="receipts", auth=auth)

创建客户端后,还可以复制现有的客户端并覆盖一些关键参数。

client2 = client.clone(collection="orders")

还提供了一个异步客户端。它具有与同步客户端相同的所有端点,除了批处理操作。

from kinto_http import AsyncClient

auth = ('alexis', 'p4ssw0rd')

client = AsyncClient(server_url='http://localhost:8888/v1', auth=auth)
info = await client.server_info()
assert 'schema' in info['capabilities'], "Server doesn't support schema validation."

使用Bearer访问令牌进行认证(OpenID)

import kinto_http

client = kinto_http.Client(auth=kinto_http.BearerTokenAuth("XYPJTNsFKV2"))

默认情况下,授权头前缀为Bearer。如果header_type在服务器上进行了自定义,客户端必须指定预期的类型:kinto_http.BearerTokenAuth("XYPJTNsFKV2", type="Bearer+OIDC")

自定义头

可以在客户端构造函数中指定自定义头,并将它们发送到每个请求

import kinto_http

client = kinto_http.Client(server_url="http://server/v1", headers={
    "Allow-Access": "CDN",
    "User-Agent": "blocklist-updater"
})

获取服务器信息

可以使用server_info()方法获取服务器信息

from kinto_http import Client

client = Client(server_url='http://localhost:8888/v1')
info = client.server_info()
assert 'schema' in info['capabilities'], "Server doesn't support schema validation."

桶操作

  • get_bucket(id=None, **kwargs):检索单个桶

  • get_buckets(**kwargs):检索所有可读桶

  • create_bucket(id=None, data=None, **kwargs):创建一个桶

  • update_bucket(id=None, data=None, **kwargs):创建或替换现有桶

  • patch_bucket(id=None, changes=None, **kwargs):修改现有桶的一些字段

  • delete_bucket(id=None, **kwargs):删除一个桶及其下的所有内容

  • delete_buckets(**kwargs):删除所有可写桶

组操作

  • get_group(id=None, bucket=None, **kwargs):检索单个组

  • get_groups(bucket=None, **kwargs):检索所有可读组

  • create_group(id=None, data=None, bucket=None, **kwargs):创建一个组

  • update_group(id=None, data=None, bucket=None, **kwargs):创建或替换现有组

  • patch_group(id=None, changes=None, bucket=None, **kwargs):修改现有组的一些字段

  • delete_group(id=None, bucket=None, **kwargs):删除一个组及其下的所有内容

  • delete_groups(bucket=None, **kwargs):删除所有可写组

集合

  • get_collection(id=None, bucket=None, **kwargs):检索单个集合

  • get_collections(bucket=None, **kwargs):检索所有可读集合

  • create_collection(id=None, data=None, bucket=None, **kwargs):创建一个集合

  • update_collection(id=None, data=None, bucket=None, **kwargs):创建或替换现有集合

  • patch_collection(id=None, changes=None, bucket=None, **kwargs):修改现有集合的一些字段

  • delete_collection(id=None, bucket=None, **kwargs):删除一个集合及其下的所有内容

  • delete_collections(bucket=None, **kwargs):删除所有可写集合

记录

  • get_record(id=None, bucket=None, collection=None, **kwargs):检索单个记录

  • get_records(bucket=None, collection=None, **kwargs):检索所有可读记录

  • get_paginated_records(bucket=None, collection=None, **kwargs):记录的分页列表

  • get_records_timestamp(bucket=None, collection=None, **kwargs):返回此集合的记录时间戳

  • create_record(id=None, data=None, bucket=None, collection=None, **kwargs):创建一个记录

  • update_record(id=None, data=None, bucket=None, collection=None, **kwargs): 创建或替换现有记录

  • patch_record(id=None, changes=None, bucket=None, collection=None, **kwargs): 修改现有记录的一些字段

  • delete_record(id=None, bucket=None, collection=None, **kwargs): 删除记录及其所有内容

  • delete_records(bucket=None, collection=None, **kwargs): 删除所有可写记录

权限

可以通过将permissions传递给create_*()patch_*()update_*()方法来指定或修改对象的权限

client.create_record(data={'foo': 'bar'},
                     permissions={'read': ['group:groupid']})


record = client.get_record('123', collection='todos', bucket='alexis')
record['permissions']['write'].append('leplatrem')
client.update_record(data=record)

获取或创建

在某些情况下,你可能只想在不存在时创建bucket、collection、group或record。为此,你可以在create_*()方法中传递if_not_exists=True

client.create_bucket(id='blog', if_not_exists=True)
client.create_collection(id='articles', bucket='blog', if_not_exists=True)

如果存在则删除

在某些情况下,你可能只想在存在时删除bucket、collection、group或record。为此,你可以在delete_*方法中传递if_exists=True

client.delete_bucket(id='bucket', if_exists=True)

修补操作

.patch_*()操作接收一个changes参数。

from kinto_http.patch_type import BasicPatch, MergePatch, JSONPatch


client.patch_record(id='abc', changes=BasicPatch({'over': 'write'}))

client.patch_record(id='todo', changes=MergePatch({'assignee': 'bob'}))

client.patch_record(id='receipts', changes=JSONPatch([
    {'op': 'add', 'path': '/data/members/0', 'value': 'ldap:user@corp.com'}
]))

并发控制

create_*()patch_*()update_*()方法接受一个safe参数(默认:True)。

如果为True,客户端将确保对象在创建时不存在,或者在从我们获取以来服务器端未被修改。时间戳将隐式地从传递的data对象的last_modified字段读取,或从if_match参数显式获取。

批量操作

而不是为每个操作发出请求,可以将多个操作批量处理在一个请求中(仅限同步客户端)。

batch()方法作为Python上下文管理器(with)使用

with client.batch() as batch:
    for idx in range(0, 100):
        batch.update_record(data={'id': idx})

通过使用在批量上下文关闭后可用的results()方法从批量操作中读取数据。

with client.batch() as batch:
    batch.get_record('r1')
    batch.get_record('r2')
    batch.get_record('r3')

r1, r2, r3 = batch.results()

错误

失败的操作将引发一个具有requestresponse属性的KintoException

try:
    client.create_group(id="friends")
except kinto_http.KintoException as e:
    if e.response and e.response.status_code == 403:
        print("Not allowed!")

请求超时

可以在客户端构造函数中指定以秒为单位的超时值

client = KintoClient(server_url="...", timeout=5)

为了区分连接超时和读取超时,请使用一个元组

client = KintoClient(server_url="...", timeout=(3.05, 27))

对于无限超时,请使用None

client = KintoClient(server_url="...", timeout=None)

请参阅底层requests库的超时文档

错误重试

当服务器被限制(在高负载或维护中)时,它可以返回错误响应。

因此,客户端可以重试发送相同的请求,直到成功为止。为此,在客户端指定重试次数

client = Client(server_url='http://localhost:8888/v1',
                auth=credentials,
                retry=10)

Kinto协议允许服务器定义重试之间的持续时间(以秒为单位)。在客户端中强制此值(但不推荐)是可能的

client = Client(server_url='http://localhost:8888/v1',
                auth=credentials,
                retry=10,
                retry_after=5)

分页

当服务器响应分页时,客户端将下载每一页并透明地合并它们。

get_paginated_records()方法返回一个生成器,将产生每一页

for page in client.get_paginated_records():
    records = page["data"]

可以指定每页要检索的项目数限制

records = client.get_records(_limit=10)

为了检索每页有限数量的可用页面,您可以指定页数

records = client.get_records(_limit=10, pages=float('inf'))  # Infinity

历史记录

如果启用了内置的历史插件,则可以检索更改历史记录

# Get the complete history of a bucket
changes = client.get_history(bucket='default')

# and optionally use filters
hist = client.get_history(bucket='default', _limit=2, _sort='-last_modified', _since='1533762576015')
hist = client.get_history(bucket='default', resource_name='collection')

使用以下方式也可以清除存储桶的历史记录

client.purge_history(bucket='default')

端点URL

get_endpoint()方法返回服务器上的端点URL

client = Client(server_url='http://localhost:8888/v1',
                auth=('token', 'your-token'),
                bucket="payments",
                collection="receipts")

print(client.get_endpoint("record",
                          id="c6894b2c-1856-11e6-9415-3c970ede22b0"))

# '/buckets/payments/collections/receipts/records/c6894b2c-1856-11e6-9415-3c970ede22b0'

处理日期和时间对象

除了JSON支持的数据类型外,kinto-http.py还支持原生Python日期和日期时间对象。

如果负载包含日期或日期时间对象,kinto-http.py将将其编码为ISO格式的字符串。

请注意,此转换是单向的。在读取记录时,如果字符串包含ISO格式的字符串,kinto-http.py不会将其转换为原生Python日期或日期时间对象。

如果您知道某个字段将是日期时间,您可以考虑自行编码以更明确地表示它是一个字符串,供Kinto使用。

命令行脚本

为了使脚本具有共同的参数和选项,提供了一些工具以简化客户端从命令行参数的配置和初始化。

import argparse
import logging

from kinto_http import cli_utils

logger = logging.getLogger(__name__)

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Download records")
    cli_utils.set_parser_server_options(parser)

    args = parser.parse_args()

    cli_utils.setup_logger(logger, args)

    logger.debug("Instantiate Kinto client.")
    client = cli_utils.create_client_from_args(args)

    logger.info("Fetch records.")
    records = client.get_records()
    logger.warn("{} records.".format(len(records)))

脚本现在接受基本选项

$ python example.py --help

usage: example.py [-h] [-s SERVER] [-a AUTH] [-b BUCKET] [-c COLLECTION] [-v]
                  [-q] [-D]

Download records

optional arguments:
  -h, --help            show this help message and exit
  -s SERVER, --server SERVER
                        The location of the remote server (with prefix)
  -a AUTH, --auth AUTH  BasicAuth credentials: `token:my-secret` or
                        Authorization header: `Bearer token`
  -b BUCKET, --bucket BUCKET
                        Bucket name.
  -c COLLECTION, --collection COLLECTION
                        Collection name.
  --retry RETRY         Number of retries when a request fails
  --retry-after RETRY_AFTER
                        Delay in seconds between retries when requests fail
                        (default: provided by server)
  -v, --verbose         Show all messages.
  -q, --quiet           Show only critical errors.
  -D, --debug           Show all messages, including debug messages.

开发

请参阅贡献文档

项目详情


下载文件

下载您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分布

kinto_http-11.2.0.tar.gz (61.5 kB 查看散列)

上传时间

构建分布

kinto_http-11.2.0-py3-none-any.whl (23.2 kB 查看散列)

上传时间 Python 3

支持