跳转到主要内容

将netCDF CF DSG文件回放到Kafka主题上

项目描述

ncrereplayer

Build Status License

一个小巧的实用工具,用于加载符合CF DSG规范的netCDF文件,并将其以批量模式或模式回放到Kafka主题上。可选地使用配置参数控制原始文件中的时间戳和timedeltas。

数据格式遵循AVRO schema文件schema.avsc中的描述。您可以选择将数据序列化为avromsgpack或默认的json

为什么?

这个实用工具是为了让我的生活更简单而设计的。它并不打算供许多人使用,但对以下情况非常有用:

  • 回放netCDF文件中的数据,其时间和间隔与文件中定义的不同。
  • 快速设置数据流进行负载测试。
  • 将大型系统的一部分从依赖于静态文件转换为流处理。
  • 接受来自合作者的标准化数据格式(netCDF/CF/DSG),并将其流式传输到更大的系统中。

数据格式

最简单的示例

{
  "uid": "1",
  "time": "2019-04-01T00:00:00Z",
  "lat": 30.5,
  "lon": -76.5,
}

完整示例

{
  "uid": "1",
  "gid": null,
  "time": "2019-04-01T00:00:00Z",
  "lat": 30.5,
  "lon": -76.5,
  "z": null,
  "values": {
    "salinity": 30.2,
    "temperature":  46.5
  },
  "meta": ""
}
  • values对象是可选的,是一个多类型的AVRO map
  • meta是可选的,是开放的。它的目的是携带元数据来描述values。我建议使用nco-json。如果系统需要一些关于数据上下文的信息,例如将流式数据发送到网站,这很有用。YMMV。

配置

此程序使用Click进行CLI界面。我可能花了50%的时间在CLI界面上摆弄这个实用工具。我不知道我提出的是否是令人难以置信的神奇或是一堆垃圾。它工作。欢迎评论。

$ ncreplay
Usage: ncreplay [OPTIONS] FILENAME COMMAND [ARGS]...

Options:
  --brokers TEXT                 Kafka broker string (comman separated)
                                 [required]
  --topic TEXT                   Kafka topic to send the data to  [required]
  --packing [json|avro|msgpack]  The data packing algorithm to use
  --registry TEXT                URL to a Schema Registry if avro packing is
                                 requested
  --uid TEXT                     Variable name, global attribute, or value to
                                 use for the uid values
  --gid TEXT                     Variable name, global attribute, or value to
                                 use for the gid values
  --meta / --no-meta             Include the `nco-json` metadata in each
                                 message?
  --help                         Show this message and exit.

Commands:
  batch   Batch process a netCDF file in chunks, pausing every [chunk]...
  stream  Streams each unique timestep in the netCDF file every [delta]...

批量

$ ncreplay /path/to/file.nc batch --help
Usage: ncreplay batch [OPTIONS]

  Batch process a netCDF file in chunks, pausing every [chunk] records
  for [delta] seconds. Optionally change the [starting] time of the file
  and/or change each timedelta using the [factor] and [offset] parameters.

Options:
  -s, --starting [%Y-%m-%d|%Y-%m-%dT%H:%M:%S|%Y-%m-%d %H:%M:%S]
  -f, --factor FLOAT
  -o, --offset FLOAT
  -d, --delta FLOAT
  -c, --chunk INTEGER
  --help                          Show this message and exit.

$ ncreplay /path/to/file.nc stream --help
Usage: ncreplay stream [OPTIONS]

  Streams each unqiue timestep in the netCDF file every [delta] seconds.
  Optionally you can control the [starting] point of the file and this will
  re-calculate all of the timestamps to match the original timedeltas.

Options:
  -s, --starting [%Y-%m-%d|%Y-%m-%dT%H:%M:%S|%Y-%m-%d %H:%M:%S]
  -d, --delta FLOAT
  --help                          Show this message and exit

开发/测试

目前还没有测试,但您可以使用本存储库中包含的文件来尝试不同的选项。

首先设置Kafka生态系统。

$ docker run -d --net=host \
    -e ZK_PORT=50000 \
    -e BROKER_PORT=4001 \
    -e REGISTRY_PORT=4002 \
    -e REST_PORT=4003 \
    -e CONNECT_PORT=4004 \
    -e WEB_PORT=4005 \
    -e RUNTESTS=0 \
    -e DISABLE=elastic,hbase \
    -e DISABLE_JMX=1 \
    -e RUNTESTS=0 \
    -e FORWARDLOGS=0 \
    -e SAMPLEDATA=0 \
    --name ncreplayer-testing \
  landoop/fast-data-dev:1.0.1

然后设置一个监听器。

$ docker run -it --rm --net=host \
  landoop/fast-data-dev:1.0.1  \
    kafka-console-consumer \
      --bootstrap-server localhost:4001 \
      --topic axds-ncreplayer-data

现在批量或流式传输文件。

# Batch
$ ncreplay tests/data/gda_example.nc batch -d 10 -c 10

# Stream
$ ncreplay tests/data/gda_example.nc stream -d 10

为了测试avro打包,设置一个自动解包数据的监听器。

$ docker run -it --rm --net=host \
  landoop/fast-data-dev:1.0.1  \
    kafka-avro-console-consumer \
      --bootstrap-server localhost:4001 \
      --property schema.registry.url=http://localhost:4002 \
      --topic axds-ncreplayer-data

并使用avro打包。

$ ncreplay --packing avro tests/data/gda_example.nc batch -d 10 -c 10

项目详情


下载文件

下载适用于您的平台的文件。如果您不确定选择哪一个,请了解更多关于安装包的信息。

源分发

ncreplayer-1.0.2.tar.gz (7.9 kB 查看哈希值)

上传时间

构建分发

ncreplayer-1.0.2-py3-none-any.whl (11.6 kB 查看哈希值)

上传时间 Python 3