使用纯asyncio gRPC库构建的Etcd客户端
项目描述
etcetra
纯Python asyncio Etcd客户端。
安装
pip install etcetra
API文档
参考 此处。
基本用法
etcetra管理的所有etcd操作都可以通过EtcdClient
执行。 EtcdClient
实例是一个包装器,它包含连接到Etcd通道的连接信息。此实例是可重复使用的,因为实际的gRPC通道连接将在您发起连接调用时建立(见下文)。
from etcetra import EtcdClient, HostPortPair
etcd = EtcdClient(HostPortPair('127.0.0.1', 2379))
如上所述,与Etcd的gRPC通道的实际连接将在您调用EtcdClient.connect()
时建立。此调用返回异步上下文管理器,该管理器管理EtcdCommunicator
实例。
async with etcd.connect() as communicator:
await communicator.put('testkey', 'testvalue')
value = await communicator.get('testkey')
print(value) # testvalue
EtcdCommunicator.get_prefix(prefix)
将返回一个包含给定键前缀的所有键值对的字典。
async with etcd.connect() as communicator:
await communicator.put('/testdir', 'root')
await communicator.put('/testdir/1', '1')
await communicator.put('/testdir/2', '2')
await communicator.put('/testdir/2/3', '3')
test_dir = await communicator.get_prefix('/testdir')
print(test_dir) # {'/testdir': 'root', '/testdir/1': '1', '/testdir/2': '2', '/testdir/2/3': '3'}
操作Etcd锁
就像EtcdClient.connect()
一样,您可以通过调用EtcdClient.with_lock(lock_name, timeout=None)
轻松使用etcd锁。
async def first():
async with etcd.with_lock('foolock') as communicator:
value = await communicator.get('testkey')
print('first:', value, end=' | ')
async def second():
await asyncio.sleep(0.1)
async with etcd.with_lock('foolock') as communicator:
value = await communicator.get('testkey')
print('second:', value)
async with etcd.connect() as communicator:
await communicator.put('testkey', 'testvalue')
await asyncio.gather(first(), second()) # first: testvalue | second: testvalue
将timeout
参数添加到EtcdClient.with_lock()
调用将为获取锁的过程添加超时。
async def first():
async with etcd.with_lock('foolock') as communicator:
value = await communicator.get('testkey')
print('first:', value)
await asyncio.sleep(10)
async def second():
await asyncio.sleep(0.1)
async with etcd.with_lock('foolock', timeout=5) as communicator:
value = await communicator.get('testkey')
print('second:', value)
async with etcd.connect() as communicator:
await communicator.put('testkey', 'testvalue')
await asyncio.gather(first(), second()) # asyncio.TimeoutError followed by first: testvalue output
将ttl
参数添加到EtcdClient.with_lock()
调用将强制在给定秒数后释放锁。
async def first():
async with etcd.with_lock('foolock', ttl=5) as communicator:
await asyncio.sleep(10)
await first()
# on other file
import time
async def second():
start = time.time()
async with etcd.with_lock('foolock', ttl=5) as communicator:
print(f'acquired lock after {time.time() - start} seconds')
await second() # acquired lock after 4.756163120269775 seconds
观察
您可以通过调用EtcdCommunicator.watch(key)
观察键的变化。
async def watch():
async with etcd.connect() as communicator:
async for event in communicator.watch('testkey'):
print(event.event, event.value)
async def update():
await asyncio.sleep(0.1)
async with etcd.connect() as communicator:
await communicator.put('testkey', '1')
await communicator.put('testkey', '2')
await communicator.put('testkey', '3')
await communicator.put('testkey', '4')
await communicator.put('testkey', '5')
await asyncio.gather(watch(), update())
# WatchEventType.PUT 1
# WatchEventType.PUT 2
# WatchEventType.PUT 3
# WatchEventType.PUT 4
# WatchEventType.PUT 5
也可以通过EtcdCommunicator.watch_prefix(key_prefix)
观察具有特定前缀的键的变化。
async def watch():
async with etcd.connect() as communicator:
async for event in communicator.watch_prefix('/testdir'):
print(event.event, event.key, event.value)
async def update():
await asyncio.sleep(0.1)
async with etcd.connect() as communicator:
await communicator.put('/testdir', '1')
await communicator.put('/testdir/foo', '2')
await communicator.put('/testdir/bar', '3')
await communicator.put('/testdir/foo/baz', '4')
await asyncio.gather(watch(), update())
# WatchEventType.PUT /testdir 1
# WatchEventType.PUT /testdir/foo 2
# WatchEventType.PUT /testdir/bar 3
# WatchEventType.PUT /testdir/foo/baz 4
事务
您可以通过调用EtcdCommunicator.txn_compare(compares, txn_builder)
运行etcd事务。
构建比较
构建比较操作可以通过使用 Python 的内置比较运算符(==
、!=
、>
、<
)将 CompareKey
实例与值进行比较来完成。
from etcetra import CompareKey
compares = [
CompareKey('cmpkey1').value == 'foo',
CompareKey('cmpkey2').value > 'bar',
]
执行事务调用
async with etcd.connect() with communicator:
await communicator.put('cmpkey1', 'foo')
await communicator.put('cmpkey2', 'baz')
await communicator.put('successkey', 'asdf')
def _txn(success, failure):
success.get('successkey')
values = await communicator.txn_compare(compares, _txn)
print(values) # ['asdf']
compares = [
CompareKey('cmpkey1').value == 'foo',
CompareKey('cmpkey2').value < 'bar',
]
async with etcd.connect() with communicator:
await communicator.put('failurekey', 'asdf')
def _txn(success, failure):
failure.get('failurekey')
values = await communicator.txn_compare(compares, _txn)
print(values) # ['asdf']
如果您不需要事务比较条件,可以使用 EtcdCommunicator.txn(txn_builder)
,它是 EtcdCommunicator.txn_compare([], lambda success, failure: txn_builder(success))
的简写。
async with etcd.connect() with communicator:
def _txn(action):
action.get('cmpkey1')
action.get('cmpkey2')
values = await communicator.txn(_txn)
print(values) # ['foo', 'baz']
贡献
编译 Protobuf
$ scripts/compile_protobuf.py <target Etcd version>
生成文档
$ cd docs
$ make markdown
$ mv _build/markdown/index.mf references.md
项目详情
下载文件
下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。
源分布
构建分布
etcetra-0.1.19.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | bb8f710e54c3b6d6f85d219ea89e73edaa3916677966c7f72430381a28619a46 |
|
MD5 | 2a0c95fb58c8abd376d08ba9725d4cf4 |
|
BLAKE2b-256 | 65f1558889a8bebdae7cc62e67625da481dbe9c6fab5bf51251573bb939bf833 |
etcetra-0.1.19-py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | ffe055d931e00011b546f89c5c029e3f76f3915fe0590ec32c647f9ab81117d2 |
|
MD5 | 0c7c98272095b71ca17d9a3b2f06b8e1 |
|
BLAKE2b-256 | 598109edbe1d18117d4805a81f0feda005b2afe308feaccd0efab99893add0fd |