Fluentd事件收集器的Python日志处理器
项目描述
警告:这是一个https://github.com/fluent/fluent-logger-python项目的分支,用于与asyncio一起工作。
许多Web/移动应用程序会生成大量事件日志(例如登录、登出、购买、关注等)。分析这些事件日志对于改进服务可能非常有价值。然而,挑战在于轻松且可靠地收集这些日志。
Fluentd通过以下方式解决了这个问题:易于安装、占用空间小、插件、可靠的缓冲、日志转发等。
aioluent是一个Python库,用于记录Python应用程序的事件。
需求
Python 3.5或更高版本
msgpack-python
安装
此库作为“aiofluent”python包分发。请执行以下命令进行安装。
$ pip install aiofluent
配置
必须使用tcp源配置启动Fluentd守护进程
<source> type forward port 24224 </source>
为了快速测试您的设置,请添加一个记录到stdout的匹配器
<match app.**> type stdout </match>
用法
FluentSender接口
sender.FluentSender是Fluentd的结构化事件记录器。
默认情况下,记录器假定本地启动了fluentd守护进程。您也可以通过传递选项来指定远程记录器。
from aiofluent import sender
# for local fluent
logger = sender.FluentSender('app')
# for remote fluent
logger = sender.FluentSender('app', host='host', port=24224)
要发送事件,请使用您的事件调用emit方法。以下示例将事件发送到fluentd,带有标签‘app.follow’和属性‘from’和‘to’。
# Use current time
logger.emit('follow', {'from': 'userA', 'to': 'userB'})
# Specify optional time
cur_time = int(time.time())
logger.emit_with_time('follow', cur_time, {'from': 'userA', 'to':'userB'})
您可以通过emit的返回值来检测错误。如果在emit过程中发生错误,emit将返回False,并通过last_error方法获取错误对象。
if not logger.emit('follow', {'from': 'userA', 'to': 'userB'}):
print(logger.last_error)
logger.clear_last_error() # clear stored error after handled errors
如果您想关闭客户端,请调用close()方法。
logger.close()
基于事件的接口
此API是sender.FluentSender的包装器。
首先,您需要调用sender.setup()来创建全局的sender.FluentSender日志实例。此调用只需在应用程序开始时调用一次。
以下是基于事件的API的初始化代码
from aiofluent import sender
# for local fluent
sender.setup('app')
# for remote fluent
sender.setup('app', host='host', port=24224)
然后,请创建以下事件。这将带有标签‘app.follow’和属性‘from’和‘to’的事件发送到fluentd。
from aiofluent import event
# send event to fluentd, with 'app.follow' tag
event.Event('follow', {
'from': 'userA',
'to': 'userB'
})
event.Event有一个限制,不能返回成功/失败结果。
其他基于事件的接口方法。
sender.get_global_sender # get instance of global sender
sender.close # Call FluentSender#close
缓冲区溢出处理器
您可以在连接失败的情况下注入自己的自定义进程来处理缓冲区溢出。这将减少数据丢失,而不是简单地丢弃数据。
import msgpack
from io import BytesIO
def handler(pendings):
unpacker = msgpack.Unpacker(BytesIO(pendings))
for unpacked in unpacker:
print(unpacked)
logger = sender.FluentSender('app', host='host', port=24224, buffer_overflow_handler=handler)
您应该在处理程序中处理任何异常。aiofluent忽略来自buffer_overflow_handler的异常。
在调用close()时存在挂起事件时,也将调用此处理程序。
Python日志处理程序接口
此客户端库还具有用于Python日志模块的FluentHandler类。
import logging
from aiofluent import handler
custom_format = {
'host': '%(hostname)s',
'where': '%(module)s.%(funcName)s',
'type': '%(levelname)s',
'stack_trace': '%(exc_text)s'
}
logging.basicConfig(level=logging.INFO)
l = logging.getLogger('fluent.test')
h = handler.FluentHandler('app.follow', host='host', port=24224)
formatter = handler.FluentRecordFormatter(custom_format)
h.setFormatter(formatter)
l.addHandler(h)
l.info({
'from': 'userA',
'to': 'userB'
})
l.info('{"from": "userC", "to": "userD"}')
l.info("This log entry will be logged with the additional key: 'message'.")
您还可以通过logging.config.dictConfig自定义格式化程序。
import logging.config
import yaml
with open('logging.yaml') as fd:
conf = yaml.load(fd)
logging.config.dictConfig(conf['logging'])
以下是一个示例配置logging.yaml:
logging:
version: 1
formatters:
brief:
format: '%(message)s'
default:
format: '%(asctime)s %(levelname)-8s %(name)-15s %(message)s'
datefmt: '%Y-%m-%d %H:%M:%S'
fluent_fmt:
'()': fluent.handler.FluentRecordFormatter
format:
level: '%(levelname)s'
hostname: '%(hostname)s'
where: '%(module)s.%(funcName)s'
handlers:
console:
class : logging.StreamHandler
level: DEBUG
formatter: default
stream: ext://sys.stdout
fluent:
class: fluent.handler.FluentHandler
host: localhost
port: 24224
tag: test.logging
formatter: fluent_fmt
level: DEBUG
none:
class: logging.NullHandler
loggers:
amqp:
handlers: [none]
propagate: False
conf:
handlers: [none]
propagate: False
'': # root logger
handlers: [console, fluent]
level: DEBUG
propagate: False
许可证
Apache License,版本2.0
1.2.9 (2020-10-22)
每30秒只记录错误
1.2.8 (2020-05-15)
处理TypeError格式化日志数据
1.2.7 (2020-03-09)
修复仓库位置
1.2.6 (2020-01-06)
改进错误日志[vangheem]
1.2.5 (2019-12-19)
处理事件循环关闭错误[vangheem]
1.2.4 (2019-12-19)
增加最大队列大小
1.2.3 (2019-04-01)
修复发布
1.2.2 (2019-04-01)
默认为纳秒精度[davidonna]
1.2.1 (2018-10-31)
添加对纳秒精度时间戳的支持[davidonna]
1.2.0 (2018-06-14)
维护所有日志的异步队列[vangheem]
1.1.4 (2018-05-29)
处理取消任务/清理时的RuntimeError[vangheem]
1.1.3 (2018-02-15)
锁定调用发送者的close方法[vangheem]
增加默认超时[vangheem]
1.1.2 (2018-02-07)
锁定整个方法[vangheem]
1.1.1 (2018-02-07)
在获取连接对象时使用锁[vangheem]
1.1.0 (2018-01-25)
转移到使用异步连接基础设施而不是套接字[vangheem]
1.0.8 (2018-01-04)
始终关闭缓冲区数据[vangheem]
1.0.7 (2018-01-04)
处理处理日志队列时的错误[vangheem]
1.0.6 (2017-11-14)
防止日志队列过大[vangheem]
1.0.5 (2017-10-17)
修复发布以包含CHANGELOG.rst文件[vangheem]
1.0.4 (2017-10-10)
修复推送初始记录
1.0.3 (2017-10-10)
处理在事件循环开始之前完成日志记录时的RuntimeError[vangheem]
1.0.2 (2017-10-09)
修复以使正常日志调用异步[vangheem]
1.0.1 (2017-07-03)
初始发布