Fluentd事件收集器的Python日志处理程序
项目描述
许多Web/移动应用程序会生成大量事件日志(例如登录、登出、购买、关注等)。分析这些事件日志对于改进服务非常有价值。然而,挑战在于轻松且可靠地收集这些日志。
Fluentd 和 Fluent Bit 通过以下方式解决了这个问题:易于安装、占用空间小、插件、可靠的缓冲、日志转发等。
fluent-logger-python 是一个Python库,用于记录Python应用程序的事件。
要求
Python 3.7+
msgpack
重要:版本0.8.0是最后一个支持Python 2.6、3.2和3.3的版本
重要:版本0.9.6是最后一个支持Python 2.7和3.4的版本
重要:版本0.10.0是最后一个支持Python 3.5和3.6的版本
安装
此库作为“fluent-logger”Python包分发。请执行以下命令进行安装。
$ pip install fluent-logger
配置
必须以tcp源配置启动Fluentd守护进程
<source> type forward port 24224 </source>
为了快速测试您的设置,请添加一个将日志记录到stdout的匹配器
<match app.**> type stdout </match>
用法
FluentSender 接口
sender.FluentSender 是Fluentd的结构化事件日志记录器。
默认情况下,记录器假设本地启动了fluentd守护进程。您也可以通过传递选项来指定远程记录器。
from fluent import sender
# for local fluent
logger = sender.FluentSender('app')
# for remote fluent
logger = sender.FluentSender('app', host='host', port=24224)
要发送事件,请使用您的事件调用 emit 方法。以下示例将事件发送到fluentd,带有标签“app.follow”和属性“from”和“to”。
# Use current time
logger.emit('follow', {'from': 'userA', 'to': 'userB'})
# Specify optional time
cur_time = int(time.time())
logger.emit_with_time('follow', cur_time, {'from': 'userA', 'to':'userB'})
要发送具有纳秒级时间戳的事件(Fluent 0.14及更高版本),请在 FluentSender 上指定 nanosecond_precision。
# Use nanosecond
logger = sender.FluentSender('app', nanosecond_precision=True)
logger.emit('follow', {'from': 'userA', 'to': 'userB'})
logger.emit_with_time('follow', time.time(), {'from': 'userA', 'to': 'userB'})
您可以通过 emit 的返回值检测错误。如果在 emit 中发生错误,emit 返回 False,并使用 last_error 方法获取错误对象。
if not logger.emit('follow', {'from': 'userA', 'to': 'userB'}):
print(logger.last_error)
logger.clear_last_error() # clear stored error after handled errors
如果您想关闭客户端,请调用 close() 方法。
logger.close()
事件驱动接口
此API是 sender.FluentSender 的包装器。
首先,您需要调用 sender.setup() 来创建全局 sender.FluentSender 记录器实例。此调用只需要调用一次,例如在应用程序开始时。
事件驱动API的初始化代码如下
from fluent import sender
# for local fluent
sender.setup('app')
# for remote fluent
sender.setup('app', host='host', port=24224)
然后,请创建事件如下。这将事件发送到fluentd,带有标签“app.follow”和属性“from”和“to”。
from fluent import event
# send event to fluentd, with 'app.follow' tag
event.Event('follow', {
'from': 'userA',
'to': 'userB'
})
event.Event 有一个限制,不能返回成功/失败结果。
事件驱动接口的其他方法。
sender.get_global_sender # get instance of global sender
sender.close # Call FluentSender#close
处理缓冲区溢出
您可以在连接失败的情况下注入自己的自定义进程来处理事件中的缓冲区溢出。这将减轻数据丢失,而不仅仅是简单地丢弃数据。
import msgpack
from io import BytesIO
def overflow_handler(pendings):
unpacker = msgpack.Unpacker(BytesIO(pendings))
for unpacked in unpacker:
print(unpacked)
logger = sender.FluentSender('app', host='host', port=24224, buffer_overflow_handler=overflow_handler)
您应该在处理程序中处理任何异常。fluent-logger忽略来自 buffer_overflow_handler 的异常。
当在 close() 期间存在挂起的事件时,也会调用此处理程序。
Python logging.Handler 接口
此客户端库还具有用于Python日志模块的 FluentHandler 类。
import logging
from fluent import handler
custom_format = {
'host': '%(hostname)s',
'where': '%(module)s.%(funcName)s',
'type': '%(levelname)s',
'stack_trace': '%(exc_text)s'
}
logging.basicConfig(level=logging.INFO)
l = logging.getLogger('fluent.test')
h = handler.FluentHandler('app.follow', host='host', port=24224, buffer_overflow_handler=overflow_handler)
formatter = handler.FluentRecordFormatter(custom_format)
h.setFormatter(formatter)
l.addHandler(h)
l.info({
'from': 'userA',
'to': 'userB'
})
l.info('{"from": "userC", "to": "userD"}')
l.info("This log entry will be logged with the additional key: 'message'.")
您还可以通过 logging.config.dictConfig 自定义格式化程序
import logging.config
import yaml
with open('logging.yaml') as fd:
conf = yaml.load(fd)
logging.config.dictConfig(conf['logging'])
您可以在连接失败的情况下注入自己的自定义进程来处理事件中的缓冲区溢出。这将减轻数据丢失,而不仅仅是简单地丢弃数据。
import msgpack
from io import BytesIO
def overflow_handler(pendings):
unpacker = msgpack.Unpacker(BytesIO(pendings))
for unpacked in unpacker:
print(unpacked)
一个示例配置 logging.yaml 如下
logging:
version: 1
formatters:
brief:
format: '%(message)s'
default:
format: '%(asctime)s %(levelname)-8s %(name)-15s %(message)s'
datefmt: '%Y-%m-%d %H:%M:%S'
fluent_fmt:
'()': fluent.handler.FluentRecordFormatter
format:
level: '%(levelname)s'
hostname: '%(hostname)s'
where: '%(module)s.%(funcName)s'
handlers:
console:
class : logging.StreamHandler
level: DEBUG
formatter: default
stream: ext://sys.stdout
fluent:
class: fluent.handler.FluentHandler
host: localhost
port: 24224
tag: test.logging
buffer_overflow_handler: overflow_handler
formatter: fluent_fmt
level: DEBUG
none:
class: logging.NullHandler
loggers:
amqp:
handlers: [none]
propagate: False
conf:
handlers: [none]
propagate: False
'': # root logger
handlers: [console, fluent]
level: DEBUG
propagate: False
异步通信
除了常规接口之外,还包括由sender.FluentSender提供的基于事件的接口和由handler.FluentHandler提供的Python日志接口。在asyncsender和asynchandler中分别有对应的异步版本。这些版本使用单独的线程来处理与远程fluentd服务器的通信。这样,在记录事件时,库的客户端不会被阻塞,也不会因为fluentd服务器不可达而超时。同时,它也不会因为网络开销而变慢。
asyncsender和asynchandler的接口与sender和handler的接口完全相同,所以只需从不同的模块导入即可。
例如,对于基于事件的接口
from fluent import asyncsender as sender
# for local fluent
sender.setup('app')
# for remote fluent
sender.setup('app', host='host', port=24224)
# do your work
...
# IMPORTANT: before program termination, close the sender
sender.close()
或对于Python日志接口
import logging
from fluent import asynchandler as handler
custom_format = {
'host': '%(hostname)s',
'where': '%(module)s.%(funcName)s',
'type': '%(levelname)s',
'stack_trace': '%(exc_text)s'
}
logging.basicConfig(level=logging.INFO)
l = logging.getLogger('fluent.test')
h = handler.FluentHandler('app.follow', host='host', port=24224, buffer_overflow_handler=overflow_handler)
formatter = handler.FluentRecordFormatter(custom_format)
h.setFormatter(formatter)
l.addHandler(h)
l.info({
'from': 'userA',
'to': 'userB'
})
l.info('{"from": "userC", "to": "userD"}')
l.info("This log entry will be logged with the additional key: 'message'.")
...
# IMPORTANT: before program termination, close the handler
h.close()
注意:请注意,在程序结束时关闭发送器或处理程序非常重要。这将确保通信线程终止并正确连接。否则,程序不会退出,会等待线程,除非强制终止。
循环队列模式
在某些应用程序中,确保在任何情况下都不会阻塞日志记录过程特别重要,即使在日志记录速度超过发送线程可以处理的速度(backpressure)时也是如此。在这种情况下,可以通过在asyncsender.FluentSender或asynchandler.FluentHandler的queue_circular
参数中传递True
来启用循环队列
模式。这样做,即使队列已满,进行日志记录的线程也不会阻塞,新的事件将通过丢弃最旧的事件添加到队列中。
警告:将queue_circular
设置为True
会在队列完全填满时导致事件丢失!请确保不会发生这种情况,或者这对您的应用程序是可以接受的。
测试
可以使用pytest进行测试。
$ pytest tests
发布
$ # Download dist.zip for release from GitHub Action artifact.
$ unzip -d dist dist.zip
$ pipx twine upload dist/*
贡献者
由这些人贡献的补丁。
许可
Apache License,版本2.0
项目详情
下载文件
下载您平台上的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。
源分布
构建分布
fluent_logger-0.11.1.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 6727525ba08671b758e3ac222f36fa3345bc1a77b81e7ddbc045ced68f44cd77 |
|
MD5 | 2f152925fe42cad792f9c68b6ff68c29 |
|
BLAKE2b-256 | 80c328b1522cc6b17a8b6bd9551e4a81c111004cf48eaf4fd6a7c644cf3dc3e8 |
fluent_logger-0.11.1-py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 1601c4b929a93b4aef03310573387ac808cf4463cdeaf619c39b257da34c0c0c |
|
MD5 | 2410ce2c0bc145578c0b64d7d6b38500 |
|
BLAKE2b-256 | b0824faf651e942897790b1bbba156e61ab4d84a7378e8f64d0e6e2d8538b0d5 |