跳转到主要内容

基于预测器的通用异步消息通道

项目描述

message-channel

PyPI PyPI - License PyPI - Python Version Test

此库提供了一个消息通道对象,它从消息集合中减去特定的消息。它类似于SQL或ReactiveX的group by,但适用于异步读取器。

安装

pip install python-message-channel

使用方法

例如,假设您有一个字符串流,其中消息以ab、... e等前缀开头,并且您希望为以bd等前缀开头的消息分割子通道,如下所示。

=============================================
---------------------------------> a:foo
--------------------+
--------------------|------------> c:foo
--------------------|------------> d:foo
--------------------|------------> e:foo
====================|========================
channel             |
                   =|========================
                    +------------> b:foo
                   ==========================
                   subchannel `m.startswith('b')`

此库是处理此类情况的工具。首先,从一个流读取器创建一个Channel实例,然后您可以通过channel.recv()方法接收消息。在本例中,我们使用asyncio.Queue作为流。

import asyncio

from message_channel import Channel

async def main():
    # Create original stream
    stream = asyncio.Queue()

    # Create stream reader
    async def reader():
        return await stream.get()

    # Create stream channel
    async with Channel(reader) as channel:
        stream.put_nowait('a:foo')
        stream.put_nowait('b:foo')
        stream.put_nowait('c:foo')
        stream.put_nowait('d:foo')
        stream.put_nowait('e:foo')
        assert (await channel.recv()) == 'a:foo'
        assert (await channel.recv()) == 'b:foo'
        assert (await channel.recv()) == 'c:foo'
        assert (await channel.recv()) == 'd:foo'
        assert (await channel.recv()) == 'e:foo'


if __name__ == '__main__':
    asyncio.run(main())

您可以通过一个预测器使用channel.split()方法分割通道。

    async with Channel(reader) as channel:
        def predicator(m):
            return m.startswith('b:')

        async with channel.split(predicator) as sub:
            stream.put_nowait('a:foo')
            stream.put_nowait('b:foo')
            stream.put_nowait('c:foo')
            stream.put_nowait('d:foo')
            stream.put_nowait('e:foo')
            # sub receive messages starts from 'b:'
            assert (await sub.recv()) == 'b:foo'
            # channel (original) receive messages other than above
            assert (await channel.recv()) == 'a:foo'
            assert (await channel.recv()) == 'c:foo'
            assert (await channel.recv()) == 'd:foo'
            assert (await channel.recv()) == 'e:foo'

API文档

https://fixpoint.github.io/python-message-channel/

pdoc提供支持。

许可证

MIT许可证下分发

项目详情


下载文件

下载您平台对应的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源代码分发

python-message-channel-0.2.3.tar.gz (5.5 kB 查看哈希值)

上传时间 源代码

构建分发

python_message_channel-0.2.3-py3-none-any.whl (5.7 kB 查看哈希值)

上传时间 Python 3

由以下机构支持

AWSAWS 云计算和安全赞助商 DatadogDatadog 监控 FastlyFastly CDN GoogleGoogle 下载分析 MicrosoftMicrosoft PSF赞助商 PingdomPingdom 监控 SentrySentry 错误日志 StatusPageStatusPage 状态页面