跳转到主要内容

EventStore的快速、易用客户端

项目描述

Photon-pump是一个快速、用户友好的Eventstore客户端。

它强调模块化设计,背后是一个为人类编写的接口。

安装

Photon pump可在cheese shop上获得。

pip install photon-pump

您需要安装lib-protobuf 3.2.0或更高版本。

文档可在Read the docs上找到。

基本用法

与连接一起工作

通常您将通过~photonpump.Client类与光泵交互。该~photonpump.Client是一个全双工客户端,可以并行处理多个请求和响应。建议每个应用程序创建一个连接。

首先您需要创建一个连接

>>> from photonpump import connect
>>>
>>> async with connect() as c:
>>>     await c.ping()

photonpump.connect函数返回一个异步上下文管理器,以便在您完成时自动关闭连接。或者,您可以创建一个客户端并自行管理其生命周期。

>>> from photonpump import connect
>>>
>>> client = connect()
>>> await client.connect()
>>> await client.ping()
>>> await client.close()

读取和写入单个事件

连接可用于读取和写入事件。您可以使用~photonpump.Client.publish_event方法发布单个事件

>>> # When publishing events, you must provide the stream name.
>>> stream = 'ponies'
>>> event_type = 'PonyJumped'
>>>
>>> result = await conn.publish_event(stream, event_type, body={
>>>     'Pony': 'Derpy Hooves',
>>>     'Height': 10,
>>>     'Distance': 13
>>>     })

如果我们知道事件的编号和发布事件的流,可以使用补充的 ~photonpump.Client.get_event 方法获取单个事件。

>>> event_number = result.last_event_number
>>> event = await conn.get_event(stream, event_number)

假设您的事件以json格式发布,可以使用 ~photonpump.messages.Event.json 方法加载正文。

async def write_an_event():
    async with photonpump.connect() as conn:
        await conn.publish_event('pony_stream', 'pony.jumped', body={
            'name': 'Applejack',
            'height_m': 0.6
        })


async def read_an_event(conn):
    event = await conn.get_event('pony_stream', 1)
    print(event)


async def write_two_events(conn):
    await conn.publish('pony_stream', [
        NewEvent('pony.jumped', body={
            'name': 'Rainbow Colossus',
            'height_m': 0.6
        },
        NewEvent('pony.jumped', body={
            'name': 'Sunshine Carnivore',
            'height_m': 1.12
        })
    ])


async def read_two_events(conn):
    events = await conn.get('pony_stream', max_count=2, from_event=0)
    print(events[0])


async def stneve_owt_daer(conn):
    events = await conn.get('pony_stream', direction=StreamDirection.backward, max_count=2)
    print(events[0])


async def ticker(delay):
    while True:
        yield NewEvent('tick', body{ 'tick': i})
        i += 1
        await asyncio.sleep(delay)


async def write_an_infinite_number_of_events(conn):
    await conn.publish('ticker_stream', ticker(1000))


async def read_an_infinite_number_of_events(conn):
    async for event in conn.iter('ticker_stream'):
        print(event)


>>> data = event.json()
>>> assert data['Pony'] == 'Derpy Hooves'

批量读写

我们可以使用 ~photonpump.Client.get~photonpump.Client.publish 方法来读取和写入多个事件,这些方法是我们的 ~photonpump.Client 的。 photonpump.message.NewEvent 函数用于构建事件的一个辅助函数。

>>> stream = 'more_ponies'
>>> events = [
>>>     NewEvent('PonyJumped',
>>>              data={
>>>                 'Pony': 'Peculiar Hooves',
>>>                 'Height': 9,
>>>                 'Distance': 13
>>>              }),
>>>     NewEvent('PonyJumped',
>>>              data={
>>>                 'Pony': 'Sparkly Hooves',
>>>                 'Height': 12,
>>>                 'Distance': 12
>>>              }),
>>>     NewEvent('PonyJumped',
>>>              data={
>>>                 'Pony': 'Sparkly Hooves',
>>>                 'Height': 11,
>>>                 'Distance': 14
>>>              })]
>>>
>>> await conn.publish(stream, events)

通过设置 from_event_numbermax_count 参数,我们可以按片段从流中获取事件。我们可以从流的开始或结束处读取事件。

>>> import StreamDirection from photonpump.messages
>>>
>>> all_events = await conn.get(stream)
>>> assert len(all_events) == 3
>>>
>>> first_event = await conn.get(stream, max_count=1)[0].json()
>>> assert first_event['Pony'] == 'Peculiar Hooves'
>>>
>>> second_event = await conn.get(stream, max_count=1, from_event_number=1)[0].json()
>>> assert second_event['Pony'] == 'Sparkly Hooves'
>>>
>>> reversed_events = await conn.get(stream, direction=StreamDirection.backward)
>>> assert len(reversed_events) == 3
>>> assert reversed_events[2] == first_event

使用异步生成器读取

我们可以通过使用 ~photonpump.Client.getfrom_event_number 参数手动翻阅流,但使用返回异步生成器的 ~photonpump.Client.iter 方法更简单。默认情况下,iter 将从流的开始读取到结束,然后停止。与 get 一样,您可以通过设置 ~photon.messages.StreamDirection 或使用 from_event 来控制结果。

>>> async for event in conn.iter(stream):
>>>     print (event)

这扩展到了异步推导式。

>>> async def feet_to_metres(jumps):
>>>    async for jump in jumps:
>>>         data = jump.json()
>>>         data['Height'] = data * 0.3048
>>>         data['Distance'] = data * 0.3048
>>>         yield data
>>>
>>> jumps = (event async for event in conn.iter('ponies')
>>>             if event.type == 'PonyJumped')
>>> async for jump in feet_to_metres(jumps):
>>>     print (event)

持久订阅

有时我们希望持续监视一个流并在发生新事件时接收通知。Eventstore支持这种用例的易失性和持久订阅。

持久订阅将它的状态存储在服务器上。当您的应用程序重新启动时,您可以再次连接到订阅并继续上次离开的地方。多个客户端可以连接到同一个持久订阅来支持竞争消费者场景。为了支持这些功能,持久订阅必须在Eventstore集群的主节点上运行。

首先,我们需要 创建订阅 <photonpump.connection.Client.create_subscription>

>>> async def create_subscription(subscription_name, stream_name, conn):
>>>     await conn.create_subscription(subscription_name, stream_name)

一旦我们有了订阅,我们就可以 连接到它 <photonpump.connection.Client.connect_subscription> 开始接收事件。持久订阅公开一个 events 属性,它像一个异步迭代器。

>>> async def read_events_from_subscription(subscription_name, stream_name, conn):
>>>     subscription = await conn.connect_subscription(subscription_name, stream_name)
>>>     async for event in subscription.events:
>>>         print(event)
>>>         await subscription.ack(event)

Eventstore会一次将每个事件发送给一个消费者。当您处理完事件后,您必须确认收到。Eventstore会重发未确认的消息。

易失性订阅

在易失性订阅中,状态由客户端存储。当您的应用程序重新启动时,您必须重新订阅到流。Eventstore不支持对易失性订阅的竞争消费者。易失性订阅可以在集群的任何节点上运行。

易失性订阅不支持事件确认。

>>> async def subscribe_to_stream(stream, conn):
>>>     subscription = await conn.subscribe_to(stream)
>>>     async for event in subscription.events:
>>>         print(event)

高可用性场景

Eventstore支持高可用性集群部署拓扑。在这种情况下,Eventstore运行一个主节点和多个从节点。一些操作,尤其是持久订阅和投影,只由主节点处理。为了连接到高可用性集群并自动找到主节点,photonpump支持集群发现。

集群发现会查询eventstore的gossip来找到活动的主节点。您可以提供集群中匹配机的IP地址,或解析到集群成员的DNS名称,photonpump将发现其他成员。

>>> async def connect_to_cluster(hostname_or_ip, port=2113):
>>>     with connect(discovery_host=hostname_or_ip, discovery_port=2113) as c:
>>>         await c.ping()

如果您提供了 hostdiscovery_host,photonpump将优先发现。

调试

如果您想逐步调试使用photonpump的代码,了解Event Store的TCP API(photonpump使用的)使用“心跳”来确保连接不会留下是很有帮助的。这意味着如果您坐在调试器(例如pdb)提示符上——因此每次运行事件循环数十秒——您会发现您被断开连接。为了防止这种情况,您可以将它运行为Event Store的心跳超时设置为高值——例如,使用一个 Dockerfile 如这个

开发

我们使用 make 来管理常见的开发任务。请检查 Makefile 以获取所有可用选项。最重要的命令包括

make init

安装 requirement.txt(你需要一个虚拟环境)

make eventstore_docker

在 docker 中启动 eventstore

make all_tests

运行你虚拟环境中的所有测试(需要运行 eventstore 实例,localhost:1113)

make tox

针对所有支持的 Python 版本运行测试

## [0.7.2] - 2019-01-29 修复:当连接断开时,迭代器将重启到最后处理的事件编号。重构:MessageReader 在标题中返回 TcpCommand 而不是 int。任务:删除了未使用的依赖。

## [0.7.1] - 2019-01-29 修复:当重新创建连接时,易失性订阅无法重启。

## [0.7.0] - 2019-01-29 修复:易失性订阅无法为投影提供所有事件。这是由于链接事件和原始事件之间的混淆造成的。

### 破坏性变更
  • Event.original_event 现在是 Event.received_event,因为原始名称不够明确。

  • Event.event_number 现在等于 received_event.event_number 的值,而不是链接事件的值。

## [0.6.0.1] - 2019-01-03 添加了使用 Travis 和 Versioneer 的自动部署到 pypi。

## [0.6.0] - 2018-12-21 在 subscribe_to 方法中添加了批量大小参数。

## [0.6.0-alpha-5] - 2018-11-09 修复:CreatePersistentSubscription 命令在成功后从未清理。

## [0.6.0-alpha-4] - 2018-10-05 修复:我们现在可以正确处理已删除的消息。

## [0.6.0-alpha-2] - 2018-09-17 发现现在支持“选择器”来控制我们从八卦中选择节点的方式。

## [0.6.0-alpha-1] - 2018-09-14 添加了对追捕订阅的支持。

## [0.5] - 2018-04-27 ### 破坏性变更 - 删除了 ConnectionContextManager 类。- “Connection” 类现在是 “Client”,并作为其自身的上下文管理器。- 完全重写了连接模块。- PersistentSubscriptions 在创建流迭代器时不再使用 maxsize 参数。这是针对 https://github.com/madedotcom/photon-pump/issues/49 的解决方案。

## [0.4] - 2018-04-27 ### 修复 - 添加了集群发现以支持高可用性场景。

## [0.3] - 2018-04-11 ### 修复 - iter 正确支持反向迭代流。- 破坏性变更 - published_event 交换了类型和流的顺序。

版本历史:https://github.com/madedotcom/photon-pump/compare/v0.7.1..v0.7.2 [0.7.1]: https://github.com/madedotcom/photon-pump/compare/v0.7.0..v0.7.1 [0.7.0]: https://github.com/madedotcom/photon-pump/compare/v0.6.0.1..v0.7.0 [0.6.0.1]: https://github.com/madedotcom/photon-pump/compare/v0.6.0..v0.6.0.1 [0.6.0]: https://github.com/madedotcom/photon-pump/compare/v0.6.0-alpha-5..v0.6.0 [0.6.0-alpha-5]: https://github.com/madedotcom/photon-pump/compare/v0.6.0-alpha-4..v0.6.0-alpha-5 [0.6.0-alpha-4]: https://github.com/madedotcom/photon-pump/compare/v0.6.0-alpha-2..v0.6.0-alpha-4 [0.6.0-alpha-2]: https://github.com/madedotcom/photon-pump/compare/v0.6.0-alpha-1..v0.6.0-alpha-2 [0.6.0-alpha-1]: https://github.com/madedotcom/photon-pump/compare/v0.5.0..v0.6.0-alpha-1 [0.5]: https://github.com/madedotcom/photon-pump/compare/v0.4.0..v0.5.0 [0.4]: https://github.com/madedotcom/photon-pump/compare/v0.3.0..v0.4.0 [0.3]: https://github.com/madedotcom/photon-pump/compare/v0.2.5..v0.3 [0.2.5]: https://github.com/madedotcom/photon-pump/compare/v0.2.4..v0.2.5

项目详情


下载文件

下载适合您平台文件的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分布

photon-pump-0.9.5.2.tar.gz (56.8 kB 查看哈希值)

上传时间

支持