EventStore的快速、易用客户端
项目描述
Photon-pump是一个快速、用户友好的Eventstore客户端。
它强调模块化设计,背后是一个为人类编写的接口。
安装
Photon pump可在cheese shop上获得。
pip install photon-pump
您需要安装lib-protobuf 3.2.0或更高版本。
文档可在Read the docs上找到。
基本用法
与连接一起工作
通常您将通过~photonpump.Client类与光泵交互。该~photonpump.Client是一个全双工客户端,可以并行处理多个请求和响应。建议每个应用程序创建一个连接。
首先您需要创建一个连接
>>> from photonpump import connect >>> >>> async with connect() as c: >>> await c.ping()
photonpump.connect函数返回一个异步上下文管理器,以便在您完成时自动关闭连接。或者,您可以创建一个客户端并自行管理其生命周期。
>>> from photonpump import connect >>> >>> client = connect() >>> await client.connect() >>> await client.ping() >>> await client.close()
读取和写入单个事件
连接可用于读取和写入事件。您可以使用~photonpump.Client.publish_event方法发布单个事件
>>> # When publishing events, you must provide the stream name. >>> stream = 'ponies' >>> event_type = 'PonyJumped' >>> >>> result = await conn.publish_event(stream, event_type, body={ >>> 'Pony': 'Derpy Hooves', >>> 'Height': 10, >>> 'Distance': 13 >>> })
如果我们知道事件的编号和发布事件的流,可以使用补充的 ~photonpump.Client.get_event 方法获取单个事件。
>>> event_number = result.last_event_number >>> event = await conn.get_event(stream, event_number)
假设您的事件以json格式发布,可以使用 ~photonpump.messages.Event.json 方法加载正文。
async def write_an_event():
async with photonpump.connect() as conn:
await conn.publish_event('pony_stream', 'pony.jumped', body={
'name': 'Applejack',
'height_m': 0.6
})
async def read_an_event(conn):
event = await conn.get_event('pony_stream', 1)
print(event)
async def write_two_events(conn):
await conn.publish('pony_stream', [
NewEvent('pony.jumped', body={
'name': 'Rainbow Colossus',
'height_m': 0.6
},
NewEvent('pony.jumped', body={
'name': 'Sunshine Carnivore',
'height_m': 1.12
})
])
async def read_two_events(conn):
events = await conn.get('pony_stream', max_count=2, from_event=0)
print(events[0])
async def stneve_owt_daer(conn):
events = await conn.get('pony_stream', direction=StreamDirection.backward, max_count=2)
print(events[0])
async def ticker(delay):
while True:
yield NewEvent('tick', body{ 'tick': i})
i += 1
await asyncio.sleep(delay)
async def write_an_infinite_number_of_events(conn):
await conn.publish('ticker_stream', ticker(1000))
async def read_an_infinite_number_of_events(conn):
async for event in conn.iter('ticker_stream'):
print(event)
>>> data = event.json()
>>> assert data['Pony'] == 'Derpy Hooves'
批量读写
我们可以使用 ~photonpump.Client.get 和 ~photonpump.Client.publish 方法来读取和写入多个事件,这些方法是我们的 ~photonpump.Client 的。 photonpump.message.NewEvent 函数用于构建事件的一个辅助函数。
>>> stream = 'more_ponies' >>> events = [ >>> NewEvent('PonyJumped', >>> data={ >>> 'Pony': 'Peculiar Hooves', >>> 'Height': 9, >>> 'Distance': 13 >>> }), >>> NewEvent('PonyJumped', >>> data={ >>> 'Pony': 'Sparkly Hooves', >>> 'Height': 12, >>> 'Distance': 12 >>> }), >>> NewEvent('PonyJumped', >>> data={ >>> 'Pony': 'Sparkly Hooves', >>> 'Height': 11, >>> 'Distance': 14 >>> })] >>> >>> await conn.publish(stream, events)
通过设置 from_event_number 和 max_count 参数,我们可以按片段从流中获取事件。我们可以从流的开始或结束处读取事件。
>>> import StreamDirection from photonpump.messages >>> >>> all_events = await conn.get(stream) >>> assert len(all_events) == 3 >>> >>> first_event = await conn.get(stream, max_count=1)[0].json() >>> assert first_event['Pony'] == 'Peculiar Hooves' >>> >>> second_event = await conn.get(stream, max_count=1, from_event_number=1)[0].json() >>> assert second_event['Pony'] == 'Sparkly Hooves' >>> >>> reversed_events = await conn.get(stream, direction=StreamDirection.backward) >>> assert len(reversed_events) == 3 >>> assert reversed_events[2] == first_event
使用异步生成器读取
我们可以通过使用 ~photonpump.Client.get 的 from_event_number 参数手动翻阅流,但使用返回异步生成器的 ~photonpump.Client.iter 方法更简单。默认情况下,iter 将从流的开始读取到结束,然后停止。与 get 一样,您可以通过设置 ~photon.messages.StreamDirection 或使用 from_event 来控制结果。
>>> async for event in conn.iter(stream): >>> print (event)
这扩展到了异步推导式。
>>> async def feet_to_metres(jumps): >>> async for jump in jumps: >>> data = jump.json() >>> data['Height'] = data * 0.3048 >>> data['Distance'] = data * 0.3048 >>> yield data >>> >>> jumps = (event async for event in conn.iter('ponies') >>> if event.type == 'PonyJumped') >>> async for jump in feet_to_metres(jumps): >>> print (event)
持久订阅
有时我们希望持续监视一个流并在发生新事件时接收通知。Eventstore支持这种用例的易失性和持久订阅。
持久订阅将它的状态存储在服务器上。当您的应用程序重新启动时,您可以再次连接到订阅并继续上次离开的地方。多个客户端可以连接到同一个持久订阅来支持竞争消费者场景。为了支持这些功能,持久订阅必须在Eventstore集群的主节点上运行。
首先,我们需要 创建订阅 <photonpump.connection.Client.create_subscription>。
>>> async def create_subscription(subscription_name, stream_name, conn): >>> await conn.create_subscription(subscription_name, stream_name)
一旦我们有了订阅,我们就可以 连接到它 <photonpump.connection.Client.connect_subscription> 开始接收事件。持久订阅公开一个 events 属性,它像一个异步迭代器。
>>> async def read_events_from_subscription(subscription_name, stream_name, conn): >>> subscription = await conn.connect_subscription(subscription_name, stream_name) >>> async for event in subscription.events: >>> print(event) >>> await subscription.ack(event)
Eventstore会一次将每个事件发送给一个消费者。当您处理完事件后,您必须确认收到。Eventstore会重发未确认的消息。
易失性订阅
在易失性订阅中,状态由客户端存储。当您的应用程序重新启动时,您必须重新订阅到流。Eventstore不支持对易失性订阅的竞争消费者。易失性订阅可以在集群的任何节点上运行。
易失性订阅不支持事件确认。
>>> async def subscribe_to_stream(stream, conn): >>> subscription = await conn.subscribe_to(stream) >>> async for event in subscription.events: >>> print(event)
高可用性场景
Eventstore支持高可用性集群部署拓扑。在这种情况下,Eventstore运行一个主节点和多个从节点。一些操作,尤其是持久订阅和投影,只由主节点处理。为了连接到高可用性集群并自动找到主节点,photonpump支持集群发现。
集群发现会查询eventstore的gossip来找到活动的主节点。您可以提供集群中匹配机的IP地址,或解析到集群成员的DNS名称,photonpump将发现其他成员。
>>> async def connect_to_cluster(hostname_or_ip, port=2113): >>> with connect(discovery_host=hostname_or_ip, discovery_port=2113) as c: >>> await c.ping()
如果您提供了 host 和 discovery_host,photonpump将优先发现。
调试
如果您想逐步调试使用photonpump的代码,了解Event Store的TCP API(photonpump使用的)使用“心跳”来确保连接不会留下是很有帮助的。这意味着如果您坐在调试器(例如pdb)提示符上——因此每次运行事件循环数十秒——您会发现您被断开连接。为了防止这种情况,您可以将它运行为Event Store的心跳超时设置为高值——例如,使用一个 Dockerfile 如这个。
开发
我们使用 make
来管理常见的开发任务。请检查 Makefile 以获取所有可用选项。最重要的命令包括
make init
安装 requirement.txt(你需要一个虚拟环境)
make eventstore_docker
在 docker 中启动 eventstore
make all_tests
运行你虚拟环境中的所有测试(需要运行 eventstore 实例,localhost:1113)
make tox
针对所有支持的 Python 版本运行测试
## [0.7.2] - 2019-01-29 修复:当连接断开时,迭代器将重启到最后处理的事件编号。重构:MessageReader 在标题中返回 TcpCommand 而不是 int。任务:删除了未使用的依赖。
## [0.7.1] - 2019-01-29 修复:当重新创建连接时,易失性订阅无法重启。
## [0.7.0] - 2019-01-29 修复:易失性订阅无法为投影提供所有事件。这是由于链接事件和原始事件之间的混淆造成的。
- ### 破坏性变更
Event.original_event 现在是 Event.received_event,因为原始名称不够明确。
Event.event_number 现在等于 received_event.event_number 的值,而不是链接事件的值。
## [0.6.0.1] - 2019-01-03 添加了使用 Travis 和 Versioneer 的自动部署到 pypi。
## [0.6.0] - 2018-12-21 在 subscribe_to 方法中添加了批量大小参数。
## [0.6.0-alpha-5] - 2018-11-09 修复:CreatePersistentSubscription 命令在成功后从未清理。
## [0.6.0-alpha-4] - 2018-10-05 修复:我们现在可以正确处理已删除的消息。
## [0.6.0-alpha-2] - 2018-09-17 发现现在支持“选择器”来控制我们从八卦中选择节点的方式。
## [0.6.0-alpha-1] - 2018-09-14 添加了对追捕订阅的支持。
## [0.5] - 2018-04-27 ### 破坏性变更 - 删除了 ConnectionContextManager 类。- “Connection” 类现在是 “Client”,并作为其自身的上下文管理器。- 完全重写了连接模块。- PersistentSubscriptions 在创建流迭代器时不再使用 maxsize 参数。这是针对 https://github.com/madedotcom/photon-pump/issues/49 的解决方案。
## [0.4] - 2018-04-27 ### 修复 - 添加了集群发现以支持高可用性场景。
## [0.3] - 2018-04-11 ### 修复 - iter 正确支持反向迭代流。- 破坏性变更 - published_event 交换了类型和流的顺序。
版本历史:https://github.com/madedotcom/photon-pump/compare/v0.7.1..v0.7.2 [0.7.1]: https://github.com/madedotcom/photon-pump/compare/v0.7.0..v0.7.1 [0.7.0]: https://github.com/madedotcom/photon-pump/compare/v0.6.0.1..v0.7.0 [0.6.0.1]: https://github.com/madedotcom/photon-pump/compare/v0.6.0..v0.6.0.1 [0.6.0]: https://github.com/madedotcom/photon-pump/compare/v0.6.0-alpha-5..v0.6.0 [0.6.0-alpha-5]: https://github.com/madedotcom/photon-pump/compare/v0.6.0-alpha-4..v0.6.0-alpha-5 [0.6.0-alpha-4]: https://github.com/madedotcom/photon-pump/compare/v0.6.0-alpha-2..v0.6.0-alpha-4 [0.6.0-alpha-2]: https://github.com/madedotcom/photon-pump/compare/v0.6.0-alpha-1..v0.6.0-alpha-2 [0.6.0-alpha-1]: https://github.com/madedotcom/photon-pump/compare/v0.5.0..v0.6.0-alpha-1 [0.5]: https://github.com/madedotcom/photon-pump/compare/v0.4.0..v0.5.0 [0.4]: https://github.com/madedotcom/photon-pump/compare/v0.3.0..v0.4.0 [0.3]: https://github.com/madedotcom/photon-pump/compare/v0.2.5..v0.3 [0.2.5]: https://github.com/madedotcom/photon-pump/compare/v0.2.4..v0.2.5