米库是PgQ到RabbitMQ的中继
项目描述
这是一个从PgQ到RabbitMQ的中继。米库是一个将消息发布到RabbitMQ的PgQ消费者。此外,它还包括一个内置的审计系统,可以用来确认所有PgQ事件是否都被RabbitMQ接收。
米库这个名字来源于寓言《聪明兔子和象》中的兔子。
安装
米库可在Python包索引上找到,并且可以通过pip进行安装
pip install mikkoo
一旦您设置了Skytools,您可能想安装mikkoo.sql中包含的可选实用函数,以便更轻松地使用。
您可以使用curl和psql的组合来完成此操作
curl -L https://github.com/gmr/mikkoo/blob/main/mikkoo.sql | psql
这将在一个名为mikkoo的模式中安装多个存储过程和一个审计表。查看DDL以了解每个函数是什么以及如何使用它。
PgQ设置
将 pgq 安装到您的数据库中并创建队列
# CREATE EXTENSION pgq; CREATE EXTENSION # SELECT pgq.create_queue('test'); create_queue -------------- 1 (1 row)
确保 pgqd 正在运行。
PgQ 事件到 AMQP 映射
将事件插入 PgQ 队列时,应使用以下字段映射的 pgq.insert_event/7 函数
PgQ 事件 |
AMQP |
---|---|
ev_type |
路由键 |
ev_data |
消息正文 |
ev_extra1 |
交换机 |
ev_extra2 |
内容类型属性 |
ev_extra3 |
AMQP 属性 [1] |
ev_extra4 |
报头 [2] |
ev_time |
报头.timetamp |
ev_txid |
报头.txid |
在 mikkoo.sql 文件中有一个便利的架构,它添加了创建正确格式化 mikkoo 事件在 PgQ 中的存储过程。此外,还有审计功能,允许创建已发送到 PgQ 的事件的审计日志。
AMQP 消息属性
以下表定义了在插入事件时可以设置在 ev_extra3 字段中的 JSON blob 中可用的字段。
属性 |
PgSQL 类型 |
---|---|
app_id |
text |
content_encoding |
text |
content_type |
text |
correlation_id |
text |
delivery_mode |
int2 |
expiration |
text |
message_id |
text |
headers |
text/json [3] |
timestamp |
int4 |
type |
text |
priority |
int4 |
user_id |
text |
headers 如果指定,应发送到键/值 JSON blob
分配给提供给 ev_extra3 的 JSON blob 中的值将优先于 Mikkoo 在处理时自动创建的 app_id、content_type、correlation_id、headers 和 timestamp 值。
从 1.0 版本开始,Mikkoo 将自动添加四个 AMQP 报头属性值。这些值不会覆盖在 ev_extra4 中指定的具有相同名称的任何值。即使这些值在 sequence 值是一个动态生成的 ID,旨在提供模糊的分布式排序信息。 timestamp 值是 ev_time 字段的 ISO-8601 表示,当事件添加到 PgQ 时创建。 txid 值是事件的 PgQ 事务 ID。这些值被添加以帮助提供一定程度的确定性排序。 origin 值是 Mikkoo 运行的服务器的主机名。
事件插入示例
以下示例插入一个 JSON blob 消息正文 {"foo": "bar"},该正文将被发布到 RabbitMQ 中的 postgres 交换机,使用 test.routing-key 路由键。内容类型在 ev_extra2 中指定,AMQP 的 type 消息属性在 ev_extra3 中指定。
# SELECT pgq.insert_event('test', 'test.routing-key', '{"foo": "bar"}', 'postgres', 'application/json', '{"type": "example"}', '');
insert_event
--------------
4
(1 row)
当此消息被 RabbitMQ 接收时,它将有以下消息正文
{"foo": "bar"}
并且它将有类似以下的消息属性
属性 |
示例值 |
||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
app_id |
mikkoo |
||||||||||
content_type |
application/json |
||||||||||
correlation_id |
0ad6b212-4c84-4eb0-8782-9a44bdfe949f |
||||||||||
headers |
|
||||||||||
timestamp |
1449600290 |
||||||||||
type |
示例 |
配置
Mikkoo配置文件使用YAML进行标记,并允许处理一个或多个PgQ队列。
如果您有Sentry或Sentry账户,当raven客户端库已安装时,Application/sentry_dsn设置将开启Sentry异常日志记录。
队列在Application/workers部分按名称进行配置。以下示例配置了两个工作进程以处理名为invoices的队列。每个工作进程使用默认凭证连接到本地PostgreSQL和RabbitMQ实例。
Application:
workers:
invoices:
postgres_url: postgresql://localhost:5432/postgres
rabbitmq:
host: localhost
port: 5671
vhost: /
ssl_options:
protocol: 2
confirm: false
队列配置选项
以下表格详细说明了每个队列可用的配置选项
密钥 |
描述 |
---|---|
confirm |
启用/禁用RabbitMQ发布者确认。默认:True |
consumer_name |
覆盖默认PgQ消费者名称。默认:mikkoo |
max_failures |
在丢弃事件之前的最大失败次数。默认:10 |
postgresql_url |
连接到PostgreSQL的URL |
rabbitmq |
连接到RabbitMQ的连接参数数据结构 |
retry_delay |
PgQ发出失败事件前的秒数。默认:10 |
unregister |
在关闭时从PgQ注销消费者。默认:True |
wait_duration |
在检查队列后的最后空结果之前要等待多长时间。默认:10 |
rabbitmq 属性
属性 |
描述 |
---|---|
host |
RabbitMQ服务器的主机名或IP地址(字符串) |
port |
RabbitMQ服务器的端口(整数) |
vhost |
要连接的虚拟主机(字符串) |
username |
连接时使用的用户名(字符串) |
password |
要使用的密码(字符串) |
ssl_options |
可选:SSL连接套接字的SSL选项 |
heartbeat_interval |
可选:AMQP心跳间隔(整数)默认:300秒 |
ssl_options 属性
属性 |
描述 |
|
---|---|---|
ca_certs |
CA证书的串联列表文件路径(字符串) |
|
ca_path |
PEM格式CA证书的目录路径(字符串) |
|
ca_data |
PEM编码的CA证书(字符串) |
|
protocol |
ssl PROTOCOL_* 枚举整数值。默认:2 对应于枚举 PROTOCOL_TLS(整数) |
|
certfile |
PEM格式证书文件的文件路径(字符串) |
|
keyfile |
证书私钥的文件路径(字符串) |
|
password |
解密keyfile私钥的密码(字符串) |
|
ciphers |
OpenSSL加密列表格式中可用的加密套件集合(字符串) |
示例配置
以下是一个完整配置文件的示例
Application:
poll_interval: 10
sentry_dsn: [YOUR SENTRY DSN]
statsd:
enabled: true
host: localhost
port: 8125
workers:
test:
confirm: False
consumer_name: my_consumer
max_failures: 5
postgres_url: postgresql://localhost:5432/postgres
rabbitmq:
host: localhost
port: 5671
username: guest
password: guest
ssl_options:
protocol: 2
retry_delay: 5
unregister: False
wait_duration: 5
Daemon:
user: mikkoo
pidfile: /var/run/mikkoo
Logging:
version: 1
formatters:
verbose:
format: '%(levelname) -10s %(asctime)s %(process)-6d %(processName) -20s %(name) -18s: %(message)s'
datefmt: '%Y-%m-%d %H:%M:%S'
handlers:
console:
class: logging.StreamHandler
formatter: verbose
debug_only: True
loggers:
helper:
handlers: [console]
level: INFO
propagate: true
mikkoo:
handlers: [console]
level: INFO
propagate: true
pika:
handlers: [console]
level: ERROR
propagate: true
queries:
handlers: [console]
level: ERROR
propagate: true
tornado:
handlers: [console]
level: ERROR
propagate: true
root:
handlers: [console]
level: CRITICAL
propagate: true
disable_existing_loggers: true
incremental: false
运行Mikkoo
创建类似于上面的Mikkoo配置文件后,只需运行mikkoo应用程序并提供配置文件的路径
mikkoo -c mikkoo.yml
除非您使用-f 前台CLI开关,否则应用程序将尝试守护进程化。
使用--help可以调用Mikkoo的CLI帮助,并产生以下输出
$ mikkoo -h
usage: mikkoo [-h] [-c CONFIG] [-f]
Mikkoo is a PgQ to RabbitMQ Relay
optional arguments:
-h, --help show this help message and exit
-c CONFIG, --config CONFIG
Path to the configuration file
-f, --foreground Run the application interactively
项目详细信息
mikkoo-2.2.2.tar.gz的散列值
算法 | 散列摘要 | |
---|---|---|
SHA256 | 5b43b5f5b4c9c77718ee237fea150e77bb912497d0de231837b5a01d902d52dd |
|
MD5 | ff79a96fd548ab7889642b2212c42166 |
|
BLAKE2b-256 | 12fe600806543be20164f440cfab2498c1cc2efe08c67bf6554891add2001b42 |