EventStore DB的gRPC客户端
项目描述
esdb-py
EventStoreDB Python gRPC客户端
注意:此项目仍在开发中
完成的功能
- 安全连接
- 基本认证
- 连接字符串解析
- 流- 追加
- 批量追加(v21.10+)
- 删除
- 读取流
- 带流/事件类型过滤器读取所有(v21.10+)
- 追赶订阅
- 墓碑
- 过滤
 
- 持久订阅- 创建
- 读取流
- 带过滤读取所有(v21.10+)
- 更新
- 删除
- 列表
- 信息
- 回复停泊事件
 
- 投影的CRUD操作
- 用户
安装
使用pip
pip install esdb
使用poetry
poetry add esdb
开发
- 
安装poetry 
- 
创建虚拟环境(例如使用pyenv) pyenv install 3.12.0 pyenv virtualenv 3.12.0 esdb-py pyenv local esdb-py 
- 
使用 poetry install安装依赖项
- 
在docker中启动eventstore: make run-esdb
- 
运行测试: pytest tests
使用
查看更多示例,请参阅测试。
连接字符串示例
带有凭据的DNS发现、发现配置、节点偏好和CA文件路径
esdb+discover://admin:changeit@localhost:2111?discoveryInterval=0&maxDiscoverAttempts=3&tlsCafile=certs/ca/ca.crt&nodePreference=follower
单节点不安全连接
esdb://:2111?tls=false
支持的参数
- 发现间隔
- 谣言超时
- 最大发现尝试次数
- 节点偏好
- 保持存活间隔
- 保持存活超时
- tls
- tlsCafile
- tlsVerifyCert
- 默认截止日期
连接字符串可以在这里生成。
发现和节点偏好
from esdb import ESClient
client = ESClient("esdb+discover://admin:changeit@localhost:2111?nodePreference=follower")
连接配置
from esdb import ESClient
# Connect without TLS
client = ESClient("esdb://:2111?tls=false")
# Secure connection with basic auth and keepalive
client = ESClient("esdb://admin:changeit@localhost:2111?tlsCafile=certs/ca/ca.crt&keepAliveInterval=5&keepAliveTimeout=5")
追加、读取、追赶订阅
import asyncio
import datetime
import uuid
from esdb import ESClient
client = ESClient("esdb+discover://admin:changeit@localhost:2111")
stream = f"test-{str(uuid.uuid4())}"
async def streams():
    async with client.connect() as conn:
        # Appending to stream
        for i in range(10):
            append_result = await conn.streams.append(
                stream=stream,
                event_type="test_event",
                data={"i": i, "ts": datetime.datetime.utcnow().isoformat()},
            )
        # Read up to 10 events
        async for result in conn.streams.read(stream=stream, count=10):
            print(result.data)
        # Read up to 10 events, backwards
        async for result in conn.streams.read(stream=stream, count=10, backwards=True):
            print(result.data)
        # Read up to 10 events, starting from 5th event
        async for result in conn.streams.read(stream=stream, count=10, revision=5):
            print(result.data)
        # Read up to 10 events backwards, starting from 5th event
        async for result in conn.streams.read(stream=stream, count=10, backwards=True, revision=5):
            print(result.data)
        # Create a catch-up subscription to a stream
        async for result in conn.streams.read(stream=stream, subscribe=True):
            print(result.data)
asyncio.run(streams())
批量追加
import asyncio
import uuid
from esdb import ESClient
from esdb.streams import Message
async def batch_append():
    # Append multiple events in as a single batch
    # Batch append is not supported on EventStore < v21.10
    stream = str(uuid.uuid4())
    messages: list[Message] = [
        Message(event_type="one", data={"item": 1}),
        Message(event_type="one", data={"item": 2}),
        Message(event_type="one", data={"item": 3}),
        Message(event_type="two", data={"item": 1}),
        Message(event_type="two", data={"item": 2}),
        Message(event_type="two", data={"item": 3}),
    ]
    async with ESClient("esdb+discover://admin:changeit@localhost:2111").connect() as conn:
        response = await conn.streams.batch_append(stream=stream, messages=messages)
        assert response.current_revision == 5
        events = [e async for e in conn.streams.read(stream=stream, count=50)]
        assert len(events) == 6
asyncio.run(batch_append())
带过滤的所有事件的追赶订阅
import uuid
import asyncio
from esdb import ESClient
from esdb.shared import Filter
async def filters():
    async with ESClient("esdb+discover://admin:changeit@localhost:2111").connect() as conn:
        # Append 10 events with the same prefix to random streams
        for i in range(10):
            await conn.streams.append(stream=str(uuid.uuid4()), event_type=f"prefix-{i}", data=b"")
        # subscribe to events from all streams, filtering by event type
        async for event in conn.streams.read_all(
                subscribe=True,  # subscribe will wait for events, use count=<n> to read <n> events and stop
                filter_by=Filter(
                    kind=Filter.Kind.EVENT_TYPE,
                    regex="^prefix-",
                    # Checkpoint only required when subscribe=True, it's not needed when using count=<int>
                    checkpoint_interval_multiplier=1000,
                ),
        ):
            print(event)
asyncio.run(filters())
持久订阅
import asyncio
from esdb import ESClient
from esdb.shared import Filter
from esdb.subscriptions import SubscriptionSettings, NackAction
client = ESClient("esdb+discover://admin:changeit@localhost:2111")
stream = "stream-foo"
group = "group-bar"
async def persistent():
    async with client.connect() as conn:
        # emit some events to the same stream
        for i in range(50):
            await conn.streams.append(stream, "foobar", {"i": i})
        # create a stream subscription
        await conn.subscriptions.create_stream_subscription(
            stream=stream,
            group_name=group,
            settings=SubscriptionSettings(
                max_subscriber_count=50,
                read_batch_size=5,
                live_buffer_size=10,
                history_buffer_size=10,
                consumer_strategy=SubscriptionSettings.ConsumerStrategy.ROUND_ROBIN,
                checkpoint_ms=10000,
            ),
        )
        # create subscription to all events with filtering
        # Only supported on EventStore v21.10+
        await conn.subscriptions.create_all_subscription(
            group_name="subscription_group",
            filter_by=Filter(kind=Filter.Kind.EVENT_TYPE, regex="^some_type$", checkpoint_interval_multiplier=200),
            settings=SubscriptionSettings(
                read_batch_size=50,
                live_buffer_size=100,
                history_buffer_size=100,
                max_retry_count=2,
                checkpoint_ms=20000,
            ),
        )
    # read from a subscription
    async with client.connect() as conn:
        sub = conn.subscriptions.subscribe(stream=stream, group_name=group, buffer_size=5)
        async for event in sub:
            try:
                # do work with event
                print(event)
                await sub.ack([event])
            except Exception as err:
                await sub.nack([event], NackAction.RETRY, reason=str(err))
        # get subscription info
        info = await conn.subscriptions.get_info(group, stream)
        assert info.group_name == group
        # delete subscription
        await conn.subscriptions.delete(group, stream)
        
        # list subscriptions
        subs = await conn.subscriptions.list()
        for sub in subs:
            print(sub.total_items)
asyncio.run(persistent())
          项目详情
下载文件
下载您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。
源分布
         esdb-0.3.5.tar.gz  (63.9 kB 查看哈希值)
      
    构建分布
         esdb-0.3.5-py3-none-any.whl  (80.4 kB 查看哈希值)
      
    
    
       关闭
    
      
        
    
    
  
esdb-0.3.5.tar.gz的哈希值
| 算法 | 哈希摘要 | |
|---|---|---|
| SHA256 | 0311dabd56ce0b42b5c55e13b7085e74276270327e08a6164929cad739366ce6 | |
| MD5 | 1f163120a7d42f6d719d20e04cf8dd95 | |
| BLAKE2b-256 | a83628dc6153021725623c2232aca6785ed31c2bea8a7188ac6746009b1ccc63 | 
    
       关闭
    
      
        
    
    
  
esdb-0.3.5-py3-none-any.whl的哈希值
| 算法 | 哈希摘要 | |
|---|---|---|
| SHA256 | 866fc108d933becc23ea7e20f74e0c1f01337c17fd4705c4381ab2e2d7af0e42 | |
| MD5 | 18ea152edd5a0ee18801343108b4e3f2 | |
| BLAKE2b-256 | 9c3ff72fae47146573ed2006410f9a06ac2da8c7d4ad6b7aa8376302bbc68369 |