Kinesis工具
项目描述
用于AWS Kinesis的各种工具。
安装
pip install kinesisutils
要安装开发版本
pip install git+https://github.com/findhotel/kinesisutils
用法
目前只实现了一个工具:一个从Kinesis流中读取记录的Python生成器。您可以像这样使用它
import json from kinesisutils.kinesisutils import KinesisGenerator # Hit Kinesis with at most 10 requests per second, usin json.loads to # deserialize the Kinesis records (the default). You could deactivate record # deserialization by setting des=None. kg = KinesisGenerator("stream_name", rqs=10, des=json.loads) for rec in kg: print(rec)
默认情况下,生成器将连续从Kinesis中拉取记录60秒。如果您想自定义此超时时间,可以这样做
from kinesisutils.kinesisutils import KinesisGenerator kg = KinesisGenerator("stream_name", timeout=30) for rec in kg: print(rec)
如果您正在使用Kinesis生成器读取通过[日志订阅][logsubs]转发到Kinesis流的Cloudwatch日志事件,您需要在反序列化之前解压缩Cloudwatch记录
import decompress import json kg = KinesisGenerator("stream_name", rqs=10, des=json.loads, preprocess=gzip.decompress) for rec in kg: print(rec)
联系方式
如果您有任何问题、错误报告、建议等,请在本GitHub项目页面上创建一个问题。PR也是受欢迎的。
许可证
本软件根据MIT许可证授权。
查看许可证文件。
© 2017 German Gomez-Herrero, Find Hotel及其他人。