跳转到主要内容

EventStore DB的gRPC客户端

项目描述

esdb-py

PyPI version codecov

EventStoreDB Python gRPC客户端

注意:此项目仍在开发中

完成的功能

  • 安全连接
  • 基本认证
  • 连接字符串解析
    • 追加
    • 批量追加(v21.10+)
    • 删除
    • 读取流
    • 带流/事件类型过滤器读取所有(v21.10+)
    • 追赶订阅
    • 墓碑
    • 过滤
  • 持久订阅
    • 创建
    • 读取流
    • 带过滤读取所有(v21.10+)
    • 更新
    • 删除
    • 列表
    • 信息
    • 回复停泊事件
  • 投影的CRUD操作
  • 用户

安装

使用pip

pip install esdb

使用poetry

poetry add esdb

开发

  1. 安装poetry

  2. 创建虚拟环境(例如使用pyenv)

    pyenv install 3.12.0
    pyenv virtualenv 3.12.0 esdb-py
    pyenv local esdb-py
    
  3. 使用poetry install安装依赖项

  4. 在docker中启动eventstore: make run-esdb

  5. 运行测试: pytest tests

使用

查看更多示例,请参阅测试

连接字符串示例

带有凭据的DNS发现、发现配置、节点偏好和CA文件路径

esdb+discover://admin:changeit@localhost:2111?discoveryInterval=0&maxDiscoverAttempts=3&tlsCafile=certs/ca/ca.crt&nodePreference=follower

单节点不安全连接

esdb://localhost:2111?tls=false

支持的参数

  • 发现间隔
  • 谣言超时
  • 最大发现尝试次数
  • 节点偏好
  • 保持存活间隔
  • 保持存活超时
  • tls
  • tlsCafile
  • tlsVerifyCert
  • 默认截止日期

连接字符串可以在这里生成。

发现和节点偏好

from esdb import ESClient

client = ESClient("esdb+discover://admin:changeit@localhost:2111?nodePreference=follower")

连接配置

from esdb import ESClient

# Connect without TLS
client = ESClient("esdb://localhost:2111?tls=false")

# Secure connection with basic auth and keepalive
client = ESClient("esdb://admin:changeit@localhost:2111?tlsCafile=certs/ca/ca.crt&keepAliveInterval=5&keepAliveTimeout=5")

追加、读取、追赶订阅

import asyncio
import datetime
import uuid

from esdb import ESClient


client = ESClient("esdb+discover://admin:changeit@localhost:2111")
stream = f"test-{str(uuid.uuid4())}"


async def streams():
    async with client.connect() as conn:
        # Appending to stream
        for i in range(10):
            append_result = await conn.streams.append(
                stream=stream,
                event_type="test_event",
                data={"i": i, "ts": datetime.datetime.utcnow().isoformat()},
            )

        # Read up to 10 events
        async for result in conn.streams.read(stream=stream, count=10):
            print(result.data)

        # Read up to 10 events, backwards
        async for result in conn.streams.read(stream=stream, count=10, backwards=True):
            print(result.data)

        # Read up to 10 events, starting from 5th event
        async for result in conn.streams.read(stream=stream, count=10, revision=5):
            print(result.data)

        # Read up to 10 events backwards, starting from 5th event
        async for result in conn.streams.read(stream=stream, count=10, backwards=True, revision=5):
            print(result.data)

        # Create a catch-up subscription to a stream
        async for result in conn.streams.read(stream=stream, subscribe=True):
            print(result.data)


asyncio.run(streams())

批量追加

import asyncio
import uuid

from esdb import ESClient
from esdb.streams import Message


async def batch_append():
    # Append multiple events in as a single batch
    # Batch append is not supported on EventStore < v21.10
    stream = str(uuid.uuid4())
    messages: list[Message] = [
        Message(event_type="one", data={"item": 1}),
        Message(event_type="one", data={"item": 2}),
        Message(event_type="one", data={"item": 3}),
        Message(event_type="two", data={"item": 1}),
        Message(event_type="two", data={"item": 2}),
        Message(event_type="two", data={"item": 3}),
    ]
    async with ESClient("esdb+discover://admin:changeit@localhost:2111").connect() as conn:
        response = await conn.streams.batch_append(stream=stream, messages=messages)
        assert response.current_revision == 5
        events = [e async for e in conn.streams.read(stream=stream, count=50)]
        assert len(events) == 6


asyncio.run(batch_append())

带过滤的所有事件的追赶订阅

import uuid
import asyncio

from esdb import ESClient
from esdb.shared import Filter


async def filters():
    async with ESClient("esdb+discover://admin:changeit@localhost:2111").connect() as conn:
        # Append 10 events with the same prefix to random streams
        for i in range(10):
            await conn.streams.append(stream=str(uuid.uuid4()), event_type=f"prefix-{i}", data=b"")
        # subscribe to events from all streams, filtering by event type
        async for event in conn.streams.read_all(
                subscribe=True,  # subscribe will wait for events, use count=<n> to read <n> events and stop
                filter_by=Filter(
                    kind=Filter.Kind.EVENT_TYPE,
                    regex="^prefix-",
                    # Checkpoint only required when subscribe=True, it's not needed when using count=<int>
                    checkpoint_interval_multiplier=1000,
                ),
        ):
            print(event)


asyncio.run(filters())

持久订阅

import asyncio
from esdb import ESClient
from esdb.shared import Filter
from esdb.subscriptions import SubscriptionSettings, NackAction

client = ESClient("esdb+discover://admin:changeit@localhost:2111")

stream = "stream-foo"
group = "group-bar"


async def persistent():
    async with client.connect() as conn:
        # emit some events to the same stream
        for i in range(50):
            await conn.streams.append(stream, "foobar", {"i": i})

        # create a stream subscription
        await conn.subscriptions.create_stream_subscription(
            stream=stream,
            group_name=group,
            settings=SubscriptionSettings(
                max_subscriber_count=50,
                read_batch_size=5,
                live_buffer_size=10,
                history_buffer_size=10,
                consumer_strategy=SubscriptionSettings.ConsumerStrategy.ROUND_ROBIN,
                checkpoint_ms=10000,
            ),
        )

        # create subscription to all events with filtering
        # Only supported on EventStore v21.10+
        await conn.subscriptions.create_all_subscription(
            group_name="subscription_group",
            filter_by=Filter(kind=Filter.Kind.EVENT_TYPE, regex="^some_type$", checkpoint_interval_multiplier=200),
            settings=SubscriptionSettings(
                read_batch_size=50,
                live_buffer_size=100,
                history_buffer_size=100,
                max_retry_count=2,
                checkpoint_ms=20000,
            ),
        )

    # read from a subscription
    async with client.connect() as conn:
        sub = conn.subscriptions.subscribe(stream=stream, group_name=group, buffer_size=5)
        async for event in sub:
            try:
                # do work with event
                print(event)
                await sub.ack([event])
            except Exception as err:
                await sub.nack([event], NackAction.RETRY, reason=str(err))

        # get subscription info
        info = await conn.subscriptions.get_info(group, stream)
        assert info.group_name == group

        # delete subscription
        await conn.subscriptions.delete(group, stream)
        
        # list subscriptions
        subs = await conn.subscriptions.list()
        for sub in subs:
            print(sub.total_items)


asyncio.run(persistent())

项目详情


下载文件

下载您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分布

esdb-0.3.5.tar.gz (63.9 kB 查看哈希值)

上传时间

构建分布

esdb-0.3.5-py3-none-any.whl (80.4 kB 查看哈希值)

上传时间 Python 3

由以下支持