跳转到主要内容

米库是PgQ到RabbitMQ的中继

项目描述

这是一个从PgQRabbitMQ的中继。米库是一个将消息发布到RabbitMQ的PgQ消费者。此外,它还包括一个内置的审计系统,可以用来确认所有PgQ事件是否都被RabbitMQ接收。

米库这个名字来源于寓言《聪明兔子和象》中的兔子。

Version Downloads Status

安装

米库可在Python包索引上找到,并且可以通过pip进行安装

pip install mikkoo

一旦您设置了Skytools,您可能想安装mikkoo.sql中包含的可选实用函数,以便更轻松地使用。

您可以使用curl和psql的组合来完成此操作

curl -L https://github.com/gmr/mikkoo/blob/main/mikkoo.sql | psql

这将在一个名为mikkoo的模式中安装多个存储过程和一个审计表。查看DDL以了解每个函数是什么以及如何使用它。

PgQ设置

  1. pgq 安装到您的数据库中并创建队列

    # CREATE EXTENSION pgq;
    CREATE EXTENSION
    # SELECT pgq.create_queue('test');
    create_queue
    --------------
                1
    (1 row)
  2. 确保 pgqd 正在运行。

PgQ 事件到 AMQP 映射

将事件插入 PgQ 队列时,应使用以下字段映射的 pgq.insert_event/7 函数

PgQ 事件

AMQP

ev_type

路由键

ev_data

消息正文

ev_extra1

交换机

ev_extra2

内容类型属性

ev_extra3

AMQP 属性 [1]

ev_extra4

报头 [2]

ev_time

报头.timetamp

ev_txid

报头.txid

mikkoo.sql 文件中有一个便利的架构,它添加了创建正确格式化 mikkoo 事件在 PgQ 中的存储过程。此外,还有审计功能,允许创建已发送到 PgQ 的事件的审计日志。

AMQP 消息属性

以下表定义了在插入事件时可以设置在 ev_extra3 字段中的 JSON blob 中可用的字段。

属性

PgSQL 类型

app_id

text

content_encoding

text

content_type

text

correlation_id

text

delivery_mode

int2

expiration

text

message_id

text

headers

text/json [3]

timestamp

int4

type

text

priority

int4

user_id

text

分配给提供给 ev_extra3 的 JSON blob 中的值将优先于 Mikkoo 在处理时自动创建的 app_idcontent_typecorrelation_idheaderstimestamp 值。

从 1.0 版本开始,Mikkoo 将自动添加四个 AMQP 报头属性值。这些值不会覆盖在 ev_extra4 中指定的具有相同名称的任何值。即使这些值在 sequence 值是一个动态生成的 ID,旨在提供模糊的分布式排序信息。 timestamp 值是 ev_time 字段的 ISO-8601 表示,当事件添加到 PgQ 时创建。 txid 值是事件的 PgQ 事务 ID。这些值被添加以帮助提供一定程度的确定性排序。 origin 值是 Mikkoo 运行的服务器的主机名。

事件插入示例

以下示例插入一个 JSON blob 消息正文 {"foo": "bar"},该正文将被发布到 RabbitMQ 中的 postgres 交换机,使用 test.routing-key 路由键。内容类型在 ev_extra2 中指定,AMQP 的 type 消息属性在 ev_extra3 中指定。

# SELECT pgq.insert_event('test', 'test.routing-key', '{"foo": "bar"}', 'postgres', 'application/json', '{"type": "example"}', '');
 insert_event
--------------
            4
(1 row)

当此消息被 RabbitMQ 接收时,它将有以下消息正文

{"foo": "bar"}

并且它将有类似以下的消息属性

属性

示例值

app_id

mikkoo

content_type

application/json

correlation_id

0ad6b212-4c84-4eb0-8782-9a44bdfe949f

headers

密钥

示例值

来源

mikkoo.domain.com

序列号

4539586784185129828

timestamp

2017-02-23 06:17:14.471682-00

交易ID

41356335556

timestamp

1449600290

type

示例

配置

Mikkoo配置文件使用YAML进行标记,并允许处理一个或多个PgQ队列。

如果您有Sentry或Sentry账户,当raven客户端库已安装时,Application/sentry_dsn设置将开启Sentry异常日志记录。

队列在Application/workers部分按名称进行配置。以下示例配置了两个工作进程以处理名为invoices的队列。每个工作进程使用默认凭证连接到本地PostgreSQL和RabbitMQ实例。

Application:
  workers:
     invoices:
       postgres_url: postgresql://localhost:5432/postgres
       rabbitmq:
         host: localhost
         port: 5671
         vhost: /
         ssl_options:
           protocol: 2
       confirm: false

队列配置选项

以下表格详细说明了每个队列可用的配置选项

密钥

描述

confirm

启用/禁用RabbitMQ发布者确认。默认:True

consumer_name

覆盖默认PgQ消费者名称。默认:mikkoo

max_failures

在丢弃事件之前的最大失败次数。默认:10

postgresql_url

连接到PostgreSQL的URL

rabbitmq

连接到RabbitMQ的连接参数数据结构

retry_delay

PgQ发出失败事件前的秒数。默认:10

unregister

在关闭时从PgQ注销消费者。默认:True

wait_duration

在检查队列后的最后空结果之前要等待多长时间。默认:10

rabbitmq 属性

属性

描述

host

RabbitMQ服务器的主机名或IP地址(字符串)

port

RabbitMQ服务器的端口(整数)

vhost

要连接的虚拟主机(字符串)

username

连接时使用的用户名(字符串)

password

要使用的密码(字符串)

ssl_options

可选:SSL连接套接字的SSL选项

heartbeat_interval

可选:AMQP心跳间隔(整数)默认:300秒

ssl_options 属性

属性

描述

ca_certs

CA证书的串联列表文件路径(字符串)

ca_path

PEM格式CA证书的目录路径(字符串)

ca_data

PEM编码的CA证书(字符串)

protocol

ssl PROTOCOL_* 枚举整数值。默认:2 对应于枚举 PROTOCOL_TLS(整数)

certfile

PEM格式证书文件的文件路径(字符串)

keyfile

证书私钥的文件路径(字符串)

password

解密keyfile私钥的密码(字符串)

ciphers

OpenSSL加密列表格式中可用的加密套件集合(字符串)

示例配置

以下是一个完整配置文件的示例

Application:

  poll_interval: 10
  sentry_dsn: [YOUR SENTRY DSN]

  statsd:
    enabled: true
    host: localhost
    port: 8125

  workers:
    test:
      confirm: False
      consumer_name: my_consumer
      max_failures: 5
      postgres_url: postgresql://localhost:5432/postgres
      rabbitmq:
        host: localhost
        port: 5671
        username: guest
        password: guest
        ssl_options:
          protocol: 2
      retry_delay: 5
      unregister: False
      wait_duration: 5

Daemon:
  user: mikkoo
  pidfile: /var/run/mikkoo

Logging:
  version: 1
  formatters:
    verbose:
      format: '%(levelname) -10s %(asctime)s  %(process)-6d %(processName) -20s %(name) -18s: %(message)s'
      datefmt: '%Y-%m-%d %H:%M:%S'
  handlers:
    console:
      class: logging.StreamHandler
      formatter: verbose
      debug_only: True
  loggers:
    helper:
      handlers: [console]
      level: INFO
      propagate: true
    mikkoo:
      handlers: [console]
      level: INFO
      propagate: true
    pika:
      handlers: [console]
      level: ERROR
      propagate: true
    queries:
      handlers: [console]
      level: ERROR
      propagate: true
    tornado:
      handlers: [console]
      level: ERROR
      propagate: true
  root:
    handlers: [console]
    level: CRITICAL
    propagate: true
  disable_existing_loggers: true
  incremental: false

运行Mikkoo

创建类似于上面的Mikkoo配置文件后,只需运行mikkoo应用程序并提供配置文件的路径

mikkoo -c mikkoo.yml

除非您使用-f 前台CLI开关,否则应用程序将尝试守护进程化。

使用--help可以调用Mikkoo的CLI帮助,并产生以下输出

$ mikkoo -h
usage: mikkoo [-h] [-c CONFIG] [-f]

Mikkoo is a PgQ to RabbitMQ Relay

optional arguments:
  -h, --help            show this help message and exit
  -c CONFIG, --config CONFIG
                        Path to the configuration file
  -f, --foreground      Run the application interactively

项目详细信息


下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源分发

mikkoo-2.2.2.tar.gz (26.0 kB 查看散列值)

上传时间

支持者