跳转到主要内容

一个不断监视接口事件并将它们分别转发的系统

项目描述

一个通过事件总线不断监视接口事件并将它们分别通过的事件总线转发的系统

安装

pip install pop-beacon

使用

beacon主要是一个适用于更大项目的应用程序合并组件。然而,它包含一个有用的测试脚本。这个测试脚本将监听所有信标并打印出它们接收到的所有事件。监听器将遍历内部信标队列并打印它们发布的所有内容。可以通过–output指定格式化打印数据的输出器。

beacon_test --output json

测试

使用测试库本地安装beacon

$ git clone git@gitlab.com:saltstack/pop/beacon.git
$ pip install -e beacon -r requirements-test.txt

如果您已通过系统包管理器安装了rabbitmq-server二进制文件,则不会跳过pika测试。使用默认参数启动本地rabbitmq-server

sudo rabbitmq-server

使用pytest运行测试

$ pytest beacon/tests

ACCT PROFILES

信标 将读取使用 acct 系统加密的凭据。要使用此系统,创建一个 yaml 文件,其中包含明文凭据以及连接到各种信标插件所需的信息。例如,要连接到 rabbitmq 服务器或任何 amqp 实现,请在您的 acct 凭据文件中创建一个配置文件,指定“pika” acct 插件

credentials.yml

pika:
  profile_name:
    host: localhost
    port: 5672
    username: XXXXXXXXXXXX
    password: XXXXXXXXXXXX
    beacon_channels:
      - channel1
      - channel2

接下来,使用 acct 命令使用 fernet 算法加密此文件

$ acct encrypt credentials.yml
YeckEnWEGOjBDVxxytw13AsdLgquzhCtFHOs7kDsna8=

现在可以将 acct 信息存储在环境变量中

$ export ACCT_FILE = $PWD/credentials.yml.fernet
$ export ACCT_KEY = "YeckEnWEGOjBDVxxytw13AsdLgquzhCtFHOs7kDsna8="

它们也可以在命令行中使用

$ beacon_test --acct-file=credentials.yml.fernet --acct-key="YeckEnWEGOjBDVxxytw13AsdLgquzhCtFHOs7kDsna8="

集成

您自己的应用程序可以扩展 acct 的命令行界面,以使用 –acct-file–acct-key 选项来使用信标

my_project/conf.py

CLI_CONFIG = {
    "acct_file": {"source": "acct", "os": "ACCT_FILE"},
    "acct_key": {"source": "acct", "os": "ACCT_KEY"},
    "beacon_profiles": {"source": "beacon"},
}

在您自己的项目中,您可以垂直合并 beacon 并使用您自己的信标插件扩展它

my_project/conf.py

DYNE = {
    "acct": ["acct"],
    "beacon": ["beacon"],
    "my_project": ["my_project"],
}

创建目录 my_project/beacon 并将您的信标插件添加到其中。

信标插件需要一个名为“listen”的函数,它是一个异步生成器。

my_project/beacon/my_plugin.py

from typing import AsyncGenerator


async def listen(hub) -> AsyncGenerator:
    async for event in my_queue:
        yield event

如果您的信标插件需要登录凭据,则“listen”函数可以可选地具有一个 ctx 参数。如果加密的 acct 文件中包含指定您的插件的配置文件,则 ctx 参数将由 acctevbus 自动填充。

my_project/beacon/my_plugin.py

from typing import AsyncGenerator


async def listen(hub, ctx) -> AsyncGenerator:
    if not ctx.connected:
        return

    # Many message queues have named channels that can be specified
    # Create a listener for every channel on this connection
    # A listener is another function that returns an async generator
    channel_listeners = [
        await ctx.connection.channel_listener(channel) for channel in ctx.channels
    ]
    # Use hub.pop.loop.as_yielded to combine all the channel async generators into a single async generator
    generator = hub.pop.loop.as_yielded(channel_listeners)

    # Listen for events as they come from any of the channels
    async for event in generator:
        yield event

创建目录 my_project/acct/beacon 并将您的 acct 插件添加到其中。 acct 插件需要实现一个 gather 函数,该函数从 hub.acct.PROFILES 读取适当的信息,并将其转换为 hub.acct.SUB_PROFILES 中的处理过的配置文件信息。这种处理可以包括打开到远程服务器的连接等操作。

my_project/acct/beacon/my_plugin.py

async def gather(hub):
    """
    Get [my_plugin] profiles from an encrypted file

    Example:

    .. code-block:: yaml

        my_plugin:
          profile_name:
            host: localhost
            port: 12345
            username: XXXXXXXXXXXX
            password: XXXXXXXXXXXX
            beacon_channels:
              - channel1
              - channel2
    """
    sub_profiles = {}
    for profile, ctx in hub.acct.PROFILES.get("my_plugin", {}).items():
        # Create a connection through [some_library] for each of the profiles
        sub_profiles[profile] = {
            "connected": False,
            "connection": await some_library.connect(**ctx),
            "channels": ctx.pop("beacon_channels", []),
        }
    # Return these to be automatically processed by acct and injected into the `ctx` parameter of appropriate beacon publish calls.
    return sub_profiles

将信标启动代码添加到您项目的初始化器中

my_project/my_project/init.py

def __init__(hub):
    # Horizontally merge the beacon dynamic namespace into your project
    hub.pop.sub.add(dyne_name="beacon")


def cli(hub):
    # Load the config from beacon onto hub.OPT
    hub.pop.config.load(["my_project", "beacon", "evbus", "acct"], cli="my_project")

    # Create the asyncio loop
    hub.pop.loop.create()

    # Create the beacon coroutine
    coro = hub.beacon.init.start(
        format_plugin=hub.OPT.beacon.format,
        acct_file=hub.OPT.acct.acct_file,
        acct_key=hub.OPT.acct.acct_key,
        beacon_profiles=hub.OPT.beacon.beacon_profiles,
    )

    # Start the main beacon listener
    hub.pop.Loop.run_until_complete(coro)

项目详情


下载文件

下载适用于您的平台文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。

源分布

pop-beacon-3.0.0.tar.gz (14.2 kB 查看哈希值)

上传时间

构建分布

pop_beacon-3.0.0-py3-none-any.whl (15.4 kB 查看哈希值)

上传时间 Python 3

由以下支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面