将netCDF CF DSG文件回放到Kafka主题上
项目描述
ncrereplayer
一个小巧的实用工具,用于加载符合CF DSG规范的netCDF文件,并将其以批量
模式或流
模式回放到Kafka主题上。可选地使用配置参数控制原始文件中的时间戳和timedeltas。
数据格式遵循AVRO schema文件schema.avsc
中的描述。您可以选择将数据序列化为avro
、msgpack
或默认的json
。
为什么?
这个实用工具是为了让我的生活更简单而设计的。它并不打算供许多人使用,但对以下情况非常有用:
- 回放netCDF文件中的数据,其时间和间隔与文件中定义的不同。
- 快速设置数据流进行负载测试。
- 将大型系统的一部分从依赖于静态文件转换为流处理。
- 接受来自合作者的标准化数据格式(netCDF/CF/DSG),并将其流式传输到更大的系统中。
数据格式
最简单的示例
{
"uid": "1",
"time": "2019-04-01T00:00:00Z",
"lat": 30.5,
"lon": -76.5,
}
完整示例
{
"uid": "1",
"gid": null,
"time": "2019-04-01T00:00:00Z",
"lat": 30.5,
"lon": -76.5,
"z": null,
"values": {
"salinity": 30.2,
"temperature": 46.5
},
"meta": ""
}
values
对象是可选的,是一个多类型的AVROmap
。meta
是可选的,是开放的。它的目的是携带元数据来描述values
。我建议使用nco-json
。如果系统需要一些关于数据上下文的信息,例如将流式数据发送到网站,这很有用。YMMV。
配置
此程序使用Click
进行CLI界面。我可能花了50%的时间在CLI界面上摆弄这个实用工具。我不知道我提出的是否是令人难以置信的神奇或是一堆垃圾。它工作。欢迎评论。
$ ncreplay
Usage: ncreplay [OPTIONS] FILENAME COMMAND [ARGS]...
Options:
--brokers TEXT Kafka broker string (comman separated)
[required]
--topic TEXT Kafka topic to send the data to [required]
--packing [json|avro|msgpack] The data packing algorithm to use
--registry TEXT URL to a Schema Registry if avro packing is
requested
--uid TEXT Variable name, global attribute, or value to
use for the uid values
--gid TEXT Variable name, global attribute, or value to
use for the gid values
--meta / --no-meta Include the `nco-json` metadata in each
message?
--help Show this message and exit.
Commands:
batch Batch process a netCDF file in chunks, pausing every [chunk]...
stream Streams each unique timestep in the netCDF file every [delta]...
批量
$ ncreplay /path/to/file.nc batch --help
Usage: ncreplay batch [OPTIONS]
Batch process a netCDF file in chunks, pausing every [chunk] records
for [delta] seconds. Optionally change the [starting] time of the file
and/or change each timedelta using the [factor] and [offset] parameters.
Options:
-s, --starting [%Y-%m-%d|%Y-%m-%dT%H:%M:%S|%Y-%m-%d %H:%M:%S]
-f, --factor FLOAT
-o, --offset FLOAT
-d, --delta FLOAT
-c, --chunk INTEGER
--help Show this message and exit.
流
$ ncreplay /path/to/file.nc stream --help
Usage: ncreplay stream [OPTIONS]
Streams each unqiue timestep in the netCDF file every [delta] seconds.
Optionally you can control the [starting] point of the file and this will
re-calculate all of the timestamps to match the original timedeltas.
Options:
-s, --starting [%Y-%m-%d|%Y-%m-%dT%H:%M:%S|%Y-%m-%d %H:%M:%S]
-d, --delta FLOAT
--help Show this message and exit
开发/测试
目前还没有测试,但您可以使用本存储库中包含的文件来尝试不同的选项。
首先设置Kafka生态系统。
$ docker run -d --net=host \
-e ZK_PORT=50000 \
-e BROKER_PORT=4001 \
-e REGISTRY_PORT=4002 \
-e REST_PORT=4003 \
-e CONNECT_PORT=4004 \
-e WEB_PORT=4005 \
-e RUNTESTS=0 \
-e DISABLE=elastic,hbase \
-e DISABLE_JMX=1 \
-e RUNTESTS=0 \
-e FORWARDLOGS=0 \
-e SAMPLEDATA=0 \
--name ncreplayer-testing \
landoop/fast-data-dev:1.0.1
然后设置一个监听器。
$ docker run -it --rm --net=host \
landoop/fast-data-dev:1.0.1 \
kafka-console-consumer \
--bootstrap-server localhost:4001 \
--topic axds-ncreplayer-data
现在批量或流式传输文件。
# Batch
$ ncreplay tests/data/gda_example.nc batch -d 10 -c 10
# Stream
$ ncreplay tests/data/gda_example.nc stream -d 10
为了测试avro
打包,设置一个自动解包数据的监听器。
$ docker run -it --rm --net=host \
landoop/fast-data-dev:1.0.1 \
kafka-avro-console-consumer \
--bootstrap-server localhost:4001 \
--property schema.registry.url=http://localhost:4002 \
--topic axds-ncreplayer-data
并使用avro
打包。
$ ncreplay --packing avro tests/data/gda_example.nc batch -d 10 -c 10
项目详情
下载文件
下载适用于您的平台的文件。如果您不确定选择哪一个,请了解更多关于安装包的信息。
源分发
ncreplayer-1.0.2.tar.gz (7.9 kB 查看哈希值)
构建分发
ncreplayer-1.0.2-py3-none-any.whl (11.6 kB 查看哈希值)
关闭
ncreplayer-1.0.2.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | b3182e18c2ca7842c32e014b05ae8e38a68cc3a6da0fb8701569364c751b26e6 |
|
MD5 | ee8a5618b8c46d84b1af58940855c37a |
|
BLAKE2b-256 | 0912c9a9f8b5d678a600b96ba80c6dbb210cee049b2f8ccb4fe9127a6167a9cc |
关闭
ncreplayer-1.0.2-py3-none-any.whl的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 4ea8ca92e70d5d83eed1c27045e2ea30d7a6349cb8bf608584b5700b664e6a41 |
|
MD5 | b87b11f1092d6eeaddc92232e5e10c43 |
|
BLAKE2b-256 | 58a22ab3ce44d60b56df37d9f1d3331295e52476c30cd3fbaec3bccbe6527346 |