InfluxDB的Kafka消费者
项目描述
用例
快速入门
为了快速测试,在Kafka和InfluxDB容器内运行kafka-influxdb。启动时自动生成一些示例消息(使用kafkacat)。
Python 2
make docker exec -it kafkainfluxdb python -m kafka_influxdb -c config_example.yaml -s
Python 3
make RUNTIME=py3 docker exec -it kafkainfluxdb python -m kafka_influxdb -c config_example.yaml -s
PyPy 5.x
make RUNTIME=pypy docker exec -it kafkainfluxdb pypy3 -m kafka_influxdb -c config_example.yaml -s --kafka_reader=kafka_influxdb.reader.kafka_python
(注意,这里有一个额外的标志:--kafka_reader=kafka_influxdb.reader.kafka_python。这是因为PyPy与librdkafka的C扩展(即confluent kafka consumer)不兼容。因此我们在这里使用兼容PyPy但速度稍慢的kafka_python库。)
安装
pip install kafka_influxdb kafka_influxdb -c config-example.yaml
性能
基准测试
为了进行快速基准测试,您可以使用以下命令启动完整的kafkacat -> Kafka -> kafka_influxdb -> Influxdb设置:
make
这将立即开始从Kafka读取消息并将它们写入InfluxDB。要查看输出,您可以使用InfluxDB CLI。
docker exec -it docker_influxdb_1 bash # Double check your container name influx use metrics show measurements
支持的格式
您可以为任何输入和输出格式(即使是像Protobuf这样的复杂格式)编写自定义编码器。查看encoder目录中的示例以开始。以下格式得到官方支持
输入格式
mydatacenter.myhost.load.load.shortterm 0.45 1436357630
[{
"values":[
0.6
],
"dstypes":[
"gauge"
],
"dsnames":[
"value"
],
"time":1444745144.824,
"interval":10.000,
"host":"xx.example.internal",
"plugin":"cpu",
"plugin_instance":"1",
"type":"percent",
"type_instance":"system"
}]
输出格式
load_load_shortterm,datacenter=mydatacenter,host=myhost value="0.45" 1436357630
配置
选项 |
描述 |
---|---|
-h, --help |
显示帮助信息并退出 |
--kafka_host KAFKA_HOST |
Kafka消息代理的主机名或IP(默认:localhost) |
--kafka_port KAFKA_PORT |
Kafka消息代理的端口号(默认:9092) |
--kafka_topic KAFKA_TOPIC |
指标的主题(默认:my_topic) |
--kafka_group KAFKA_GROUP |
Kafka消费者组(默认:my_group) |
--kafka_reader KAFKA_READER |
要使用的Kafka客户端库(kafka_python或confluent)(默认:kafka_influxdb.reader.confluent) |
--influxdb_host INFLUXDB_HOST |
InfluxDB的主机名或IP(默认:localhost) |
--influxdb_port INFLUXDB_PORT |
InfluxDB API端口(默认:8086) |
--influxdb_user INFLUXDB_USER |
InfluxDB用户名(默认:root) |
--influxdb_password INFLUXDB_PASSWORD |
InfluxDB密码(默认:root) |
--influxdb_dbname INFLUXDB_DBNAME |
将指标写入的InfluxDB数据库(默认:metrics) |
--influxdb_use_ssl |
为InfluxDB使用SSL连接(默认:False) |
--influxdb_verify_ssl |
在连接之前验证SSL证书(默认:False) |
--influxdb_timeout INFLUXDB_TIMEOUT |
建立与InfluxDB连接的最大秒数(默认:5) |
--influxdb_use_udp |
为InfluxDB使用UDP连接(默认:False) |
--influxdb_retention_policy INFLUXDB_RETENTION_POLICY |
传入指标的数据保留策略(默认:autogen) |
--influxdb_time_precision INFLUXDB_TIME_PRECISION |
传入指标的精度。可以是‘s’,‘m’,‘ms’,‘u’中的一个(默认:s) |
--encoder ENCODER |
将传入的消息转换为字典的输入编码器(默认:collectd_graphite_encoder) |
--buffer_size BUFFER_SIZE |
在刷新到后端之前收集的消息的最大数量(默认:1000) |
-c CONFIGFILE, --configfile CONFIGFILE |
配置文件路径(默认:None) |
-s, --statistics |
显示性能统计信息(默认:True) |
-v,--verbose |
设置详细程度。通过添加 v 来增加详细程度:-v -vv -vvv(默认:0) |
--version |
显示版本 |
与其他工具的比较
为 logstash 提供了 Kafka 输入插件和 InfluxDB 输出插件。它支持 Influxdb 0.9+。我们已经在这个设置中实现了大约 5000 条消息/秒 的消息吞吐量。请查看 docker/logstash/config.conf 中的配置。您可以自行运行基准测试
make RUNTIME=logstash docker exec -it logstash logstash -f config.conf
如果您知道其他可以在此提及的工具,请发送拉取请求。
项目详情
kafka_influxdb-0.9.3.zip 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | ebab0b5861e07d000b24fe7bb3da33c5f2fe6fda4d8d336a267ef4322eee4586 |
|
MD5 | da7d1ed74a6423c6b6126fa56b4329c8 |
|
BLAKE2b-256 | fc55edd83e6fd6c8f172b44a935fdebf777027135976d6e9fac959b007edd417 |
kafka_influxdb-0.9.3-py2.py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 96b5ef523a004886d7552956a76876c31ae911fa6a5b49dcfd3ca9c19d29c5b8 |
|
MD5 | 31f0aa3b2cbb833f8cf1ade934c3e92b |
|
BLAKE2b-256 | 44284101d992d715090b33552a434bf4f03584e7d19913cc125c7ff86cabc7ca |