Rejected是一个Python RabbitMQ消费者框架和控制守护进程
项目描述
Rejected是一个AMQP消费者守护进程和消息处理框架。它通过处理与RabbitMQ通信和消费者进程管理所有核心功能,允许快速开发消息处理消费者。
Rejected作为一个主进程运行,具有多个消费者配置,每个配置都在独立的进程中运行。它能够收集消费者进程的统计数据并报告。
Rejected支持Python 2.7和3.4+。
功能
自动异常处理,包括连接管理和消费者重启
智能消费者类,可以自动解码和解序列化消息体,基于消息头
指标日志记录并提交到statsd和InfluxDB
内置消费者代码分析
在消费者中编写异步代码的能力,允许与外部资源并行通信
文档
示例消费者
from rejected import consumer
import logging
LOGGER = logging.getLogger(__name__)
class Test(consumer.Consumer):
def process(self, message):
LOGGER.debug('In Test.process: %s' % message.body)
异步消费者
要将消费者设置为异步,您可以使用 Tornado 的 Consumer.prepare 和 Consumer.process 方法,通过装饰器 Tornado 的 @gen.coroutine。异步消费者不允许在同一个进程中并发处理多个消息,但允许您在处理单个消息时,使用异步客户端如 Tornado 的 AsyncHTTPClient 和 Queries PostgreSQL 库,通过协程执行并行任务。
import logging
from rejected import consumer
from tornado import gen
from tornado import httpclient
class AsyncExampleConsumer(consumer.Consumer):
@gen.coroutine
def process(self):
LOGGER.debug('Message: %r', self.body)
http_client = httpclient.AsyncHTTPClient()
results = yield [http_client.fetch('http://www.github.com'),
http_client.fetch('http://www.reddit.com')]
LOGGER.info('Length: %r', [len(r.body) for r in results])
示例配置
%YAML 1.2
---
Application:
poll_interval: 10.0
stats:
log: True
influxdb:
enabled: True
scheme: http
host: localhost
port: 8086
user: username
password: password
database: dbname
statsd:
enabled: True
host: localhost
port: 8125
prefix: applications.rejected
Connections:
rabbitmq:
host: localhost
port: 5672
user: guest
pass: guest
ssl: False
vhost: /
heartbeat_interval: 300
Consumers:
example:
consumer: rejected.example.Consumer
sentry_dsn: https://[YOUR-SENTRY-DSN]
connections:
- name: rabbitmq1
consume: True
drop_exchange: dlxname
qty: 2
queue: generated_messages
qos_prefetch: 100
ack: True
max_errors: 100
config:
foo: True
bar: baz
Daemon:
user: rejected
group: daemon
pidfile: /var/run/rejected/example.%(pid)s.pid
Logging:
version: 1
formatters:
verbose:
format: "%(levelname) -10s %(asctime)s %(process)-6d %(processName) -25s %(name) -20s %(funcName) -25s: %(message)s"
datefmt: "%Y-%m-%d %H:%M:%S"
verbose_correlation:
format: "%(levelname) -10s %(asctime)s %(process)-6d %(processName) -25s %(name) -20s %(funcName) -25s: %(message)s {CID %(correlation_id)s}"
datefmt: "%Y-%m-%d %H:%M:%S"
syslog:
format: "%(levelname)s <PID %(process)d:%(processName)s> %(name)s.%(funcName)s: %(message)s"
syslog_correlation:
format: "%(levelname)s <PID %(process)d:%(processName)s> %(name)s.%(funcName)s: %(message)s {CID %(correlation_id)s)"
filters:
correlation:
'()': rejected.log.CorrelationFilter
'exists': True
no_correlation:
'()': rejected.log.CorrelationFilter
'exists': False
handlers:
console:
class: logging.StreamHandler
formatter: verbose
debug_only: false
filters: [no_correlation]
console_correlation:
class: logging.StreamHandler
formatter: verbose_correlation
debug_only: false
filters: [correlation]
syslog:
class: logging.handlers.SysLogHandler
facility: daemon
address: /var/run/syslog
formatter: syslog
filters: [no_correlation]
syslog_correlation:
class: logging.handlers.SysLogHandler
facility: daemon
address: /var/run/syslog
formatter: syslog
filters: [correlation]
loggers:
helper:
level: INFO
propagate: true
handlers: [console, console_correlation, syslog, syslog_correlation]
rejected:
level: INFO
propagate: true
handlers: [console, console_correlation, syslog, syslog_correlation]
tornado:
level: INFO
propagate: true
handlers: [console, console_correlation, syslog, syslog_correlation]
disable_existing_loggers: true
incremental: false
版本历史
可在 https://rejected.readthedocs.org/en/latest/history.html 查找
项目详情
下载文件
下载适用于您的平台的文件。如果您不确定要选择哪个,请了解更多关于 安装包 的信息。
源分布
rejected-3.23.1.tar.gz (53.7 kB 查看哈希值)
构建分布
rejected-3.23.1-py2.py3-none-any.whl (48.2 kB 查看哈希值)
关闭
rejected-3.23.1.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | d56047a549037463b9a3a64e8c75a3f0e08c3209ece3a0ed53b8e4131b4d3678 |
|
MD5 | 5749a90906c8a1bdd196e034f98d2b2d |
|
BLAKE2b-256 | 6918353da913990d9f60352f8a888a2f1b08c6efd6a0c925ef251faabcb013fc |
关闭
rejected-3.23.1-py2.py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 33aaa373cb492c689444d526360a1e4c9e78d7971e1e65bc60562a5dfc2523fa |
|
MD5 | 45d494207aeabf812a586ce31ab5e9ab |
|
BLAKE2b-256 | 56a17d9ed7c2c1c945726eb0d5c4a1a385b5f8a6a0e215fb0ec62163212190c3 |