跳转到主要内容

InfluxDB的Kafka消费者

项目描述

Build Status
Coverage Status
Code Climate
PyPi Version
Scrutinizer
Python编写的InfluxDB Kafka消费者。
支持InfluxDB 0.9.x及以上版本。对于InfluxDB 0.8.x的支持,请查看0.3.0标签

用例

Kafka将在高负载期间充当你的指标数据的缓冲。
它还适用于从具有不可靠连接的海外数据中心向监控后端发送指标。

image5

快速入门

为了快速测试,在Kafka和InfluxDB容器内运行kafka-influxdb。启动时自动生成一些示例消息(使用kafkacat)。

Python 2

make
docker exec -it kafkainfluxdb
python -m kafka_influxdb -c config_example.yaml -s

Python 3

make RUNTIME=py3
docker exec -it kafkainfluxdb
python -m kafka_influxdb -c config_example.yaml -s

PyPy 5.x

make RUNTIME=pypy
docker exec -it kafkainfluxdb
pypy3 -m kafka_influxdb -c config_example.yaml -s --kafka_reader=kafka_influxdb.reader.kafka_python

(注意,这里有一个额外的标志:--kafka_reader=kafka_influxdb.reader.kafka_python。这是因为PyPy与librdkafka的C扩展(即confluent kafka consumer)不兼容。因此我们在这里使用兼容PyPy但速度稍慢的kafka_python库。)

安装

pip install kafka_influxdb
kafka_influxdb -c config-example.yaml

性能

以下图表显示了不同Python版本和Kafka消费者插件从Kafka读取的每秒消息数。
这是针对具有10个分区和五个消息代理的Kafka主题进行的测试。
如您所见,在Python 3中使用-O标志进行字节码优化,并结合confluent-kafka读取器(默认设置),可以达到最佳性能。请注意,将数据编码并发送到InfluxDB可能会降低此最大性能,尽管与logstash相比,您仍然应该看到显著的性能提升。

Benchmark results

基准测试

为了进行快速基准测试,您可以使用以下命令启动完整的kafkacat -> Kafka -> kafka_influxdb -> Influxdb设置:

make

这将立即开始从Kafka读取消息并将它们写入InfluxDB。要查看输出,您可以使用InfluxDB CLI。

docker exec -it docker_influxdb_1 bash # Double check your container name
influx
use metrics
show measurements

支持的格式

您可以为任何输入和输出格式(即使是像Protobuf这样的复杂格式)编写自定义编码器。查看encoder目录中的示例以开始。以下格式得到官方支持

输入格式

mydatacenter.myhost.load.load.shortterm 0.45 1436357630
[{
    "values":[
       0.6
    ],
    "dstypes":[
       "gauge"
    ],
    "dsnames":[
       "value"
    ],
    "time":1444745144.824,
    "interval":10.000,
    "host":"xx.example.internal",
    "plugin":"cpu",
    "plugin_instance":"1",
    "type":"percent",
    "type_instance":"system"
 }]

输出格式

load_load_shortterm,datacenter=mydatacenter,host=myhost value="0.45" 1436357630

配置

查看config-example.yaml以了解如何创建配置文件。
您可以从命令行覆盖设置。以下参数被允许

选项

描述

-h, --help

显示帮助信息并退出

--kafka_host KAFKA_HOST

Kafka消息代理的主机名或IP(默认:localhost)

--kafka_port KAFKA_PORT

Kafka消息代理的端口号(默认:9092)

--kafka_topic KAFKA_TOPIC

指标的主题(默认:my_topic)

--kafka_group KAFKA_GROUP

Kafka消费者组(默认:my_group)

--kafka_reader KAFKA_READER

要使用的Kafka客户端库(kafka_python或confluent)(默认:kafka_influxdb.reader.confluent)

--influxdb_host INFLUXDB_HOST

InfluxDB的主机名或IP(默认:localhost)

--influxdb_port INFLUXDB_PORT

InfluxDB API端口(默认:8086)

--influxdb_user INFLUXDB_USER

InfluxDB用户名(默认:root)

--influxdb_password INFLUXDB_PASSWORD

InfluxDB密码(默认:root)

--influxdb_dbname INFLUXDB_DBNAME

将指标写入的InfluxDB数据库(默认:metrics)

--influxdb_use_ssl

为InfluxDB使用SSL连接(默认:False)

--influxdb_verify_ssl

在连接之前验证SSL证书(默认:False)

--influxdb_timeout INFLUXDB_TIMEOUT

建立与InfluxDB连接的最大秒数(默认:5)

--influxdb_use_udp

为InfluxDB使用UDP连接(默认:False)

--influxdb_retention_policy INFLUXDB_RETENTION_POLICY

传入指标的数据保留策略(默认:autogen)

--influxdb_time_precision INFLUXDB_TIME_PRECISION

传入指标的精度。可以是‘s’,‘m’,‘ms’,‘u’中的一个(默认:s)

--encoder ENCODER

将传入的消息转换为字典的输入编码器(默认:collectd_graphite_encoder)

--buffer_size BUFFER_SIZE

在刷新到后端之前收集的消息的最大数量(默认:1000)

-c CONFIGFILE, --configfile CONFIGFILE

配置文件路径(默认:None)

-s, --statistics

显示性能统计信息(默认:True)

-v--verbose

设置详细程度。通过添加 v 来增加详细程度:-v -vv -vvv(默认:0)

--version

显示版本

与其他工具的比较

logstash 提供了 Kafka 输入插件和 InfluxDB 输出插件。它支持 Influxdb 0.9+。我们已经在这个设置中实现了大约 5000 条消息/秒 的消息吞吐量。请查看 docker/logstash/config.conf 中的配置。您可以自行运行基准测试

make RUNTIME=logstash
docker exec -it logstash
logstash -f config.conf

如果您知道其他可以在此提及的工具,请发送拉取请求。

项目详情


下载文件

下载适合您平台的自定义文件。如果您不确定要选择哪个,请了解有关 安装包 的更多信息。

源分布

kafka_influxdb-0.9.3.zip (51.7 kB 查看哈希值

上传时间:

构建分布

kafka_influxdb-0.9.3-py2.py3-none-any.whl (42.9 kB 查看哈希值

上传时间: Python 2 Python 3

由以下机构支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面