Python gRPC Client for EventStoreDB
项目描述
Python gRPC Client for EventStoreDB
这个 Python包 为 EventStoreDB 数据库提供了多线程和asyncio Python客户端。
以下详细描述了多线程的 EventStoreDBClient
。请向下滚动以获取有关 AsyncEventStoreDBClient
的信息。
这些客户端是由EventStoreDB团队合作开发和维护的,并得到Event Store Ltd的官方支持。尽管EventStoreDB gRPC API的并非所有方面都得到了实现,但许多最有用的功能都以易于使用的界面呈现。
这些客户端已测试与EventStoreDB LTS版本22.10和23.10以及候选版本24.2和24.6兼容,包括和不包括SSL/TLS,包括单服务器和集群模式,以及Python版本3.8、3.9、3.10、3.11和3.12。
测试套件具有100%的行和分支覆盖率。代码使用mypy严格检查类型注解。代码使用black和isort进行格式化,并使用flake8进行检查。在开发过程中使用Poetry进行包管理,用于构建和发布到 PyPI。
有关用法示例,请参阅 eventsourcing-eventstoredb 包。
摘要
可以从esdbclient
包导入EventStoreDBClient
类。
EventStoreDBClient
类最常用的三个方法是
-
append_to_stream()
此方法可以用于在特定的“流”中记录新事件。这在执行修改聚合的应用程序中的命令时很有用。这个方法在记录事件时是“原子”的,即要么全部记录,要么不记录。 -
get_stream()
此方法可以用于检索“流”中记录的所有事件。这在从记录的事件中重建聚合,在执行创建新事件的应用程序中的命令之前很有用。 -
subscribe_to_all()
此方法可以用于接收数据库中记录的所有事件。这在事件处理组件中很有用,因为它支持以“恰好一次”语义处理事件。
以下示例使用在本地端口2113运行的“不安全”EventStoreDB服务器。
import uuid
from esdbclient import EventStoreDBClient, NewEvent, StreamState
# Construct EventStoreDBClient with an EventStoreDB URI. The
# connection string URI specifies that the client should
# connect to an "insecure" server running on port 2113.
client = EventStoreDBClient(
uri="esdb://localhost:2113?Tls=false"
)
# Generate new events. Typically, domain events of different
# types are generated in a domain model, and then serialized
# into NewEvent objects. An aggregate ID may be used as the
# name of a stream in EventStoreDB.
stream_name1 = str(uuid.uuid4())
event1 = NewEvent(
type='OrderCreated',
data=b'{"order_number": "123456"}'
)
event2 = NewEvent(
type='OrderSubmitted',
data=b'{}'
)
event3 = NewEvent(
type='OrderCancelled',
data=b'{}'
)
# Append new events to a new stream. The value returned
# from the append_to_stream() method is the overall
# "commit position" in the database of the last new event
# recorded by this operation. The returned "commit position"
# may be used in a user interface to poll an eventually
# consistent event-processing component until it can
# present an up-to-date materialized view. New events are
# each allocated a "stream position", which is the next
# available position in the stream, starting from 0.
commit_position1 = client.append_to_stream(
stream_name=stream_name1,
current_version=StreamState.NO_STREAM,
events=[event1, event2],
)
# Append events to an existing stream. The "current version"
# is the "stream position" of the last recorded event in a
# stream. We have recorded two new events, so the "current
# version" is 1. The exception 'WrongCurrentVersion' will be
# raised if an incorrect value is given.
commit_position2 = client.append_to_stream(
stream_name=stream_name1,
current_version=1,
events=[event3],
)
# - allocated commit positions increase monotonically
assert commit_position2 > commit_position1
# Get events recorded in a stream. This method returns
# a sequence of recorded event objects. The recorded
# event objects may be deserialized to domain event
# objects of different types and used to reconstruct
# an aggregate in a domain model.
recorded_events = client.get_stream(
stream_name=stream_name1
)
# - stream 'stream_name1' now has three events
assert len(recorded_events) == 3
# - allocated stream positions are zero-based and gapless
assert recorded_events[0].stream_position == 0
assert recorded_events[1].stream_position == 1
assert recorded_events[2].stream_position == 2
# - event attribute values are recorded faithfully
assert recorded_events[0].type == "OrderCreated"
assert recorded_events[0].data == b'{"order_number": "123456"}'
assert recorded_events[0].id == event1.id
assert recorded_events[1].type == "OrderSubmitted"
assert recorded_events[1].data == b'{}'
assert recorded_events[1].id == event2.id
assert recorded_events[2].type == "OrderCancelled"
assert recorded_events[2].data == b'{}'
assert recorded_events[2].id == event3.id
# Start a catch-up subscription from last recorded position.
# This method returns a "catch-up subscription" object,
# which can be iterated over to obtain recorded events.
# The iterator will not stop when there are no more recorded
# events to be returned, but instead will block, and then continue
# when further events are recorded. It can be used as a context
# manager so that the underlying streaming gRPC call to the database
# can be cancelled cleanly in case of any error.
received_events = []
with client.subscribe_to_all(commit_position=0) as subscription:
# Iterate over the catch-up subscription. Process each recorded
# event in turn. Within an atomic database transaction, record
# the event's "commit position" along with any new state generated
# by processing the event. Use the component's last recorded commit
# position when restarting the catch-up subscription.
for event in subscription:
received_events.append(event)
if event.commit_position == commit_position2:
# Break so we can continue with the example.
break
# - events are received in the order they were recorded
assert received_events[-3].type == "OrderCreated"
assert received_events[-3].data == b'{"order_number": "123456"}'
assert received_events[-3].id == event1.id
assert received_events[-2].type == "OrderSubmitted"
assert received_events[-2].data == b'{}'
assert received_events[-2].id == event2.id
assert received_events[-1].type == "OrderCancelled"
assert received_events[-1].data == b'{}'
assert received_events[-1].id == event3.id
# Close the client's gRPC connection.
client.close()
安装包
建议将Python包安装到Python虚拟环境中。
从PyPI
您可以使用pip直接从Python包索引安装此包。
$ pip install esdbclient
使用Poetry
您可以使用Poetry将其添加到pyproject.toml文件并安装。
$ poetry add esdbclient
EventStoreDB服务器
EventStoreDB服务器可以使用官方Docker容器镜像在本地运行。
运行容器
对于开发,您可以使用以下命令运行一个“安全”的EventStoreDB服务器。
$ docker run -d --name eventstoredb-secure -it -p 2113:2113 --env "HOME=/tmp" docker.eventstore.com/eventstore-ce/eventstoredb-ce:23.10.0-jammy --dev
正如我们将看到的,您的客户端需要将EventStoreDB连接字符串URI作为其uri
构造函数参数的值。此“安全”EventStoreDB服务器的连接字符串将是
esdb://admin:changeit@localhost:2113
要连接到“安全”服务器,通常需要在连接字符串中包含“用户名”和“密码”,以便服务器可以验证客户端。对于EventStoreDB,默认用户名是“admin”,默认密码是“changeit”。
当连接到“安全”服务器时,您可能还需要提供SSL/TLS证书作为root_certificates
构造函数参数的值。如果服务器证书是公开签名的,则证书授权机构的根证书可能已本地安装,并由grpc包从默认位置获取。客户端使用根SSL/TLS证书对服务器进行认证。对于开发,您可以使用用于创建服务器证书的自签证书授权机构的SSL/TLS证书。或者,当使用单个节点集群时,您可以直接使用服务器证书本身,以下Python代码可以获取服务器证书。
import ssl
server_certificate = ssl.get_server_certificate(addr=('localhost', 2113))
或者,您可以使用以下命令启动一个“不安全”的服务器。
$ docker run -d --name eventstoredb-insecure -it -p 2113:2113 docker.eventstore.com/eventstore-ce/eventstoredb-ce:23.10.0-jammy --insecure
此“不安全”服务器的连接字符串URI为
esdb://localhost:2113?Tls=false
正如我们将看到的,当连接到“不安全”的服务器时,在连接字符串中不需要包含“用户名”和“密码”。如果您这样做,客户端将忽略这些值,以确保它们不会通过不安全的通道发送。
请注意,“不安全”的连接字符串使用包含字段值Tls=false
的查询字符串。此字段的默认值是true
。
停止容器
要停止并删除“安全”容器,请使用以下Docker命令。
$ docker stop eventstoredb-secure
$ docker rm eventstoredb-secure
要停止并删除“不安全”容器,请使用以下Docker命令。
$ docker stop eventstoredb-insecure
$ docker rm eventstoredb-insecure
EventStoreDB客户端
此EventStoreDB客户端在esdbclient
包中实现,具有EventStoreDBClient
类。
导入类
可以从esdbclient
包导入EventStoreDBClient
类。
from esdbclient import EventStoreDBClient
构建客户端
EventStoreDBClient
类有一个必需的构造函数参数uri
和三个可选的构造函数参数root_certificates
、private_key
和certificate_chain
。
期望uri
参数是一个符合标准EventStoreDB“esdb”或“esdb+discover”URI方案的EventStoreDB连接字符串URI。
客户端必须配置为创建到“安全”服务器的“安全”连接,或者作为替代,创建到“不安全”服务器的“不安全”连接。默认情况下,客户端将尝试创建“安全”连接。因此,当连接到“不安全”服务器时,连接字符串必须指定客户端应尝试通过使用URI查询字符串字段值Tls=false
来建立“不安全”连接。
可选的root_certificates
参数可以是包含PEM编码的SSL/TLS证书的Python str
或Python bytes
对象,并用于向客户端认证服务器。当连接到“不安全”服务时,将忽略此参数的值。当连接到“安全”服务器时,可能需要设置此参数。通常,此参数的值将是负责生成EventStoreDB服务器使用的证书的证书授权机构的公共证书。如果证书授权机构证书已本地安装,Python grpc库可以从默认位置获取它们,则在这种情况下不需要设置此值。或者,对于开发,您可以使用服务器的证书本身。此参数的值直接传递给grpc.ssl_channel_credentials()
。
可选的 private_key
和 certificate_chain
参数都可以是 Python 的 str
或 bytes
对象。这些参数可以用来验证客户端对服务器的身份。当连接到一个运行 EventStoreDB 商业版并启用了用户证书插件的“安全”服务器时,必须提供这些参数的正确值。private_key
的值应该是 X.509 用户证书的私钥,格式为 PEM。certificate_chain
的值应该是 X.509 用户证书本身,格式为 PEM。这些参数的值将直接传递给 grpc.ssl_channel_credentials()
。当连接到“非安全”服务时,这些参数的值将被忽略。请注意,向客户端提供用户证书和私钥的另一种方法是使用连接字符串 URI 查询字符串中的 UserCertFile
和 UserKeyFile
字段值(见下文)。如果指定了 UserCertFile
字段值,则忽略 certificate_chain
参数。如果指定了 UserKeyFile
字段值,则忽略 public_key
参数。
以下示例中,uri
和 root_certificates
构造函数参数值来自操作系统环境。
import os
client = EventStoreDBClient(
uri=os.getenv("ESDB_URI"),
root_certificates=os.getenv("ESDB_ROOT_CERTIFICATES"),
)
连接字符串
EventStoreDB 连接字符串是一个符合以下两种可能的方案的 URI:要么是 "esdb" 方案,要么是 "esdb+discover" 方案。
以下描述了 EventStoreDB URI 方案的语法和语义。语法使用 EBNF 定义。
两种方案
"esdb" URI 方案可以定义为以下方式。
esdb-uri = "esdb://" , [ user-info , "@" ] , grpc-target, { "," , grpc-target } , [ "?" , query-string ] ;
在 "esdb" URI 方案中,在可选的用户信息字符串之后,必须至少有一个 gRPC 目标。如果有多个 gRPC 目标,它们必须用逗号 "," 分隔。
每个 gRPC 目标应指定一个 EventStoreDB gRPC 服务器套接字,所有这些都位于同一个 EventStoreDB 集群中,通过指定用冒号 ":" 分隔的主机和端口号。主机可以是可解析为 IP 地址的域名,也可以是 IP 地址。
grpc-target = ( hostname | ip-address ) , ":" , port-number ;
如果只有一个 gRPC 目标,客户端将尝试连接到此服务器,并在记录和检索事件时使用此连接。
如果有两个或更多 gRPC 目标,客户端将依次尝试连接到每个 Gossip API,并从中获取有关集群的信息,直到获得集群信息。然后客户端根据连接字符串 URI 中指定的“节点偏好”选择集群成员。然后客户端将关闭其连接并连接到所选节点,而不使用 'round robin' 负载均衡策略。如果“节点偏好”是“leader”,并且连接到一个领导者后,如果领导者成为 follower,客户端将重新连接到新的领导者。
"esdb+discover" URI 方案可以定义为以下方式。
esdb-discover-uri = "esdb+discover://" , [ user-info, "@" ] , cluster-domainname, [ ":" , port-number ] , [ "?" , query-string ] ;
在 "esdb+discover" URI 方案中,在可选的用户信息字符串之后,应该有一个域名,该域名标识一个 EventStoreDB 服务器集群。集群中的各个节点应通过 DNS 'A' 记录声明。
客户端将使用gRPC库的“轮询”负载均衡策略,通过DNS "A"记录发现的地址调用Gossip API。从Gossip API获取EventStoreDB集群的信息。然后客户端根据“节点偏好”选项选择集群的一个成员。客户端将关闭其连接并连接到所选节点,而不使用“轮询”负载均衡策略。如果“节点偏好”为“领导者”,并在连接到领导者后,如果领导者变为跟随者,客户端将重新连接到新的领导者。
用户信息字符串
在“esdb”和“esdb+discover”两种方案中,URI可能包含用户信息字符串。如果它存在于URI中,用户信息字符串必须用“@”字符与URI的其余部分分开。用户信息字符串必须包含用户名和密码,用“:”字符分隔。
user-info = username , ":" , password ;
客户端在每次对“安全”服务器的gRPC调用中,通过“基本认证”授权头发送用户信息。服务器使用此授权头对客户端进行身份验证。Python gRPC库不允许将调用凭证传递给“不安全”服务器。
查询字符串
在“esdb”和“esdb+discover”两种方案中,可选的查询字符串必须是使用“&”字符分隔的一个或多个字段值参数。
query-string = field-value, { "&", field-value } ;
每个字段值参数必须是支持的字段之一,以及适当的值,用“=”字符分隔。
field-value = ( "Tls", "=" , "true" | "false" )
| ( "TlsVerifyCert", "=" , "true" | "false" )
| ( "ConnectionName", "=" , string )
| ( "NodePreference", "=" , "leader" | "follower" | "readonlyreplica" | "random" )
| ( "DefaultDeadline", "=" , integer )
| ( "GossipTimeout", "=" , integer )
| ( "MaxDiscoverAttempts", "=" , integer )
| ( "DiscoveryInterval", "=" , integer )
| ( "KeepAliveInterval", "=" , integer )
| ( "KeepAliveTimeout", "=" , integer ) ;
| ( "UserCertFile", "=" , string ) ;
| ( "UserKeyFile", "=" , string ) ;
下表描述了此客户端支持的查询字符串字段值。
字段 | 值 | 描述 |
---|---|---|
Tls | "true", "false"(默认:"true") | 如果为“true”,客户端将创建一个“安全”的gRPC通道。如果为“false”,客户端将创建一个“不安全”的gRPC通道。这必须与服务器配置相匹配。 |
TlsVerifyCert | "true", "false"(默认:"true") | 此值目前被忽略。 |
ConnectionName | 字符串(默认:自动生成的版本4 UUID) | 在每次调用中发送到调用元数据,以标识客户端到集群。 |
NodePreference | "leader","follower","readonlyreplica","random"(默认:"leader") | 客户端偏好的节点状态。客户端将根据从Gossip API接收到的集群信息选择一个节点。 |
DefaultDeadline | 整数(默认:None) | 客户端“写入”方法(如append_to_stream())的timeout参数的默认值(以秒为单位)。 |
GossipTimeout | 整数(默认:5) | gossip读取方法(如read_gossip())的timeout参数的默认值(以秒为单位)。 |
MaxDiscoverAttempts | 整数(默认:10) | 在连接或重新连接集群成员时读取gossip的尝试次数。 |
DiscoveryInterval | 整数(默认:100) | 在gossip重试之间等待的时间(以毫秒为单位)。 |
KeepAliveInterval | 整数(默认:None) | "grpc.keepalive_ms" gRPC通道选项的值(以毫秒为单位)。 |
KeepAliveTimeout | 整数(默认:None) | "grpc.keepalive_timeout_ms" gRPC通道选项的值(以毫秒为单位)。 |
UserCertFile | 字符串(默认:None) | 包含PEM格式的X.509用户证书的文件的绝对文件系统路径。 |
UserKeyFile | 字符串(默认:None) | 包含PEM格式的X.509用户证书私钥的文件的绝对文件系统路径。 |
请注意,客户端对字段和值的区分大小写不敏感。如果查询字符串中字段重复,则查询字符串将无错误地解析。然而,客户端使用的连接选项将使用第一个字段的值。查询字符串中具有相同字段名的所有其他字段值将被忽略。没有值的字段也将被忽略。
如果客户端的节点偏好是“从节点”且集群中没有从节点,则客户端将引发异常。同样,如果客户端的节点偏好是“只读副本”且集群中没有只读副本节点,则客户端也将引发异常。
gRPC通道选项“grpc.max_receive_message_length”自动配置为值17 * 1024 * 1024
。此值不能进行配置。
示例
以下是一些EventStoreDB连接字符串URI的示例。
以下URI将导致客户端向gRPC目标'localhost:2113'
建立“非安全”连接。因为客户端的节点偏好是“从节点”,在从节点上可调用的方法应该成功完成,需要领导者的方法将引发NodeIsNotLeader
异常。
esdb://127.0.0.1:2113?Tls=false&NodePreference=follower
以下URI将导致客户端向gRPC目标'localhost:2113'
建立“非安全”连接。因为客户端的节点偏好是“领导者”,如果此节点不是领导者,则所有方法将引发NodeIsNotLeader
异常。
esdb://127.0.0.1:2113?Tls=false&NodePreference=leader
以下URI将导致客户端以用户名'admin'
和密码'changeit'
作为默认调用凭据,向gRPC目标'localhost:2113'
建立“安全”连接。因为客户端的节点偏好是“领导者”,默认情况下,如果此节点不是领导者,则所有方法将引发NodeIsNotLeader
异常。
esdb://admin:changeit@localhost:2113
以下URI将导致客户端首先从'localhost:2111'
、'localhost:2112'
或'localhost:2113'
中获取集群信息,然后从集群信息中选择领导者节点并重新连接到领导者。如果“领导者”节点变为“从节点”且另一个节点变为“领导者”,则客户端将重新连接到新的领导者。
esdb://admin:changeit@localhost:2111,localhost:2112,localhost:2113?NodePreference=leader
以下URI将导致客户端首先从'localhost:2111'
、'localhost:2112'
或'localhost:2113'
中获取集群信息,然后从集群信息中选择一个从节点并重新连接到该从节点。请注意,如果“从节点”变为“领导者”,客户端将不会重新连接到从节点——这种行为可能在客户端和服务器未来的版本中得到实现。
esdb://admin:changeit@localhost:2111,localhost:2112,localhost:2113?NodePreference=follower
以下URI将导致客户端首先从DNS 'A'记录中的地址获取集群信息,然后连接到一个“领导者”节点。在调用EventStore API "写入"方法时,客户端将使用默认的超时时间5秒。
esdb+discover://admin:changeit@cluster1.example.com?DefaultDeadline=5
以下URI将导致客户端首先从DNS 'A'记录中的地址获取集群信息,然后连接到一个“领导者”节点。它将配置具有“保持活动间隔”和“保持活动超时”的gRPC连接。
esdb+discover://admin:changeit@cluster1.example.com?KeepAliveInterval=10000&KeepAliveTimeout=10000
事件对象
此包定义了一个NewEvent
类和一个RecordedEvent
类。在将事件写入数据库时应使用NewEvent
类。在从数据库读取事件时使用RecordedEvent
类。
新事件
在将事件写入 EventStoreDB 数据库时,应使用 NewEvent
类。在调用 append_to_stream()
之前,需要构建新的事件对象。
NewEvent
类是一个冻结的 Python 数据类。它有两个必需的构造函数参数(type
和 data
)和三个可选的构造函数参数(metadata
、content_type
和 id
)。
必需的 type
参数是一个 Python str
,用于描述正在记录的领域事件的类型。
必需的 data
参数是一个 Python bytes
对象,用于表示正在记录的领域事件的序列化数据。
可选的 metadata
参数是一个 Python bytes
对象,用于指示将被记录的事件的任何元数据。默认值是一个空的 bytes
对象。
可选的 content_type
参数是一个 Python str
,用于指示正在记录的数据的类型。默认值是 'application/json'
,表示 data
使用 JSON 进行序列化。此参数的另一个值是更通用的指示 'application/octet-stream'
。
可选的 id
参数是一个 Python UUID
对象,用于指定将被记录的事件的唯一 ID。如果没有提供值,将生成一个新的版本 4 UUID。
new_event1 = NewEvent(
type='OrderCreated',
data=b'{"name": "Greg"}',
)
assert new_event1.type == 'OrderCreated'
assert new_event1.data == b'{"name": "Greg"}'
assert new_event1.metadata == b''
assert new_event1.content_type == 'application/json'
assert isinstance(new_event1.id, uuid.UUID)
event_id = uuid.uuid4()
new_event2 = NewEvent(
type='ImageCreated',
data=b'01010101010101',
metadata=b'{"a": 1}',
content_type='application/octet-stream',
id=event_id,
)
assert new_event2.type == 'ImageCreated'
assert new_event2.data == b'01010101010101'
assert new_event2.metadata == b'{"a": 1}'
assert new_event2.content_type == 'application/octet-stream'
assert new_event2.id == event_id
记录的事件
RecordedEvent
类用于从 EventStoreDB 数据库中读取事件。客户端将从所有返回已记录事件的操作返回此类型的事件对象,例如 get_stream()
、subscribe_to_all()
和 read_subscription_to_all()
。不需要构建记录事件对象。
与 NewEvent
类似,RecordedEvent
类也是一个冻结的 Python 数据类。它具有 NewEvent
所有的属性(type
、data
、metadata
、content_type
、id
),这些属性来源于已记录的事件,还有一些额外的属性来源于事件的记录(stream_name
、stream_position
、commit_position
、recorded_at
)。它还有一个 link
属性,除非记录的事件是“链接事件”且已被“解析”为链接事件,否则该属性为 None
。它还有一个 retry_count
属性,当从持久订阅中接收记录事件时,其值为整数,否则 retry_count
的值为 None
。
type
属性是一个 Python str
,用于指示已记录事件的类型。
data
属性是一个 Python bytes
对象,用于指示已记录事件的的数据。
metadata
属性是一个 Python bytes
对象,用于指示已记录事件的元数据。
content_type
属性是一个 Python str
,用于指示为事件记录的数据的类型。通常为 'application/json'
,表示数据可以解析为 JSON。或者,它为 'application/octet-stream'
。
id
属性是一个 Python UUID
对象,用于指示已记录事件的唯一 ID。
stream_name
属性是一个 Python str
,用于指示记录事件时的流名称。
stream_position
属性是一个 Python int
,用于指示记录事件时的流中的位置。
在 EventStoreDB 中,“流位置”是一个表示记录事件在流中位置的整数。每个记录事件都在流中的一个位置上被记录。每个流位置只被一个记录事件占用。新事件在下一个未被占用的位置上被记录。所有序列的流位置都是基于零且无间隙的。
commit_position
属性是一个 Python int
,用于指示记录事件时的数据库位置。
在EventStoreDB中,"提交位置"是一个整数,表示记录在数据库中的事件的位置。每个记录的事件都会在数据库中记录一个位置。每个提交位置只被一个记录的事件占用。提交位置从零开始,随着新事件的记录而单调递增。但是,与流位置不同,连续提交位置的序列并不是无缝的。实际上,连续记录事件的提交位置之间通常存在较大的差异。
请注意,在EventStoreDB 21.10中,从read_stream()
获得的所有RecordedEvent
对象的commit_position
都是None
,而从read_all()
获得的则具有记录事件的实际提交位置。这一变化发生在版本22.10中,使得从get_stream()
和read_all()
获取的事件对象都具有实际的提交位置。仅为此原因,RecordedEvent
类的commit_position
属性被注解为类型Optional[int]
。
recorded_at
属性是一个Python datetime
,用于指示数据库记录事件的日期和时间。
link
属性是一个可选的RecordedEvent
,它携带有关“链接事件”的信息,该事件已被“解析”为链接事件。这允许在解析链接事件时访问链接事件属性,例如在确认或否定确认链接事件时访问正确的事件ID。当resolve_links
参数为True
且正在回放暂停事件时(从具有'park'
动作的持久订阅接收的事件将创建一个链接事件,当暂停事件回放时,它们将作为解析事件接收)。ack_id
属性有助于获取在确认或否定确认来自持久订阅的事件时使用的正确事件ID。
retry_count
是一个Python int
,用于指示持久订阅尝试将事件发送给消费者的次数。
from dataclasses import dataclass
from datetime import datetime
@dataclass(frozen=True)
class RecordedEvent:
"""
Encapsulates event data that has been recorded in EventStoreDB.
"""
type: str
data: bytes
metadata: bytes
content_type: str
id: UUID
stream_name: str
stream_position: int
commit_position: Optional[int]
recorded_at: Optional[datetime] = None
link: Optional["RecordedEvent"] = None
retry_count: Optional[int] = None
@property
def ack_id(self) -> UUID:
if self.link is not None:
return self.link.id
else:
return self.id
@property
def is_system_event(self) -> bool:
return self.type.startswith("$")
@property
def is_link_event(self) -> bool:
return self.type == "$>"
@property
def is_resolved_event(self) -> bool:
return self.link is not None
@property
def is_checkpoint(self) -> bool:
return False
属性ack_id
可用于在读取持久订阅时获取正确的用于ack()
或nack()
事件的事件ID。返回值是link
属性的id
属性的值,如果link
不是None
,否则是id
属性的值。
属性is_system_event
指示事件是否是“系统事件”。系统事件的type
值以'$'
开头。
属性is_link_event
指示事件是否是“链接事件”。链接事件的type
值为'$>'
。
属性is_resolve_event
指示事件是否已从“链接事件”解析出来。如果link
不是None
,则返回值是True
。
属性is_checkpoint
是False
。这可以用来识别在接收事件时include_checkpoints=True
返回的Checkpoint
实例。
流
在EventStoreDB中,“流”是一系列具有相同“流名称”的记录事件。数据库中通常会有许多流,每个流都有许多记录的事件。每个记录的事件在其流中有一个位置(即“流位置”),以及在数据库中的一个位置(即“提交位置”)。流位置从零开始,是无缝的。提交位置也是从零开始,但不是无缝的。
可以使用append_to_stream()
、get_stream()
和read_all()
方法在数据库中读取和记录。
追加事件
需要领导者
append_to_stream()
方法可以原子性地记录一系列新事件。如果操作成功,它将返回序列中最后一个已记录事件的提交位置。
此方法有三个必需的参数,即stream_name
、current_version
和events
。
必需的 stream_name
参数是一个 Python str
,它唯一标识了一个要附加一系列事件的流。
必需的 current_version
参数应为一个 Python int
,表示流中最后记录的事件的流位置,或者在流尚不存在或已被删除的情况下为 StreamState.NO_STREAM
。流位置是从零开始的且无缝的,因此如果流中有两个事件,则 current_version
应为 1。如果给出了不正确的值,此方法将引发 WrongCurrentVersion
异常。这种行为旨在在记录新事件时提供并发控制。任何流的正确 current_version
值可以通过调用 get_current_version()
来获取。然而,典型的做法是从记录的事件中重新构建一个聚合,这样聚合的版本就是最后记录事件的流位置,然后让聚合生成新的事件,然后将聚合的当前版本作为在附加新聚合事件时 current_version
参数的值。这确保了记录的聚合事件的连续性,因为如果记录新事件时遇到 WrongCurrentVersion
异常,可以重新尝试使用新生成的聚合执行生成新聚合事件的操作。可以通过将 current_version
参数的值设置为常量 StreamState.ANY
来完全禁用这种控制行为。更具有选择性的是,可以通过将 current_version
参数的值设置为常量 StreamState.EXISTS
来禁用现有流的此行为。
必需的 events
参数应为一个新事件对象的序列。应使用 NewEvent
类来构造新事件对象。append_to_stream()
操作是原子的,因此所有或没有任何新事件将被记录。使用 EventStoreDB 无法以原子方式在多个流中记录新事件。
此方法有一个可选的 timeout
参数,它是一个 Python float
,用于设置 gRPC 操作完成的最大持续时间(以秒为单位)。
此方法有一个可选的 credentials
参数,可以用来覆盖从连接字符串 URI 派生的调用凭据。
以下示例中,将一个新的事件 event1
附加到一个新的流中。该流尚不存在,因此 current_version
是 StreamState.NO_STREAM
。
# Construct a new event object.
event1 = NewEvent(type='OrderCreated', data=b'{}')
# Define a new stream name.
stream_name1 = str(uuid.uuid4())
# Append the new events to the new stream.
commit_position1 = client.append_to_stream(
stream_name=stream_name1,
current_version=StreamState.NO_STREAM,
events=[event1],
)
以下示例中,将两个后续事件附加到一个现有流中。该流有一个记录的事件,因此 current_version
是 0
。
event2 = NewEvent(type='OrderUpdated', data=b'{}')
event3 = NewEvent(type='OrderDeleted', data=b'{}')
commit_position2 = client.append_to_stream(
stream_name=stream_name1,
current_version=0,
events=[event2, event3],
)
返回的值 commit_position1
和 commit_position2
是记录序列中最后事件的数据库提交位置。也就是说,commit_position1
是 event1
的提交位置,而 commit_position2
是 event3
的提交位置。
以这种方式返回的提交位置可以被用户界面用于轮询下游组件,直到它处理完所有新记录的事件。例如,考虑一个用户界面命令,该命令导致记录新事件,以及一个从这些事件中最终一致地更新的下游组件的可视化视图。如果新事件尚未被处理,视图可能会过时。为了不显示过时的视图,用户界面可以轮询下游组件,直到它处理了新记录的事件,然后向用户显示最新的视图。
幂等附加操作
对于 NewEvent
对象的 id
值,append_to_stream()
方法是“幂等”的。也就是说,如果以具有与流中已记录的 id
值相等的 id
值的事件调用 append_to_stream()
,则该方法调用将成功返回,返回最后一个新事件的提交位置,而不会对数据库进行任何更改。
这是因为有时在调用 append_to_stream()
时,新事件可能会成功记录,但在方法调用成功返回给调用者之前,可能会发生某些不好的事情。在这种情况下,我们无法确定事件是否实际上已记录,因此我们可能希望重试。
如果事件实际上已成功记录,则重试的方法调用成功返回很方便,并且不会抛出异常(当 current_version
是 StreamState.NO_STREAM
或整数值时),也不会创建更多的事件记录(当 current_version
是 StreamState.ANY
或 StreamState.EXISTS
时),就像如果 append_to_stream()
方法不是幂等的情况一样。
如果方法调用最初失败并且新事件实际上未记录,则在重试方法调用时记录新事件并使方法调用成功返回是有意义的。如果未禁用并发控制,即如果 current version
是 StreamState.NO_STREAM
或整数值,并且在重试方法调用时引发 WrongCurrentVersion
异常,则可以假设最初的方法调用实际上没有成功记录事件,并且在此期间其他人已记录了后续事件。在这种情况下,生成新事件的程序命令可能需要再次执行。应用程序的用户可能需要有机会决定是否仍然希望执行其原始意图,通过显示带有最新记录状态的适当错误来实现。在已禁用并发控制的情况下,通过使用 current_version
的值为 StreamState.ANY
或 StreamState.EXISTS
,重试未成功返回的方法调用将更简单地尝试确保记录新事件,而不管它们的最终流位置如何。在任何情况下,当方法调用成功返回时,我们都可以确信事件已被记录。
下面的示例显示了再次以事件 event2
和 event3
调用 append_to_stream()
方法,并且 current_version=0
。我们可以看到,重复调用 append_to_stream()
会成功返回,而不会引发 WrongCurrentVersion
异常,就像如果 append_to_stream()
操作不是幂等的情况一样。
# Retry appending event3.
commit_position_retry = client.append_to_stream(
stream_name=stream_name1,
current_version=0,
events=[event2, event3],
)
我们可以看到返回的提交位置与上述相同。
assert commit_position_retry == commit_position2
下面的示例显示了再次以事件 event2
和 event3
调用 append_to_stream()
方法,并且 current_version=StreamState.ANY
。
# Retry appending event3.
commit_position_retry = client.append_to_stream(
stream_name=stream_name1,
current_version=0,
events=[event2, event3],
)
我们可以看到返回的提交位置与上述相同。
assert commit_position_retry == commit_position2
通过调用 get_stream()
,我们还可以看到流未改变。也就是说,流中仍然只有三个事件。
events = client.get_stream(
stream_name=stream_name1
)
assert len(events) == 3
这种幂等行为依赖于 NewEvent
类的 id
属性。默认情况下,当构造 NewEvent
实例时,将分配一个新且唯一的版本-4 UUID。要在构造 NewEvent
对象时设置 NewEvent
的 id
值,可以使用可选的 id
构造函数参数。
读取流事件
可以使用 read_stream()
方法获取已附加到流的的事件。此方法返回一个“读取响应”对象。
“读取响应”对象是 Python 迭代器。可以通过遍历“读取响应”对象来获取记录的事件。记录的事件在迭代过程中从服务器流式传输到客户端。迭代将在没有更多记录事件要返回时自动停止。也可以通过在“读取响应”对象上调用 stop()
方法来停止事件的流式传输和迭代器。
可以使用 get_stream()
方法获取已附加到流中的事件。此方法返回一个包含已记录事件对象的 Python tuple
。已记录的事件对象是 RecordedEvent
类的实例。它调用 read_stream()
并将 "read response" 迭代器传递给 Python tuple
,以确保在方法返回之前流操作完成。
read_stream()
和 get_stream()
方法有一个必需的参数,即 stream_name
。
必需的 stream_name
参数是一个 Python str
,用于唯一标识将从中返回记录事件的流。
read_stream()
和 get_stream()
方法还有六个可选参数,分别是 stream_position
、backwards
、resolve_links
、limit
、timeout
和 credentials
。
可选的 stream_position
参数是一个 Python int
,可以用来指示从哪个位置开始读取流中的数据。默认值是 None
。当从流的特定位置读取时,该位置的事件将被包括,无论是向前还是向后读取。
可选的 backwards
参数是一个 Python bool
。默认值是 False
,这意味着流将被正向读取,以便事件按记录顺序返回。如果 backwards
为 True
,则事件将按相反顺序返回。
如果 backwards
是 False
且 stream_position
是 None
,则流的事件将按记录顺序返回,从第一个记录的事件开始。如果 backwards
是 True
且 stream_position
是 None
,则流的事件将按相反顺序返回,从最后一个记录的事件开始。
可选的 resolve_links
参数是一个 Python bool
。默认值是 False
,这意味着任何事件链接将不会被解析,因此返回的事件可能代表事件链接。如果 resolve_links
是 True
,则任何事件链接将被解析,因此将返回链接事件而不是事件链接。
可选的 limit
参数是一个 Python int
,它限制了将返回的事件数量。默认值是 sys.maxint
。
可选的 timeout
参数是一个 Python float
,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。
可选的 credentials
参数可以用来覆盖从连接字符串 URI 派生的调用凭据。可以通过调用客户端方法的 construct_call_credentials()
来构建此参数的适当值。
下面的示例显示了默认行为,即从第一个记录的事件到最后的记录事件,正向返回流中所有记录的事件。
events = client.get_stream(
stream_name=stream_name1
)
assert len(events) == 3
assert events[0] == event1
assert events[1] == event2
assert events[2] == event3
下面的示例显示了如何使用 stream_position
参数从特定的流位置读取流到流的末尾。流位置是从零开始的,因此 stream_position=1
对应于在流中记录的第二个事件,在这种情况下是 event2
。
events = client.get_stream(
stream_name=stream_name1,
stream_position=1,
)
assert len(events) == 2
assert events[0] == event2
assert events[1] == event3
下面的示例显示了如何使用 backwards
参数反向读取流。
events = client.get_stream(
stream_name=stream_name1,
backwards=True,
)
assert len(events) == 3
assert events[0] == event3
assert events[1] == event2
assert events[2] == event1
下面的示例显示了如何使用 limit
参数读取有限数量的事件。
events = client.get_stream(
stream_name=stream_name1,
limit=2,
)
assert len(events) == 2
assert events[0] == event1
assert events[1] == event2
如果指定的流从未存在或已被删除,read_stream()
和 get_stream()
方法将引发一个 NotFound
异常。
from esdbclient.exceptions import NotFound
try:
client.get_stream('does-not-exist')
except NotFound:
pass # The stream does not exist.
else:
raise Exception("Shouldn't get here")
请注意,get_stream()
方法被 @autoreconnect
和 @retrygrpc
装饰器装饰,而 read_stream()
方法则没有。这意味着在调用 get_stream()
方法时,所有由于连接问题导致的错误都将被重试和重连装饰器捕获,但在调用 read_stream()
时则不会。read_stream()
方法没有这样的装饰器,因为流仅在迭代“读取响应”时开始,这意味着该方法在流开始之前就返回了,因此没有机会让任何装饰器捕获任何连接问题。
出于同样的原因,当流不存在时,read_stream()
不会抛出 NotFound
异常,直到开始迭代“读取响应”对象。
如果您正在读取一个非常大的流,那么您可能更喜欢调用 read_stream()
,并在从服务器流式传输事件的同时开始迭代记录的事件,而不是等待所有事件都积累在内存中。
获取当前版本
get_current_version()
方法是一个便捷方法,它实际上使用 backwards=True
和 limit=1
调用 get_stream()
。此方法返回流中最后一条记录事件的 stream_position
属性的值。如果流不存在,则返回值是 StreamState.NO_STREAM
。当向流添加事件时,返回值是 current_version
的正确值,当删除或弃用流时也是如此。
此方法有一个必需的参数,stream_name
。
必需的 stream_name
参数是 Python str
,它唯一标识一个流,将从中返回流位置。
此方法有一个可选的 timeout
参数,它是一个 Python float
,用于设置 gRPC 操作完成的最大持续时间(以秒为单位)。
此方法有一个可选的 credentials
参数,可以用来覆盖从连接字符串 URI 派生的调用凭据。
以下示例中,获取了 stream_name1
的最后流位置。由于已向 stream_name1
添加了三个事件,并且因为流中的位置是以零为基础且无间隙的,所以当前版本是 2
。
current_version = client.get_current_version(
stream_name=stream_name1
)
assert current_version == 2
如果一个流从未存在或已被删除,则返回值是 StreamState.NO_STREAM
,这是向新流添加第一个事件时以及向已删除的流添加事件时 current_version
参数的正确值。
current_version = client.get_current_version(
stream_name='does-not-exist'
)
assert current_version is StreamState.NO_STREAM
如何使用 EventStoreDB 实现快照
快照可以提高那些本应从非常长的流中重建的聚合的性能。然而,通常建议设计具有有限生命周期的聚合,从而具有相对较短的流,避免需要快照。本“如何操作”部分旨在仅展示如何使用此 Python 客户端通过 EventStoreDB 实现聚合的快照。
事件源聚合通常通过为每个记录的事件调用一个突变函数来从记录的事件重建,从初始状态 None
发展到聚合的当前状态。函数 get_aggregate()
展示了如何实现这一点。聚合 ID 用作流名称。如果找不到聚合流,则引发 AggregateNotFound
异常。
class AggregateNotFound(Exception):
"""Raised when an aggregate is not found."""
def get_aggregate(aggregate_id, mutator_func):
stream_name = aggregate_id
# Get recorded events.
try:
events = client.get_stream(
stream_name=stream_name,
stream_position=None
)
except NotFound as e:
raise AggregateNotFound(aggregate_id) from e
else:
# Reconstruct aggregate from recorded events.
aggregate = None
for event in events:
aggregate = mutator_func(aggregate, event)
return aggregate
可以通过将聚合的当前状态作为新事件记录来实现聚合的快照。
如果一个聚合对象具有与用于重建聚合的最后事件的流位置相对应的版本号,并且此版本号记录在快照元数据中,那么可以使用此版本号选择快照之后记录的事件。然后可以从最后一个快照和任何后续事件中重建聚合,而不必重新播放整个历史。
我们将使用一个单独的流来存储聚合的快照,该流以记录其事件的流名称命名。快照流名称将由在聚合的流名称前加上 'snapshot-$'
来构造。
SNAPSHOT_STREAM_NAME_PREFIX = 'snapshot-$'
def make_snapshot_stream_name(stream_name):
return f'{SNAPSHOT_STREAM_NAME_PREFIX}{stream_name}'
def remove_snapshot_stream_prefix(snapshot_stream_name):
assert snapshot_stream_name.startswith(SNAPSHOT_STREAM_NAME_PREFIX)
return snapshot_stream_name[len(SNAPSHOT_STREAM_NAME_PREFIX):]
现在,让我们重新定义 get_aggregate()
函数,以便它查找快照事件,然后选择后续的聚合事件,然后为每个记录的事件调用突变函数。
请注意,聚合事件是从序列化聚合事件的流中读取的,而快照是从为序列化聚合快照的单独流中读取的。我们将使用JSON来序列化和反序列化事件数据。
import json
def get_aggregate(aggregate_id, mutator_func):
stream_name = aggregate_id
recorded_events = []
# Look for a snapshot.
try:
snapshots = client.get_stream(
stream_name=make_snapshot_stream_name(stream_name),
backwards=True,
limit=1
)
except NotFound:
stream_position = None
else:
assert len(snapshots) == 1
snapshot = snapshots[0]
stream_position = deserialize(snapshot.metadata)['version'] + 1
recorded_events.append(snapshot)
# Get subsequent events.
try:
events = client.get_stream(
stream_name=stream_name,
stream_position=stream_position
)
except NotFound as e:
raise AggregateNotFound(aggregate_id) from e
else:
recorded_events += events
# Reconstruct aggregate from recorded events.
aggregate = None
for event in recorded_events:
aggregate = mutator_func(aggregate, event)
return aggregate
def serialize(d):
return json.dumps(d).encode('utf8')
def deserialize(s):
return json.loads(s.decode('utf8'))
为了展示如何使用get_aggregate()
,让我们定义一个具有name
和tricks
属性的Dog
聚合类。属性id
和version
将指示聚合对象的ID和版本号。属性is_from_snapshot
在此处仅用于演示当聚合对象使用快照重构时。
from dataclasses import dataclass
@dataclass(frozen=True)
class Aggregate:
id: str
version: int
is_from_snapshot: bool
@dataclass(frozen=True)
class Dog(Aggregate):
name: str
tricks: list
让我们还定义一个突变函数mutate_dog()
,该函数根据各种不同类型的事件('DogRegistered'
、'DogLearnedTrick'
和'Snapshot'
)来演化Dog
聚合的状态。
def mutate_dog(dog, event):
data = deserialize(event.data)
if event.type == 'DogRegistered':
return Dog(
id=event.stream_name,
version=event.stream_position,
is_from_snapshot=False,
name=data['name'],
tricks=[],
)
elif event.type == 'DogLearnedTrick':
assert event.stream_position == dog.version + 1
assert event.stream_name == dog.id, (event.stream_name, dog.id)
return Dog(
id=dog.id,
version=event.stream_position,
is_from_snapshot=dog.is_from_snapshot,
name=dog.name,
tricks=dog.tricks + [data['trick']],
)
elif event.type == 'Snapshot':
return Dog(
id=remove_snapshot_stream_prefix(event.stream_name),
version=deserialize(event.metadata)['version'],
is_from_snapshot=True,
name=data['name'],
tricks=data['tricks'],
)
else:
raise Exception(f"Unknown event type: {event.type}")
为了方便,我们还定义了一个get_dog()
函数,它将get_aggregate()
与mutate_dog()
函数作为其mutator_func
参数的值一起调用。
def get_dog(dog_id):
return get_aggregate(
aggregate_id=dog_id,
mutator_func=mutate_dog,
)
我们还可以定义一些“命令”函数,这些函数将新事件追加到数据库中。函数register_dog()
追加一个DogRegistered
事件。函数record_trick_learned()
追加一个DogLearnedTrick
事件。函数snapshot_dog()
追加一个Snapshot
事件。请注意,函数record_trick_learned()
和snapshot_dog()
使用get_dog()
。
请注意,DogRegistered
和DogLearnedTrick
事件被追加到聚合事件的流中,而Snapshot
事件被追加到为聚合快照的单独流中。
def register_dog(name):
dog_id = str(uuid.uuid4())
event = NewEvent(
type='DogRegistered',
data=serialize({'name': name}),
)
client.append_to_stream(
stream_name=dog_id,
current_version=StreamState.NO_STREAM,
events=event,
)
return dog_id
def record_trick_learned(dog_id, trick):
dog = get_dog(dog_id)
event = NewEvent(
type='DogLearnedTrick',
data=serialize({'trick': trick}),
)
client.append_to_stream(
stream_name=dog_id,
current_version=dog.version,
events=event,
)
def snapshot_dog(dog_id):
dog = get_dog(dog_id)
event = NewEvent(
type='Snapshot',
data=serialize({'name': dog.name, 'tricks': dog.tricks}),
metadata=serialize({'version': dog.version}),
)
client.append_to_stream(
stream_name=make_snapshot_stream_name(dog_id),
current_version=StreamState.ANY,
events=event,
)
我们可以调用register_dog()
来注册一只新狗。
# Register a new dog.
dog_id = register_dog('Fido')
dog = get_dog(dog_id)
assert dog.name == 'Fido'
assert dog.tricks == []
assert dog.version == 0
assert dog.is_from_snapshot is False
我们可以调用record_trick_learned()
来记录已经学会了一些技巧。
# Record that 'Fido' learned a new trick.
record_trick_learned(dog_id, trick='roll over')
dog = get_dog(dog_id)
assert dog.name == 'Fido'
assert dog.tricks == ['roll over']
assert dog.version == 1
assert dog.is_from_snapshot is False
# Record that 'Fido' learned another new trick.
record_trick_learned(dog_id, trick='fetch ball')
dog = get_dog(dog_id)
assert dog.name == 'Fido'
assert dog.tricks == ['roll over', 'fetch ball']
assert dog.version == 2
assert dog.is_from_snapshot is False
我们可以调用snapshot_dog()
来记录Dog
聚合的当前状态快照。在调用snapshot_dog()
之后,get_dog()
函数将返回一个使用Snapshot
事件构建的Dog
对象。
# Snapshot 'Fido'.
snapshot_dog(dog_id)
dog = get_dog(dog_id)
assert dog.name == 'Fido'
assert dog.tricks == ['roll over', 'fetch ball']
assert dog.version == 2
assert dog.is_from_snapshot is True
我们可以继续使用快照在调用record_trick_learned()
和直接调用get_dog()
时演化Dog
聚合的状态。
record_trick_learned(dog_id, trick='sit')
dog = get_dog(dog_id)
assert dog.name == 'Fido'
assert dog.tricks == ['roll over', 'fetch ball', 'sit']
assert dog.version == 3
assert dog.is_from_snapshot is True
我们可以从is_from_snapshot
属性中看到,Dog
对象确实是从快照中重建的。
快照可以在固定版本号间隔、固定时间段、特定类型事件后、事件追加后或作为后台进程创建。
读取所有事件
可以使用read_all()
方法获取数据库中按记录顺序记录的所有事件。此方法返回一个“读取响应”对象,就像read_stream()
。
“读取响应”是一个迭代器,而不是一个序列。可以通过迭代“读取响应”对象来获取记录的事件。记录的事件在迭代过程中从服务器流到客户端。迭代将自动停止,当没有更多记录的事件要返回时。可以通过在“读取响应”对象上调用stop()
方法来停止事件的流和迭代器。记录的事件对象是RecordedEvent
类的实例。
此方法有九个可选参数,分别为commit_position
、backwards
、resolve_links
、filter_exclude
、filter_include
、filter_by_stream_name
、limit
、timeout
和credentials
。
可选的commit_position
参数是一个Python int
,可以用来指定一个读取的提交位置。默认值commit_position
是None
。请注意,如果指定了提交位置,它必须在数据库中实际存在的提交位置。当向前读取时,根据过滤器,提交位置的事件可能包括在内。当向后读取时,提交位置的事件将不包括在内。
可选的 backwards
参数是一个 Python bool
类型。默认的 backwards
值为 False
,表示返回的事件按照记录顺序排列;如果 backwards
为 True
,则返回的事件顺序相反。
如果 backwards
为 False
且 commit_position
为 None
,数据库中的事件将按照记录顺序返回,从第一个记录的事件开始。这是 read_all()
的默认行为。如果 backwards
为 True
且 commit_position
为 None
,数据库中的事件将按照相反顺序返回,从最后一个记录的事件开始。
可选的 resolve_links
参数是一个 Python bool
。默认值是 False
,这意味着任何事件链接将不会被解析,因此返回的事件可能代表事件链接。如果 resolve_links
是 True
,则任何事件链接将被解析,因此将返回链接事件而不是事件链接。
可选的 filter_exclude
参数是一个正则表达式序列,用于指定应该返回哪些记录事件。如果设置了非空序列的 filter_include
,则此参数将被忽略。默认值匹配 EventStoreDB 的 "系统事件" 事件类型,因此系统事件通常不会被包括。有关筛选表达式的更多信息,请参阅下面的注释部分。
可选的 filter_include
参数是一个正则表达式序列,用于指定应该返回哪些记录事件。默认情况下,此参数是一个空元组。如果设置了非空序列,则忽略 filter_exclude
参数。
可选的 filter_by_stream_name
参数是一个表示是否将筛选应用于事件类型或流名称的 Python bool
。默认值为 False
,因此筛选应用于记录事件的类型字符串。
可选的 limit
参数是一个整数,用于限制返回的事件数量。默认值为 sys.maxint
。
可选的 timeout
参数是一个 Python float
,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。
可选的 credentials
参数可以用来覆盖从连接字符串 URI 派生的调用凭证。
事件筛选是在 EventStoreDB 服务器上进行的。在筛选之后,在服务器上应用 limit
参数。
以下示例显示了如何按记录顺序获取数据库中迄今为止记录的所有事件。我们可以看到 stream_name1
(event1
、event2
和 event3
)的三个事件被包括在内,以及其他事件。
# Read all events (creates a streaming gRPC call).
read_response = client.read_all()
# Convert the iterator into a sequence of recorded events.
events = tuple(read_response)
assert len(events) > 3 # more than three
# Convert the sequence of recorded events into a set of event IDs.
event_ids = set(e.id for e in events)
assert event1.id in event_ids
assert event2.id in event_ids
assert event3.id in event_ids
以下示例显示了如何从特定的提交位置读取数据库中记录的所有事件,在这种情况下是 commit_position1
。当从特定的提交位置正向读取时,将包括该位置的事件。 commit_position1
的值是我们附加 event1
时的位置。因此,event1
是我们将收到的第一个记录事件,event2
是第二个,event3
是第三个。
# Read all events forwards from a commit position.
read_response = client.read_all(
commit_position=commit_position1
)
# Step through the "read response" iterator.
assert next(read_response) == event1
assert next(read_response) == event2
assert next(read_response) == event3
# Stop the iterator.
read_response.stop()
以下示例显示了如何以相反的顺序读取数据库中记录的所有事件。我们可以看到,我们收到的第一个事件是最后记录的事件:有关快照和快照的章节中关于 Dog
聚合的事件。
# Read all events backwards from the end.
read_response = client.read_all(
backwards=True
)
# Step through the "read response" iterator.
assert next(read_response).type == "DogLearnedTrick"
assert next(read_response).type == "Snapshot"
assert next(read_response).type == "DogLearnedTrick"
assert next(read_response).type == "DogLearnedTrick"
assert next(read_response).type == "DogRegistered"
# Stop the iterator.
read_response.stop()
以下示例显示了如何从特定的提交位置正向读取有限数量的事件。
events = tuple(
client.read_all(
commit_position=commit_position1,
limit=1,
)
)
assert len(events) == 1
assert events[0] == event1
以下示例显示了如何从数据库末尾反向读取有限数量的记录事件。在这种情况下,限制为 1,因此我们收到最后一个记录的事件。
events = tuple(
client.read_all(
backwards=True,
limit=1,
)
)
assert len(events) == 1
assert events[0].type == 'DogLearnedTrick'
assert deserialize(events[0].data)['trick'] == 'sit'
请注意,与 read_stream()
方法一样,read_all()
方法没有使用重试和重连装饰器,因为从服务器流式传输记录事件仅在开始迭代 "读取响应" 时开始,这意味着该方法在流式传输开始之前就返回了,因此没有机会让任何装饰器捕获任何连接问题。
获取提交位置
可以使用 get_commit_position()
方法获取数据库中最后一条记录事件的提交位置。它只是调用 read_all()
方法,传入 backwards=True
和 limit=1
,并返回最后一条记录事件的 commit_position
属性的值。
commit_position = client.get_commit_position()
此方法有五个可选参数,分别是 filter_exclude
、filter_include
、filter_by_stream_name
、timeout
和 credentials
。这些值会传递给 read_all()
。
可选的 filter_exclude
、filter_include
和 filter_by_stream_name
参数与 read_all()
方法中的用法相同。
可选的 timeout
参数是一个 Python float
,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。
可选的 credentials
参数可以用来覆盖从连接字符串 URI 派生的调用凭证。
此方法可能用于测量下游组件处理所有记录事件的进度,通过比较当前提交位置与下游组件中最后成功处理的事件的记录提交位置。在这种情况下,filter_exclude
、filter_include
和 filter_by_stream_name
参数的值应与下游组件获取记录事件时使用的值相等。
获取流元数据
get_stream_metadata()
方法返回流元数据及其版本。
此方法有一个必需参数,即 stream_name
,它是一个 Python str
,用于唯一标识将获取流元数据的流。
此方法有一个可选的 timeout
参数,它是一个 Python float
,用于设置 gRPC 操作完成的最大持续时间(以秒为单位)。
此方法有一个可选的 credentials
参数,可以用来覆盖从连接字符串 URI 派生的调用凭据。
以下示例中,获取了 stream_name1
的元数据。
metadata, metadata_version = client.get_stream_metadata(stream_name=stream_name1)
返回的 metadata
值是一个 Python dict
。返回的 metadata_version
值可以是 int
类型,表示流存在时的版本,也可以是 StreamState.NO_STREAM
,表示流不存在且未设置元数据。这些值可以用作 set_stream_metadata()
的参数。
设置流元数据
需要领导者
set_stream_metadata()
方法用于设置流的元数据。在向流追加事件之前可以设置流元数据。
此方法有一个必需参数,即 stream_name
,它是一个 Python str
,用于唯一标识将设置流元数据的流。
此方法有一个可选的 timeout
参数,它是一个 Python float
,用于设置 gRPC 操作完成的最大持续时间(以秒为单位)。
此方法有一个可选的 credentials
参数,可以用来覆盖从连接字符串 URI 派生的调用凭据。
以下示例中,设置了 stream_name1
的元数据。
metadata["foo"] = "bar"
client.set_stream_metadata(
stream_name=stream_name1,
metadata=metadata,
current_version=metadata_version,
)
current_version
参数应该是从 get_stream_metadata()
获取的流元数据当前版本。
有关流元数据的更多信息,请参阅 EventStoreDB 文档。
删除流
需要领导者
可以使用 delete_stream()
方法“删除”流。
此方法有两个必需参数,即 stream_name
和 current_version
。
必需的 stream_name
参数是一个 Python str
,它唯一标识了一个要附加一系列事件的流。
必需的 current_version
参数预期为一个 Python int
,表示流中最后一条记录事件的流位置。
此方法有一个可选的 timeout
参数,它是一个 Python float
,用于设置 gRPC 操作完成的最大持续时间(以秒为单位)。
此方法有一个可选的 credentials
参数,可以用来覆盖从连接字符串 URI 派生的调用凭据。
以下示例中,删除了 stream_name1
。
commit_position = client.delete_stream(stream_name=stream_name1, current_version=2)
删除流后,仍然可以追加新事件。从已删除的流中读取将只返回在它被删除后追加的事件。
墓碑流
需要领导者
可以使用 tombstone_stream()
方法“墓碑”流。
此方法有两个必需参数,即 stream_name
和 current_version
。
必需的 stream_name
参数是一个 Python str
,它唯一标识了一个要附加一系列事件的流。
必需的 current_version
参数预期为一个 Python int
,表示流中最后一条记录事件的流位置。
此方法有一个可选的 timeout
参数,它是一个 Python float
,用于设置 gRPC 操作完成的最大持续时间(以秒为单位)。
此方法有一个可选的 credentials
参数,可以用来覆盖从连接字符串 URI 派生的调用凭据。
以下示例中,stream_name1
被墓碑。
commit_position = client.tombstone_stream(stream_name=stream_name1, current_version=2)
在墓碑一个流之后,无法再追加新事件。
追捕订阅
“追捕”订阅可用于接收已记录的事件以及随后记录的事件。追捕订阅可用于处理记录事件的组件,该组件使用“精确一次”语义。
subscribe_to_all()
方法启动一个追赶订阅,可以接收数据库中的所有事件。subscribe_to_stream()
方法启动一个追赶订阅,可以接收特定流的事件。这两种方法都返回一个“追赶订阅”对象,该对象是 Python 迭代器。可以通过迭代获取已记录的事件。以这种方式获取的记录事件对象是 RecordedEvent
类的实例。
在将“追赶订阅”对象返回给调用者之前,客户端将首先从服务器获得一个“确认”响应,这允许客户端检测 gRPC 连接和流式 gRPC 调用是否正常工作。因此,subscribe_to_all()
和 subscribe_to_stream()
方法都非常有用地装饰了重连和重试装饰器。然而,一旦方法返回,装饰器就会退出,并且在迭代订阅对象时由于连接问题而引发的任何异常都必须由您的代码处理。
当没有更多事件要返回时,"追赶订阅"迭代器不会自动停止,但迭代将阻塞,直到数据库中随后记录新事件。随后记录的任何事件都将立即流式传输到客户端,然后迭代将继续。可以通过在“追赶订阅”对象上调用 stop()
方法来停止事件流,从而停止迭代。
订阅所有事件
subscribe_to_all()
方法可以用来启动一个追赶订阅,从中可以按记录顺序获取数据库中记录的所有事件。该方法返回一个“追赶订阅”迭代器。
此方法还有十个可选参数:commit_position
、from_end
、resolve_links
、filter_exclude
、filter_include
、filter_by_stream_name
、include_checkpoints
、include_caught_up
、timeout
和 credentials
。
commit_position
可选参数指定一个提交位置。commit_position
的默认值是 None
,这意味着追赶订阅将从数据库中的第一个记录事件开始。如果提供了提交位置,它必须与数据库中实际存在的提交位置匹配。只有在该位置之后记录的事件才会被获取。
from_end
可选参数指定追赶订阅是否从数据库中的最后一个记录事件开始。默认情况下,此参数为 False
。如果 from_end
为 True
,则只获取在订阅启动之后记录的事件。如果 commit_position
不是 None
,则忽略此参数。
可选的 resolve_links
参数是一个 Python bool
。默认值是 False
,这意味着任何事件链接将不会被解析,因此返回的事件可能代表事件链接。如果 resolve_links
是 True
,则任何事件链接将被解析,因此将返回链接事件而不是事件链接。
可选的 filter_exclude
参数是一个正则表达式序列,用于指定应该返回哪些记录事件。如果设置了非空序列的 filter_include
,则此参数将被忽略。默认值匹配 EventStoreDB 的 "系统事件" 事件类型,因此系统事件通常不会被包括。有关筛选表达式的更多信息,请参阅下面的注释部分。
可选的 filter_include
参数是一个正则表达式序列,用于指定应该返回哪些记录事件。默认情况下,此参数是一个空元组。如果设置了非空序列,则忽略 filter_exclude
参数。
可选的 filter_by_stream_name
参数是一个表示是否将筛选应用于事件类型或流名称的 Python bool
。默认值为 False
,因此筛选应用于记录事件的类型字符串。
include_checkpoints
可选参数是一个 Python bool
,表示在接收记录事件时是否应包含“检查点”消息。检查点具有 commit_position
值,事件处理组件可以使用该值来更新其记录的提交位置值,这样在大量事件被过滤时,订阅者不必在事件处理组件重新启动时从相同的老位置开始。
include_caught_up
可选参数是一个 Python bool
,表示在接收记录事件时是否应包含“已赶上”消息。默认值 include_caught_up
是 False
。
可选的 timeout
参数是一个 Python float
,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。
可选的 credentials
参数可以用来覆盖从连接字符串 URI 派生的调用凭证。
下面的示例显示了如何从数据库中的第一个记录事件开始启动一个追赶订阅。
# Subscribe from the first recorded event in the database.
catchup_subscription = client.subscribe_to_all()
下面的示例显示,追赶订阅不会自动停止,但在收到最后一个记录事件时会阻塞,然后在随后记录事件时继续。
from time import sleep
from threading import Thread
# Append a new event to a new stream.
stream_name2 = str(uuid.uuid4())
event4 = NewEvent(type='OrderCreated', data=b'{}')
client.append_to_stream(
stream_name=stream_name2,
current_version=StreamState.NO_STREAM,
events=[event4],
)
# Receive events from the catch-up subscription in a different thread.
received_events = []
def receive_events():
for event in catchup_subscription:
received_events.append(event)
def wait_for_event(event):
for _ in range(100):
for received in reversed(received_events):
if event == received:
return
else:
sleep(0.1)
else:
raise AssertionError("Event wasn't received")
thread = Thread(target=receive_events, daemon=True)
thread.start()
# Wait to receive event4.
wait_for_event(event4)
# Append another event whilst the subscription is running.
event5 = NewEvent(type='OrderUpdated', data=b'{}')
client.append_to_stream(
stream_name=stream_name2,
current_version=0,
events=[event5],
)
# Wait for the subscription to block.
wait_for_event(event5)
# Stop the subscription.
catchup_subscription.stop()
thread.join()
以下示例展示了如何订阅特定提交位置之后记录的事件,在本例中是从上面接收到的最后一个记录事件的提交位置开始。然后,在重新启动订阅之前,又记录了一个事件。当订阅运行时,还记录了另外三个事件。这四个事件按照记录的顺序接收。
# Append another event.
event6 = NewEvent(type='OrderDeleted', data=b'{}')
client.append_to_stream(
stream_name=stream_name2,
current_version=1,
events=[event6],
)
# Restart subscribing to all events after the
# commit position of the last received event.
catchup_subscription = client.subscribe_to_all(
commit_position=received_events[-1].commit_position
)
thread = Thread(target=receive_events, daemon=True)
thread.start()
# Wait for event6.
wait_for_event(event6)
# Append three more events to a new stream.
stream_name3 = str(uuid.uuid4())
event7 = NewEvent(type='OrderCreated', data=b'{}')
event8 = NewEvent(type='OrderUpdated', data=b'{}')
event9 = NewEvent(type='OrderDeleted', data=b'{}')
client.append_to_stream(
stream_name=stream_name3,
current_version=StreamState.NO_STREAM,
events=[event7, event8, event9],
)
# Wait for events 7, 8 and 9.
wait_for_event(event7)
wait_for_event(event8)
wait_for_event(event9)
# Stop the subscription.
catchup_subscription.stop()
thread.join()
当调用订阅对象的 stop()
方法时,追赶订阅调用即结束。当它超出作用域时,或使用 Python 的 del
关键字显式地从内存中删除时,会自动发生。
订阅流事件
可以使用 subscribe_to_stream()
方法从单个流中获取记录的事件开始追赶订阅。此方法返回一个“追赶订阅”迭代器。
此方法有一个必需的 stream_name
参数,它指定了将接收记录事件的流名称。
此方法还有六个可选参数,分别是 stream_position
、from_end
、resolve_links
、include_caught_up
、timeout
和 credentials
。
可选的 stream_position
参数指定了一个位置,从该位置开始订阅。默认值 stream_position
为 None
,这意味着将按记录顺序获取流中记录的所有事件,除非将 from_end
设置为 True
。如果提供了流位置,则仅获取该位置之后记录的事件。
可选的 from_end
参数指定订阅将从流中的最后一个位置开始。默认值 from_end
为 False
。如果 from_end
为 True
,则仅获取订阅创建之后记录的事件。如果设置了 stream_position
,则忽略此参数。
可选的 resolve_links
参数是一个 Python bool
。默认值是 False
,这意味着任何事件链接将不会被解析,因此返回的事件可能代表事件链接。如果 resolve_links
是 True
,则任何事件链接将被解析,因此将返回链接事件而不是事件链接。
include_caught_up
可选参数是一个 Python bool
,表示在接收记录事件时是否应包含“已赶上”消息。默认值 include_caught_up
是 False
。
可选的 timeout
参数是一个 Python float
,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。
可选的 credentials
参数可以用来覆盖从连接字符串 URI 派生的调用凭证。
以下示例展示了如何从流中的第一个记录事件开始启动追赶订阅。
# Subscribe from the start of 'stream2'.
subscription = client.subscribe_to_stream(stream_name=stream_name2)
以下示例展示了如何从特定的流位置启动追赶订阅。
# Subscribe to stream2, from the second recorded event.
subscription = client.subscribe_to_stream(
stream_name=stream_name2,
stream_position=1,
)
如何实现恰好一次的事件处理
下游组件接收并处理记录事件的提交位置对于下游组件来说非常有用,这样在恢复处理时可以确定最后处理的事件的提交位置。
最后一个记录的提交位置可以用来指定恢复处理时订阅的提交位置。由于此提交位置将代表下游组件中最后一个成功处理的事件的位置,因此通常希望获取此位置之后的事件,因为那是尚未处理的事件。因此,当使用 EventStoreDB 中的追赶订阅从特定提交位置订阅事件时,将不会将指定提交位置上记录的事件包含在接收到的记录事件序列中。
在使用追击订阅时,为了在下游组件中实现记录事件的“精确一次”处理,应该将记录事件的提交位置与处理记录事件的结果原子性地、唯一地记录下来,例如在实现最终一致性的CQRS时,可以在与物化视图相同的数据库中,或者在与下游分析、报告或归档应用相同的数据库中。通过将记录事件的提交位置与处理记录事件产生的新状态原子性地记录,可以避免记录事件消费过程中的“双重写入”。通过唯一记录提交位置,新状态不会重复记录,因此下游组件的记录状态对于任何记录事件只会更新一次。通过使用最大的记录提交位置来恢复追击订阅,所有记录的事件最终都会被处理。将“最多一次”条件与“至少一次”条件相结合,得到“精确一次”条件。
在记录事件消费过程中的“双重写入”的危险在于,如果记录事件在事务中成功处理并记录新状态,提交位置在另一个事务中记录,可能发生一种情况而不是另一种情况。如果新状态被记录但位置丢失,然后处理停止并恢复,记录的事件可能会被处理两次。另一方面,如果提交位置被记录但新状态丢失,记录的事件可能实际上根本未处理。通过处理事件超过一次,或者未能处理事件,下游组件的记录状态可能会不准确,或者可能是不一致的,甚至可能是灾难性的。这些后果在您的情况下可能重要也可能不重要。但有时不一致性可能会停止处理,直到问题解决。您可以通过在相同的原子事务中原子性地记录记录事件的提交位置以及处理该事件产生的新状态来避免事件消费中的“双重写入”。通过使提交位置的记录唯一,当存在冲突时,事务将回滚,您将防止任何重复处理记录事件的成果被提交。
从追击订阅接收到的记录事件不能返回确认给EventStoreDB服务器。然而,确认事件是“持久订阅”的一个方面。希望依赖于将事件确认到上游组件是双重写入的例子。
持久订阅
在EventStoreDB中,“持久”订阅与追击订阅类似,因为在没有更多记录事件可接收时,读取持久订阅将会阻塞,然后在新事件随后记录时继续。
持久订阅可以通过一组使用支持的“消费者策略”之一操作的消费者组来消费。
持久订阅的显著不同之处在于服务器会跟踪消费者的进度。因此,持久订阅的消费者需要在记录事件成功处理时“确认”,否则“否定确认”已接收但未成功处理的记录事件。
所有这些都意味着对于持久订阅,需要考虑“创建”、“读取”、“更新”、“删除”、“确认”和“否定确认”操作。
虽然持久订阅有一些优点,特别是记录事件由一组消费者并发处理,但通过在服务器中跟踪已处理事件的提交序列中的位置,事件消费中的“双重写入”问题就会产生。一组持久订阅消费者对记录事件的处理可靠性将依赖于它们对重复消息的幂等处理,以及它们对顺序错乱交付的弹性。
创建对所有事件的订阅
需要领导者
可以使用 create_subscription_to_all()
方法创建对所有数据库中所有流记录事件的“持久订阅”。
此方法需要一个必需的 group_name
参数,它是订阅的“消费者组”名称。
此方法有十九个可选参数:from_end
、commit_position
、resolve_links
、filter_exclude
、filter_include
、filter_by_stream_name
、consumer_strategy
、message_timeout
、max_retry_count
、min_checkpoint_count
、max_checkpoint_count
、checkpoint_after
、max_subscriber_count
、live_buffer_size
、read_batch_size
、history_buffer_size
、extra_statistics
、timeout
和 credentials
。
可选的 from_end
参数可以用来指定订阅的消费者组应该只接收在订阅创建后记录的事件。
或者,可选的 commit_position
参数可以用来指定一个提交位置,从该位置开始,订阅的消费者组应该接收事件。请注意,在指定提交位置记录的事件可能包含在消费者组接收到的记录事件中。
如果没有指定 from_end
或 commit_position
,订阅的消费者组可能会接收到数据库中所有记录的事件。
可选的 resolve_links
参数是一个 Python bool
。默认值是 False
,这意味着任何事件链接将不会被解析,因此返回的事件可能代表事件链接。如果 resolve_links
是 True
,则任何事件链接将被解析,因此将返回链接事件而不是事件链接。
可选的 filter_exclude
参数是一个正则表达式序列,用于指定应该返回哪些记录事件。如果设置了非空序列的 filter_include
,则此参数将被忽略。默认值匹配 EventStoreDB 的 "系统事件" 事件类型,因此系统事件通常不会被包括。有关筛选表达式的更多信息,请参阅下面的注释部分。
可选的 filter_include
参数是一个正则表达式序列,用于指定应该返回哪些记录事件。默认情况下,此参数是一个空元组。如果设置了非空序列,则忽略 filter_exclude
参数。
可选的 filter_by_stream_name
参数是一个表示是否将筛选应用于事件类型或流名称的 Python bool
。默认值为 False
,因此筛选应用于记录事件的类型字符串。
可选的 consumer_strategy
参数是一个 Python str
,它定义了此持久订阅的消费者策略。此参数的值可以是 'DispatchToSingle'
、'RoundRobin'
、'Pinned'
或 'PinnedByCorrelation'
。默认值是 'DispatchToSingle'
。
可选的 message_timeout
参数是一个 Python float
,它设置从服务器发送记录事件到持久订阅消费者,直到服务器收到“确认”(ack)或“否定确认”(nack)的最大持续时间(以秒为单位),之后服务器将重试发送事件。默认值 message_timeout
为 30.0
。
可选的 max_retry_count
参数是一个 Python int
,它设置服务器重试发送事件的次数。默认值 max_retry_count
为 10
。
可选的 min_checkpoint_count
参数是一个 Python int
,它设置服务器在记录确认(acks)之前必须收到的最小确认(acks)数量。默认值 min_checkpoint_count
为 10
。
可选的 max_checkpoint_count
参数是一个 Python int
,它设置服务器在记录确认(acks)之前必须收到的最大确认(acks)数量。默认值 max_checkpoint_count
为 1000
。
可选的 checkpoint_after
参数是一个 Python float
,它设置记录确认(acks)之间的最大持续时间(以秒为单位)。默认值 checkpoint_after
为 2.0
。
可选的 max_subscriber_count
参数是一个 Python int
,它设置持久订阅的并发读取者的最大数量。超出此数量后,尝试读取持久订阅将引发 MaximumSubscriptionsReached
错误。
可选的 live_buffer_size
参数是一个 Python int
,它设置存储新记录事件的缓冲区大小(内存中)。默认值 live_buffer_size
为 500。
可选的 read_batch_size
参数是一个 Python int
,它设置在追赶时从磁盘读取的记录事件的数量。默认值 read_batch_size
为 200。
可选的history_buffer_size
参数是一个Python int
,用于设置在追及时缓存的记录事件数量。默认值history_buffer_size
为500。
可选的extra_statistics
参数是一个Python bool
,用于启用此订阅的额外统计跟踪。默认值extra_statistics
为False
。
可选的 timeout
参数是一个 Python float
,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。
可选的 credentials
参数可以用来覆盖从连接字符串 URI 派生的调用凭证。
方法create_subscription_to_all()
不返回值。通过调用read_subscription_to_all()
方法来获取记录事件。
以下示例中,创建了一个持久订阅,从数据库中的第一个记录的非系统事件开始操作。
# Create a persistent subscription.
group_name1 = f"group-{uuid.uuid4()}"
client.create_subscription_to_all(group_name=group_name1)
读取所有订阅
需要领导者
read_subscription_to_all()
方法可以被一组消费者使用,以接收使用create_subscription_to_all()
方法创建的持久订阅中的记录事件。
此方法有一个必需的group_name
参数,是调用create_subscription_to_all()
时指定的订阅消费者“组”的名称。
此方法有一个可选的 timeout
参数,它是一个 Python float
,用于设置 gRPC 操作完成的最大持续时间(以秒为单位)。
此方法有一个可选的 credentials
参数,可以用来覆盖从连接字符串 URI 派生的调用凭据。
此方法返回一个PersistentSubscription
对象,它是一个迭代器,提供RecordedEvent
对象。它还具有ack()
、nack()
和stop()
方法。
subscription = client.read_subscription_to_all(group_name=group_name1)
ack()
方法应由消费者使用以“确认”已从服务器接收并成功处理记录事件。这将防止该记录事件被同一组中的另一个消费者接收。ack()
有一个item
参数,可以是RecordedEvent
或UUID
。如果您传入一个RecordedEvent
,则将使用其ack_id
属性的值向服务器确认事件。如果您传入UUID,则使用被确认的RecordedEvent
的ack_id
值,如果该事件已从链接事件中解析出来(这可以在持久订阅设置resolve_links
为True
时发生,也可以在无论resolve_links
设置如何重放已停放的事件时发生)。
以下示例遍历订阅对象,并使用接收到的RecordedEvent
对象调用ack()
。当接收到event9
时,调用订阅的stop()
方法,停止迭代,以便我们可以继续下面的示例。
received_events = []
for event in subscription:
received_events.append(event)
# Acknowledge the received event.
subscription.ack(event)
# Stop when 'event9' has been received.
if event == event9:
subscription.stop()
nack()
应由消费者使用以向服务器“否定确认”它已接收但未成功处理记录事件。与ack()
一样,nack()
方法有一个item
参数,其工作方式相同。使用记录事件或其ack_id
属性。nack()
方法还有一个action
参数,它应该是一个Python str
:可以是'unknown'
、'park'
、'retry'
、'skip'
或'stop'
。
stop()
方法可以用来停止gRPC流操作。
如何编写持久订阅消费者
持久订阅的读取可以封装在一个“消费者”中,该消费者在接收到记录事件时调用一个“策略”函数,如果策略函数正常返回,则自动调用ack()
,如果它引发异常,则调用nack()
,可能重试事件一定次数,然后停放事件。
以下简单示例展示了如何实现这一点。我们可以看到,在最终停放event5
之前,已确认了'event9'。
RecordedEvent
已被重试的次数通过其retry_count
属性表示。
acked_events = {}
nacked_events = {}
class ExampleConsumer:
def __init__(self, subscription, max_retry_count, final_action):
self.subscription = subscription
self.max_retry_count = max_retry_count
self.final_action = final_action
self.error = None
def run(self):
with self.subscription:
for event in self.subscription:
try:
self.policy(event)
except Exception:
if event.retry_count < self.max_retry_count:
action = "retry"
else:
action = self.final_action
self.subscription.nack(event, action)
self.after_nack(event, action)
else:
self.subscription.ack(event)
self.after_ack(event)
def stop(self):
self.subscription.stop()
def policy(self, event):
# Raise an exception when we see "event5".
if event == event5:
raise Exception()
def after_ack(self, event):
# Track retry count of acked events.
acked_events[event.id] = event.retry_count
def after_nack(self, event, action):
# Track retry count of nacked events.
nacked_events[event.id] = event.retry_count
if action == self.final_action:
# Stop the consumer, so we can continue with the examples.
self.stop()
# Create subscription.
group_name = f"group-{uuid.uuid4()}"
client.create_subscription_to_all(group_name, commit_position=commit_position1)
# Read subscription.
subscription = client.read_subscription_to_all(group_name)
# Construct consumer.
consumer = ExampleConsumer(
subscription=subscription,
max_retry_count=5,
final_action="park",
)
# Run consumer.
consumer.run()
# Check 'event5' was nacked and never acked.
assert event5.id in nacked_events
assert event5.id not in acked_events
assert nacked_events[event5.id] == 5
# Check 'event9' was acked and never nacked.
assert event9.id in acked_events
assert event9.id not in nacked_events
更新所有订阅
需要领导者
可以使用 update_subscription_to_all()
方法来更新一个“持久订阅”。请注意,无法调整过滤选项和消费者策略。
此方法需要一个必需的 group_name
参数,它是订阅的“消费者组”名称。
此方法还有十六个可选参数,分别是:from_end
、commit_position
、resolve_links
、consumer_strategy
、message_timeout
、max_retry_count
、min_checkpoint_count
、max_checkpoint_count
、checkpoint_after
、max_subscriber_count
、live_buffer_size
、read_batch_size
、history_buffer_size
、extra_statistics
、timeout
和 credentials
。
可选参数 from_end
、commit_position
、resolve_links
、consumer_strategy
、message_timeout
、max_retry_count
、min_checkpoint_count
、max_checkpoint_count
、checkpoint_after
、max_subscriber_count
、live_buffer_size
、read_batch_size
、history_buffer_size
和 extra_statistics
可以用来调整之前调用 create_subscription_to_all()
和 update_subscription_to_all()
时设置的值。如果在 update_subscription_to_all()
的调用中没有提到这些参数,则持久订阅的相应设置将保持不变。
可选的 timeout
参数是一个 Python float
,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。
可选的 credentials
参数可以用来覆盖从连接字符串 URI 派生的调用凭证。
update_subscription_to_all()
方法不返回任何值。
以下示例中,将持久订阅更新为从数据库末尾开始运行。
# Create a persistent subscription.
client.update_subscription_to_all(group_name=group_name1, from_end=True)
创建订阅到流
需要领导者
可以使用 create_subscription_to_stream()
方法创建对流的持久订阅。
此方法有两个必需参数,分别是 group_name
和 stream_name
。group_name
参数命名了将接收此订阅事件的消费者组。stream_name
参数指定订阅将跟踪哪个流。这两个参数的值都应预期为 Python str
对象。
此方法还有十六个可选参数,分别是:stream_position
、from_end
、resolve_links
、consumer_strategy
、message_timeout
、max_retry_count
、min_checkpoint_count
、max_checkpoint_count
、checkpoint_after
、max_subscriber_count
、live_buffer_size
、read_batch_size
、history_buffer_size
、extra_statistics
、timeout
和 credentials
。
stream_position
参数指定一个订阅位置,从该位置订阅流。当读取订阅时,将接收到此订阅位置的记录事件。
from_end
参数是一个 Python bool
。默认情况下,此参数的值为 False
。如果将此参数设置为 True
,则从订阅中读取将只接收订阅创建之后记录的事件。也就是说,它不包括当前流位置。
可选的 resolve_links
参数是一个 Python bool
。默认值是 False
,这意味着任何事件链接将不会被解析,因此返回的事件可能代表事件链接。如果 resolve_links
是 True
,则任何事件链接将被解析,因此将返回链接事件而不是事件链接。
可选的 consumer_strategy
参数是一个 Python str
,它定义了此持久订阅的消费者策略。此参数的值可以是 'DispatchToSingle'
、'RoundRobin'
、'Pinned'
或 'PinnedByCorrelation'
。默认值是 'DispatchToSingle'
。
可选的 message_timeout
参数是一个 Python float
,它设置从服务器发送记录事件到持久订阅消费者,直到服务器收到“确认”(ack)或“否定确认”(nack)的最大持续时间(以秒为单位),之后服务器将重试发送事件。默认值 message_timeout
为 30.0
。
可选的 max_retry_count
参数是一个 Python int
,它设置服务器重试发送事件的次数。默认值 max_retry_count
为 10
。
可选的 min_checkpoint_count
参数是一个 Python int
,它设置服务器在记录确认(acks)之前必须收到的最小确认(acks)数量。默认值 min_checkpoint_count
为 10
。
可选的 max_checkpoint_count
参数是一个 Python int
,它设置服务器在记录确认(acks)之前必须收到的最大确认(acks)数量。默认值 max_checkpoint_count
为 1000
。
可选的 checkpoint_after
参数是一个 Python float
,它设置记录确认(acks)之间的最大持续时间(以秒为单位)。默认值 checkpoint_after
为 2.0
。
可选的 max_subscriber_count
参数是一个 Python int
,它设置持久订阅的并发读取者的最大数量。超出此数量后,尝试读取持久订阅将引发 MaximumSubscriptionsReached
错误。
可选的 live_buffer_size
参数是一个 Python int
,它设置存储新记录事件的缓冲区大小(内存中)。默认值 live_buffer_size
为 500。
可选的 read_batch_size
参数是一个 Python int
,它设置在追赶时从磁盘读取的记录事件的数量。默认值 read_batch_size
为 200。
可选的history_buffer_size
参数是一个Python int
,用于设置在追及时缓存的记录事件数量。默认值history_buffer_size
为500。
可选的extra_statistics
参数是一个Python bool
,用于启用此订阅的额外统计跟踪。默认值extra_statistics
为False
。
可选的 timeout
参数是一个 Python float
,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。
可选的 credentials
参数可以用来覆盖从连接字符串 URI 派生的调用凭证。
此方法不返回任何值。可以通过调用 read_subscription_to_stream()
来接收事件。
以下示例创建了一个从流开始处的持久流订阅。
# Create a persistent stream subscription from start of the stream.
group_name2 = f"group-{uuid.uuid4()}"
client.create_subscription_to_stream(
group_name=group_name2,
stream_name=stream_name2,
)
读取订阅到流
需要领导者
可以使用 read_subscription_to_stream()
方法读取对流的持久订阅。
此方法有两个必需参数,分别是 group_name
和 stream_name
,它们应与调用 create_subscription_to_stream()
时使用的参数值匹配。
此方法有一个可选的 timeout
参数,它是一个 Python float
,用于设置 gRPC 操作完成的最大持续时间(以秒为单位)。
此方法有一个可选的 credentials
参数,可以用来覆盖从连接字符串 URI 派生的调用凭据。
此方法返回一个 PersistentSubscription
对象,它是一个迭代器,提供 RecordedEvent
对象,并且还具有 ack()
、nack()
和 stop()
方法。
subscription = client.read_subscription_to_stream(
group_name=group_name2,
stream_name=stream_name2,
)
以下示例遍历订阅对象,并调用 ack()
。当接收到 event6
后,调用订阅的 stop()
方法,停止迭代,以便我们可以继续下面的示例。
events = []
for event in subscription:
events.append(event)
# Acknowledge the received event.
subscription.ack(event)
# Stop when 'event6' has been received.
if event == event6:
subscription.stop()
我们可以检查在上述示例中是否收到了所有附加到 stream_name2
的事件。
assert len(events) == 3
assert events[0] == event4
assert events[1] == event5
assert events[2] == event6
更新订阅到流
需要领导者
可以使用 update_subscription_to_stream()
方法更新对流的持久订阅。请注意,无法调整消费者策略。
此方法有一个必需的 group_name
参数,它是订阅的“组”消费者的名称,以及一个必需的 stream_name
参数,它是流的名称。
此方法还具有十六个可选参数,包括from_end
、stream_position
、resolve_links
、consumer_strategy
、message_timeout
、max_retry_count
、max_subscriber_count
、live_buffer_size
、read_batch_size
、history_buffer_size
、extra_statistics
、min_checkpoint_count
、max_checkpoint_count
、checkpoint_after
、timeout
和credentials
。
可选参数from_end
、stream_position
、resolve_links
、consumer_strategy
、message_timeout
、max_retry_count
、min_checkpoint_count
、max_checkpoint_count
、checkpoint_after
、max_subscriber_count
、live_buffer_size
、read_batch_size
、history_buffer_size
和extra_statistics
可用于调整之前调用create_subscription_to_stream()
和update_subscription_to_stream()
时设置的值。如果在update_subscription_to_stream()
的调用中没有提及这些参数中的任何一个,则持久订阅的相应设置将保持不变。
可选的 timeout
参数是一个 Python float
,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。
可选的 credentials
参数可以用来覆盖从连接字符串 URI 派生的调用凭证。
update_subscription_to_stream()
方法不返回任何值。
以下示例展示了如何将持久订阅更新为从流的末尾开始运行。
# Create a persistent subscription.
client.update_subscription_to_stream(
group_name=group_name2,
stream_name=stream_name2,
from_end=True,
)
重放已停泊事件
需要领导者
replay_parked_events()
方法可用于在读取持久订阅时“重放”已被“停泊”(使用操作'park'
进行负面确认)的事件。停泊的事件将被消费者再次接收到。
此方法需要一个必需的group_name
参数和一个可选的stream_name
参数。这些参数的值应与调用create_subscription_to_all()
或create_subscription_to_stream()
时使用的值匹配。
此方法有一个可选的 timeout
参数,它是一个 Python float
,用于设置 gRPC 操作完成的最大持续时间(以秒为单位)。
此方法有一个可选的 credentials
参数,可以用来覆盖从连接字符串 URI 派生的调用凭据。
以下示例展示了为组group_name1
重放已停泊事件。
client.replay_parked_events(
group_name=group_name1,
)
以下示例展示了为组group_name2
重放已停泊事件。
client.replay_parked_events(
group_name=group_name2,
stream_name=stream_name2,
)
获取订阅信息
需要领导者
get_subscription_info()
方法可用于获取持久订阅的信息。
此方法需要一个必需的group_name
参数和一个可选的stream_name
参数,这些参数应与调用create_subscription_to_all()
或create_subscription_to_stream()
时使用的参数值匹配。
此方法有一个可选的 timeout
参数,它是一个 Python float
,用于设置 gRPC 操作完成的最大持续时间(以秒为单位)。
此方法有一个可选的 credentials
参数,可以用来覆盖从连接字符串 URI 派生的调用凭据。
以下示例获取了通过调用create_subscription_to_all()
创建的持久订阅group_name1
的信息。
subscription_info = client.get_subscription_info(
group_name=group_name1,
)
以下示例获取了通过调用create_subscription_to_stream()
在stream_name2
上创建的持久订阅group_name2
的信息。
subscription_info = client.get_subscription_info(
group_name=group_name2,
stream_name=stream_name2,
)
返回值是一个SubscriptionInfo
对象。
列出订阅
需要领导者
list_subscriptions()
方法可用于获取所有现有持久订阅的信息,包括“对所有订阅”和“对流的订阅”。
此方法有一个可选的 timeout
参数,它是一个 Python float
,用于设置 gRPC 操作完成的最大持续时间(以秒为单位)。
此方法有一个可选的 credentials
参数,可以用来覆盖从连接字符串 URI 派生的调用凭据。
以下示例列出了所有现有的持久订阅。
subscriptions = client.list_subscriptions()
返回值是一个SubscriptionInfo
对象列表。
列出对流的订阅
需要领导者
list_subscriptions_to_stream()
方法可用于获取对流的全部持久订阅的信息。
此方法有一个必需的参数,stream_name
。
此方法有一个可选的 timeout
参数,它是一个 Python float
,用于设置 gRPC 操作完成的最大持续时间(以秒为单位)。
此方法有一个可选的 credentials
参数,可以用来覆盖从连接字符串 URI 派生的调用凭据。
subscriptions = client.list_subscriptions_to_stream(
stream_name=stream_name2,
)
返回值是一个SubscriptionInfo
对象列表。
删除订阅
需要领导者
delete_subscription()
方法可用于删除持久订阅。
此方法需要一个必需的group_name
参数和一个可选的stream_name
参数,这些参数应与调用create_subscription_to_all()
或create_subscription_to_stream()
时使用的参数值匹配。
此方法有一个可选的 timeout
参数,它是一个 Python float
,用于设置 gRPC 操作完成的最大持续时间(以秒为单位)。
此方法有一个可选的 credentials
参数,可以用来覆盖从连接字符串 URI 派生的调用凭据。
以下示例删除了通过调用create_subscription_to_all()
创建的持久订阅group_name1
。
client.delete_subscription(
group_name=group_name1,
)
以下示例删除了通过调用create_subscription_to_stream()
在stream_name2
上创建的持久订阅group_name2
。
client.delete_subscription(
group_name=group_name2,
stream_name=stream_name2,
)
投影
有关EventStoreDB中投影的更多信息,请参阅EventStoreDB文档。
创建投影
需要领导者
create_projection()
方法可用于创建“连续”投影。
此方法有两个必需参数,即name
和query
。
此必需的name
参数是一个Python str
,用于指定投影的名称。
这个必需的 query
参数是一个 Python str
,它定义了投影将执行的操作。
此方法还有四个可选参数,分别是 emit_enabled
、track_emitted_streams
、timeout
和 credentials
。
可选的 emit_enabled
参数是一个 Python bool
,它指定了投影是否能够发出事件。如果指定了 True
值,则投影能够发出事件;否则,投影不能发出事件。默认值 emit_enabled
为 False
。
请注意,如果您的投影查询中包含对 emit()
的调用,则 emit_enabled
必须为 True
,否则投影将无法运行。
可选的 track_emitted_streams
参数是一个 Python bool
,它指定了投影是否跟踪其发出的流。如果指定了 True
值,则投影将跟踪其发出的流;否则,投影不会跟踪其发出的流。默认值 track_emitted_streams
为 False
。
跟踪发出流的目的是,它们在删除投影时可以可选地被删除(有关详细信息,请参阅 delete_projection()
方法)。
请注意,如果将 track_emitted_streams
设置为 True
,则还必须将 emit_enabled
设置为 True
,否则此方法将引发错误。
可选的 timeout
参数是一个 Python float
,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。
可选的 credentials
参数可以用来覆盖从连接字符串 URI 派生的调用凭证。
以下示例中创建了一个投影,它处理附加到 stream_name2
的事件。投影的“状态”被初始化为具有“计数”,该计数在每次事件发生时增加一次。
projection_name = str(uuid.uuid4())
projection_query = """fromStream('%s')
.when({
$init: function(){
return {
count: 0
};
},
OrderCreated: function(s,e){
s.count += 1;
},
OrderUpdated: function(s,e){
s.count += 1;
},
OrderDeleted: function(s,e){
s.count += 1;
}
})
.outputState()
""" % stream_name2
client.create_projection(
name=projection_name,
query=projection_query,
)
请注意,outputState()
调用是可选的,它将投影的状态持久化到“结果”流中。如果调用 outputState()
,则立即将表示投影状态的的事件写入“结果”流。
对于名为 projection_name
的投影,“结果”流的默认名称为 $projections-{projection_name}-result
。可以使用该流名称使用 get_stream()
、read_stream()
、subscribe_to_stream()
、create_subscription_to_stream()
和 read_subscription_to_stream()
方法读取和订阅“结果”流。
如果您的投影没有调用 outputState()
,则您将无法读取或订阅“结果”流,但您仍然可以使用 get_projection_state()
方法获取投影“状态”。
记录在“结果”流中的事件“类型”字符串为 'Result'
。您可能希望在读取或订阅数据库中记录的“所有”事件(使用 read_all()
、subscribe_to_all()
等)时将其包含在 filter_exclude
参数中按类型过滤事件。
此外,无论何时,投影的状态都会定期记录在一个“状态”流中,并且投影还会写入一个“检查点”流。在删除投影时,“状态”流、“检查点”流以及所有已“跟踪”的“发出”流(由于 track_emitted_streams
参数被设置为 True
)可以可选地被删除。有关详细信息,请参阅 delete_projection()
。
与“结果”和“发出”流不同,“状态”流和“检查点”流不能由用户读取或订阅,或通过 EventStoreDB Web 界面的“流浏览器”视图查看。
获取投影状态
需要领导者
可以使用 get_projection_state()
方法获取投影的“状态”。
此方法有一个必需的 name
参数,它是一个 Python str
,指定了投影的名称。
此方法还有两个可选参数,分别是 timeout
和 credentials
。
可选的 timeout
参数是一个 Python float
,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。
可选的 credentials
参数可以用来覆盖从连接字符串 URI 派生的调用凭证。
以下示例中,在睡眠 1 秒以允许投影处理所有记录的事件之后,获取了投影“状态”。我们可以看到投影已处理了三个事件。
sleep(1) # allow time for projection to process recorded events
projection_state = client.get_projection_state(name=projection_name)
assert projection_state.value == {'count': 3}
获取投影统计
需要领导者
可以使用 get_projection_statistics()
方法来获取投影统计信息。
此方法有一个必需的 name
参数,它是一个 Python str
,指定了投影的名称。
此方法还有两个可选参数,分别是 timeout
和 credentials
。
可选的 timeout
参数是一个 Python float
,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。
可选的 credentials
参数可以用来覆盖从连接字符串 URI 派生的调用凭证。
此方法返回一个表示命名投影的 ProjectionStatistics
对象。
statistics = client.get_projection_statistics(name=projection_name)
返回一个 ProjectionStatistics
对象。此对象属性的值代表投影的进度。
更新投影
需要领导者
可以使用 update_projection()
方法来更新投影。
此方法有两个必需参数,即name
和query
。
必需的 name
参数是一个 Python str
,它指定要更新的投影的名称。
必需的 query
参数是一个 Python str
,它定义了投影将执行的操作。
此方法还有三个可选参数,emit_enabled
、timeout
和 credentials
。
可选的 emit_enabled
参数是一个 Python bool
,它指定投影是否能够发出事件。如果指定了 True
值,则投影将能够发出事件。如果指定了 False
值,则投影将不能发出事件。默认值 emit_enabled
为 False
。
请注意,如果您的投影查询中包含对 emit()
的调用,则 emit_enabled
必须为 True
,否则投影将无法运行。
请注意,无法通过 gRPC API 更新 track_emitted_streams
。
可选的 timeout
参数是一个 Python float
,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。
可选的 credentials
参数可以用来覆盖从连接字符串 URI 派生的调用凭证。
client.update_projection(name=projection_name, query=projection_query)
启用投影
需要领导者
可以使用 enable_projection()
方法启用(启动运行)先前已禁用(停止)的投影。
此方法有一个必需的 name
参数,它是一个指定要启用的投影名称的 Python str
。
此方法还有两个可选参数,分别是 timeout
和 credentials
。
可选的 timeout
参数是一个 Python float
,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。
可选的 credentials
参数可以用来覆盖从连接字符串 URI 派生的调用凭证。
client.enable_projection(name=projection_name)
禁用投影
需要领导者
可以使用 disable_projection()
方法禁用(停止运行)投影。
此方法有一个必需的 name
参数,它是一个指定要禁用的投影名称的 Python str
。
此方法还有两个可选参数,timeout
和 credentials
。
可选的 timeout
参数是一个 Python float
,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。
可选的 credentials
参数可以用来覆盖从连接字符串 URI 派生的调用凭证。
client.disable_projection(name=projection_name)
重置投影
需要领导者
可以使用 reset_projection()
方法重置投影。
此方法有一个必需的 name
参数,它是一个指定要重置的投影名称的 Python str
。
此方法还有两个可选参数,timeout
和 credentials
。
可选的 timeout
参数是一个 Python float
,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。
可选的 credentials
参数可以用来覆盖从连接字符串 URI 派生的调用凭证。
client.reset_projection(name=projection_name)
请注意,必须先禁用投影才能重置。
删除投影
需要领导者
可以使用 delete_projection()
方法删除投影。
此方法有一个必需的 name
参数,它是一个指定要删除的投影名称的 Python str
。
此方法还有五个可选参数,delete_emitted_streams
、delete_state_stream
、delete_checkpoint_stream
、timeout
和 credentials
。
可选的 delete_emitted_streams
参数是一个 Python bool
,它指定将删除所有已跟踪的“已发出”流。为了删除已发出的流,它们必须已被跟踪(参见 create_projection()
方法的 track_emitted_streams
参数)。
可选的 delete_state_stream
参数是一个 Python bool
,它指定应该删除投影的“状态”流。与“结果”流类似,“状态”流偶尔将事件写入其中,同时将事件写入“检查点”流,而不是像调用 outputState()
一样立即将事件写入“结果”流。
可选的 delete_checkpoint_stream
参数是一个 Python bool
,它指定应该删除投影的“检查点”流。
可选的 timeout
参数是一个 Python float
,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。
可选的 credentials
参数可以用来覆盖从连接字符串 URI 派生的调用凭证。
client.delete_projection(name=projection_name)
请注意,必须先禁用投影才能删除。
重启投影子系统
需要领导者
可以使用 restart_projections_subsystem()
方法重启投影子系统。
此方法还有两个可选参数,分别是 timeout
和 credentials
。
可选的 timeout
参数是一个 Python float
,它设置 gRPC 操作完成的最大持续时间(以秒为单位)。
可选的 credentials
参数可以用来覆盖从连接字符串 URI 派生的调用凭证。
client.restart_projections_subsystem()
调用凭据
默认调用凭据由客户端从连接字符串 URI 的用户信息部分推导而来。
上述描述的许多客户端方法都有一个可选的credentials
参数,可以用来为单个方法调用设置调用凭证,以覆盖从连接字符串URI派生的凭证。
调用凭证通过“基本认证”授权头发送到“安全”服务器。此授权头由服务器用于验证客户端。授权头不会发送到“不安全”服务器。
构建调用凭证
客户端方法construct_call_credentials()
可以用来从一个用户名和密码中构建调用凭证对象。
call_credentials = client.construct_call_credentials(
username='admin', password='changeit'
)
调用凭证对象可以用作其他客户端方法中credentials
参数的值。
连接
重连
reconnect()
方法可以用来手动将客户端重连到合适的EventStoreDB节点。此方法使用与读取集群节点状态相同的程序,然后根据在客户端构建时在连接字符串URI中指定的客户端节点偏好连接到合适的节点。此方法是线程安全的,即在多个线程同时调用时,只会发生一次重连。并发尝试重连将阻塞,直到客户端成功重连,然后它们都将正常返回。
client.reconnect()
手动重连可能需要的情况之一是,出于性能原因,客户端的节点偏好是连接到集群中的跟随者节点,并且在集群领导者选举之后,跟随者成为领导者。在这种情况下,重新连接到跟随者节点目前超出了此客户端的功能,但这种行为可能在未来的版本中得到实现。
由于@autoreconnect
装饰器,在许多情况下,重连将自动发生。
关闭
close()
方法可以用来干净地关闭客户端的gRPC连接。
client.close()
Asyncio客户端
esdbclient
包还提供了一个用于EventStoreDB的异步I/O gRPC Python客户端。它在功能上与多线程客户端等效。它使用grpc.aio
包和asyncio
模块,而不是grpc
和threading
。
它支持“esdb”和“esdb+discover”连接字符串URI方案,并且可以连接到“安全”和“不安全”的EventStoreDB服务器。
可以使用AsyncEventStoreDBClient
类来构建异步I/O gRPC Python客户端的实例。它可以从esdbclient
导入。构建客户端后,应该调用async方法connect()
。
异步客户端具有与多线程EventStoreDBClient
完全相同的方法。这些方法被定义为async def
方法,因此对这些方法的调用将返回Python“awaitables”,必须等待才能获得方法的返回值。方法具有相同的行为、相同的参数和相同的或等效的返回值。这些方法还用重连和重试装饰器进行了装饰,在遇到连接问题或服务器错误时,会选择性地重连和重试。
当等待时,方法read_all()
和read_stream()
返回一个AsyncReadResponse
对象。方法subscribe_to_all()
和subscribe_to_stream()
返回一个AsyncCatchupSubscription
对象。方法read_subscription_to_all()
和read_subscription_to_stream()
返回一个AsyncPersistentSubscription
对象。这些对象是asyncio迭代器,您可以使用Python的async for
语法遍历它们以获取RecordedEvent
对象。它们也是asyncio上下文管理器,支持async with
语法。它们还有一个stop()
方法,可以用来以积极取消流式gRPC调用到服务器的方式终止迭代器。当用作上下文管理器时,将在上下文管理器退出时调用stop()
方法。
方法 read_subscription_to_all()
和 read_subscription_to_stream()
返回 AsyncPersistentSubscription
类的实例,该类具有异步方法 ack()
和 nack()
,这些方法与 PersistentSubscription
上的方法工作方式相同,支持对从持久订阅接收到的记录事件的确认和否定确认。详情请参见上方。
概要
以下示例演示了异步 append_to_stream()
、get_stream()
和 subscribe_to_all()
方法。这些是编写事件源应用程序最有用的方法,允许记录新的聚合事件,获取聚合的记录事件以便重构聚合,以及以“恰好一次”语义传播和处理应用程序的状态。
import asyncio
from esdbclient import AsyncEventStoreDBClient
async def demonstrate_async_client():
# Construct client.
client = AsyncEventStoreDBClient(
uri=os.getenv("ESDB_URI"),
root_certificates=os.getenv("ESDB_ROOT_CERTIFICATES"),
)
# Connect to EventStoreDB.
await client.connect()
# Append events.
stream_name = str(uuid.uuid4())
event1 = NewEvent("OrderCreated", data=b'{}')
event2 = NewEvent("OrderUpdated", data=b'{}')
event3 = NewEvent("OrderDeleted", data=b'{}')
commit_position = await client.append_to_stream(
stream_name=stream_name,
current_version=StreamState.NO_STREAM,
events=[event1, event2, event3]
)
# Get stream events.
recorded = await client.get_stream(stream_name)
assert len(recorded) == 3
assert recorded[0] == event1
assert recorded[1] == event2
assert recorded[2] == event3
# Subscribe to all events.
received = []
async with await client.subscribe_to_all(commit_position=0) as subscription:
async for event in subscription:
received.append(event)
if event.commit_position == commit_position:
break
assert received[-3] == event1
assert received[-2] == event2
assert received[-1] == event3
# Close the client.
await client.close()
# Run the demo.
asyncio.run(
demonstrate_async_client()
)
FastAPI 示例
以下示例展示了如何使用 AsyncEventStoreDBClient
与 FastAPI。
from contextlib import asynccontextmanager
from fastapi import FastAPI
from esdbclient import AsyncEventStoreDBClient
client: AsyncEventStoreDBClient
@asynccontextmanager
async def lifespan(_: FastAPI):
# Construct the client.
global client
client = AsyncEventStoreDBClient(
uri="esdb+discover://localhost:2113?Tls=false",
)
await client.connect()
yield
# Close the client.
await client.close()
app = FastAPI(lifespan=lifespan)
@app.get("/commit_position")
async def commit_position():
commit_position = await client.get_commit_position()
return {"commit_position": commit_position}
如果将此代码放入名为 fastapi_example.py
的文件中,然后运行命令 uvicorn fastapi_example:app --host 0.0.0.0 --port 80
,那么当浏览器指向 http://localhost/commit_position
时,FastAPI 应用程序将返回类似 {"commit_position":628917}
的内容。使用 Ctrl-c 退出进程。
说明
正则表达式过滤器
read_all()
、subscribe_to_all()
、create_subscription_to_all()
和 get_commit_position()
方法有 filter_exclude
和 filter_include
参数。本节提供了一些关于这些参数值的更多详细信息。
首先要注意的是,这些参数的值应该是正则表达式的序列。
请注意,客户端将它们作为括号中的备选方案连接在一起,形成一个更大的正则表达式,该正则表达式锚定在要匹配的字符串的开始和结束处。因此,不需要包含 '^'
和 '$'
锚定断言。
如果您想匹配子字符串,请使用通配符,例如 '.*Snapshot'
以匹配所有以 'Snapshot'
结尾的字符串,或 'Order.*'
以匹配所有以 'Order'
开头的字符串。
EventStoreDB 生成的系统事件的 type
字符串以 $
符号开头。在操作持久订阅时生成的持久订阅事件具有以 PersistentConfig
开头的 type
字符串。
例如,要匹配 EventStoreDB 系统事件的类型,请使用正则表达式字符串 r'\$.+'
。请注意,常量 ESDB_SYSTEM_EVENTS_REGEX
设置为此值。您可以从 esdbclient
导入此常量,并在构建更长的正则表达式序列时使用它。
类似地,要匹配 EventStoreDB 持久订阅事件的类型,请使用正则表达式 r'PersistentConfig\d+'
。常量 ESDB_PERSISTENT_CONFIG_EVENTS_REGEX
设置为此值。您可以从 esdbclient
导入此常量,并在构建更长的正则表达式序列时使用它。
常量 DEFAULT_EXCLUDE_FILTER
是一个正则表达式序列,其中包含 ESDB_SYSTEM_EVENTS_REGEX
和 ESDB_PERSISTENT_CONFIG_EVENTS_REGEX
。它用作 filter_exclude
的默认值,以便默认排除 EventStoreDB 内部生成的事件。
在所有具有 filter_exclude
参数的方法中,参数的默认值是常量 DEFAULT_EXCLUDE_FILTER
,该常量设计用于匹配(因此排除)"系统"和"持久订阅配置"事件类型,否则将包含在内。
此值可以扩展。例如,如果您想排除系统事件、持久订阅事件以及类型以 'Snapshot'
结尾的事件,则可以使用 DEFAULT_EXCLUDE_FILTER + ['.*Snapshot']
作为 filter_exclude
参数。
filter_include
和 filter_exclude
参数设计成相互对立的效果,因此提供给 filter_include
的字符串序列将返回那些如果使用相同的参数值与 filter_exclude
配合将不被包括的事件。反之亦然,因此提供给 filter_exclude
的字符串序列将返回那些如果使用相同的参数值与 filter_include
配合将不会被包括的事件。
重连和重试方法装饰器
请注意,几乎所有客户端方法都装饰了 @autoreconnect
和 @retrygrpc
装饰器。
@autoreconnect
装饰器会在连接的服务不可用或客户端的 gRPC 通道意外关闭时,连接到集群中的合适节点。当调用需要领导者的方法,并且客户端的节点首选连接到领导者,但客户端连接到的节点不再是领导者时,客户端也会重连。在这种情况下,客户端将重连到当前领导者。重连后,将重试失败的操作。
@retrygrpc
装饰器会选择性重试由于超时、网络错误或服务器错误而失败的 gRPC 操作。它不会重试由于不良请求而肯定失败的操作。
请注意,未由重连和重试装饰器行为覆盖的方面与返回迭代器的相关方法有关。例如,考虑从 read_all()
方法返回的“读取响应”迭代器。在遍历“读取响应”之前,read_all()
方法将已返回,因此方法装饰器已退出。因此,如果在遍历“读取响应”时发生连接问题,则无法触发 read_all()
方法上的任何装饰器的重连。
对于“追捕订阅”对象,服务器会发送一个初始“确认”响应,由客户端接收并检查。因此,当调用 subscribe_to_all()
或 subscribe_to_stream()
时,如果服务器不可用,或者通道以某种方式已关闭,或者请求因其他原因失败,则客户端将重连并重试。然而,如果在成功返回的“追捕订阅”对象迭代期间抛出异常,则需要重新启动追捕订阅。类似地,当读取持久性订阅时,如果在成功接收到的响应遍历期间出现连接问题,则消费者需要重新启动。
仪表化
仪表化是指修改软件以便对其进行分析的行为。仪表化帮助企业揭示用户在软件或平台中频繁遇到错误或减速的区域或功能。
仪表化帮助您了解软件系统的内部状态。仪表化应用程序通过收集指标、事件、日志和跟踪等数据来测量在响应活动请求时正在进行的代码。
仪表化提供了对应用程序的即时可见性,通常使用图表和图形来展示“内部发生的事情”。
此包支持使用 OpenTelemetry 仪表化 EventStoreDB 客户端。
OpenTelemetry
OpenTelemetry 项目提供了一套 API、SDK 和工具,用于仪表化、生成、收集和导出遥测数据,可以帮助您分析软件的性能和行为。它是供应商中立的,100% 免费和开源,并得到了可观测性领域行业领袖的采用和支持。
本包为EventStoreDBClient
和AsyncEventStoreDBClient
客户端提供了OpenTelemetry仪表化工具。这些仪表化工具依赖于各种OpenTelemetry Python包,您需要安装它们,最好使用本项目的"opentelemetry"包扩展来确保版本兼容性。
例如,您可以使用pip安装"opentelemetry"包扩展。
$ pip install esdbclient[opentelemetry]
或者您可以使用Poetry将其添加到您的pyproject.toml文件中并安装它。
$ poetry add esdbclient[opentelemetry]
然后您可以使用OpenTelemetry仪表化工具EventStoreDBClientInstrumentor
对EventStoreDBClient
进行仪表化。
from esdbclient.instrumentation.opentelemetry import EventStoreDBClientInstrumentor
# Activate instrumentation.
EventStoreDBClientInstrumentor().instrument()
# Deactivate instrumentation.
EventStoreDBClientInstrumentor().uninstrument()
您还可以使用OpenTelemetry仪表化工具AsyncEventStoreDBClientInstrumentor
对AsyncEventStoreDBClient
进行仪表化。
from esdbclient.instrumentation.opentelemetry import AsyncEventStoreDBClientInstrumentor
# Activate instrumentation.
AsyncEventStoreDBClientInstrumentor().instrument()
# Deactivate instrumentation.
AsyncEventStoreDBClientInstrumentor().uninstrument()
仪表化工具使用全局的OpenTelemetry "tracer provider",您需要初始化它以导出遥测数据。
例如,要将数据导出到控制台,您需要安装Python包opentelemetry-sdk
,并使用以下方式使用类TracerProvider
、BatchSpanProcessor
和ConsoleSpanExporter
。
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.trace import set_tracer_provider
resource = Resource.create(
attributes={
SERVICE_NAME: "eventstoredb",
}
)
provider = TracerProvider(resource=resource)
provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))
set_tracer_provider(provider)
或者要导出到OpenTelemetry兼容的数据收集器,例如Jaeger,您需要安装Python包opentelemetry-exporter-otlp-proto-http
,然后使用来自opentelemetry.exporter.otlp.proto.http.trace_exporter
模块的OTLPSpanExporter
类,并使用适当的endpoint
参数为您的收集器。
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import set_tracer_provider
resource = Resource.create(
attributes={
SERVICE_NAME: "eventstoredb",
}
)
provider = TracerProvider(resource=resource)
provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4318/v1/traces")))
set_tracer_provider(provider)
您可以通过运行以下命令在本地启动Jaeger。
$ docker run -d -p 4318:4318 -p 16686:16686 --name jaeger jaegertracing/all-in-one:latest
然后您可以导航到http://localhost:16686
来访问Jaeger UI。并且遥测数据可以通过OpenTelemetry tracer provider发送到http://localhost:4318/v1/traces
。
此时,仪表化的方法是append_to_stream()
、subscribe_to_stream()
、subscribe_to_all()
、read_subscription_to_stream()
和read_subscription_to_all()
。
append_to_stream()
方法通过使用"producer" span kind来封装方法调用进行仪表化。它还将span上下文信息添加到新事件元数据中,以便消费者可以关联"producer" span和"consumer" span。
订阅方法通过仪表化响应迭代器进行仪表化,为每个接收到的记录事件创建一个"consumer" span。它从记录的事件元数据中提取span上下文信息,并通过将"consumer" span作为"producer" span的子span关联到"producer" span来将"consumer" span与"producer" span关联起来。
社区
贡献者
安装Poetry
首先要检查您是否已安装Poetry。
$ poetry --version
如果没有,请安装Poetry。
$ curl -sSL https://install.python-poetry.org | python3 -
确保Poetry的bin目录在您的PATH
环境变量中。
但无论如何,请确保您知道poetry
可执行文件的路径。Poetry安装程序会告诉您其安装位置以及如何配置您的shell。
请参阅Poetry文档以获取有关使用Poetry的指导。
为PyCharm用户设置
您可以使用PyCharm轻松获取项目文件(菜单“Git > Clone...”)。然后PyCharm通常会提示您打开项目。
在新窗口中打开项目。PyCharm通常会提示您创建新的虚拟环境。
为项目创建一个新的Poetry虚拟环境。如果PyCharm不知道您的poetry
可执行文件的位置,请将路径设置在“新建Poetry环境”表单中的“Poetry可执行文件”标签的输入字段中。在“新建Poetry环境”表单中,您还将有机会选择虚拟环境将使用的Python可执行文件。
PyCharm将为您的项目创建一个新的Poetry虚拟环境,使用特定的Python版本,并按照项目的poetry.lock
文件安装项目依赖包。
您可以添加不同版本的Python的Poetry环境,并使用PyCharm的“Python解释器”设置在它们之间切换。如果您想使用未安装的Python版本,可以使用您喜欢的包管理器,或者直接从Python网站下载Python的最新版本安装程序。
项目依赖包安装完成后,您应该能够在PyCharm中运行测试(在tests
文件夹上右键单击,选择“运行”选项)。
由于pytest与PyCharm的调试器和覆盖率工具之间存在冲突,您可能需要在测试运行器模板中添加--no-cov
选项。或者,只需使用Python标准库的unittest
模块。
您还应该在PyCharm中打开一个终端窗口,并从命令行运行项目的Makefile命令(见下文)。
命令行设置
使用Git或合适的替代工具获取项目文件。
在终端应用程序中,将当前工作目录更改为项目文件的根目录。此文件夹中应有Makefile。
使用Makefile创建项目的新Poetry虚拟环境,并使用以下命令将项目依赖包安装到其中。
$ make install-packages
还可以将项目以“可编辑模式”安装。
$ make install
请注意,以这种方式创建虚拟环境后,如果尝试在PyCharm中打开项目并将其配置为使用此虚拟环境作为“现有的Poetry环境”,PyCharm有时会出现一些问题(不清楚原因),可能存在问题。如果遇到此类问题,可以通过删除虚拟环境并使用PyCharm创建Poetry虚拟环境来解决问题(见上文)。
项目Makefile命令
可以使用以下命令启动EventStoreDB。
$ make start-eventstoredb
可以使用以下命令运行测试(需要EventStoreDB正在运行)。
$ make test
可以使用以下命令停止EventStoreDB。
$ make stop-eventstoredb
可以使用以下命令检查代码格式。
$ make lint
可以使用以下命令重新格式化代码。
$ make fmt
测试应位于./tests
中。待测试的代码应在./esdbclient
中。
在pyproject.toml
中编辑包依赖项。使用以下命令更新已安装的包(以及poetry.lock
文件)。
$ make update-packages
项目详情
下载文件
下载适用于您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。
源分发
构建分发
esdbclient-1.1.1.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 213fab246ae185752797bb34e1444770bc408e618aa58d6a14772d8d8f88f09f |
|
MD5 | 6f1cfb2eea3df2f9cd54865be76c4528 |
|
BLAKE2b-256 | e83827512437b7bebe830d4569e30f75fff25b98f52967ca3ea238ce30837a69 |
esdbclient-1.1.1-py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | db2d5abc008dff55f54820bab2f9dfd07228fb587deb5cbac7bc5bae24aaed2f |
|
MD5 | a3d743948d07ecd80beace9ec179df22 |
|
BLAKE2b-256 | d21a5d16608adef5240a8ac43dcfc794ab44fda9b8048e33c2f1bf916a9249d8 |