跳转到主要内容

Rejected是一个Python RabbitMQ消费者框架和控制守护进程

项目描述

Rejected是一个AMQP消费者守护进程和消息处理框架。它通过处理与RabbitMQ通信和消费者进程管理所有核心功能,允许快速开发消息处理消费者。

Rejected作为一个主进程运行,具有多个消费者配置,每个配置都在独立的进程中运行。它能够收集消费者进程的统计数据并报告。

Rejected支持Python 2.7和3.4+。

Version Status Coverage License

功能

  • 自动异常处理,包括连接管理和消费者重启

  • 智能消费者类,可以自动解码和解序列化消息体,基于消息头

  • 指标日志记录并提交到statsd和InfluxDB

  • 内置消费者代码分析

  • 在消费者中编写异步代码的能力,允许与外部资源并行通信

文档

https://rejected.readthedocs.io

示例消费者

from rejected import consumer
import logging

LOGGER = logging.getLogger(__name__)


class Test(consumer.Consumer):

    def process(self, message):
        LOGGER.debug('In Test.process: %s' % message.body)

异步消费者

要将消费者设置为异步,您可以使用 Tornado 的 Consumer.prepareConsumer.process 方法,通过装饰器 Tornado 的 @gen.coroutine。异步消费者不允许在同一个进程中并发处理多个消息,但允许您在处理单个消息时,使用异步客户端如 Tornado 的 AsyncHTTPClientQueries PostgreSQL 库,通过协程执行并行任务。

import logging

from rejected import consumer

from tornado import gen
from tornado import httpclient


class AsyncExampleConsumer(consumer.Consumer):

    @gen.coroutine
    def process(self):
        LOGGER.debug('Message: %r', self.body)
        http_client = httpclient.AsyncHTTPClient()
        results = yield [http_client.fetch('http://www.github.com'),
                         http_client.fetch('http://www.reddit.com')]
        LOGGER.info('Length: %r', [len(r.body) for r in results])

示例配置

%YAML 1.2
---
Application:
  poll_interval: 10.0
  stats:
    log: True
    influxdb:
      enabled: True
      scheme: http
      host: localhost
      port: 8086
      user: username
      password: password
      database: dbname
    statsd:
      enabled: True
      host: localhost
      port: 8125
      prefix: applications.rejected
  Connections:
    rabbitmq:
      host: localhost
      port: 5672
      user: guest
      pass: guest
      ssl: False
      vhost: /
      heartbeat_interval: 300
  Consumers:
    example:
      consumer: rejected.example.Consumer
      sentry_dsn: https://[YOUR-SENTRY-DSN]
      connections:
        - name: rabbitmq1
          consume: True
      drop_exchange: dlxname
      qty: 2
      queue: generated_messages
      qos_prefetch: 100
      ack: True
      max_errors: 100
      config:
        foo: True
        bar: baz

Daemon:
  user: rejected
  group: daemon
  pidfile: /var/run/rejected/example.%(pid)s.pid

Logging:
  version: 1
  formatters:
    verbose:
      format: "%(levelname) -10s %(asctime)s %(process)-6d %(processName) -25s %(name) -20s %(funcName) -25s: %(message)s"
      datefmt: "%Y-%m-%d %H:%M:%S"
    verbose_correlation:
      format: "%(levelname) -10s %(asctime)s %(process)-6d %(processName) -25s %(name) -20s %(funcName) -25s: %(message)s {CID %(correlation_id)s}"
      datefmt: "%Y-%m-%d %H:%M:%S"
    syslog:
      format: "%(levelname)s <PID %(process)d:%(processName)s> %(name)s.%(funcName)s: %(message)s"
    syslog_correlation:
      format: "%(levelname)s <PID %(process)d:%(processName)s> %(name)s.%(funcName)s: %(message)s {CID %(correlation_id)s)"
  filters:
    correlation:
      '()': rejected.log.CorrelationFilter
      'exists': True
    no_correlation:
      '()': rejected.log.CorrelationFilter
      'exists': False
  handlers:
    console:
      class: logging.StreamHandler
      formatter: verbose
      debug_only: false
      filters: [no_correlation]
    console_correlation:
      class: logging.StreamHandler
      formatter: verbose_correlation
      debug_only: false
      filters: [correlation]
    syslog:
      class: logging.handlers.SysLogHandler
      facility: daemon
      address: /var/run/syslog
      formatter: syslog
      filters: [no_correlation]
    syslog_correlation:
      class: logging.handlers.SysLogHandler
      facility: daemon
      address: /var/run/syslog
      formatter: syslog
      filters: [correlation]
  loggers:
    helper:
      level: INFO
      propagate: true
      handlers: [console, console_correlation, syslog, syslog_correlation]
    rejected:
      level: INFO
      propagate: true
      handlers: [console, console_correlation, syslog, syslog_correlation]
    tornado:
      level: INFO
      propagate: true
      handlers: [console, console_correlation, syslog, syslog_correlation]
  disable_existing_loggers: true
  incremental: false

版本历史

可在 https://rejected.readthedocs.org/en/latest/history.html 查找

项目详情


发布历史 发布通知 | RSS 源

下载文件

下载适用于您的平台的文件。如果您不确定要选择哪个,请了解更多关于 安装包 的信息。

源分布

rejected-3.23.1.tar.gz (53.7 kB 查看哈希值)

上传时间

构建分布

rejected-3.23.1-py2.py3-none-any.whl (48.2 kB 查看哈希值)

上传时间 Python 2 Python 3

支持