NASA通用坐标网络(GCN)的Kafka客户端
项目描述
Python GCN Kafka客户端
这是通用坐标网络(GCN)的官方Python客户端。它是对confluent-kafka-python的轻量级封装。
安装
运行此命令使用pip安装
pip install gcn-kafka
或运行此命令使用conda安装
conda install -c conda-forge gcn-kafka
使用
创建一个消费者。
from gcn_kafka import Consumer
consumer = Consumer(client_id='fill me in',
client_secret='fill me in')
列出所有主题
print(consumer.list_topics().topics)
订阅主题并接收警报
consumer.subscribe(['gcn.classic.text.FERMI_GBM_FIN_POS',
'gcn.classic.text.LVC_INITIAL'])
while True:
for message in consumer.consume(timeout=1):
if message.error():
print(message.error())
continue
print(message.value())
consume()
函数的timeout
参数,以秒为单位,允许程序在达到现有消息缓冲区末尾时快速退出。这对于只想从流中恢复旧消息的用户很有用。timeout
还将使while True
无限循环可以通过标准的ctrl-c键序列中断,而consume()
忽略此键序列。
测试和开发Kafka集群
GCN有三个Kafka集群:生产、测试和内部开发部署。使用可选的domain
关键字参数选择要连接到的代理。
# Production (default)
consumer = Consumer(client_id='fill me in',
client_secret='fill me in',
domain='gcn.nasa.gov')
# Testing
consumer = Consumer(client_id='fill me in',
client_secret='fill me in',
domain='test.gcn.nasa.gov')
# Development (internal)
consumer = Consumer(client_id='fill me in',
client_secret='fill me in',
domain='dev.gcn.nasa.gov')
常见问题解答
如何跟踪在重启客户端时的最后读取消息?
Apache Kafka消费者客户端的一个关键特性是能够持久跟踪哪些消息已被读取。这使得客户端能够在重启后通过从最早未读消息开始恢复丢失的消息,而不是从流中的下一个可用消息开始。为了启用此功能,您需要使用Consumer类的配置字典参数设置客户端Group ID,并将自动偏移重置选项更改为“earliest”设置。完成这些操作后,每个具有给定Group ID的新客户端都将从最早未读消息开始读取指定的主题。在此过程中,建议关闭自动提交功能,因为它如果在自动提交间隔(默认为5秒)之前客户端崩溃,可能会丢失最后读取的消息。在消息读取后手动提交消息(即存储最后读取消息的状态)是跟踪最后读取消息最稳健的方法。
示例代码
from gcn_kafka import Consumer
config = {'group.id': 'my group name',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False}
consumer = Consumer(config=config,
client_id='fill me in',
client_secret='fill me in',
domain='gcn.nasa.gov')
topics = ['gcn.classic.voevent.FERMI_GBM_SUBTHRESH']
consumer.subscribe(topics)
while True:
for message in consumer.consume(timeout=1):
print(message.value())
consumer.commit(message)
如何从给定流的最早消息开始读取消息?
您可以通过将Group ID设置为空字符串并在Consumer类的配置字典参数中将自动偏移重置选项设置为“earliest”来从流缓冲区中存在的最早消息开始读取给定主题流。此功能允许用户为了测试目的或恢复由于崩溃或网络中断而可能丢失的消息而扫描旧消息。只需记住,流缓冲区的大小是有限的。目前,它们包含过去几天内的消息。
示例代码
from gcn_kafka import Consumer
config = {'auto.offset.reset': 'earliest'}
consumer = Consumer(config=config,
client_id='fill me in',
client_secret='fill me in',
domain='gcn.nasa.gov')
topics = ['gcn.classic.voevent.INTEGRAL_SPIACS']
consumer.subscribe(topics)
while True:
for message in consumer.consume(timeout=1):
print(message.value())
如何搜索给定日期范围内的消息?
要搜索给定日期范围内的消息,您可以使用Consumer类的offsets_for_times()函数获取所需日期范围内的消息偏移量。然后,您可以将起始偏移量分配给Consumer并读取所需数量的消息。在此过程中,请记住流缓冲区的大小是有限的。无法恢复流缓冲区开始之前的消息。GCN流缓冲区目前设置为保留过去几天内的消息。
示例代码
import datetime
from gcn_kafka import Consumer
from confluent_kafka import TopicPartition
consumer = Consumer(client_id='fill me in',
client_secret='fill me in',
domain='gcn.nasa.gov')
# get messages occurring 3 days ago
timestamp1 = int((datetime.datetime.now() - datetime.timedelta(days=3)).timestamp() * 1000)
timestamp2 = timestamp1 + 86400000 # +1 day
topic = 'gcn.classic.voevent.INTEGRAL_SPIACS'
start = consumer.offsets_for_times(
[TopicPartition(topic, 0, timestamp1)])
end = consumer.offsets_for_times(
[TopicPartition(topic, 0, timestamp2)])
consumer.assign(start)
for message in consumer.consume(end[0].offset - start[0].offset, timeout=1):
print(message.value())
已知问题
confluent-kafka-python
如果您使用confluent-kafka-python v2.1.0或v2.1.1与librdkafka v2.1.1一起使用,在订阅不可用的主题时将遇到段错误。
有关问题的更新,请参阅confluent-kafka-python github问题。
项目详情
下载文件
下载适合您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。