跳转到主要内容

Fluentd事件收集器的Python日志处理器

项目描述

警告:这是一个https://github.com/fluent/fluent-logger-python项目的分支,用于与asyncio一起工作。

许多Web/移动应用程序会生成大量事件日志(例如登录、登出、购买、关注等)。分析这些事件日志对于改进服务可能非常有价值。然而,挑战在于轻松且可靠地收集这些日志。

Fluentd通过以下方式解决了这个问题:易于安装、占用空间小、插件、可靠的缓冲、日志转发等。

aioluent是一个Python库,用于记录Python应用程序的事件。

需求

  • Python 3.5或更高版本

  • msgpack-python

安装

此库作为“aiofluent”python包分发。请执行以下命令进行安装。

$ pip install aiofluent

配置

必须使用tcp源配置启动Fluentd守护进程

<source>
  type forward
  port 24224
</source>

为了快速测试您的设置,请添加一个记录到stdout的匹配器

<match app.**>
  type stdout
</match>

用法

FluentSender接口

sender.FluentSender是Fluentd的结构化事件记录器。

默认情况下,记录器假定本地启动了fluentd守护进程。您也可以通过传递选项来指定远程记录器。

from aiofluent import sender

# for local fluent
logger = sender.FluentSender('app')

# for remote fluent
logger = sender.FluentSender('app', host='host', port=24224)

要发送事件,请使用您的事件调用emit方法。以下示例将事件发送到fluentd,带有标签‘app.follow’和属性‘from’和‘to’。

# Use current time
logger.emit('follow', {'from': 'userA', 'to': 'userB'})

# Specify optional time
cur_time = int(time.time())
logger.emit_with_time('follow', cur_time, {'from': 'userA', 'to':'userB'})

您可以通过emit的返回值来检测错误。如果在emit过程中发生错误,emit将返回False,并通过last_error方法获取错误对象。

if not logger.emit('follow', {'from': 'userA', 'to': 'userB'}):
    print(logger.last_error)
    logger.clear_last_error() # clear stored error after handled errors

如果您想关闭客户端,请调用close()方法。

logger.close()

基于事件的接口

此API是sender.FluentSender的包装器。

首先,您需要调用sender.setup()来创建全局的sender.FluentSender日志实例。此调用只需在应用程序开始时调用一次。

以下是基于事件的API的初始化代码

from aiofluent import sender

# for local fluent
sender.setup('app')

# for remote fluent
sender.setup('app', host='host', port=24224)

然后,请创建以下事件。这将带有标签‘app.follow’和属性‘from’和‘to’的事件发送到fluentd。

from aiofluent import event

# send event to fluentd, with 'app.follow' tag
event.Event('follow', {
  'from': 'userA',
  'to':   'userB'
})

event.Event有一个限制,不能返回成功/失败结果。

其他基于事件的接口方法。

sender.get_global_sender # get instance of global sender
sender.close # Call FluentSender#close

缓冲区溢出处理器

您可以在连接失败的情况下注入自己的自定义进程来处理缓冲区溢出。这将减少数据丢失,而不是简单地丢弃数据。

import msgpack
from io import BytesIO

def handler(pendings):
    unpacker = msgpack.Unpacker(BytesIO(pendings))
    for unpacked in unpacker:
        print(unpacked)

logger = sender.FluentSender('app', host='host', port=24224, buffer_overflow_handler=handler)

您应该在处理程序中处理任何异常。aiofluent忽略来自buffer_overflow_handler的异常。

在调用close()时存在挂起事件时,也将调用此处理程序。

Python日志处理程序接口

此客户端库还具有用于Python日志模块的FluentHandler类。

import logging
from aiofluent import handler

custom_format = {
  'host': '%(hostname)s',
  'where': '%(module)s.%(funcName)s',
  'type': '%(levelname)s',
  'stack_trace': '%(exc_text)s'
}

logging.basicConfig(level=logging.INFO)
l = logging.getLogger('fluent.test')
h = handler.FluentHandler('app.follow', host='host', port=24224)
formatter = handler.FluentRecordFormatter(custom_format)
h.setFormatter(formatter)
l.addHandler(h)
l.info({
  'from': 'userA',
  'to': 'userB'
})
l.info('{"from": "userC", "to": "userD"}')
l.info("This log entry will be logged with the additional key: 'message'.")

您还可以通过logging.config.dictConfig自定义格式化程序。

import logging.config
import yaml

with open('logging.yaml') as fd:
    conf = yaml.load(fd)

logging.config.dictConfig(conf['logging'])

以下是一个示例配置logging.yaml:

logging:
    version: 1

    formatters:
      brief:
        format: '%(message)s'
      default:
        format: '%(asctime)s %(levelname)-8s %(name)-15s %(message)s'
        datefmt: '%Y-%m-%d %H:%M:%S'
      fluent_fmt:
        '()': fluent.handler.FluentRecordFormatter
        format:
          level: '%(levelname)s'
          hostname: '%(hostname)s'
          where: '%(module)s.%(funcName)s'

    handlers:
        console:
            class : logging.StreamHandler
            level: DEBUG
            formatter: default
            stream: ext://sys.stdout
        fluent:
            class: fluent.handler.FluentHandler
            host: localhost
            port: 24224
            tag: test.logging
            formatter: fluent_fmt
            level: DEBUG
        none:
            class: logging.NullHandler

    loggers:
        amqp:
            handlers: [none]
            propagate: False
        conf:
            handlers: [none]
            propagate: False
        '': # root logger
            handlers: [console, fluent]
            level: DEBUG
            propagate: False

许可证

Apache License,版本2.0

1.2.9 (2020-10-22)

  • 每30秒只记录错误

1.2.8 (2020-05-15)

  • 处理TypeError格式化日志数据

1.2.7 (2020-03-09)

  • 修复仓库位置

1.2.6 (2020-01-06)

  • 改进错误日志[vangheem]

1.2.5 (2019-12-19)

  • 处理事件循环关闭错误[vangheem]

1.2.4 (2019-12-19)

  • 增加最大队列大小

1.2.3 (2019-04-01)

  • 修复发布

1.2.2 (2019-04-01)

  • 默认为纳秒精度[davidonna]

1.2.1 (2018-10-31)

  • 添加对纳秒精度时间戳的支持[davidonna]

1.2.0 (2018-06-14)

  • 维护所有日志的异步队列[vangheem]

1.1.4 (2018-05-29)

  • 处理取消任务/清理时的RuntimeError[vangheem]

1.1.3 (2018-02-15)

  • 锁定调用发送者的close方法[vangheem]

  • 增加默认超时[vangheem]

1.1.2 (2018-02-07)

  • 锁定整个方法[vangheem]

1.1.1 (2018-02-07)

  • 在获取连接对象时使用锁[vangheem]

1.1.0 (2018-01-25)

  • 转移到使用异步连接基础设施而不是套接字[vangheem]

1.0.8 (2018-01-04)

  • 始终关闭缓冲区数据[vangheem]

1.0.7 (2018-01-04)

  • 处理处理日志队列时的错误[vangheem]

1.0.6 (2017-11-14)

  • 防止日志队列过大[vangheem]

1.0.5 (2017-10-17)

  • 修复发布以包含CHANGELOG.rst文件[vangheem]

1.0.4 (2017-10-10)

  • 修复推送初始记录

1.0.3 (2017-10-10)

  • 处理在事件循环开始之前完成日志记录时的RuntimeError[vangheem]

1.0.2 (2017-10-09)

  • 修复以使正常日志调用异步[vangheem]

1.0.1 (2017-07-03)

  • 初始发布

项目详情


下载文件

下载适合您平台的文件。如果您不确定要选择哪个,请了解更多关于安装包的信息。

源分布

aiofluent-1.2.9.tar.gz (14.3 kB 查看哈希值)

上传于 源码

由...