跳转到主要内容

Python gRPC Client for EventStoreDB

项目描述

Python gRPC Client for EventStoreDB

这个 Python包EventStoreDB 数据库提供了多线程和asyncio Python客户端。

以下详细描述了多线程的 EventStoreDBClient。请向下滚动以获取有关 AsyncEventStoreDBClient 的信息。

这些客户端是由EventStoreDB团队合作开发和维护的,并得到Event Store Ltd的官方支持。尽管EventStoreDB gRPC API的并非所有方面都得到了实现,但许多最有用的功能都以易于使用的界面呈现。

这些客户端已测试与EventStoreDB LTS版本22.10和23.10以及候选版本24.2和24.6兼容,包括和不包括SSL/TLS,包括单服务器和集群模式,以及Python版本3.8、3.9、3.10、3.11和3.12。

测试套件具有100%的行和分支覆盖率。代码使用mypy严格检查类型注解。代码使用black和isort进行格式化,并使用flake8进行检查。在开发过程中使用Poetry进行包管理,用于构建和发布到 PyPI

有关用法示例,请参阅 eventsourcing-eventstoredb 包。

摘要

可以从esdbclient包导入EventStoreDBClient类。

EventStoreDBClient类最常用的三个方法是

  • append_to_stream()此方法可以用于在特定的“流”中记录新事件。这在执行修改聚合的应用程序中的命令时很有用。这个方法在记录事件时是“原子”的,即要么全部记录,要么不记录。

  • get_stream()此方法可以用于检索“流”中记录的所有事件。这在从记录的事件中重建聚合,在执行创建新事件的应用程序中的命令之前很有用。

  • subscribe_to_all()此方法可以用于接收数据库中记录的所有事件。这在事件处理组件中很有用,因为它支持以“恰好一次”语义处理事件。

以下示例使用在本地端口2113运行的“不安全”EventStoreDB服务器。

import uuid

from esdbclient import EventStoreDBClient, NewEvent, StreamState


# Construct EventStoreDBClient with an EventStoreDB URI. The
# connection string URI specifies that the client should
# connect to an "insecure" server running on port 2113.

client = EventStoreDBClient(
    uri="esdb://localhost:2113?Tls=false"
)


# Generate new events. Typically, domain events of different
# types are generated in a domain model, and then serialized
# into NewEvent objects. An aggregate ID may be used as the
# name of a stream in EventStoreDB.

stream_name1 = str(uuid.uuid4())
event1 = NewEvent(
    type='OrderCreated',
    data=b'{"order_number": "123456"}'
)
event2 = NewEvent(
    type='OrderSubmitted',
    data=b'{}'
)
event3 = NewEvent(
    type='OrderCancelled',
    data=b'{}'
)


# Append new events to a new stream. The value returned
# from the append_to_stream() method is the overall
# "commit position" in the database of the last new event
# recorded by this operation. The returned "commit position"
# may be used in a user interface to poll an eventually
# consistent event-processing component until it can
# present an up-to-date materialized view. New events are
# each allocated a "stream position", which is the next
# available position in the stream, starting from 0.

commit_position1 = client.append_to_stream(
    stream_name=stream_name1,
    current_version=StreamState.NO_STREAM,
    events=[event1, event2],
)

# Append events to an existing stream. The "current version"
# is the "stream position" of the last recorded event in a
# stream. We have recorded two new events, so the "current
# version" is 1. The exception 'WrongCurrentVersion' will be
# raised if an incorrect value is given.

commit_position2 = client.append_to_stream(
    stream_name=stream_name1,
    current_version=1,
    events=[event3],
)

# - allocated commit positions increase monotonically
assert commit_position2 > commit_position1


# Get events recorded in a stream. This method returns
# a sequence of recorded event objects. The recorded
# event objects may be deserialized to domain event
# objects of different types and used to reconstruct
# an aggregate in a domain model.

recorded_events = client.get_stream(
    stream_name=stream_name1
)

# - stream 'stream_name1' now has three events
assert len(recorded_events) == 3

# - allocated stream positions are zero-based and gapless
assert recorded_events[0].stream_position == 0
assert recorded_events[1].stream_position == 1
assert recorded_events[2].stream_position == 2

# - event attribute values are recorded faithfully
assert recorded_events[0].type == "OrderCreated"
assert recorded_events[0].data == b'{"order_number": "123456"}'
assert recorded_events[0].id == event1.id

assert recorded_events[1].type == "OrderSubmitted"
assert recorded_events[1].data == b'{}'
assert recorded_events[1].id == event2.id

assert recorded_events[2].type == "OrderCancelled"
assert recorded_events[2].data == b'{}'
assert recorded_events[2].id == event3.id


# Start a catch-up subscription from last recorded position.
# This method returns a "catch-up subscription" object,
# which can be iterated over to obtain recorded events.
# The iterator will not stop when there are no more recorded
# events to be returned, but instead will block, and then continue
# when further events are recorded. It can be used as a context
# manager so that the underlying streaming gRPC call to the database
# can be cancelled cleanly in case of any error.

received_events = []
with client.subscribe_to_all(commit_position=0) as subscription:

    # Iterate over the catch-up subscription. Process each recorded
    # event in turn. Within an atomic database transaction, record
    # the event's "commit position" along with any new state generated
    # by processing the event. Use the component's last recorded commit
    # position when restarting the catch-up subscription.

    for event in subscription:
        received_events.append(event)

        if event.commit_position == commit_position2:
            # Break so we can continue with the example.
            break


# - events are received in the order they were recorded
assert received_events[-3].type == "OrderCreated"
assert received_events[-3].data == b'{"order_number": "123456"}'
assert received_events[-3].id == event1.id

assert received_events[-2].type == "OrderSubmitted"
assert received_events[-2].data == b'{}'
assert received_events[-2].id == event2.id

assert received_events[-1].type == "OrderCancelled"
assert received_events[-1].data == b'{}'
assert received_events[-1].id == event3.id


# Close the client's gRPC connection.

client.close()

安装包

建议将Python包安装到Python虚拟环境中。

从PyPI

您可以使用pip直接从Python包索引安装此包。

$ pip install esdbclient

使用Poetry

您可以使用Poetry将其添加到pyproject.toml文件并安装。

$ poetry add esdbclient

EventStoreDB服务器

EventStoreDB服务器可以使用官方Docker容器镜像在本地运行。

运行容器

对于开发,您可以使用以下命令运行一个“安全”的EventStoreDB服务器。

$ docker run -d --name eventstoredb-secure -it -p 2113:2113 --env "HOME=/tmp" docker.eventstore.com/eventstore-ce/eventstoredb-ce:23.10.0-jammy --dev

正如我们将看到的,您的客户端需要将EventStoreDB连接字符串URI作为其uri构造函数参数的值。此“安全”EventStoreDB服务器的连接字符串将是

esdb://admin:changeit@localhost:2113

要连接到“安全”服务器,通常需要在连接字符串中包含“用户名”和“密码”,以便服务器可以验证客户端。对于EventStoreDB,默认用户名是“admin”,默认密码是“changeit”。

当连接到“安全”服务器时,您可能还需要提供SSL/TLS证书作为root_certificates构造函数参数的值。如果服务器证书是公开签名的,则证书授权机构的根证书可能已本地安装,并由grpc包从默认位置获取。客户端使用根SSL/TLS证书对服务器进行认证。对于开发,您可以使用用于创建服务器证书的自签证书授权机构的SSL/TLS证书。或者,当使用单个节点集群时,您可以直接使用服务器证书本身,以下Python代码可以获取服务器证书。

import ssl

server_certificate = ssl.get_server_certificate(addr=('localhost', 2113))

或者,您可以使用以下命令启动一个“不安全”的服务器。

$ docker run -d --name eventstoredb-insecure -it -p 2113:2113 docker.eventstore.com/eventstore-ce/eventstoredb-ce:23.10.0-jammy --insecure

此“不安全”服务器的连接字符串URI为

esdb://localhost:2113?Tls=false

正如我们将看到的,当连接到“不安全”的服务器时,在连接字符串中不需要包含“用户名”和“密码”。如果您这样做,客户端将忽略这些值,以确保它们不会通过不安全的通道发送。

请注意,“不安全”的连接字符串使用包含字段值Tls=false的查询字符串。此字段的默认值是true

停止容器

要停止并删除“安全”容器,请使用以下Docker命令。

$ docker stop eventstoredb-secure
$ docker rm eventstoredb-secure

要停止并删除“不安全”容器,请使用以下Docker命令。

$ docker stop eventstoredb-insecure
$ docker rm eventstoredb-insecure

EventStoreDB客户端

此EventStoreDB客户端在esdbclient包中实现,具有EventStoreDBClient类。

导入类

可以从esdbclient包导入EventStoreDBClient类。

from esdbclient import EventStoreDBClient

构建客户端

EventStoreDBClient类有一个必需的构造函数参数uri和三个可选的构造函数参数root_certificatesprivate_keycertificate_chain

期望uri参数是一个符合标准EventStoreDB“esdb”或“esdb+discover”URI方案的EventStoreDB连接字符串URI。

客户端必须配置为创建到“安全”服务器的“安全”连接,或者作为替代,创建到“不安全”服务器的“不安全”连接。默认情况下,客户端将尝试创建“安全”连接。因此,当连接到“不安全”服务器时,连接字符串必须指定客户端应尝试通过使用URI查询字符串字段值Tls=false来建立“不安全”连接。

可选的root_certificates参数可以是包含PEM编码的SSL/TLS证书的Python str或Python bytes对象,并用于向客户端认证服务器。当连接到“不安全”服务时,将忽略此参数的值。当连接到“安全”服务器时,可能需要设置此参数。通常,此参数的值将是负责生成EventStoreDB服务器使用的证书的证书授权机构的公共证书。如果证书授权机构证书已本地安装,Python grpc库可以从默认位置获取它们,则在这种情况下不需要设置此值。或者,对于开发,您可以使用服务器的证书本身。此参数的值直接传递给grpc.ssl_channel_credentials()

可选的 private_keycertificate_chain 参数都可以是 Python 的 strbytes 对象。这些参数可以用来验证客户端对服务器的身份。当连接到一个运行 EventStoreDB 商业版并启用了用户证书插件的“安全”服务器时,必须提供这些参数的正确值。private_key 的值应该是 X.509 用户证书的私钥,格式为 PEM。certificate_chain 的值应该是 X.509 用户证书本身,格式为 PEM。这些参数的值将直接传递给 grpc.ssl_channel_credentials()。当连接到“非安全”服务时,这些参数的值将被忽略。请注意,向客户端提供用户证书和私钥的另一种方法是使用连接字符串 URI 查询字符串中的 UserCertFileUserKeyFile 字段值(见下文)。如果指定了 UserCertFile 字段值,则忽略 certificate_chain 参数。如果指定了 UserKeyFile 字段值,则忽略 public_key 参数。

以下示例中,uriroot_certificates 构造函数参数值来自操作系统环境。

import os

client = EventStoreDBClient(
    uri=os.getenv("ESDB_URI"),
    root_certificates=os.getenv("ESDB_ROOT_CERTIFICATES"),
)

连接字符串

EventStoreDB 连接字符串是一个符合以下两种可能的方案的 URI:要么是 "esdb" 方案,要么是 "esdb+discover" 方案。

以下描述了 EventStoreDB URI 方案的语法和语义。语法使用 EBNF 定义。

两种方案

"esdb" URI 方案可以定义为以下方式。

esdb-uri = "esdb://" , [ user-info , "@" ] , grpc-target, { "," , grpc-target } , [ "?" , query-string ] ;

在 "esdb" URI 方案中,在可选的用户信息字符串之后,必须至少有一个 gRPC 目标。如果有多个 gRPC 目标,它们必须用逗号 "," 分隔。

每个 gRPC 目标应指定一个 EventStoreDB gRPC 服务器套接字,所有这些都位于同一个 EventStoreDB 集群中,通过指定用冒号 ":" 分隔的主机和端口号。主机可以是可解析为 IP 地址的域名,也可以是 IP 地址。

grpc-target = ( hostname | ip-address ) , ":" , port-number ;

如果只有一个 gRPC 目标,客户端将尝试连接到此服务器,并在记录和检索事件时使用此连接。

如果有两个或更多 gRPC 目标,客户端将依次尝试连接到每个 Gossip API,并从中获取有关集群的信息,直到获得集群信息。然后客户端根据连接字符串 URI 中指定的“节点偏好”选择集群成员。然后客户端将关闭其连接并连接到所选节点,而不使用 'round robin' 负载均衡策略。如果“节点偏好”是“leader”,并且连接到一个领导者后,如果领导者成为 follower,客户端将重新连接到新的领导者。

"esdb+discover" URI 方案可以定义为以下方式。

esdb-discover-uri = "esdb+discover://" , [ user-info, "@" ] , cluster-domainname, [ ":" , port-number ] , [ "?" , query-string ] ;

在 "esdb+discover" URI 方案中,在可选的用户信息字符串之后,应该有一个域名,该域名标识一个 EventStoreDB 服务器集群。集群中的各个节点应通过 DNS 'A' 记录声明。

客户端将使用gRPC库的“轮询”负载均衡策略,通过DNS "A"记录发现的地址调用Gossip API。从Gossip API获取EventStoreDB集群的信息。然后客户端根据“节点偏好”选项选择集群的一个成员。客户端将关闭其连接并连接到所选节点,而不使用“轮询”负载均衡策略。如果“节点偏好”为“领导者”,并在连接到领导者后,如果领导者变为跟随者,客户端将重新连接到新的领导者。

用户信息字符串

在“esdb”和“esdb+discover”两种方案中,URI可能包含用户信息字符串。如果它存在于URI中,用户信息字符串必须用“@”字符与URI的其余部分分开。用户信息字符串必须包含用户名和密码,用“:”字符分隔。

user-info = username , ":" , password ;

客户端在每次对“安全”服务器的gRPC调用中,通过“基本认证”授权头发送用户信息。服务器使用此授权头对客户端进行身份验证。Python gRPC库不允许将调用凭证传递给“不安全”服务器。

查询字符串

在“esdb”和“esdb+discover”两种方案中,可选的查询字符串必须是使用“&”字符分隔的一个或多个字段值参数。

query-string = field-value, { "&", field-value } ;

每个字段值参数必须是支持的字段之一,以及适当的值,用“=”字符分隔。

field-value = ( "Tls", "=" , "true" | "false" )
            | ( "TlsVerifyCert", "=" , "true" | "false" )
            | ( "ConnectionName", "=" , string )
            | ( "NodePreference", "=" , "leader" | "follower" | "readonlyreplica" | "random" )
            | ( "DefaultDeadline", "=" , integer )
            | ( "GossipTimeout", "=" , integer )
            | ( "MaxDiscoverAttempts", "=" , integer )
            | ( "DiscoveryInterval", "=" , integer )
            | ( "KeepAliveInterval", "=" , integer )
            | ( "KeepAliveTimeout", "=" , integer ) ;
            | ( "UserCertFile", "=" , string ) ;
            | ( "UserKeyFile", "=" , string ) ;

下表描述了此客户端支持的查询字符串字段值。

字段 描述
Tls "true", "false"(默认:"true") 如果为“true”,客户端将创建一个“安全”的gRPC通道。如果为“false”,客户端将创建一个“不安全”的gRPC通道。这必须与服务器配置相匹配。
TlsVerifyCert "true", "false"(默认:"true") 此值目前被忽略。
ConnectionName 字符串(默认:自动生成的版本4 UUID) 在每次调用中发送到调用元数据,以标识客户端到集群。
NodePreference "leader","follower","readonlyreplica","random"(默认:"leader") 客户端偏好的节点状态。客户端将根据从Gossip API接收到的集群信息选择一个节点。
DefaultDeadline 整数(默认:None) 客户端“写入”方法(如append_to_stream())的timeout参数的默认值(以秒为单位)。
GossipTimeout 整数(默认:5) gossip读取方法(如read_gossip())的timeout参数的默认值(以秒为单位)。
MaxDiscoverAttempts 整数(默认:10) 在连接或重新连接集群成员时读取gossip的尝试次数。
DiscoveryInterval 整数(默认:100) 在gossip重试之间等待的时间(以毫秒为单位)。
KeepAliveInterval 整数(默认:None) "grpc.keepalive_ms" gRPC通道选项的值(以毫秒为单位)。
KeepAliveTimeout 整数(默认:None) "grpc.keepalive_timeout_ms" gRPC通道选项的值(以毫秒为单位)。
UserCertFile 字符串(默认:None) 包含PEM格式的X.509用户证书的文件的绝对文件系统路径。
UserKeyFile 字符串(默认:None) 包含PEM格式的X.509用户证书私钥的文件的绝对文件系统路径。

请注意,客户端对字段和值的区分大小写不敏感。如果查询字符串中字段重复,则查询字符串将无错误地解析。然而,客户端使用的连接选项将使用第一个字段的值。查询字符串中具有相同字段名的所有其他字段值将被忽略。没有值的字段也将被忽略。

如果客户端的节点偏好是“从节点”且集群中没有从节点,则客户端将引发异常。同样,如果客户端的节点偏好是“只读副本”且集群中没有只读副本节点,则客户端也将引发异常。

gRPC通道选项“grpc.max_receive_message_length”自动配置为值17 * 1024 * 1024。此值不能进行配置。

示例

以下是一些EventStoreDB连接字符串URI的示例。

以下URI将导致客户端向gRPC目标'localhost:2113'建立“非安全”连接。因为客户端的节点偏好是“从节点”,在从节点上可调用的方法应该成功完成,需要领导者的方法将引发NodeIsNotLeader异常。

esdb://127.0.0.1:2113?Tls=false&NodePreference=follower

以下URI将导致客户端向gRPC目标'localhost:2113'建立“非安全”连接。因为客户端的节点偏好是“领导者”,如果此节点不是领导者,则所有方法将引发NodeIsNotLeader异常。

esdb://127.0.0.1:2113?Tls=false&NodePreference=leader

以下URI将导致客户端以用户名'admin'和密码'changeit'作为默认调用凭据,向gRPC目标'localhost:2113'建立“安全”连接。因为客户端的节点偏好是“领导者”,默认情况下,如果此节点不是领导者,则所有方法将引发NodeIsNotLeader异常。

esdb://admin:changeit@localhost:2113

以下URI将导致客户端首先从'localhost:2111''localhost:2112''localhost:2113'中获取集群信息,然后从集群信息中选择领导者节点并重新连接到领导者。如果“领导者”节点变为“从节点”且另一个节点变为“领导者”,则客户端将重新连接到新的领导者。

esdb://admin:changeit@localhost:2111,localhost:2112,localhost:2113?NodePreference=leader

以下URI将导致客户端首先从'localhost:2111''localhost:2112''localhost:2113'中获取集群信息,然后从集群信息中选择一个从节点并重新连接到该从节点。请注意,如果“从节点”变为“领导者”,客户端将不会重新连接到从节点——这种行为可能在客户端和服务器未来的版本中得到实现。

esdb://admin:changeit@localhost:2111,localhost:2112,localhost:2113?NodePreference=follower

以下URI将导致客户端首先从DNS 'A'记录中的地址获取集群信息,然后连接到一个“领导者”节点。在调用EventStore API "写入"方法时,客户端将使用默认的超时时间5秒。

esdb+discover://admin:changeit@cluster1.example.com?DefaultDeadline=5

以下URI将导致客户端首先从DNS 'A'记录中的地址获取集群信息,然后连接到一个“领导者”节点。它将配置具有“保持活动间隔”和“保持活动超时”的gRPC连接。

esdb+discover://admin:changeit@cluster1.example.com?KeepAliveInterval=10000&KeepAliveTimeout=10000

事件对象

此包定义了一个NewEvent类和一个RecordedEvent类。在将事件写入数据库时应使用NewEvent类。在从数据库读取事件时使用RecordedEvent类。

新事件

在将事件写入 EventStoreDB 数据库时,应使用 NewEvent 类。在调用 append_to_stream() 之前,需要构建新的事件对象。

NewEvent 类是一个冻结的 Python 数据类。它有两个必需的构造函数参数(typedata)和三个可选的构造函数参数(metadatacontent_typeid)。

必需的 type 参数是一个 Python str,用于描述正在记录的领域事件的类型。

必需的 data 参数是一个 Python bytes 对象,用于表示正在记录的领域事件的序列化数据。

可选的 metadata 参数是一个 Python bytes 对象,用于指示将被记录的事件的任何元数据。默认值是一个空的 bytes 对象。

可选的 content_type 参数是一个 Python str,用于指示正在记录的数据的类型。默认值是 'application/json',表示 data 使用 JSON 进行序列化。此参数的另一个值是更通用的指示 'application/octet-stream'

可选的 id 参数是一个 Python UUID 对象,用于指定将被记录的事件的唯一 ID。如果没有提供值,将生成一个新的版本 4 UUID。

new_event1 = NewEvent(
    type='OrderCreated',
    data=b'{"name": "Greg"}',
)
assert new_event1.type == 'OrderCreated'
assert new_event1.data == b'{"name": "Greg"}'
assert new_event1.metadata == b''
assert new_event1.content_type == 'application/json'
assert isinstance(new_event1.id, uuid.UUID)

event_id = uuid.uuid4()
new_event2 = NewEvent(
    type='ImageCreated',
    data=b'01010101010101',
    metadata=b'{"a": 1}',
    content_type='application/octet-stream',
    id=event_id,
)
assert new_event2.type == 'ImageCreated'
assert new_event2.data == b'01010101010101'
assert new_event2.metadata == b'{"a": 1}'
assert new_event2.content_type == 'application/octet-stream'
assert new_event2.id == event_id

记录的事件

RecordedEvent 类用于从 EventStoreDB 数据库中读取事件。客户端将从所有返回已记录事件的操作返回此类型的事件对象,例如 get_stream()subscribe_to_all()read_subscription_to_all()。不需要构建记录事件对象。

NewEvent 类似,RecordedEvent 类也是一个冻结的 Python 数据类。它具有 NewEvent 所有的属性(typedatametadatacontent_typeid),这些属性来源于已记录的事件,还有一些额外的属性来源于事件的记录(stream_namestream_positioncommit_positionrecorded_at)。它还有一个 link 属性,除非记录的事件是“链接事件”且已被“解析”为链接事件,否则该属性为 None。它还有一个 retry_count 属性,当从持久订阅中接收记录事件时,其值为整数,否则 retry_count 的值为 None

type 属性是一个 Python str,用于指示已记录事件的类型。

data 属性是一个 Python bytes 对象,用于指示已记录事件的的数据。

metadata 属性是一个 Python bytes 对象,用于指示已记录事件的元数据。

content_type 属性是一个 Python str,用于指示为事件记录的数据的类型。通常为 'application/json',表示数据可以解析为 JSON。或者,它为 'application/octet-stream'

id 属性是一个 Python UUID 对象,用于指示已记录事件的唯一 ID。

stream_name 属性是一个 Python str,用于指示记录事件时的流名称。

stream_position 属性是一个 Python int,用于指示记录事件时的流中的位置。

在 EventStoreDB 中,“流位置”是一个表示记录事件在流中位置的整数。每个记录事件都在流中的一个位置上被记录。每个流位置只被一个记录事件占用。新事件在下一个未被占用的位置上被记录。所有序列的流位置都是基于零且无间隙的。

commit_position 属性是一个 Python int,用于指示记录事件时的数据库位置。

在EventStoreDB中,"提交位置"是一个整数,表示记录在数据库中的事件的位置。每个记录的事件都会在数据库中记录一个位置。每个提交位置只被一个记录的事件占用。提交位置从零开始,随着新事件的记录而单调递增。但是,与流位置不同,连续提交位置的序列并不是无缝的。实际上,连续记录事件的提交位置之间通常存在较大的差异。

请注意,在EventStoreDB 21.10中,从read_stream()获得的所有RecordedEvent对象的commit_position都是None,而从read_all()获得的则具有记录事件的实际提交位置。这一变化发生在版本22.10中,使得从get_stream()read_all()获取的事件对象都具有实际的提交位置。仅为此原因,RecordedEvent类的commit_position属性被注解为类型Optional[int]

recorded_at属性是一个Python datetime,用于指示数据库记录事件的日期和时间。

link属性是一个可选的RecordedEvent,它携带有关“链接事件”的信息,该事件已被“解析”为链接事件。这允许在解析链接事件时访问链接事件属性,例如在确认或否定确认链接事件时访问正确的事件ID。当resolve_links参数为True且正在回放暂停事件时(从具有'park'动作的持久订阅接收的事件将创建一个链接事件,当暂停事件回放时,它们将作为解析事件接收)。ack_id属性有助于获取在确认或否定确认来自持久订阅的事件时使用的正确事件ID。

retry_count是一个Python int,用于指示持久订阅尝试将事件发送给消费者的次数。

from dataclasses import dataclass
from datetime import datetime

@dataclass(frozen=True)
class RecordedEvent:
    """
    Encapsulates event data that has been recorded in EventStoreDB.
    """

    type: str
    data: bytes
    metadata: bytes
    content_type: str
    id: UUID
    stream_name: str
    stream_position: int
    commit_position: Optional[int]
    recorded_at: Optional[datetime] = None
    link: Optional["RecordedEvent"] = None
    retry_count: Optional[int] = None

    @property
    def ack_id(self) -> UUID:
        if self.link is not None:
            return self.link.id
        else:
            return self.id

    @property
    def is_system_event(self) -> bool:
        return self.type.startswith("$")

    @property
    def is_link_event(self) -> bool:
        return self.type == "$>"

    @property
    def is_resolved_event(self) -> bool:
        return self.link is not None

    @property
    def is_checkpoint(self) -> bool:
        return False

属性ack_id可用于在读取持久订阅时获取正确的用于ack()nack()事件的事件ID。返回值是link属性的id属性的值,如果link不是None,否则是id属性的值。

属性is_system_event指示事件是否是“系统事件”。系统事件的type值以'$'开头。

属性is_link_event指示事件是否是“链接事件”。链接事件的type值为'$>'

属性is_resolve_event指示事件是否已从“链接事件”解析出来。如果link不是None,则返回值是True

属性is_checkpointFalse。这可以用来识别在接收事件时include_checkpoints=True返回的Checkpoint实例。

在EventStoreDB中,“流”是一系列具有相同“流名称”的记录事件。数据库中通常会有许多流,每个流都有许多记录的事件。每个记录的事件在其流中有一个位置(即“流位置”),以及在数据库中的一个位置(即“提交位置”)。流位置从零开始,是无缝的。提交位置也是从零开始,但不是无缝的。

可以使用append_to_stream()get_stream()read_all()方法在数据库中读取和记录。

追加事件

需要领导者

append_to_stream()方法可以原子性地记录一系列新事件。如果操作成功,它将返回序列中最后一个已记录事件的提交位置。

此方法有三个必需的参数,即stream_namecurrent_versionevents

必需的 stream_name 参数是一个 Python str,它唯一标识了一个要附加一系列事件的流。

必需的 current_version 参数应为一个 Python int,表示流中最后记录的事件的流位置,或者在流尚不存在或已被删除的情况下为 StreamState.NO_STREAM。流位置是从零开始的且无缝的,因此如果流中有两个事件,则 current_version 应为 1。如果给出了不正确的值,此方法将引发 WrongCurrentVersion 异常。这种行为旨在在记录新事件时提供并发控制。任何流的正确 current_version 值可以通过调用 get_current_version() 来获取。然而,典型的做法是从记录的事件中重新构建一个聚合,这样聚合的版本就是最后记录事件的流位置,然后让聚合生成新的事件,然后将聚合的当前版本作为在附加新聚合事件时 current_version 参数的值。这确保了记录的聚合事件的连续性,因为如果记录新事件时遇到 WrongCurrentVersion 异常,可以重新尝试使用新生成的聚合执行生成新聚合事件的操作。可以通过将 current_version 参数的值设置为常量 StreamState.ANY 来完全禁用这种控制行为。更具有选择性的是,可以通过将 current_version 参数的值设置为常量 StreamState.EXISTS 来禁用现有流的此行为。

必需的 events 参数应为一个新事件对象的序列。应使用 NewEvent 类来构造新事件对象。append_to_stream() 操作是原子的,因此所有或没有任何新事件将被记录。使用 EventStoreDB 无法以原子方式在多个流中记录新事件。

此方法有一个可选的 timeout 参数,它是一个 Python float,用于设置 gRPC 操作完成的最大持续时间(以秒为单位)。

此方法有一个可选的 credentials 参数,可以用来覆盖从连接字符串 URI 派生的调用凭据。

以下示例中,将一个新的事件 event1 附加到一个新的流中。该流尚不存在,因此 current_versionStreamState.NO_STREAM

# Construct a new event object.
event1 = NewEvent(type='OrderCreated', data=b'{}')

# Define a new stream name.
stream_name1 = str(uuid.uuid4())

# Append the new events to the new stream.
commit_position1 = client.append_to_stream(
    stream_name=stream_name1,
    current_version=StreamState.NO_STREAM,
    events=[event1],
)

以下示例中,将两个后续事件附加到一个现有流中。该流有一个记录的事件,因此 current_version0

event2 = NewEvent(type='OrderUpdated', data=b'{}')
event3 = NewEvent(type='OrderDeleted', data=b'{}')

commit_position2 = client.append_to_stream(
    stream_name=stream_name1,
    current_version=0,
    events=[event2, event3],
)

返回的值 commit_position1commit_position2 是记录序列中最后事件的数据库提交位置。也就是说,commit_position1event1 的提交位置,而 commit_position2event3 的提交位置。

以这种方式返回的提交位置可以被用户界面用于轮询下游组件,直到它处理完所有新记录的事件。例如,考虑一个用户界面命令,该命令导致记录新事件,以及一个从这些事件中最终一致地更新的下游组件的可视化视图。如果新事件尚未被处理,视图可能会过时。为了不显示过时的视图,用户界面可以轮询下游组件,直到它处理了新记录的事件,然后向用户显示最新的视图。

幂等附加操作

对于 NewEvent 对象的 id 值,append_to_stream() 方法是“幂等”的。也就是说,如果以具有与流中已记录的 id 值相等的 id 值的事件调用 append_to_stream(),则该方法调用将成功返回,返回最后一个新事件的提交位置,而不会对数据库进行任何更改。

这是因为有时在调用 append_to_stream() 时,新事件可能会成功记录,但在方法调用成功返回给调用者之前,可能会发生某些不好的事情。在这种情况下,我们无法确定事件是否实际上已记录,因此我们可能希望重试。

如果事件实际上已成功记录,则重试的方法调用成功返回很方便,并且不会抛出异常(当 current_versionStreamState.NO_STREAM 或整数值时),也不会创建更多的事件记录(当 current_versionStreamState.ANYStreamState.EXISTS 时),就像如果 append_to_stream() 方法不是幂等的情况一样。

如果方法调用最初失败并且新事件实际上未记录,则在重试方法调用时记录新事件并使方法调用成功返回是有意义的。如果未禁用并发控制,即如果 current versionStreamState.NO_STREAM 或整数值,并且在重试方法调用时引发 WrongCurrentVersion 异常,则可以假设最初的方法调用实际上没有成功记录事件,并且在此期间其他人已记录了后续事件。在这种情况下,生成新事件的程序命令可能需要再次执行。应用程序的用户可能需要有机会决定是否仍然希望执行其原始意图,通过显示带有最新记录状态的适当错误来实现。在已禁用并发控制的情况下,通过使用 current_version 的值为 StreamState.ANYStreamState.EXISTS,重试未成功返回的方法调用将更简单地尝试确保记录新事件,而不管它们的最终流位置如何。在任何情况下,当方法调用成功返回时,我们都可以确信事件已被记录。

下面的示例显示了再次以事件 event2event3 调用 append_to_stream() 方法,并且 current_version=0。我们可以看到,重复调用 append_to_stream() 会成功返回,而不会引发 WrongCurrentVersion 异常,就像如果 append_to_stream() 操作不是幂等的情况一样。

# Retry appending event3.
commit_position_retry = client.append_to_stream(
    stream_name=stream_name1,
    current_version=0,
    events=[event2, event3],
)

我们可以看到返回的提交位置与上述相同。

assert commit_position_retry == commit_position2

下面的示例显示了再次以事件 event2event3 调用 append_to_stream() 方法,并且 current_version=StreamState.ANY

# Retry appending event3.
commit_position_retry = client.append_to_stream(
    stream_name=stream_name1,
    current_version=0,
    events=[event2, event3],
)

我们可以看到返回的提交位置与上述相同。

assert commit_position_retry == commit_position2

通过调用 get_stream(),我们还可以看到流未改变。也就是说,流中仍然只有三个事件。

events = client.get_stream(
    stream_name=stream_name1
)

assert len(events) == 3

这种幂等行为依赖于 NewEvent 类的 id 属性。默认情况下,当构造 NewEvent 实例时,将分配一个新且唯一的版本-4 UUID。要在构造 NewEvent 对象时设置 NewEventid 值,可以使用可选的 id 构造函数参数。

读取流事件

可以使用 read_stream() 方法获取已附加到流的的事件。此方法返回一个“读取响应”对象。

“读取响应”对象是 Python 迭代器。可以通过遍历“读取响应”对象来获取记录的事件。记录的事件在迭代过程中从服务器流式传输到客户端。迭代将在没有更多记录事件要返回时自动停止。也可以通过在“读取响应”对象上调用 stop() 方法来停止事件的流式传输和迭代器。

可以使用 get_stream() 方法获取已附加到流中的事件。此方法返回一个包含已记录事件对象的 Python tuple。已记录的事件对象是 RecordedEvent 类的实例。它调用 read_stream() 并将 "read response" 迭代器传递给 Python tuple,以确保在方法返回之前流操作完成。

read_stream()get_stream() 方法有一个必需的参数,即 stream_name

必需的 stream_name 参数是一个 Python str,用于唯一标识将从中返回记录事件的流。

read_stream()get_stream() 方法还有六个可选参数,分别是 stream_positionbackwardsresolve_linkslimittimeoutcredentials

可选的 stream_position 参数是一个 Python int,可以用来指示从哪个位置开始读取流中的数据。默认值是 None。当从流的特定位置读取时,该位置的事件将被包括,无论是向前还是向后读取。

可选的 backwards 参数是一个 Python bool。默认值是 False,这意味着流将被正向读取,以便事件按记录顺序返回。如果 backwardsTrue,则事件将按相反顺序返回。

如果 backwardsFalsestream_positionNone,则流的事件将按记录顺序返回,从第一个记录的事件开始。如果 backwardsTruestream_positionNone,则流的事件将按相反顺序返回,从最后一个记录的事件开始。

可选的 resolve_links 参数是一个 Python bool。默认值是 False,这意味着任何事件链接将不会被解析,因此返回的事件可能代表事件链接。如果 resolve_linksTrue,则任何事件链接将被解析,因此将返回链接事件而不是事件链接。

可选的 limit 参数是一个 Python int,它限制了将返回的事件数量。默认值是 sys.maxint

可选的 timeout 参数是一个 Python float,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。

可选的 credentials 参数可以用来覆盖从连接字符串 URI 派生的调用凭据。可以通过调用客户端方法的 construct_call_credentials() 来构建此参数的适当值。

下面的示例显示了默认行为,即从第一个记录的事件到最后的记录事件,正向返回流中所有记录的事件。

events = client.get_stream(
    stream_name=stream_name1
)

assert len(events) == 3
assert events[0] == event1
assert events[1] == event2
assert events[2] == event3

下面的示例显示了如何使用 stream_position 参数从特定的流位置读取流到流的末尾。流位置是从零开始的,因此 stream_position=1 对应于在流中记录的第二个事件,在这种情况下是 event2

events = client.get_stream(
    stream_name=stream_name1,
    stream_position=1,
)

assert len(events) == 2
assert events[0] == event2
assert events[1] == event3

下面的示例显示了如何使用 backwards 参数反向读取流。

events = client.get_stream(
    stream_name=stream_name1,
    backwards=True,
)

assert len(events) == 3
assert events[0] == event3
assert events[1] == event2
assert events[2] == event1

下面的示例显示了如何使用 limit 参数读取有限数量的事件。

events = client.get_stream(
    stream_name=stream_name1,
    limit=2,
)

assert len(events) == 2
assert events[0] == event1
assert events[1] == event2

如果指定的流从未存在或已被删除,read_stream()get_stream() 方法将引发一个 NotFound 异常。

from esdbclient.exceptions import NotFound


try:
    client.get_stream('does-not-exist')
except NotFound:
    pass  # The stream does not exist.
else:
    raise Exception("Shouldn't get here")

请注意,get_stream() 方法被 @autoreconnect@retrygrpc 装饰器装饰,而 read_stream() 方法则没有。这意味着在调用 get_stream() 方法时,所有由于连接问题导致的错误都将被重试和重连装饰器捕获,但在调用 read_stream() 时则不会。read_stream() 方法没有这样的装饰器,因为流仅在迭代“读取响应”时开始,这意味着该方法在流开始之前就返回了,因此没有机会让任何装饰器捕获任何连接问题。

出于同样的原因,当流不存在时,read_stream() 不会抛出 NotFound 异常,直到开始迭代“读取响应”对象。

如果您正在读取一个非常大的流,那么您可能更喜欢调用 read_stream(),并在从服务器流式传输事件的同时开始迭代记录的事件,而不是等待所有事件都积累在内存中。

获取当前版本

get_current_version() 方法是一个便捷方法,它实际上使用 backwards=Truelimit=1 调用 get_stream()。此方法返回流中最后一条记录事件的 stream_position 属性的值。如果流不存在,则返回值是 StreamState.NO_STREAM。当向流添加事件时,返回值是 current_version 的正确值,当删除或弃用流时也是如此。

此方法有一个必需的参数,stream_name

必需的 stream_name 参数是 Python str,它唯一标识一个流,将从中返回流位置。

此方法有一个可选的 timeout 参数,它是一个 Python float,用于设置 gRPC 操作完成的最大持续时间(以秒为单位)。

此方法有一个可选的 credentials 参数,可以用来覆盖从连接字符串 URI 派生的调用凭据。

以下示例中,获取了 stream_name1 的最后流位置。由于已向 stream_name1 添加了三个事件,并且因为流中的位置是以零为基础且无间隙的,所以当前版本是 2

current_version = client.get_current_version(
    stream_name=stream_name1
)

assert current_version == 2

如果一个流从未存在或已被删除,则返回值是 StreamState.NO_STREAM,这是向新流添加第一个事件时以及向已删除的流添加事件时 current_version 参数的正确值。

current_version = client.get_current_version(
    stream_name='does-not-exist'
)

assert current_version is StreamState.NO_STREAM

如何使用 EventStoreDB 实现快照

快照可以提高那些本应从非常长的流中重建的聚合的性能。然而,通常建议设计具有有限生命周期的聚合,从而具有相对较短的流,避免需要快照。本“如何操作”部分旨在仅展示如何使用此 Python 客户端通过 EventStoreDB 实现聚合的快照。

事件源聚合通常通过为每个记录的事件调用一个突变函数来从记录的事件重建,从初始状态 None 发展到聚合的当前状态。函数 get_aggregate() 展示了如何实现这一点。聚合 ID 用作流名称。如果找不到聚合流,则引发 AggregateNotFound 异常。

class AggregateNotFound(Exception):
    """Raised when an aggregate is not found."""


def get_aggregate(aggregate_id, mutator_func):
    stream_name = aggregate_id

    # Get recorded events.
    try:
        events = client.get_stream(
            stream_name=stream_name,
            stream_position=None
        )
    except NotFound as e:
        raise AggregateNotFound(aggregate_id) from e
    else:
        # Reconstruct aggregate from recorded events.
        aggregate = None
        for event in events:
            aggregate = mutator_func(aggregate, event)
        return aggregate

可以通过将聚合的当前状态作为新事件记录来实现聚合的快照。

如果一个聚合对象具有与用于重建聚合的最后事件的流位置相对应的版本号,并且此版本号记录在快照元数据中,那么可以使用此版本号选择快照之后记录的事件。然后可以从最后一个快照和任何后续事件中重建聚合,而不必重新播放整个历史。

我们将使用一个单独的流来存储聚合的快照,该流以记录其事件的流名称命名。快照流名称将由在聚合的流名称前加上 'snapshot-$' 来构造。

SNAPSHOT_STREAM_NAME_PREFIX = 'snapshot-$'

def make_snapshot_stream_name(stream_name):
    return f'{SNAPSHOT_STREAM_NAME_PREFIX}{stream_name}'


def remove_snapshot_stream_prefix(snapshot_stream_name):
    assert snapshot_stream_name.startswith(SNAPSHOT_STREAM_NAME_PREFIX)
    return snapshot_stream_name[len(SNAPSHOT_STREAM_NAME_PREFIX):]

现在,让我们重新定义 get_aggregate() 函数,以便它查找快照事件,然后选择后续的聚合事件,然后为每个记录的事件调用突变函数。

请注意,聚合事件是从序列化聚合事件的流中读取的,而快照是从为序列化聚合快照的单独流中读取的。我们将使用JSON来序列化和反序列化事件数据。

import json


def get_aggregate(aggregate_id, mutator_func):
    stream_name = aggregate_id
    recorded_events = []

    # Look for a snapshot.
    try:
        snapshots = client.get_stream(
            stream_name=make_snapshot_stream_name(stream_name),
            backwards=True,
            limit=1
        )
    except NotFound:
        stream_position = None
    else:
        assert len(snapshots) == 1
        snapshot = snapshots[0]
        stream_position = deserialize(snapshot.metadata)['version'] + 1
        recorded_events.append(snapshot)

    # Get subsequent events.
    try:
        events = client.get_stream(
            stream_name=stream_name,
            stream_position=stream_position
        )
    except NotFound as e:
        raise AggregateNotFound(aggregate_id) from e
    else:
        recorded_events += events

    # Reconstruct aggregate from recorded events.
    aggregate = None
    for event in recorded_events:
        aggregate = mutator_func(aggregate, event)

    return aggregate


def serialize(d):
    return json.dumps(d).encode('utf8')


def deserialize(s):
    return json.loads(s.decode('utf8'))

为了展示如何使用get_aggregate(),让我们定义一个具有nametricks属性的Dog聚合类。属性idversion将指示聚合对象的ID和版本号。属性is_from_snapshot在此处仅用于演示当聚合对象使用快照重构时。

from dataclasses import dataclass


@dataclass(frozen=True)
class Aggregate:
    id: str
    version: int
    is_from_snapshot: bool


@dataclass(frozen=True)
class Dog(Aggregate):
    name: str
    tricks: list

让我们还定义一个突变函数mutate_dog(),该函数根据各种不同类型的事件('DogRegistered''DogLearnedTrick''Snapshot')来演化Dog聚合的状态。

def mutate_dog(dog, event):
    data = deserialize(event.data)
    if event.type == 'DogRegistered':
        return Dog(
            id=event.stream_name,
            version=event.stream_position,
            is_from_snapshot=False,
            name=data['name'],
            tricks=[],
        )
    elif event.type == 'DogLearnedTrick':
        assert event.stream_position == dog.version + 1
        assert event.stream_name == dog.id, (event.stream_name, dog.id)
        return Dog(
            id=dog.id,
            version=event.stream_position,
            is_from_snapshot=dog.is_from_snapshot,
            name=dog.name,
            tricks=dog.tricks + [data['trick']],
        )
    elif event.type == 'Snapshot':
        return Dog(
            id=remove_snapshot_stream_prefix(event.stream_name),
            version=deserialize(event.metadata)['version'],
            is_from_snapshot=True,
            name=data['name'],
            tricks=data['tricks'],
        )
    else:
        raise Exception(f"Unknown event type: {event.type}")

为了方便,我们还定义了一个get_dog()函数,它将get_aggregate()mutate_dog()函数作为其mutator_func参数的值一起调用。

def get_dog(dog_id):
    return get_aggregate(
        aggregate_id=dog_id,
        mutator_func=mutate_dog,
    )

我们还可以定义一些“命令”函数,这些函数将新事件追加到数据库中。函数register_dog()追加一个DogRegistered事件。函数record_trick_learned()追加一个DogLearnedTrick事件。函数snapshot_dog()追加一个Snapshot事件。请注意,函数record_trick_learned()snapshot_dog()使用get_dog()

请注意,DogRegisteredDogLearnedTrick事件被追加到聚合事件的流中,而Snapshot事件被追加到为聚合快照的单独流中。

def register_dog(name):
    dog_id = str(uuid.uuid4())
    event = NewEvent(
        type='DogRegistered',
        data=serialize({'name': name}),
    )
    client.append_to_stream(
        stream_name=dog_id,
        current_version=StreamState.NO_STREAM,
        events=event,
    )
    return dog_id


def record_trick_learned(dog_id, trick):
    dog = get_dog(dog_id)
    event = NewEvent(
        type='DogLearnedTrick',
        data=serialize({'trick': trick}),
    )
    client.append_to_stream(
        stream_name=dog_id,
        current_version=dog.version,
        events=event,
    )


def snapshot_dog(dog_id):
    dog = get_dog(dog_id)
    event = NewEvent(
        type='Snapshot',
        data=serialize({'name': dog.name, 'tricks': dog.tricks}),
        metadata=serialize({'version': dog.version}),
    )
    client.append_to_stream(
        stream_name=make_snapshot_stream_name(dog_id),
        current_version=StreamState.ANY,
        events=event,
    )

我们可以调用register_dog()来注册一只新狗。

# Register a new dog.
dog_id = register_dog('Fido')

dog = get_dog(dog_id)
assert dog.name == 'Fido'
assert dog.tricks == []
assert dog.version == 0
assert dog.is_from_snapshot is False

我们可以调用record_trick_learned()来记录已经学会了一些技巧。

# Record that 'Fido' learned a new trick.
record_trick_learned(dog_id, trick='roll over')

dog = get_dog(dog_id)
assert dog.name == 'Fido'
assert dog.tricks == ['roll over']
assert dog.version == 1
assert dog.is_from_snapshot is False


# Record that 'Fido' learned another new trick.
record_trick_learned(dog_id, trick='fetch ball')

dog = get_dog(dog_id)
assert dog.name == 'Fido'
assert dog.tricks == ['roll over', 'fetch ball']
assert dog.version == 2
assert dog.is_from_snapshot is False

我们可以调用snapshot_dog()来记录Dog聚合的当前状态快照。在调用snapshot_dog()之后,get_dog()函数将返回一个使用Snapshot事件构建的Dog对象。

# Snapshot 'Fido'.
snapshot_dog(dog_id)

dog = get_dog(dog_id)
assert dog.name == 'Fido'
assert dog.tricks == ['roll over', 'fetch ball']
assert dog.version == 2
assert dog.is_from_snapshot is True

我们可以继续使用快照在调用record_trick_learned()和直接调用get_dog()时演化Dog聚合的状态。

record_trick_learned(dog_id, trick='sit')

dog = get_dog(dog_id)
assert dog.name == 'Fido'
assert dog.tricks == ['roll over', 'fetch ball', 'sit']
assert dog.version == 3
assert dog.is_from_snapshot is True

我们可以从is_from_snapshot属性中看到,Dog对象确实是从快照中重建的。

快照可以在固定版本号间隔、固定时间段、特定类型事件后、事件追加后或作为后台进程创建。

读取所有事件

可以使用read_all()方法获取数据库中按记录顺序记录的所有事件。此方法返回一个“读取响应”对象,就像read_stream()

“读取响应”是一个迭代器,而不是一个序列。可以通过迭代“读取响应”对象来获取记录的事件。记录的事件在迭代过程中从服务器流到客户端。迭代将自动停止,当没有更多记录的事件要返回时。可以通过在“读取响应”对象上调用stop()方法来停止事件的流和迭代器。记录的事件对象是RecordedEvent类的实例。

此方法有九个可选参数,分别为commit_positionbackwardsresolve_linksfilter_excludefilter_includefilter_by_stream_namelimittimeoutcredentials

可选的commit_position参数是一个Python int,可以用来指定一个读取的提交位置。默认值commit_positionNone。请注意,如果指定了提交位置,它必须在数据库中实际存在的提交位置。当向前读取时,根据过滤器,提交位置的事件可能包括在内。当向后读取时,提交位置的事件将不包括在内。

可选的 backwards 参数是一个 Python bool 类型。默认的 backwards 值为 False,表示返回的事件按照记录顺序排列;如果 backwardsTrue,则返回的事件顺序相反。

如果 backwardsFalsecommit_positionNone,数据库中的事件将按照记录顺序返回,从第一个记录的事件开始。这是 read_all() 的默认行为。如果 backwardsTruecommit_positionNone,数据库中的事件将按照相反顺序返回,从最后一个记录的事件开始。

可选的 resolve_links 参数是一个 Python bool。默认值是 False,这意味着任何事件链接将不会被解析,因此返回的事件可能代表事件链接。如果 resolve_linksTrue,则任何事件链接将被解析,因此将返回链接事件而不是事件链接。

可选的 filter_exclude 参数是一个正则表达式序列,用于指定应该返回哪些记录事件。如果设置了非空序列的 filter_include,则此参数将被忽略。默认值匹配 EventStoreDB 的 "系统事件" 事件类型,因此系统事件通常不会被包括。有关筛选表达式的更多信息,请参阅下面的注释部分。

可选的 filter_include 参数是一个正则表达式序列,用于指定应该返回哪些记录事件。默认情况下,此参数是一个空元组。如果设置了非空序列,则忽略 filter_exclude 参数。

可选的 filter_by_stream_name 参数是一个表示是否将筛选应用于事件类型或流名称的 Python bool。默认值为 False,因此筛选应用于记录事件的类型字符串。

可选的 limit 参数是一个整数,用于限制返回的事件数量。默认值为 sys.maxint

可选的 timeout 参数是一个 Python float,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。

可选的 credentials 参数可以用来覆盖从连接字符串 URI 派生的调用凭证。

事件筛选是在 EventStoreDB 服务器上进行的。在筛选之后,在服务器上应用 limit 参数。

以下示例显示了如何按记录顺序获取数据库中迄今为止记录的所有事件。我们可以看到 stream_name1event1event2event3)的三个事件被包括在内,以及其他事件。

# Read all events (creates a streaming gRPC call).
read_response = client.read_all()

# Convert the iterator into a sequence of recorded events.
events = tuple(read_response)
assert len(events) > 3  # more than three

# Convert the sequence of recorded events into a set of event IDs.
event_ids = set(e.id for e in events)
assert event1.id in event_ids
assert event2.id in event_ids
assert event3.id in event_ids

以下示例显示了如何从特定的提交位置读取数据库中记录的所有事件,在这种情况下是 commit_position1。当从特定的提交位置正向读取时,将包括该位置的事件。 commit_position1 的值是我们附加 event1 时的位置。因此,event1 是我们将收到的第一个记录事件,event2 是第二个,event3 是第三个。

# Read all events forwards from a commit position.
read_response = client.read_all(
    commit_position=commit_position1
)

# Step through the "read response" iterator.
assert next(read_response) == event1
assert next(read_response) == event2
assert next(read_response) == event3

# Stop the iterator.
read_response.stop()

以下示例显示了如何以相反的顺序读取数据库中记录的所有事件。我们可以看到,我们收到的第一个事件是最后记录的事件:有关快照和快照的章节中关于 Dog 聚合的事件。

# Read all events backwards from the end.
read_response = client.read_all(
    backwards=True
)

# Step through the "read response" iterator.
assert next(read_response).type == "DogLearnedTrick"
assert next(read_response).type == "Snapshot"
assert next(read_response).type == "DogLearnedTrick"
assert next(read_response).type == "DogLearnedTrick"
assert next(read_response).type == "DogRegistered"

# Stop the iterator.
read_response.stop()

以下示例显示了如何从特定的提交位置正向读取有限数量的事件。

events = tuple(
    client.read_all(
        commit_position=commit_position1,
        limit=1,
    )
)

assert len(events) == 1
assert events[0] == event1

以下示例显示了如何从数据库末尾反向读取有限数量的记录事件。在这种情况下,限制为 1,因此我们收到最后一个记录的事件。

events = tuple(
    client.read_all(
        backwards=True,
        limit=1,
    )
)

assert len(events) == 1

assert events[0].type == 'DogLearnedTrick'
assert deserialize(events[0].data)['trick'] == 'sit'

请注意,与 read_stream() 方法一样,read_all() 方法没有使用重试和重连装饰器,因为从服务器流式传输记录事件仅在开始迭代 "读取响应" 时开始,这意味着该方法在流式传输开始之前就返回了,因此没有机会让任何装饰器捕获任何连接问题。

获取提交位置

可以使用 get_commit_position() 方法获取数据库中最后一条记录事件的提交位置。它只是调用 read_all() 方法,传入 backwards=Truelimit=1,并返回最后一条记录事件的 commit_position 属性的值。

commit_position = client.get_commit_position()

此方法有五个可选参数,分别是 filter_excludefilter_includefilter_by_stream_nametimeoutcredentials。这些值会传递给 read_all()

可选的 filter_excludefilter_includefilter_by_stream_name 参数与 read_all() 方法中的用法相同。

可选的 timeout 参数是一个 Python float,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。

可选的 credentials 参数可以用来覆盖从连接字符串 URI 派生的调用凭证。

此方法可能用于测量下游组件处理所有记录事件的进度,通过比较当前提交位置与下游组件中最后成功处理的事件的记录提交位置。在这种情况下,filter_excludefilter_includefilter_by_stream_name 参数的值应与下游组件获取记录事件时使用的值相等。

获取流元数据

get_stream_metadata() 方法返回流元数据及其版本。

此方法有一个必需参数,即 stream_name,它是一个 Python str,用于唯一标识将获取流元数据的流。

此方法有一个可选的 timeout 参数,它是一个 Python float,用于设置 gRPC 操作完成的最大持续时间(以秒为单位)。

此方法有一个可选的 credentials 参数,可以用来覆盖从连接字符串 URI 派生的调用凭据。

以下示例中,获取了 stream_name1 的元数据。

metadata, metadata_version = client.get_stream_metadata(stream_name=stream_name1)

返回的 metadata 值是一个 Python dict。返回的 metadata_version 值可以是 int 类型,表示流存在时的版本,也可以是 StreamState.NO_STREAM,表示流不存在且未设置元数据。这些值可以用作 set_stream_metadata() 的参数。

设置流元数据

需要领导者

set_stream_metadata() 方法用于设置流的元数据。在向流追加事件之前可以设置流元数据。

此方法有一个必需参数,即 stream_name,它是一个 Python str,用于唯一标识将设置流元数据的流。

此方法有一个可选的 timeout 参数,它是一个 Python float,用于设置 gRPC 操作完成的最大持续时间(以秒为单位)。

此方法有一个可选的 credentials 参数,可以用来覆盖从连接字符串 URI 派生的调用凭据。

以下示例中,设置了 stream_name1 的元数据。

metadata["foo"] = "bar"

client.set_stream_metadata(
    stream_name=stream_name1,
    metadata=metadata,
    current_version=metadata_version,
)

current_version 参数应该是从 get_stream_metadata() 获取的流元数据当前版本。

有关流元数据的更多信息,请参阅 EventStoreDB 文档。

删除流

需要领导者

可以使用 delete_stream() 方法“删除”流。

此方法有两个必需参数,即 stream_namecurrent_version

必需的 stream_name 参数是一个 Python str,它唯一标识了一个要附加一系列事件的流。

必需的 current_version 参数预期为一个 Python int,表示流中最后一条记录事件的流位置。

此方法有一个可选的 timeout 参数,它是一个 Python float,用于设置 gRPC 操作完成的最大持续时间(以秒为单位)。

此方法有一个可选的 credentials 参数,可以用来覆盖从连接字符串 URI 派生的调用凭据。

以下示例中,删除了 stream_name1

commit_position = client.delete_stream(stream_name=stream_name1, current_version=2)

删除流后,仍然可以追加新事件。从已删除的流中读取将只返回在它被删除后追加的事件。

墓碑流

需要领导者

可以使用 tombstone_stream() 方法“墓碑”流。

此方法有两个必需参数,即 stream_namecurrent_version

必需的 stream_name 参数是一个 Python str,它唯一标识了一个要附加一系列事件的流。

必需的 current_version 参数预期为一个 Python int,表示流中最后一条记录事件的流位置。

此方法有一个可选的 timeout 参数,它是一个 Python float,用于设置 gRPC 操作完成的最大持续时间(以秒为单位)。

此方法有一个可选的 credentials 参数,可以用来覆盖从连接字符串 URI 派生的调用凭据。

以下示例中,stream_name1 被墓碑。

commit_position = client.tombstone_stream(stream_name=stream_name1, current_version=2)

在墓碑一个流之后,无法再追加新事件。

追捕订阅

“追捕”订阅可用于接收已记录的事件以及随后记录的事件。追捕订阅可用于处理记录事件的组件,该组件使用“精确一次”语义。

subscribe_to_all() 方法启动一个追赶订阅,可以接收数据库中的所有事件。subscribe_to_stream() 方法启动一个追赶订阅,可以接收特定流的事件。这两种方法都返回一个“追赶订阅”对象,该对象是 Python 迭代器。可以通过迭代获取已记录的事件。以这种方式获取的记录事件对象是 RecordedEvent 类的实例。

在将“追赶订阅”对象返回给调用者之前,客户端将首先从服务器获得一个“确认”响应,这允许客户端检测 gRPC 连接和流式 gRPC 调用是否正常工作。因此,subscribe_to_all()subscribe_to_stream() 方法都非常有用地装饰了重连和重试装饰器。然而,一旦方法返回,装饰器就会退出,并且在迭代订阅对象时由于连接问题而引发的任何异常都必须由您的代码处理。

当没有更多事件要返回时,"追赶订阅"迭代器不会自动停止,但迭代将阻塞,直到数据库中随后记录新事件。随后记录的任何事件都将立即流式传输到客户端,然后迭代将继续。可以通过在“追赶订阅”对象上调用 stop() 方法来停止事件流,从而停止迭代。

订阅所有事件

subscribe_to_all() 方法可以用来启动一个追赶订阅,从中可以按记录顺序获取数据库中记录的所有事件。该方法返回一个“追赶订阅”迭代器。

此方法还有十个可选参数:commit_positionfrom_endresolve_linksfilter_excludefilter_includefilter_by_stream_nameinclude_checkpointsinclude_caught_uptimeoutcredentials

commit_position 可选参数指定一个提交位置。commit_position 的默认值是 None,这意味着追赶订阅将从数据库中的第一个记录事件开始。如果提供了提交位置,它必须与数据库中实际存在的提交位置匹配。只有在该位置之后记录的事件才会被获取。

from_end 可选参数指定追赶订阅是否从数据库中的最后一个记录事件开始。默认情况下,此参数为 False。如果 from_endTrue,则只获取在订阅启动之后记录的事件。如果 commit_position 不是 None,则忽略此参数。

可选的 resolve_links 参数是一个 Python bool。默认值是 False,这意味着任何事件链接将不会被解析,因此返回的事件可能代表事件链接。如果 resolve_linksTrue,则任何事件链接将被解析,因此将返回链接事件而不是事件链接。

可选的 filter_exclude 参数是一个正则表达式序列,用于指定应该返回哪些记录事件。如果设置了非空序列的 filter_include,则此参数将被忽略。默认值匹配 EventStoreDB 的 "系统事件" 事件类型,因此系统事件通常不会被包括。有关筛选表达式的更多信息,请参阅下面的注释部分。

可选的 filter_include 参数是一个正则表达式序列,用于指定应该返回哪些记录事件。默认情况下,此参数是一个空元组。如果设置了非空序列,则忽略 filter_exclude 参数。

可选的 filter_by_stream_name 参数是一个表示是否将筛选应用于事件类型或流名称的 Python bool。默认值为 False,因此筛选应用于记录事件的类型字符串。

include_checkpoints 可选参数是一个 Python bool,表示在接收记录事件时是否应包含“检查点”消息。检查点具有 commit_position 值,事件处理组件可以使用该值来更新其记录的提交位置值,这样在大量事件被过滤时,订阅者不必在事件处理组件重新启动时从相同的老位置开始。

include_caught_up 可选参数是一个 Python bool,表示在接收记录事件时是否应包含“已赶上”消息。默认值 include_caught_upFalse

可选的 timeout 参数是一个 Python float,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。

可选的 credentials 参数可以用来覆盖从连接字符串 URI 派生的调用凭证。

下面的示例显示了如何从数据库中的第一个记录事件开始启动一个追赶订阅。

# Subscribe from the first recorded event in the database.
catchup_subscription = client.subscribe_to_all()

下面的示例显示,追赶订阅不会自动停止,但在收到最后一个记录事件时会阻塞,然后在随后记录事件时继续。

from time import sleep
from threading import Thread


# Append a new event to a new stream.
stream_name2 = str(uuid.uuid4())
event4 = NewEvent(type='OrderCreated', data=b'{}')

client.append_to_stream(
    stream_name=stream_name2,
    current_version=StreamState.NO_STREAM,
    events=[event4],
)


# Receive events from the catch-up subscription in a different thread.
received_events = []

def receive_events():
    for event in catchup_subscription:
        received_events.append(event)


def wait_for_event(event):
    for _ in range(100):
        for received in reversed(received_events):
            if event == received:
                return
        else:
            sleep(0.1)
    else:
        raise AssertionError("Event wasn't received")


thread = Thread(target=receive_events, daemon=True)
thread.start()

# Wait to receive event4.
wait_for_event(event4)

# Append another event whilst the subscription is running.
event5 = NewEvent(type='OrderUpdated', data=b'{}')
client.append_to_stream(
    stream_name=stream_name2,
    current_version=0,
    events=[event5],
)

# Wait for the subscription to block.
wait_for_event(event5)

# Stop the subscription.
catchup_subscription.stop()
thread.join()

以下示例展示了如何订阅特定提交位置之后记录的事件,在本例中是从上面接收到的最后一个记录事件的提交位置开始。然后,在重新启动订阅之前,又记录了一个事件。当订阅运行时,还记录了另外三个事件。这四个事件按照记录的顺序接收。

# Append another event.
event6 = NewEvent(type='OrderDeleted', data=b'{}')
client.append_to_stream(
    stream_name=stream_name2,
    current_version=1,
    events=[event6],
)

# Restart subscribing to all events after the
# commit position of the last received event.
catchup_subscription = client.subscribe_to_all(
    commit_position=received_events[-1].commit_position
)

thread = Thread(target=receive_events, daemon=True)
thread.start()

# Wait for event6.
wait_for_event(event6)

# Append three more events to a new stream.
stream_name3 = str(uuid.uuid4())
event7 = NewEvent(type='OrderCreated', data=b'{}')
event8 = NewEvent(type='OrderUpdated', data=b'{}')
event9 = NewEvent(type='OrderDeleted', data=b'{}')

client.append_to_stream(
    stream_name=stream_name3,
    current_version=StreamState.NO_STREAM,
    events=[event7, event8, event9],
)

# Wait for events 7, 8 and 9.
wait_for_event(event7)
wait_for_event(event8)
wait_for_event(event9)

# Stop the subscription.
catchup_subscription.stop()
thread.join()

当调用订阅对象的 stop() 方法时,追赶订阅调用即结束。当它超出作用域时,或使用 Python 的 del 关键字显式地从内存中删除时,会自动发生。

订阅流事件

可以使用 subscribe_to_stream() 方法从单个流中获取记录的事件开始追赶订阅。此方法返回一个“追赶订阅”迭代器。

此方法有一个必需的 stream_name 参数,它指定了将接收记录事件的流名称。

此方法还有六个可选参数,分别是 stream_positionfrom_endresolve_linksinclude_caught_uptimeoutcredentials

可选的 stream_position 参数指定了一个位置,从该位置开始订阅。默认值 stream_positionNone,这意味着将按记录顺序获取流中记录的所有事件,除非将 from_end 设置为 True。如果提供了流位置,则仅获取该位置之后记录的事件。

可选的 from_end 参数指定订阅将从流中的最后一个位置开始。默认值 from_endFalse。如果 from_endTrue,则仅获取订阅创建之后记录的事件。如果设置了 stream_position,则忽略此参数。

可选的 resolve_links 参数是一个 Python bool。默认值是 False,这意味着任何事件链接将不会被解析,因此返回的事件可能代表事件链接。如果 resolve_linksTrue,则任何事件链接将被解析,因此将返回链接事件而不是事件链接。

include_caught_up 可选参数是一个 Python bool,表示在接收记录事件时是否应包含“已赶上”消息。默认值 include_caught_upFalse

可选的 timeout 参数是一个 Python float,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。

可选的 credentials 参数可以用来覆盖从连接字符串 URI 派生的调用凭证。

以下示例展示了如何从流中的第一个记录事件开始启动追赶订阅。

# Subscribe from the start of 'stream2'.
subscription = client.subscribe_to_stream(stream_name=stream_name2)

以下示例展示了如何从特定的流位置启动追赶订阅。

# Subscribe to stream2, from the second recorded event.
subscription = client.subscribe_to_stream(
    stream_name=stream_name2,
    stream_position=1,
)

如何实现恰好一次的事件处理

下游组件接收并处理记录事件的提交位置对于下游组件来说非常有用,这样在恢复处理时可以确定最后处理的事件的提交位置。

最后一个记录的提交位置可以用来指定恢复处理时订阅的提交位置。由于此提交位置将代表下游组件中最后一个成功处理的事件的位置,因此通常希望获取此位置之后的事件,因为那是尚未处理的事件。因此,当使用 EventStoreDB 中的追赶订阅从特定提交位置订阅事件时,将不会将指定提交位置上记录的事件包含在接收到的记录事件序列中。

在使用追击订阅时,为了在下游组件中实现记录事件的“精确一次”处理,应该将记录事件的提交位置与处理记录事件的结果原子性地、唯一地记录下来,例如在实现最终一致性的CQRS时,可以在与物化视图相同的数据库中,或者在与下游分析、报告或归档应用相同的数据库中。通过将记录事件的提交位置与处理记录事件产生的新状态原子性地记录,可以避免记录事件消费过程中的“双重写入”。通过唯一记录提交位置,新状态不会重复记录,因此下游组件的记录状态对于任何记录事件只会更新一次。通过使用最大的记录提交位置来恢复追击订阅,所有记录的事件最终都会被处理。将“最多一次”条件与“至少一次”条件相结合,得到“精确一次”条件。

在记录事件消费过程中的“双重写入”的危险在于,如果记录事件在事务中成功处理并记录新状态,提交位置在另一个事务中记录,可能发生一种情况而不是另一种情况。如果新状态被记录但位置丢失,然后处理停止并恢复,记录的事件可能会被处理两次。另一方面,如果提交位置被记录但新状态丢失,记录的事件可能实际上根本未处理。通过处理事件超过一次,或者未能处理事件,下游组件的记录状态可能会不准确,或者可能是不一致的,甚至可能是灾难性的。这些后果在您的情况下可能重要也可能不重要。但有时不一致性可能会停止处理,直到问题解决。您可以通过在相同的原子事务中原子性地记录记录事件的提交位置以及处理该事件产生的新状态来避免事件消费中的“双重写入”。通过使提交位置的记录唯一,当存在冲突时,事务将回滚,您将防止任何重复处理记录事件的成果被提交。

从追击订阅接收到的记录事件不能返回确认给EventStoreDB服务器。然而,确认事件是“持久订阅”的一个方面。希望依赖于将事件确认到上游组件是双重写入的例子。

持久订阅

在EventStoreDB中,“持久”订阅与追击订阅类似,因为在没有更多记录事件可接收时,读取持久订阅将会阻塞,然后在新事件随后记录时继续。

持久订阅可以通过一组使用支持的“消费者策略”之一操作的消费者组来消费。

持久订阅的显著不同之处在于服务器会跟踪消费者的进度。因此,持久订阅的消费者需要在记录事件成功处理时“确认”,否则“否定确认”已接收但未成功处理的记录事件。

所有这些都意味着对于持久订阅,需要考虑“创建”、“读取”、“更新”、“删除”、“确认”和“否定确认”操作。

虽然持久订阅有一些优点,特别是记录事件由一组消费者并发处理,但通过在服务器中跟踪已处理事件的提交序列中的位置,事件消费中的“双重写入”问题就会产生。一组持久订阅消费者对记录事件的处理可靠性将依赖于它们对重复消息的幂等处理,以及它们对顺序错乱交付的弹性。

创建对所有事件的订阅

需要领导者

可以使用 create_subscription_to_all() 方法创建对所有数据库中所有流记录事件的“持久订阅”。

此方法需要一个必需的 group_name 参数,它是订阅的“消费者组”名称。

此方法有十九个可选参数:from_endcommit_positionresolve_linksfilter_excludefilter_includefilter_by_stream_nameconsumer_strategymessage_timeoutmax_retry_countmin_checkpoint_countmax_checkpoint_countcheckpoint_aftermax_subscriber_countlive_buffer_sizeread_batch_sizehistory_buffer_sizeextra_statisticstimeoutcredentials

可选的 from_end 参数可以用来指定订阅的消费者组应该只接收在订阅创建后记录的事件。

或者,可选的 commit_position 参数可以用来指定一个提交位置,从该位置开始,订阅的消费者组应该接收事件。请注意,在指定提交位置记录的事件可能包含在消费者组接收到的记录事件中。

如果没有指定 from_endcommit_position,订阅的消费者组可能会接收到数据库中所有记录的事件。

可选的 resolve_links 参数是一个 Python bool。默认值是 False,这意味着任何事件链接将不会被解析,因此返回的事件可能代表事件链接。如果 resolve_linksTrue,则任何事件链接将被解析,因此将返回链接事件而不是事件链接。

可选的 filter_exclude 参数是一个正则表达式序列,用于指定应该返回哪些记录事件。如果设置了非空序列的 filter_include,则此参数将被忽略。默认值匹配 EventStoreDB 的 "系统事件" 事件类型,因此系统事件通常不会被包括。有关筛选表达式的更多信息,请参阅下面的注释部分。

可选的 filter_include 参数是一个正则表达式序列,用于指定应该返回哪些记录事件。默认情况下,此参数是一个空元组。如果设置了非空序列,则忽略 filter_exclude 参数。

可选的 filter_by_stream_name 参数是一个表示是否将筛选应用于事件类型或流名称的 Python bool。默认值为 False,因此筛选应用于记录事件的类型字符串。

可选的 consumer_strategy 参数是一个 Python str,它定义了此持久订阅的消费者策略。此参数的值可以是 'DispatchToSingle''RoundRobin''Pinned''PinnedByCorrelation'。默认值是 'DispatchToSingle'

可选的 message_timeout 参数是一个 Python float,它设置从服务器发送记录事件到持久订阅消费者,直到服务器收到“确认”(ack)或“否定确认”(nack)的最大持续时间(以秒为单位),之后服务器将重试发送事件。默认值 message_timeout30.0

可选的 max_retry_count 参数是一个 Python int,它设置服务器重试发送事件的次数。默认值 max_retry_count10

可选的 min_checkpoint_count 参数是一个 Python int,它设置服务器在记录确认(acks)之前必须收到的最小确认(acks)数量。默认值 min_checkpoint_count10

可选的 max_checkpoint_count 参数是一个 Python int,它设置服务器在记录确认(acks)之前必须收到的最大确认(acks)数量。默认值 max_checkpoint_count1000

可选的 checkpoint_after 参数是一个 Python float,它设置记录确认(acks)之间的最大持续时间(以秒为单位)。默认值 checkpoint_after2.0

可选的 max_subscriber_count 参数是一个 Python int,它设置持久订阅的并发读取者的最大数量。超出此数量后,尝试读取持久订阅将引发 MaximumSubscriptionsReached 错误。

可选的 live_buffer_size 参数是一个 Python int,它设置存储新记录事件的缓冲区大小(内存中)。默认值 live_buffer_size 为 500。

可选的 read_batch_size 参数是一个 Python int,它设置在追赶时从磁盘读取的记录事件的数量。默认值 read_batch_size 为 200。

可选的history_buffer_size参数是一个Python int,用于设置在追及时缓存的记录事件数量。默认值history_buffer_size为500。

可选的extra_statistics参数是一个Python bool,用于启用此订阅的额外统计跟踪。默认值extra_statisticsFalse

可选的 timeout 参数是一个 Python float,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。

可选的 credentials 参数可以用来覆盖从连接字符串 URI 派生的调用凭证。

方法create_subscription_to_all()不返回值。通过调用read_subscription_to_all()方法来获取记录事件。

以下示例中,创建了一个持久订阅,从数据库中的第一个记录的非系统事件开始操作。

# Create a persistent subscription.
group_name1 = f"group-{uuid.uuid4()}"
client.create_subscription_to_all(group_name=group_name1)

读取所有订阅

需要领导者

read_subscription_to_all()方法可以被一组消费者使用,以接收使用create_subscription_to_all()方法创建的持久订阅中的记录事件。

此方法有一个必需的group_name参数,是调用create_subscription_to_all()时指定的订阅消费者“组”的名称。

此方法有一个可选的 timeout 参数,它是一个 Python float,用于设置 gRPC 操作完成的最大持续时间(以秒为单位)。

此方法有一个可选的 credentials 参数,可以用来覆盖从连接字符串 URI 派生的调用凭据。

此方法返回一个PersistentSubscription对象,它是一个迭代器,提供RecordedEvent对象。它还具有ack()nack()stop()方法。

subscription = client.read_subscription_to_all(group_name=group_name1)

ack()方法应由消费者使用以“确认”已从服务器接收并成功处理记录事件。这将防止该记录事件被同一组中的另一个消费者接收。ack()有一个item参数,可以是RecordedEventUUID。如果您传入一个RecordedEvent,则将使用其ack_id属性的值向服务器确认事件。如果您传入UUID,则使用被确认的RecordedEventack_id值,如果该事件已从链接事件中解析出来(这可以在持久订阅设置resolve_linksTrue时发生,也可以在无论resolve_links设置如何重放已停放的事件时发生)。

以下示例遍历订阅对象,并使用接收到的RecordedEvent对象调用ack()。当接收到event9时,调用订阅的stop()方法,停止迭代,以便我们可以继续下面的示例。

received_events = []

for event in subscription:
    received_events.append(event)

    # Acknowledge the received event.
    subscription.ack(event)

    # Stop when 'event9' has been received.
    if event == event9:
        subscription.stop()

nack()应由消费者使用以向服务器“否定确认”它已接收但未成功处理记录事件。与ack()一样,nack()方法有一个item参数,其工作方式相同。使用记录事件或其ack_id属性。nack()方法还有一个action参数,它应该是一个Python str:可以是'unknown''park''retry''skip''stop'

stop()方法可以用来停止gRPC流操作。

如何编写持久订阅消费者

持久订阅的读取可以封装在一个“消费者”中,该消费者在接收到记录事件时调用一个“策略”函数,如果策略函数正常返回,则自动调用ack(),如果它引发异常,则调用nack(),可能重试事件一定次数,然后停放事件。

以下简单示例展示了如何实现这一点。我们可以看到,在最终停放event5之前,已确认了'event9'。

RecordedEvent已被重试的次数通过其retry_count属性表示。

acked_events = {}
nacked_events = {}


class ExampleConsumer:
    def __init__(self, subscription, max_retry_count, final_action):
        self.subscription = subscription
        self.max_retry_count = max_retry_count
        self.final_action = final_action
        self.error = None

    def run(self):
        with self.subscription:
            for event in self.subscription:
                try:
                    self.policy(event)
                except Exception:
                    if event.retry_count < self.max_retry_count:
                        action = "retry"
                    else:
                        action = self.final_action
                    self.subscription.nack(event, action)
                    self.after_nack(event, action)
                else:
                    self.subscription.ack(event)
                    self.after_ack(event)

    def stop(self):
        self.subscription.stop()

    def policy(self, event):
        # Raise an exception when we see "event5".
        if event == event5:
            raise Exception()

    def after_ack(self, event):
        # Track retry count of acked events.
        acked_events[event.id] = event.retry_count

    def after_nack(self, event, action):
        # Track retry count of nacked events.
        nacked_events[event.id] = event.retry_count

        if action == self.final_action:
            # Stop the consumer, so we can continue with the examples.
            self.stop()


# Create subscription.
group_name = f"group-{uuid.uuid4()}"
client.create_subscription_to_all(group_name, commit_position=commit_position1)

# Read subscription.
subscription = client.read_subscription_to_all(group_name)

# Construct consumer.
consumer = ExampleConsumer(
    subscription=subscription,
    max_retry_count=5,
    final_action="park",
)

# Run consumer.
consumer.run()

# Check 'event5' was nacked and never acked.
assert event5.id in nacked_events
assert event5.id not in acked_events
assert nacked_events[event5.id] == 5

# Check 'event9' was acked and never nacked.
assert event9.id in acked_events
assert event9.id not in nacked_events

更新所有订阅

需要领导者

可以使用 update_subscription_to_all() 方法来更新一个“持久订阅”。请注意,无法调整过滤选项和消费者策略。

此方法需要一个必需的 group_name 参数,它是订阅的“消费者组”名称。

此方法还有十六个可选参数,分别是:from_endcommit_positionresolve_linksconsumer_strategymessage_timeoutmax_retry_countmin_checkpoint_countmax_checkpoint_countcheckpoint_aftermax_subscriber_countlive_buffer_sizeread_batch_sizehistory_buffer_sizeextra_statisticstimeoutcredentials

可选参数 from_endcommit_positionresolve_linksconsumer_strategymessage_timeoutmax_retry_countmin_checkpoint_countmax_checkpoint_countcheckpoint_aftermax_subscriber_countlive_buffer_sizeread_batch_sizehistory_buffer_sizeextra_statistics 可以用来调整之前调用 create_subscription_to_all()update_subscription_to_all() 时设置的值。如果在 update_subscription_to_all() 的调用中没有提到这些参数,则持久订阅的相应设置将保持不变。

可选的 timeout 参数是一个 Python float,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。

可选的 credentials 参数可以用来覆盖从连接字符串 URI 派生的调用凭证。

update_subscription_to_all() 方法不返回任何值。

以下示例中,将持久订阅更新为从数据库末尾开始运行。

# Create a persistent subscription.
client.update_subscription_to_all(group_name=group_name1, from_end=True)

创建订阅到流

需要领导者

可以使用 create_subscription_to_stream() 方法创建对流的持久订阅。

此方法有两个必需参数,分别是 group_namestream_namegroup_name 参数命名了将接收此订阅事件的消费者组。stream_name 参数指定订阅将跟踪哪个流。这两个参数的值都应预期为 Python str 对象。

此方法还有十六个可选参数,分别是:stream_positionfrom_endresolve_linksconsumer_strategymessage_timeoutmax_retry_countmin_checkpoint_countmax_checkpoint_countcheckpoint_aftermax_subscriber_countlive_buffer_sizeread_batch_sizehistory_buffer_sizeextra_statisticstimeoutcredentials

stream_position 参数指定一个订阅位置,从该位置订阅流。当读取订阅时,将接收到此订阅位置的记录事件。

from_end 参数是一个 Python bool。默认情况下,此参数的值为 False。如果将此参数设置为 True,则从订阅中读取将只接收订阅创建之后记录的事件。也就是说,它不包括当前流位置。

可选的 resolve_links 参数是一个 Python bool。默认值是 False,这意味着任何事件链接将不会被解析,因此返回的事件可能代表事件链接。如果 resolve_linksTrue,则任何事件链接将被解析,因此将返回链接事件而不是事件链接。

可选的 consumer_strategy 参数是一个 Python str,它定义了此持久订阅的消费者策略。此参数的值可以是 'DispatchToSingle''RoundRobin''Pinned''PinnedByCorrelation'。默认值是 'DispatchToSingle'

可选的 message_timeout 参数是一个 Python float,它设置从服务器发送记录事件到持久订阅消费者,直到服务器收到“确认”(ack)或“否定确认”(nack)的最大持续时间(以秒为单位),之后服务器将重试发送事件。默认值 message_timeout30.0

可选的 max_retry_count 参数是一个 Python int,它设置服务器重试发送事件的次数。默认值 max_retry_count10

可选的 min_checkpoint_count 参数是一个 Python int,它设置服务器在记录确认(acks)之前必须收到的最小确认(acks)数量。默认值 min_checkpoint_count10

可选的 max_checkpoint_count 参数是一个 Python int,它设置服务器在记录确认(acks)之前必须收到的最大确认(acks)数量。默认值 max_checkpoint_count1000

可选的 checkpoint_after 参数是一个 Python float,它设置记录确认(acks)之间的最大持续时间(以秒为单位)。默认值 checkpoint_after2.0

可选的 max_subscriber_count 参数是一个 Python int,它设置持久订阅的并发读取者的最大数量。超出此数量后,尝试读取持久订阅将引发 MaximumSubscriptionsReached 错误。

可选的 live_buffer_size 参数是一个 Python int,它设置存储新记录事件的缓冲区大小(内存中)。默认值 live_buffer_size 为 500。

可选的 read_batch_size 参数是一个 Python int,它设置在追赶时从磁盘读取的记录事件的数量。默认值 read_batch_size 为 200。

可选的history_buffer_size参数是一个Python int,用于设置在追及时缓存的记录事件数量。默认值history_buffer_size为500。

可选的extra_statistics参数是一个Python bool,用于启用此订阅的额外统计跟踪。默认值extra_statisticsFalse

可选的 timeout 参数是一个 Python float,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。

可选的 credentials 参数可以用来覆盖从连接字符串 URI 派生的调用凭证。

此方法不返回任何值。可以通过调用 read_subscription_to_stream() 来接收事件。

以下示例创建了一个从流开始处的持久流订阅。

# Create a persistent stream subscription from start of the stream.
group_name2 = f"group-{uuid.uuid4()}"
client.create_subscription_to_stream(
    group_name=group_name2,
    stream_name=stream_name2,
)

读取订阅到流

需要领导者

可以使用 read_subscription_to_stream() 方法读取对流的持久订阅。

此方法有两个必需参数,分别是 group_namestream_name,它们应与调用 create_subscription_to_stream() 时使用的参数值匹配。

此方法有一个可选的 timeout 参数,它是一个 Python float,用于设置 gRPC 操作完成的最大持续时间(以秒为单位)。

此方法有一个可选的 credentials 参数,可以用来覆盖从连接字符串 URI 派生的调用凭据。

此方法返回一个 PersistentSubscription 对象,它是一个迭代器,提供 RecordedEvent 对象,并且还具有 ack()nack()stop() 方法。

subscription = client.read_subscription_to_stream(
    group_name=group_name2,
    stream_name=stream_name2,
)

以下示例遍历订阅对象,并调用 ack()。当接收到 event6 后,调用订阅的 stop() 方法,停止迭代,以便我们可以继续下面的示例。

events = []
for event in subscription:
    events.append(event)

    # Acknowledge the received event.
    subscription.ack(event)

    # Stop when 'event6' has been received.
    if event == event6:
        subscription.stop()

我们可以检查在上述示例中是否收到了所有附加到 stream_name2 的事件。

assert len(events) == 3
assert events[0] == event4
assert events[1] == event5
assert events[2] == event6

更新订阅到流

需要领导者

可以使用 update_subscription_to_stream() 方法更新对流的持久订阅。请注意,无法调整消费者策略。

此方法有一个必需的 group_name 参数,它是订阅的“组”消费者的名称,以及一个必需的 stream_name 参数,它是流的名称。

此方法还具有十六个可选参数,包括from_endstream_positionresolve_linksconsumer_strategymessage_timeoutmax_retry_countmax_subscriber_countlive_buffer_sizeread_batch_sizehistory_buffer_sizeextra_statisticsmin_checkpoint_countmax_checkpoint_countcheckpoint_aftertimeoutcredentials

可选参数from_endstream_positionresolve_linksconsumer_strategymessage_timeoutmax_retry_countmin_checkpoint_countmax_checkpoint_countcheckpoint_aftermax_subscriber_countlive_buffer_sizeread_batch_sizehistory_buffer_sizeextra_statistics可用于调整之前调用create_subscription_to_stream()update_subscription_to_stream()时设置的值。如果在update_subscription_to_stream()的调用中没有提及这些参数中的任何一个,则持久订阅的相应设置将保持不变。

可选的 timeout 参数是一个 Python float,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。

可选的 credentials 参数可以用来覆盖从连接字符串 URI 派生的调用凭证。

update_subscription_to_stream()方法不返回任何值。

以下示例展示了如何将持久订阅更新为从流的末尾开始运行。

# Create a persistent subscription.
client.update_subscription_to_stream(
    group_name=group_name2,
    stream_name=stream_name2,
    from_end=True,
)

重放已停泊事件

需要领导者

replay_parked_events()方法可用于在读取持久订阅时“重放”已被“停泊”(使用操作'park'进行负面确认)的事件。停泊的事件将被消费者再次接收到。

此方法需要一个必需的group_name参数和一个可选的stream_name参数。这些参数的值应与调用create_subscription_to_all()create_subscription_to_stream()时使用的值匹配。

此方法有一个可选的 timeout 参数,它是一个 Python float,用于设置 gRPC 操作完成的最大持续时间(以秒为单位)。

此方法有一个可选的 credentials 参数,可以用来覆盖从连接字符串 URI 派生的调用凭据。

以下示例展示了为组group_name1重放已停泊事件。

client.replay_parked_events(
    group_name=group_name1,
)

以下示例展示了为组group_name2重放已停泊事件。

client.replay_parked_events(
    group_name=group_name2,
    stream_name=stream_name2,
)

获取订阅信息

需要领导者

get_subscription_info()方法可用于获取持久订阅的信息。

此方法需要一个必需的group_name参数和一个可选的stream_name参数,这些参数应与调用create_subscription_to_all()create_subscription_to_stream()时使用的参数值匹配。

此方法有一个可选的 timeout 参数,它是一个 Python float,用于设置 gRPC 操作完成的最大持续时间(以秒为单位)。

此方法有一个可选的 credentials 参数,可以用来覆盖从连接字符串 URI 派生的调用凭据。

以下示例获取了通过调用create_subscription_to_all()创建的持久订阅group_name1的信息。

subscription_info = client.get_subscription_info(
    group_name=group_name1,
)

以下示例获取了通过调用create_subscription_to_stream()stream_name2上创建的持久订阅group_name2的信息。

subscription_info = client.get_subscription_info(
    group_name=group_name2,
    stream_name=stream_name2,
)

返回值是一个SubscriptionInfo对象。

列出订阅

需要领导者

list_subscriptions()方法可用于获取所有现有持久订阅的信息,包括“对所有订阅”和“对流的订阅”。

此方法有一个可选的 timeout 参数,它是一个 Python float,用于设置 gRPC 操作完成的最大持续时间(以秒为单位)。

此方法有一个可选的 credentials 参数,可以用来覆盖从连接字符串 URI 派生的调用凭据。

以下示例列出了所有现有的持久订阅。

subscriptions = client.list_subscriptions()

返回值是一个SubscriptionInfo对象列表。

列出对流的订阅

需要领导者

list_subscriptions_to_stream()方法可用于获取对流的全部持久订阅的信息。

此方法有一个必需的参数,stream_name

此方法有一个可选的 timeout 参数,它是一个 Python float,用于设置 gRPC 操作完成的最大持续时间(以秒为单位)。

此方法有一个可选的 credentials 参数,可以用来覆盖从连接字符串 URI 派生的调用凭据。

subscriptions = client.list_subscriptions_to_stream(
    stream_name=stream_name2,
)

返回值是一个SubscriptionInfo对象列表。

删除订阅

需要领导者

delete_subscription()方法可用于删除持久订阅。

此方法需要一个必需的group_name参数和一个可选的stream_name参数,这些参数应与调用create_subscription_to_all()create_subscription_to_stream()时使用的参数值匹配。

此方法有一个可选的 timeout 参数,它是一个 Python float,用于设置 gRPC 操作完成的最大持续时间(以秒为单位)。

此方法有一个可选的 credentials 参数,可以用来覆盖从连接字符串 URI 派生的调用凭据。

以下示例删除了通过调用create_subscription_to_all()创建的持久订阅group_name1

client.delete_subscription(
    group_name=group_name1,
)

以下示例删除了通过调用create_subscription_to_stream()stream_name2上创建的持久订阅group_name2

client.delete_subscription(
    group_name=group_name2,
    stream_name=stream_name2,
)

投影

有关EventStoreDB中投影的更多信息,请参阅EventStoreDB文档

创建投影

需要领导者

create_projection()方法可用于创建“连续”投影。

此方法有两个必需参数,即namequery

此必需的name参数是一个Python str,用于指定投影的名称。

这个必需的 query 参数是一个 Python str,它定义了投影将执行的操作。

此方法还有四个可选参数,分别是 emit_enabledtrack_emitted_streamstimeoutcredentials

可选的 emit_enabled 参数是一个 Python bool,它指定了投影是否能够发出事件。如果指定了 True 值,则投影能够发出事件;否则,投影不能发出事件。默认值 emit_enabledFalse

请注意,如果您的投影查询中包含对 emit() 的调用,则 emit_enabled 必须为 True,否则投影将无法运行。

可选的 track_emitted_streams 参数是一个 Python bool,它指定了投影是否跟踪其发出的流。如果指定了 True 值,则投影将跟踪其发出的流;否则,投影不会跟踪其发出的流。默认值 track_emitted_streamsFalse

跟踪发出流的目的是,它们在删除投影时可以可选地被删除(有关详细信息,请参阅 delete_projection() 方法)。

请注意,如果将 track_emitted_streams 设置为 True,则还必须将 emit_enabled 设置为 True,否则此方法将引发错误。

可选的 timeout 参数是一个 Python float,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。

可选的 credentials 参数可以用来覆盖从连接字符串 URI 派生的调用凭证。

以下示例中创建了一个投影,它处理附加到 stream_name2 的事件。投影的“状态”被初始化为具有“计数”,该计数在每次事件发生时增加一次。

projection_name = str(uuid.uuid4())

projection_query = """fromStream('%s')
.when({
  $init: function(){
    return {
      count: 0
    };
  },
  OrderCreated: function(s,e){
    s.count += 1;
  },
  OrderUpdated: function(s,e){
    s.count += 1;
  },
  OrderDeleted: function(s,e){
    s.count += 1;
  }
})
.outputState()
"""  % stream_name2

client.create_projection(
    name=projection_name,
    query=projection_query,
)

请注意,outputState() 调用是可选的,它将投影的状态持久化到“结果”流中。如果调用 outputState(),则立即将表示投影状态的的事件写入“结果”流。

对于名为 projection_name 的投影,“结果”流的默认名称为 $projections-{projection_name}-result。可以使用该流名称使用 get_stream()read_stream()subscribe_to_stream()create_subscription_to_stream()read_subscription_to_stream() 方法读取和订阅“结果”流。

如果您的投影没有调用 outputState(),则您将无法读取或订阅“结果”流,但您仍然可以使用 get_projection_state() 方法获取投影“状态”。

记录在“结果”流中的事件“类型”字符串为 'Result'。您可能希望在读取或订阅数据库中记录的“所有”事件(使用 read_all()subscribe_to_all() 等)时将其包含在 filter_exclude 参数中按类型过滤事件。

此外,无论何时,投影的状态都会定期记录在一个“状态”流中,并且投影还会写入一个“检查点”流。在删除投影时,“状态”流、“检查点”流以及所有已“跟踪”的“发出”流(由于 track_emitted_streams 参数被设置为 True)可以可选地被删除。有关详细信息,请参阅 delete_projection()

与“结果”和“发出”流不同,“状态”流和“检查点”流不能由用户读取或订阅,或通过 EventStoreDB Web 界面的“流浏览器”视图查看。

获取投影状态

需要领导者

可以使用 get_projection_state() 方法获取投影的“状态”。

此方法有一个必需的 name 参数,它是一个 Python str,指定了投影的名称。

此方法还有两个可选参数,分别是 timeoutcredentials

可选的 timeout 参数是一个 Python float,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。

可选的 credentials 参数可以用来覆盖从连接字符串 URI 派生的调用凭证。

以下示例中,在睡眠 1 秒以允许投影处理所有记录的事件之后,获取了投影“状态”。我们可以看到投影已处理了三个事件。

sleep(1)  # allow time for projection to process recorded events

projection_state = client.get_projection_state(name=projection_name)

assert projection_state.value == {'count': 3}

获取投影统计

需要领导者

可以使用 get_projection_statistics() 方法来获取投影统计信息。

此方法有一个必需的 name 参数,它是一个 Python str,指定了投影的名称。

此方法还有两个可选参数,分别是 timeoutcredentials

可选的 timeout 参数是一个 Python float,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。

可选的 credentials 参数可以用来覆盖从连接字符串 URI 派生的调用凭证。

此方法返回一个表示命名投影的 ProjectionStatistics 对象。

statistics = client.get_projection_statistics(name=projection_name)

返回一个 ProjectionStatistics 对象。此对象属性的值代表投影的进度。

更新投影

需要领导者

可以使用 update_projection() 方法来更新投影。

此方法有两个必需参数,即namequery

必需的 name 参数是一个 Python str,它指定要更新的投影的名称。

必需的 query 参数是一个 Python str,它定义了投影将执行的操作。

此方法还有三个可选参数,emit_enabledtimeoutcredentials

可选的 emit_enabled 参数是一个 Python bool,它指定投影是否能够发出事件。如果指定了 True 值,则投影将能够发出事件。如果指定了 False 值,则投影将不能发出事件。默认值 emit_enabledFalse

请注意,如果您的投影查询中包含对 emit() 的调用,则 emit_enabled 必须为 True,否则投影将无法运行。

请注意,无法通过 gRPC API 更新 track_emitted_streams

可选的 timeout 参数是一个 Python float,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。

可选的 credentials 参数可以用来覆盖从连接字符串 URI 派生的调用凭证。

client.update_projection(name=projection_name, query=projection_query)

启用投影

需要领导者

可以使用 enable_projection() 方法启用(启动运行)先前已禁用(停止)的投影。

此方法有一个必需的 name 参数,它是一个指定要启用的投影名称的 Python str

此方法还有两个可选参数,分别是 timeoutcredentials

可选的 timeout 参数是一个 Python float,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。

可选的 credentials 参数可以用来覆盖从连接字符串 URI 派生的调用凭证。

client.enable_projection(name=projection_name)

禁用投影

需要领导者

可以使用 disable_projection() 方法禁用(停止运行)投影。

此方法有一个必需的 name 参数,它是一个指定要禁用的投影名称的 Python str

此方法还有两个可选参数,timeoutcredentials

可选的 timeout 参数是一个 Python float,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。

可选的 credentials 参数可以用来覆盖从连接字符串 URI 派生的调用凭证。

client.disable_projection(name=projection_name)

重置投影

需要领导者

可以使用 reset_projection() 方法重置投影。

此方法有一个必需的 name 参数,它是一个指定要重置的投影名称的 Python str

此方法还有两个可选参数,timeoutcredentials

可选的 timeout 参数是一个 Python float,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。

可选的 credentials 参数可以用来覆盖从连接字符串 URI 派生的调用凭证。

client.reset_projection(name=projection_name)

请注意,必须先禁用投影才能重置。

删除投影

需要领导者

可以使用 delete_projection() 方法删除投影。

此方法有一个必需的 name 参数,它是一个指定要删除的投影名称的 Python str

此方法还有五个可选参数,delete_emitted_streamsdelete_state_streamdelete_checkpoint_streamtimeoutcredentials

可选的 delete_emitted_streams 参数是一个 Python bool,它指定将删除所有已跟踪的“已发出”流。为了删除已发出的流,它们必须已被跟踪(参见 create_projection() 方法的 track_emitted_streams 参数)。

可选的 delete_state_stream 参数是一个 Python bool,它指定应该删除投影的“状态”流。与“结果”流类似,“状态”流偶尔将事件写入其中,同时将事件写入“检查点”流,而不是像调用 outputState() 一样立即将事件写入“结果”流。

可选的 delete_checkpoint_stream 参数是一个 Python bool,它指定应该删除投影的“检查点”流。

可选的 timeout 参数是一个 Python float,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。

可选的 credentials 参数可以用来覆盖从连接字符串 URI 派生的调用凭证。

client.delete_projection(name=projection_name)

请注意,必须先禁用投影才能删除。

重启投影子系统

需要领导者

可以使用 restart_projections_subsystem() 方法重启投影子系统。

此方法还有两个可选参数,分别是 timeoutcredentials

可选的 timeout 参数是一个 Python float,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。

可选的 credentials 参数可以用来覆盖从连接字符串 URI 派生的调用凭证。

client.restart_projections_subsystem()

调用凭据

默认调用凭据由客户端从连接字符串 URI 的用户信息部分推导而来。

上述描述的许多客户端方法都有一个可选的credentials参数,可以用来为单个方法调用设置调用凭证,以覆盖从连接字符串URI派生的凭证。

调用凭证通过“基本认证”授权头发送到“安全”服务器。此授权头由服务器用于验证客户端。授权头不会发送到“不安全”服务器。

构建调用凭证

客户端方法construct_call_credentials()可以用来从一个用户名和密码中构建调用凭证对象。

call_credentials = client.construct_call_credentials(
    username='admin', password='changeit'
)

调用凭证对象可以用作其他客户端方法中credentials参数的值。

连接

重连

reconnect()方法可以用来手动将客户端重连到合适的EventStoreDB节点。此方法使用与读取集群节点状态相同的程序,然后根据在客户端构建时在连接字符串URI中指定的客户端节点偏好连接到合适的节点。此方法是线程安全的,即在多个线程同时调用时,只会发生一次重连。并发尝试重连将阻塞,直到客户端成功重连,然后它们都将正常返回。

client.reconnect()

手动重连可能需要的情况之一是,出于性能原因,客户端的节点偏好是连接到集群中的跟随者节点,并且在集群领导者选举之后,跟随者成为领导者。在这种情况下,重新连接到跟随者节点目前超出了此客户端的功能,但这种行为可能在未来的版本中得到实现。

由于@autoreconnect装饰器,在许多情况下,重连将自动发生。

关闭

close()方法可以用来干净地关闭客户端的gRPC连接。

client.close()

Asyncio客户端

esdbclient包还提供了一个用于EventStoreDB的异步I/O gRPC Python客户端。它在功能上与多线程客户端等效。它使用grpc.aio包和asyncio模块,而不是grpcthreading

它支持“esdb”和“esdb+discover”连接字符串URI方案,并且可以连接到“安全”和“不安全”的EventStoreDB服务器。

可以使用AsyncEventStoreDBClient类来构建异步I/O gRPC Python客户端的实例。它可以从esdbclient导入。构建客户端后,应该调用async方法connect()

异步客户端具有与多线程EventStoreDBClient完全相同的方法。这些方法被定义为async def方法,因此对这些方法的调用将返回Python“awaitables”,必须等待才能获得方法的返回值。方法具有相同的行为、相同的参数和相同的或等效的返回值。这些方法还用重连和重试装饰器进行了装饰,在遇到连接问题或服务器错误时,会选择性地重连和重试。

当等待时,方法read_all()read_stream()返回一个AsyncReadResponse对象。方法subscribe_to_all()subscribe_to_stream()返回一个AsyncCatchupSubscription对象。方法read_subscription_to_all()read_subscription_to_stream()返回一个AsyncPersistentSubscription对象。这些对象是asyncio迭代器,您可以使用Python的async for语法遍历它们以获取RecordedEvent对象。它们也是asyncio上下文管理器,支持async with语法。它们还有一个stop()方法,可以用来以积极取消流式gRPC调用到服务器的方式终止迭代器。当用作上下文管理器时,将在上下文管理器退出时调用stop()方法。

方法 read_subscription_to_all()read_subscription_to_stream() 返回 AsyncPersistentSubscription 类的实例,该类具有异步方法 ack()nack(),这些方法与 PersistentSubscription 上的方法工作方式相同,支持对从持久订阅接收到的记录事件的确认和否定确认。详情请参见上方。

概要

以下示例演示了异步 append_to_stream()get_stream()subscribe_to_all() 方法。这些是编写事件源应用程序最有用的方法,允许记录新的聚合事件,获取聚合的记录事件以便重构聚合,以及以“恰好一次”语义传播和处理应用程序的状态。

import asyncio

from esdbclient import AsyncEventStoreDBClient


async def demonstrate_async_client():

    # Construct client.
    client = AsyncEventStoreDBClient(
        uri=os.getenv("ESDB_URI"),
        root_certificates=os.getenv("ESDB_ROOT_CERTIFICATES"),
    )

    # Connect to EventStoreDB.
    await client.connect()

    # Append events.
    stream_name = str(uuid.uuid4())
    event1 = NewEvent("OrderCreated", data=b'{}')
    event2 = NewEvent("OrderUpdated", data=b'{}')
    event3 = NewEvent("OrderDeleted", data=b'{}')

    commit_position = await client.append_to_stream(
        stream_name=stream_name,
        current_version=StreamState.NO_STREAM,
        events=[event1, event2, event3]
    )

    # Get stream events.
    recorded = await client.get_stream(stream_name)
    assert len(recorded) == 3
    assert recorded[0] == event1
    assert recorded[1] == event2
    assert recorded[2] == event3


    # Subscribe to all events.
    received = []
    async with await client.subscribe_to_all(commit_position=0) as subscription:
        async for event in subscription:
            received.append(event)
            if event.commit_position == commit_position:
                break
    assert received[-3] == event1
    assert received[-2] == event2
    assert received[-1] == event3


    # Close the client.
    await client.close()


# Run the demo.
asyncio.run(
    demonstrate_async_client()
)

FastAPI 示例

以下示例展示了如何使用 AsyncEventStoreDBClientFastAPI

from contextlib import asynccontextmanager

from fastapi import FastAPI

from esdbclient import AsyncEventStoreDBClient


client: AsyncEventStoreDBClient


@asynccontextmanager
async def lifespan(_: FastAPI):
    # Construct the client.
    global client
    client = AsyncEventStoreDBClient(
        uri="esdb+discover://localhost:2113?Tls=false",
    )
    await client.connect()

    yield

    # Close the client.
    await client.close()


app = FastAPI(lifespan=lifespan)


@app.get("/commit_position")
async def commit_position():
    commit_position = await client.get_commit_position()
    return {"commit_position": commit_position}

如果将此代码放入名为 fastapi_example.py 的文件中,然后运行命令 uvicorn fastapi_example:app --host 0.0.0.0 --port 80,那么当浏览器指向 http://localhost/commit_position 时,FastAPI 应用程序将返回类似 {"commit_position":628917} 的内容。使用 Ctrl-c 退出进程。

说明

正则表达式过滤器

read_all()subscribe_to_all()create_subscription_to_all()get_commit_position() 方法有 filter_excludefilter_include 参数。本节提供了一些关于这些参数值的更多详细信息。

首先要注意的是,这些参数的值应该是正则表达式的序列。

请注意,客户端将它们作为括号中的备选方案连接在一起,形成一个更大的正则表达式,该正则表达式锚定在要匹配的字符串的开始和结束处。因此,不需要包含 '^''$' 锚定断言。

如果您想匹配子字符串,请使用通配符,例如 '.*Snapshot' 以匹配所有以 'Snapshot' 结尾的字符串,或 'Order.*' 以匹配所有以 'Order' 开头的字符串。

EventStoreDB 生成的系统事件的 type 字符串以 $ 符号开头。在操作持久订阅时生成的持久订阅事件具有以 PersistentConfig 开头的 type 字符串。

例如,要匹配 EventStoreDB 系统事件的类型,请使用正则表达式字符串 r'\$.+'。请注意,常量 ESDB_SYSTEM_EVENTS_REGEX 设置为此值。您可以从 esdbclient 导入此常量,并在构建更长的正则表达式序列时使用它。

类似地,要匹配 EventStoreDB 持久订阅事件的类型,请使用正则表达式 r'PersistentConfig\d+'。常量 ESDB_PERSISTENT_CONFIG_EVENTS_REGEX 设置为此值。您可以从 esdbclient 导入此常量,并在构建更长的正则表达式序列时使用它。

常量 DEFAULT_EXCLUDE_FILTER 是一个正则表达式序列,其中包含 ESDB_SYSTEM_EVENTS_REGEXESDB_PERSISTENT_CONFIG_EVENTS_REGEX。它用作 filter_exclude 的默认值,以便默认排除 EventStoreDB 内部生成的事件。

在所有具有 filter_exclude 参数的方法中,参数的默认值是常量 DEFAULT_EXCLUDE_FILTER,该常量设计用于匹配(因此排除)"系统"和"持久订阅配置"事件类型,否则将包含在内。

此值可以扩展。例如,如果您想排除系统事件、持久订阅事件以及类型以 'Snapshot' 结尾的事件,则可以使用 DEFAULT_EXCLUDE_FILTER + ['.*Snapshot'] 作为 filter_exclude 参数。

filter_includefilter_exclude 参数设计成相互对立的效果,因此提供给 filter_include 的字符串序列将返回那些如果使用相同的参数值与 filter_exclude 配合将不被包括的事件。反之亦然,因此提供给 filter_exclude 的字符串序列将返回那些如果使用相同的参数值与 filter_include 配合将不会被包括的事件。

重连和重试方法装饰器

请注意,几乎所有客户端方法都装饰了 @autoreconnect@retrygrpc 装饰器。

@autoreconnect 装饰器会在连接的服务不可用或客户端的 gRPC 通道意外关闭时,连接到集群中的合适节点。当调用需要领导者的方法,并且客户端的节点首选连接到领导者,但客户端连接到的节点不再是领导者时,客户端也会重连。在这种情况下,客户端将重连到当前领导者。重连后,将重试失败的操作。

@retrygrpc 装饰器会选择性重试由于超时、网络错误或服务器错误而失败的 gRPC 操作。它不会重试由于不良请求而肯定失败的操作。

请注意,未由重连和重试装饰器行为覆盖的方面与返回迭代器的相关方法有关。例如,考虑从 read_all() 方法返回的“读取响应”迭代器。在遍历“读取响应”之前,read_all() 方法将已返回,因此方法装饰器已退出。因此,如果在遍历“读取响应”时发生连接问题,则无法触发 read_all() 方法上的任何装饰器的重连。

对于“追捕订阅”对象,服务器会发送一个初始“确认”响应,由客户端接收并检查。因此,当调用 subscribe_to_all()subscribe_to_stream() 时,如果服务器不可用,或者通道以某种方式已关闭,或者请求因其他原因失败,则客户端将重连并重试。然而,如果在成功返回的“追捕订阅”对象迭代期间抛出异常,则需要重新启动追捕订阅。类似地,当读取持久性订阅时,如果在成功接收到的响应遍历期间出现连接问题,则消费者需要重新启动。

仪表化

仪表化是指修改软件以便对其进行分析的行为。仪表化帮助企业揭示用户在软件或平台中频繁遇到错误或减速的区域或功能。

仪表化帮助您了解软件系统的内部状态。仪表化应用程序通过收集指标、事件、日志和跟踪等数据来测量在响应活动请求时正在进行的代码。

仪表化提供了对应用程序的即时可见性,通常使用图表和图形来展示“内部发生的事情”。

此包支持使用 OpenTelemetry 仪表化 EventStoreDB 客户端。

OpenTelemetry

OpenTelemetry 项目提供了一套 API、SDK 和工具,用于仪表化、生成、收集和导出遥测数据,可以帮助您分析软件的性能和行为。它是供应商中立的,100% 免费和开源,并得到了可观测性领域行业领袖的采用和支持。

本包为EventStoreDBClientAsyncEventStoreDBClient客户端提供了OpenTelemetry仪表化工具。这些仪表化工具依赖于各种OpenTelemetry Python包,您需要安装它们,最好使用本项目的"opentelemetry"包扩展来确保版本兼容性。

例如,您可以使用pip安装"opentelemetry"包扩展。

$ pip install esdbclient[opentelemetry]

或者您可以使用Poetry将其添加到您的pyproject.toml文件中并安装它。

$ poetry add esdbclient[opentelemetry]

然后您可以使用OpenTelemetry仪表化工具EventStoreDBClientInstrumentorEventStoreDBClient进行仪表化。

from esdbclient.instrumentation.opentelemetry import EventStoreDBClientInstrumentor

# Activate instrumentation.
EventStoreDBClientInstrumentor().instrument()

# Deactivate instrumentation.
EventStoreDBClientInstrumentor().uninstrument()

您还可以使用OpenTelemetry仪表化工具AsyncEventStoreDBClientInstrumentorAsyncEventStoreDBClient进行仪表化。

from esdbclient.instrumentation.opentelemetry import AsyncEventStoreDBClientInstrumentor

# Activate instrumentation.
AsyncEventStoreDBClientInstrumentor().instrument()

# Deactivate instrumentation.
AsyncEventStoreDBClientInstrumentor().uninstrument()

仪表化工具使用全局的OpenTelemetry "tracer provider",您需要初始化它以导出遥测数据。

例如,要将数据导出到控制台,您需要安装Python包opentelemetry-sdk,并使用以下方式使用类TracerProviderBatchSpanProcessorConsoleSpanExporter

from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.trace import set_tracer_provider

resource = Resource.create(
    attributes={
        SERVICE_NAME: "eventstoredb",
    }
)
provider = TracerProvider(resource=resource)
provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))
set_tracer_provider(provider)

或者要导出到OpenTelemetry兼容的数据收集器,例如Jaeger,您需要安装Python包opentelemetry-exporter-otlp-proto-http,然后使用来自opentelemetry.exporter.otlp.proto.http.trace_exporter模块的OTLPSpanExporter类,并使用适当的endpoint参数为您的收集器。

from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import set_tracer_provider

resource = Resource.create(
    attributes={
        SERVICE_NAME: "eventstoredb",
    }
)
provider = TracerProvider(resource=resource)
provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4318/v1/traces")))
set_tracer_provider(provider)

您可以通过运行以下命令在本地启动Jaeger。

$ docker run -d -p 4318:4318 -p 16686:16686 --name jaeger jaegertracing/all-in-one:latest

然后您可以导航到http://localhost:16686来访问Jaeger UI。并且遥测数据可以通过OpenTelemetry tracer provider发送到http://localhost:4318/v1/traces

此时,仪表化的方法是append_to_stream()subscribe_to_stream()subscribe_to_all()read_subscription_to_stream()read_subscription_to_all()

append_to_stream()方法通过使用"producer" span kind来封装方法调用进行仪表化。它还将span上下文信息添加到新事件元数据中,以便消费者可以关联"producer" span和"consumer" span。

订阅方法通过仪表化响应迭代器进行仪表化,为每个接收到的记录事件创建一个"consumer" span。它从记录的事件元数据中提取span上下文信息,并通过将"consumer" span作为"producer" span的子span关联到"producer" span来将"consumer" span与"producer" span关联起来。

社区

贡献者

安装Poetry

首先要检查您是否已安装Poetry。

$ poetry --version

如果没有,请安装Poetry

$ curl -sSL https://install.python-poetry.org | python3 -

确保Poetry的bin目录在您的PATH环境变量中。

但无论如何,请确保您知道poetry可执行文件的路径。Poetry安装程序会告诉您其安装位置以及如何配置您的shell。

请参阅Poetry文档以获取有关使用Poetry的指导。

为PyCharm用户设置

您可以使用PyCharm轻松获取项目文件(菜单“Git > Clone...”)。然后PyCharm通常会提示您打开项目。

在新窗口中打开项目。PyCharm通常会提示您创建新的虚拟环境。

为项目创建一个新的Poetry虚拟环境。如果PyCharm不知道您的poetry可执行文件的位置,请将路径设置在“新建Poetry环境”表单中的“Poetry可执行文件”标签的输入字段中。在“新建Poetry环境”表单中,您还将有机会选择虚拟环境将使用的Python可执行文件。

PyCharm将为您的项目创建一个新的Poetry虚拟环境,使用特定的Python版本,并按照项目的poetry.lock文件安装项目依赖包。

您可以添加不同版本的Python的Poetry环境,并使用PyCharm的“Python解释器”设置在它们之间切换。如果您想使用未安装的Python版本,可以使用您喜欢的包管理器,或者直接从Python网站下载Python的最新版本安装程序。

项目依赖包安装完成后,您应该能够在PyCharm中运行测试(在tests文件夹上右键单击,选择“运行”选项)。

由于pytest与PyCharm的调试器和覆盖率工具之间存在冲突,您可能需要在测试运行器模板中添加--no-cov选项。或者,只需使用Python标准库的unittest模块。

您还应该在PyCharm中打开一个终端窗口,并从命令行运行项目的Makefile命令(见下文)。

命令行设置

使用Git或合适的替代工具获取项目文件。

在终端应用程序中,将当前工作目录更改为项目文件的根目录。此文件夹中应有Makefile。

使用Makefile创建项目的新Poetry虚拟环境,并使用以下命令将项目依赖包安装到其中。

$ make install-packages

还可以将项目以“可编辑模式”安装。

$ make install

请注意,以这种方式创建虚拟环境后,如果尝试在PyCharm中打开项目并将其配置为使用此虚拟环境作为“现有的Poetry环境”,PyCharm有时会出现一些问题(不清楚原因),可能存在问题。如果遇到此类问题,可以通过删除虚拟环境并使用PyCharm创建Poetry虚拟环境来解决问题(见上文)。

项目Makefile命令

可以使用以下命令启动EventStoreDB。

$ make start-eventstoredb

可以使用以下命令运行测试(需要EventStoreDB正在运行)。

$ make test

可以使用以下命令停止EventStoreDB。

$ make stop-eventstoredb

可以使用以下命令检查代码格式。

$ make lint

可以使用以下命令重新格式化代码。

$ make fmt

测试应位于./tests中。待测试的代码应在./esdbclient中。

pyproject.toml中编辑包依赖项。使用以下命令更新已安装的包(以及poetry.lock文件)。

$ make update-packages

项目详情


发布历史 发布通知 | RSS源

下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分发

esdbclient-1.1.1.tar.gz (162.6 kB 查看哈希值)

上传时间

构建分发

esdbclient-1.1.1-py3-none-any.whl (144.8 kB 查看哈希值)

上传时间 Python 3

由以下支持