跳转到主要内容

使用纯asyncio gRPC库构建的Etcd客户端

项目描述

etcetra

纯Python asyncio Etcd客户端。

安装

pip install etcetra

API文档

参考 此处

基本用法

etcetra管理的所有etcd操作都可以通过EtcdClient执行。 EtcdClient实例是一个包装器,它包含连接到Etcd通道的连接信息。此实例是可重复使用的,因为实际的gRPC通道连接将在您发起连接调用时建立(见下文)。

from etcetra import EtcdClient, HostPortPair
etcd = EtcdClient(HostPortPair('127.0.0.1', 2379))

如上所述,与Etcd的gRPC通道的实际连接将在您调用EtcdClient.connect()时建立。此调用返回异步上下文管理器,该管理器管理EtcdCommunicator实例。

async with etcd.connect() as communicator:
    await communicator.put('testkey', 'testvalue')
    value = await communicator.get('testkey')
    print(value)  # testvalue

EtcdCommunicator.get_prefix(prefix)将返回一个包含给定键前缀的所有键值对的字典。

async with etcd.connect() as communicator:
    await communicator.put('/testdir', 'root')
    await communicator.put('/testdir/1', '1')
    await communicator.put('/testdir/2', '2')
    await communicator.put('/testdir/2/3', '3')
    test_dir = await communicator.get_prefix('/testdir')
    print(test_dir)  # {'/testdir': 'root', '/testdir/1': '1', '/testdir/2': '2', '/testdir/2/3': '3'}

操作Etcd锁

就像EtcdClient.connect()一样,您可以通过调用EtcdClient.with_lock(lock_name, timeout=None)轻松使用etcd锁。

async def first():
    async with etcd.with_lock('foolock') as communicator:
        value = await communicator.get('testkey')
        print('first:', value, end=' | ')

async def second():
    await asyncio.sleep(0.1)
    async with etcd.with_lock('foolock') as communicator:
        value = await communicator.get('testkey')
        print('second:', value)

async with etcd.connect() as communicator:
    await communicator.put('testkey', 'testvalue')
await asyncio.gather(first(), second())  # first: testvalue | second: testvalue

timeout参数添加到EtcdClient.with_lock()调用将为获取锁的过程添加超时。

async def first():
    async with etcd.with_lock('foolock') as communicator:
        value = await communicator.get('testkey')
        print('first:', value)
        await asyncio.sleep(10)

async def second():
    await asyncio.sleep(0.1)
    async with etcd.with_lock('foolock', timeout=5) as communicator:
        value = await communicator.get('testkey')
        print('second:', value)

async with etcd.connect() as communicator:
    await communicator.put('testkey', 'testvalue')
await asyncio.gather(first(), second())  # asyncio.TimeoutError followed by first: testvalue output

ttl参数添加到EtcdClient.with_lock()调用将强制在给定秒数后释放锁。

async def first():
    async with etcd.with_lock('foolock', ttl=5) as communicator:
        await asyncio.sleep(10)

await first()

# on other file

import time

async def second():
    start = time.time()
    async with etcd.with_lock('foolock', ttl=5) as communicator:
        print(f'acquired lock after {time.time() - start} seconds')

await second()  # acquired lock after 4.756163120269775 seconds

观察

您可以通过调用EtcdCommunicator.watch(key)观察键的变化。

async def watch():
    async with etcd.connect() as communicator:
        async for event in communicator.watch('testkey'):
            print(event.event, event.value)

async def update():
    await asyncio.sleep(0.1)
    async with etcd.connect() as communicator:
        await communicator.put('testkey', '1')
        await communicator.put('testkey', '2')
        await communicator.put('testkey', '3')
        await communicator.put('testkey', '4')
        await communicator.put('testkey', '5')

await asyncio.gather(watch(), update())
# WatchEventType.PUT 1
# WatchEventType.PUT 2
# WatchEventType.PUT 3
# WatchEventType.PUT 4
# WatchEventType.PUT 5

也可以通过EtcdCommunicator.watch_prefix(key_prefix)观察具有特定前缀的键的变化。

async def watch():
    async with etcd.connect() as communicator:
        async for event in communicator.watch_prefix('/testdir'):
            print(event.event, event.key, event.value)

async def update():
    await asyncio.sleep(0.1)
    async with etcd.connect() as communicator:
        await communicator.put('/testdir', '1')
        await communicator.put('/testdir/foo', '2')
        await communicator.put('/testdir/bar', '3')
        await communicator.put('/testdir/foo/baz', '4')

await asyncio.gather(watch(), update())
# WatchEventType.PUT /testdir 1
# WatchEventType.PUT /testdir/foo 2
# WatchEventType.PUT /testdir/bar 3
# WatchEventType.PUT /testdir/foo/baz 4

事务

您可以通过调用EtcdCommunicator.txn_compare(compares, txn_builder)运行etcd事务。

构建比较

构建比较操作可以通过使用 Python 的内置比较运算符(==!=><)将 CompareKey 实例与值进行比较来完成。

from etcetra import CompareKey
compares = [
    CompareKey('cmpkey1').value == 'foo',
    CompareKey('cmpkey2').value > 'bar',
]

执行事务调用

async with etcd.connect() with communicator:
    await communicator.put('cmpkey1', 'foo')
    await communicator.put('cmpkey2', 'baz')
    await communicator.put('successkey', 'asdf')

    def _txn(success, failure):
        success.get('successkey')

    values = await communicator.txn_compare(compares, _txn)
    print(values)  # ['asdf']
compares = [
    CompareKey('cmpkey1').value == 'foo',
    CompareKey('cmpkey2').value < 'bar',
]
async with etcd.connect() with communicator:
    await communicator.put('failurekey', 'asdf')

    def _txn(success, failure):
        failure.get('failurekey')

    values = await communicator.txn_compare(compares, _txn)
    print(values)  # ['asdf']

如果您不需要事务比较条件,可以使用 EtcdCommunicator.txn(txn_builder),它是 EtcdCommunicator.txn_compare([], lambda success, failure: txn_builder(success)) 的简写。

async with etcd.connect() with communicator:
    def _txn(action):
        action.get('cmpkey1')
        action.get('cmpkey2')

    values = await communicator.txn(_txn)
    print(values)  # ['foo', 'baz']

贡献

编译 Protobuf

$ scripts/compile_protobuf.py <target Etcd version>

生成文档

$ cd docs
$ make markdown
$ mv _build/markdown/index.mf references.md

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分布

etcetra-0.1.19.tar.gz (52.4 kB 查看哈希值)

上传时间

构建分布

etcetra-0.1.19-py3-none-any.whl (62.2 kB 查看哈希值)

上传时间 Python 3

由以下机构支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面