跳转到主要内容

Fluentd事件收集器的Python日志处理程序

项目描述

许多Web/移动应用程序会生成大量事件日志(例如登录、登出、购买、关注等)。分析这些事件日志对于改进服务非常有价值。然而,挑战在于轻松且可靠地收集这些日志。

FluentdFluent Bit 通过以下方式解决了这个问题:易于安装、占用空间小、插件、可靠的缓冲、日志转发等。

fluent-logger-python 是一个Python库,用于记录Python应用程序的事件。

要求

  • Python 3.7+

  • msgpack

  • 重要:版本0.8.0是最后一个支持Python 2.6、3.2和3.3的版本

  • 重要:版本0.9.6是最后一个支持Python 2.7和3.4的版本

  • 重要:版本0.10.0是最后一个支持Python 3.5和3.6的版本

安装

此库作为“fluent-logger”Python包分发。请执行以下命令进行安装。

$ pip install fluent-logger

配置

必须以tcp源配置启动Fluentd守护进程

<source>
  type forward
  port 24224
</source>

为了快速测试您的设置,请添加一个将日志记录到stdout的匹配器

<match app.**>
  type stdout
</match>

用法

FluentSender 接口

sender.FluentSender 是Fluentd的结构化事件日志记录器。

默认情况下,记录器假设本地启动了fluentd守护进程。您也可以通过传递选项来指定远程记录器。

from fluent import sender

# for local fluent
logger = sender.FluentSender('app')

# for remote fluent
logger = sender.FluentSender('app', host='host', port=24224)

要发送事件,请使用您的事件调用 emit 方法。以下示例将事件发送到fluentd,带有标签“app.follow”和属性“from”和“to”。

# Use current time
logger.emit('follow', {'from': 'userA', 'to': 'userB'})

# Specify optional time
cur_time = int(time.time())
logger.emit_with_time('follow', cur_time, {'from': 'userA', 'to':'userB'})

要发送具有纳秒级时间戳的事件(Fluent 0.14及更高版本),请在 FluentSender 上指定 nanosecond_precision

# Use nanosecond
logger = sender.FluentSender('app', nanosecond_precision=True)
logger.emit('follow', {'from': 'userA', 'to': 'userB'})
logger.emit_with_time('follow', time.time(), {'from': 'userA', 'to': 'userB'})

您可以通过 emit 的返回值检测错误。如果在 emit 中发生错误,emit 返回 False,并使用 last_error 方法获取错误对象。

if not logger.emit('follow', {'from': 'userA', 'to': 'userB'}):
    print(logger.last_error)
    logger.clear_last_error() # clear stored error after handled errors

如果您想关闭客户端,请调用 close() 方法。

logger.close()

事件驱动接口

此API是 sender.FluentSender 的包装器。

首先,您需要调用 sender.setup() 来创建全局 sender.FluentSender 记录器实例。此调用只需要调用一次,例如在应用程序开始时。

事件驱动API的初始化代码如下

from fluent import sender

# for local fluent
sender.setup('app')

# for remote fluent
sender.setup('app', host='host', port=24224)

然后,请创建事件如下。这将事件发送到fluentd,带有标签“app.follow”和属性“from”和“to”。

from fluent import event

# send event to fluentd, with 'app.follow' tag
event.Event('follow', {
  'from': 'userA',
  'to':   'userB'
})

event.Event 有一个限制,不能返回成功/失败结果。

事件驱动接口的其他方法。

sender.get_global_sender # get instance of global sender
sender.close # Call FluentSender#close

处理缓冲区溢出

您可以在连接失败的情况下注入自己的自定义进程来处理事件中的缓冲区溢出。这将减轻数据丢失,而不仅仅是简单地丢弃数据。

import msgpack
from io import BytesIO

def overflow_handler(pendings):
    unpacker = msgpack.Unpacker(BytesIO(pendings))
    for unpacked in unpacker:
        print(unpacked)

logger = sender.FluentSender('app', host='host', port=24224, buffer_overflow_handler=overflow_handler)

您应该在处理程序中处理任何异常。fluent-logger忽略来自 buffer_overflow_handler 的异常。

当在 close() 期间存在挂起的事件时,也会调用此处理程序。

Python logging.Handler 接口

此客户端库还具有用于Python日志模块的 FluentHandler 类。

import logging
from fluent import handler

custom_format = {
  'host': '%(hostname)s',
  'where': '%(module)s.%(funcName)s',
  'type': '%(levelname)s',
  'stack_trace': '%(exc_text)s'
}

logging.basicConfig(level=logging.INFO)
l = logging.getLogger('fluent.test')
h = handler.FluentHandler('app.follow', host='host', port=24224, buffer_overflow_handler=overflow_handler)
formatter = handler.FluentRecordFormatter(custom_format)
h.setFormatter(formatter)
l.addHandler(h)
l.info({
  'from': 'userA',
  'to': 'userB'
})
l.info('{"from": "userC", "to": "userD"}')
l.info("This log entry will be logged with the additional key: 'message'.")

您还可以通过 logging.config.dictConfig 自定义格式化程序

import logging.config
import yaml

with open('logging.yaml') as fd:
    conf = yaml.load(fd)

logging.config.dictConfig(conf['logging'])

您可以在连接失败的情况下注入自己的自定义进程来处理事件中的缓冲区溢出。这将减轻数据丢失,而不仅仅是简单地丢弃数据。

import msgpack
from io import BytesIO

def overflow_handler(pendings):
    unpacker = msgpack.Unpacker(BytesIO(pendings))
    for unpacked in unpacker:
        print(unpacked)

一个示例配置 logging.yaml 如下

logging:
    version: 1

    formatters:
      brief:
        format: '%(message)s'
      default:
        format: '%(asctime)s %(levelname)-8s %(name)-15s %(message)s'
        datefmt: '%Y-%m-%d %H:%M:%S'
      fluent_fmt:
        '()': fluent.handler.FluentRecordFormatter
        format:
          level: '%(levelname)s'
          hostname: '%(hostname)s'
          where: '%(module)s.%(funcName)s'

    handlers:
        console:
            class : logging.StreamHandler
            level: DEBUG
            formatter: default
            stream: ext://sys.stdout
        fluent:
            class: fluent.handler.FluentHandler
            host: localhost
            port: 24224
            tag: test.logging
            buffer_overflow_handler: overflow_handler
            formatter: fluent_fmt
            level: DEBUG
        none:
            class: logging.NullHandler

    loggers:
        amqp:
            handlers: [none]
            propagate: False
        conf:
            handlers: [none]
            propagate: False
        '': # root logger
            handlers: [console, fluent]
            level: DEBUG
            propagate: False

异步通信

除了常规接口之外,还包括由sender.FluentSender提供的基于事件的接口和由handler.FluentHandler提供的Python日志接口。在asyncsenderasynchandler中分别有对应的异步版本。这些版本使用单独的线程来处理与远程fluentd服务器的通信。这样,在记录事件时,库的客户端不会被阻塞,也不会因为fluentd服务器不可达而超时。同时,它也不会因为网络开销而变慢。

asyncsenderasynchandler的接口与senderhandler的接口完全相同,所以只需从不同的模块导入即可。

例如,对于基于事件的接口

from fluent import asyncsender as sender

# for local fluent
sender.setup('app')

# for remote fluent
sender.setup('app', host='host', port=24224)

# do your work
...

# IMPORTANT: before program termination, close the sender
sender.close()

或对于Python日志接口

import logging
from fluent import asynchandler as handler

custom_format = {
  'host': '%(hostname)s',
  'where': '%(module)s.%(funcName)s',
  'type': '%(levelname)s',
  'stack_trace': '%(exc_text)s'
}

logging.basicConfig(level=logging.INFO)
l = logging.getLogger('fluent.test')
h = handler.FluentHandler('app.follow', host='host', port=24224, buffer_overflow_handler=overflow_handler)
formatter = handler.FluentRecordFormatter(custom_format)
h.setFormatter(formatter)
l.addHandler(h)
l.info({
  'from': 'userA',
  'to': 'userB'
})
l.info('{"from": "userC", "to": "userD"}')
l.info("This log entry will be logged with the additional key: 'message'.")

...

# IMPORTANT: before program termination, close the handler
h.close()

注意:请注意,在程序结束时关闭发送器或处理程序非常重要。这将确保通信线程终止并正确连接。否则,程序不会退出,会等待线程,除非强制终止。

循环队列模式

在某些应用程序中,确保在任何情况下都不会阻塞日志记录过程特别重要,即使在日志记录速度超过发送线程可以处理的速度(backpressure)时也是如此。在这种情况下,可以通过在asyncsender.FluentSenderasynchandler.FluentHandlerqueue_circular参数中传递True来启用循环队列模式。这样做,即使队列已满,进行日志记录的线程也不会阻塞,新的事件将通过丢弃最旧的事件添加到队列中。

警告:将queue_circular设置为True会在队列完全填满时导致事件丢失!请确保不会发生这种情况,或者这对您的应用程序是可以接受的。

测试

可以使用pytest进行测试。

$ pytest tests

发布

$ # Download dist.zip for release from GitHub Action artifact.
$ unzip -d dist dist.zip
$ pipx twine upload dist/*

贡献者

这些人贡献的补丁。

许可

Apache License,版本2.0

项目详情


下载文件

下载您平台上的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分布

fluent_logger-0.11.1.tar.gz (58.6 kB 查看哈希值)

上传时间

构建分布

fluent_logger-0.11.1-py3-none-any.whl (12.6 kB 查看哈希值)

上传于 Python 3

支持