跳转到主要内容

基于多进程的AWS Kinesis生产者与消费者库,低级别

项目描述

https://img.shields.io/travis/NerdWalletOSS/kinesis-python.svg https://img.shields.io/codecov/c/github/NerdWalletOSS/kinesis-python.svg Latest PyPI version

官方Kinesis Python库要求使用Amazon的“MultiLangDaemon”,这是一个Java可执行文件,通过管道在STDIN/STDOUT上操作消息。

ಠ_ಠ

从维护角度来看,从KPL责任团队的角度来看,希望有一个客户端库的单个实现是有意义的,但要求安装JRE并考虑到由Java和Python消耗的流的开销,对于在无Java环境中工作的团队来说并不理想。

这是一个纯Python实现的Kinesis生产者和消费者类,利用Python的multiprocessing模块为每个分片生成一个进程,然后通过Queue将消息发送回主进程。它仅依赖于boto3(AWS SDK)、offspring(子进程实现)和six(py2/py3兼容性)。

它还包括一个DynamoDB状态后端,允许多实例消费多个分片,并存储检查点数据,以便在重新启动或崩溃后从上次离开的地方恢复。

概述

所有功能都封装在两个类中:KinesisConsumer和KinesisProducer

消费者

消费者通过在每个分片中启动一个进程并实现Python迭代器协议来工作。

from kinesis.consumer import KinesisConsumer

consumer = KinesisConsumer(stream_name='my-stream')
for message in consumer:
    print "Received message: {0}".format(message)

从每个分片进程接收到的消息通过Python队列传递回主进程,在队列中进行处理。消息不是严格有序的,但这不是本实现的特性,而是Kinesis的特性。

锁定、检查点和多实例消费

当部署具有多个实例的应用程序时,可以使用DynamoDB来协调哪个实例负责哪个分片,因为每个实例处理所有记录是不理想的。

无论是否有多个节点,在处理记录时也期望检查流,以便在消费者重启时能够从上次停止的地方继续。

利用DynamoDB的“状态”后端允许消费者协调哪个节点负责哪些分片以及我们在流中当前读取的位置。

from kinesis.consumer import KinesisConsumer
from kinesis.state import DynamoDB

consumer = KinesisConsumer(stream_name='my-stream', state=DynamoDB(table_name='my-kinesis-state'))
for message in consumer:
    print "Received message: {0}".format(message)

DynamoDB表必须已经存在,并且必须有一个类型为S(字符串)的HASH键为shard

生产者

生产者通过启动一个用于累积和发布到流的单个进程来工作。

from kinesis.producer import KinesisProducer

producer = KinesisProducer(stream_name='my-stream')
producer.put('Hello World from Python')

默认情况下,累积缓冲时间为500毫秒,或最大记录大小为1MB,以先到者为准。您可以通过在实例化生产者时指定以秒为单位的buffer_time关键字参数来更改缓冲时间。例如,如果您更关注预算而不是性能,可以在60秒的时间内累积。

producer = KinesisProducer(stream_name='my-stream', buffer_time=60)

后台进程会采取措施确保在关闭时通过信号处理程序和python atexit模块将任何累积的消息刷新到流中,但这并不是完全持久的。如果您向生产者进程发送kill -9,则任何累积的消息都会丢失。

AWS权限

默认情况下,生产者、消费者和状态类都使用默认的boto3凭证链。如果您想更改它,您可以实例化自己的boto3.Session对象,并通过KinesisProducerKinesisConsumerDynamoDB构造函数的boto3_session关键字参数将其传递进去。

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源分布

kinesis-python-0.2.1.tar.gz (11.7 kB 查看散列)

上传时间

构建分布

kinesis_python-0.2.1-py3-none-any.whl (10.7 kB 查看散列)

上传时间 Python 3

由以下支持