跳转到主要内容

标准化 xpublish 托管工具集合

项目描述

xpublish-host

部署 xpublish 实例的工具和标准的集合。

为什么?

随着 ~50 个基于 netCDF 的数据集通过 xpublish 发布,Axiom 需要一种标准的方式来配置每个部署。我们可以创建一个单独的存储库并定义每个单独的 xpublish 部署,我们可以为每个数据集创建单独的存储库,或者我们可以采取中间的方法。我们决定将每个部署的通用部分抽象出来,并将其放入 xpublish-host 中。这防止了诸如身份验证(待定)、日志记录、度量等内容的重复实现,并允许数据工程师专注于数据而不是部署。

目标

  • 使用配置文件和环境变量,而不是Python代码,标准化 xpublish 部署的配置(插件、端口、缓存、dask集群等)。
  • 在一组核心的 FastAPI 可观察性中间件(度量、监控等)上实现标准化。
  • 提供插件,通过配置(DatasetsConfigPlugin)定义 xpublish 数据集。
  • 提供一组通用的 loader 函数,用作 DatasetsConfig 的参数,以标准化常见的访问模式(目前支持 xarray.open_mfdataset)。
  • 提供预构建的 Docker 镜像,以使用 gunicorn 运行一个具有观点且性能良好的 xpublish 实例。

思考

xpublish-host 对您想要通过 xpublish 发布的数据集没有假设,只需提供一个可导入的 Python 函数的路径,该函数返回您想要作为参数传递给 xpublish.Rest 的对象。这允许 xpublish-host 支持除了 xarray.Dataset 以外的数据集,例如 Parquet 文件。

作为对 xpublish-host 的补充,Axiom 维护一个私有仓库,其中包含与 xpublish-hostDatasetsConfigPlugin 兼容的服务器和数据集 YAML 文件。在部署时,我们将这些文件挂载到 xpublish-host 容器中,它们代表了启动一个可工作的 xpublish-host 实例所需的唯一自定义。

安装

大多数用户不需要直接将 xpublish_host 作为库安装,而是使用 Docker 镜像来部署一个 xpublish 实例。如果您想在 Python 代码中直接使用 xpublish_host 工具和配置对象,当然可以安装它。

对于 conda 用户,您可以使用

conda install --channel conda-forge xpublish_host

或者,如果您是 pip 用户

pip install xpublish_host

或者,如果您使用 docker

docker run --rm -p 9000:9000 axiom/xpublish-host:latest

一应俱全

主机配置

运行 xpublish-host 实例的顶层配置。

配置使用 Pydantic BaseSettingsGoodConf 进行管理,以从文件中加载配置。

xpublish-host 配置可以通过以下几种方式设置

  • 环境变量 - 前缀为 XPUB_,它们直接映射到 pydantic 设置类,
  • 环境文件 - 从文件加载环境变量。使用 XPUB_ENV_FILES 控制此文件的位置,如果已定义。有关更多信息,请参阅 Pydantic 文档
  • 配置文件(JSON 和 YAML) - 基于 GoodConf 的配置文件。当使用 xpublish_host.app.serve 辅助程序时,可以通过定义 XPUB_CONFIG_FILE 来设置此文件。
  • Python 参数(仅限 API) - 当将 xpublish-host 作为库使用时,您可以使用每个配置对象的 args/kwargs 来控制您的 xpublish 实例。

要了解哪些配置选项可用(直到文档更新),请查看 xpublish_host/config.py 中的实际配置类和 tests/test_config.pytests/utils.py 中的测试。

一个功能齐全的配置如下,其中包含了每个字段的默认值。

# These are passed into the `xpublish.Rest.serve` method to control how the
# server is run. These are ignored if running through `gunicorn` in production mode
# or using the Docker image. See the `CLI` section below for more details.
publish_host: "0.0.0.0"
publish_port: 9000
log_level: debug

# Dask cluster configuration.
# The `args` and `kwargs` arguments are passed directly into the `module`
# Omitting cluster_config or setting to null will not use a cluster.
cluster_config:
  module: dask.distributed.LocalCluster
  args: []
  kwargs:
    processes: true
    n_workers: 2
    threads_per_worker: 1
    memory_limit: 1GiB
    host: "0.0.0.0"
    scheduler_port: 0  # random port
    dashboard_address: 0.0.0.0:0  # random port
    worker_dashboard_address: 0.0.0.0:0  # random port

# Should xpublish discover and load plugins?
plugins_load_defaults: true

# Define any additional plugins. This is where you can override
# default plugins. These will replace any auto-discovered plugins.
# The keys here (zarr, dconfig) are not important and are not used internally
plugins_config:

  zarr:
    module: xpublish.plugins.included.zarr.ZarrPlugin

  dconfig:
    module: xpublish_host.plugins.DatasetsConfigPlugin
    kwargs:
      # Define all of the datasets to load into the xpublish instance.
      datasets_config_file: datasets.yaml
      datasets_config:
        # The keys here (dataset_id_1) are not important and are not used internally
        # but it is good practice to make them equal to the dataset's id field
        dataset_id_1:
          # The ID is used as the "key" of the dataset in `xpublish.Rest`
          # i.e. xpublish.Rest({ [dataset.id]: [loader_function_return] })
          id: dataset_id
          title: Dataset Title
          description: Dataset Description
          # Path to an importable python function that returns the dataset you want
          # to pass into `xpublish.Rest`
          loader: [python module path]
          # Arguments passed into the `loader` function
          args:
            - [loader arg1]
            - [loader arg2]
          # Keyword arguments passed into the `loader` function. See the `examples`
          # directory for more details on how this can be used.
          kwargs:
            keyword1: 'keyword1'
            keyword2: false
          # After N seconds, invalidate the dataset and call the `loader` method again
          invalidate_after: 10
          # If true, defers the initial loading of the dataset until the first request
          # for the dataset comes in. Speeds up server load times but slows down the
          # first request (per-process) to each dataset
          skip_initial_load: true

# Keyword arguments to pass into `xpublish.Rest` as app_kws
# i.e. xpublish.Rest(..., app_kws=app_config)
app_config:
  docs_url: /api
  openapi_url: /api.json

# Keyword arguments to pass into `xpublish.Rest` as cache_kws
# i.e. xpublish.Rest(..., cache_kws=cache_config)
cache_config:
  available_bytes: 1e11

度量

xpublish-host 提供了一个与 prometheus 兼容的度量端点。Docker 镜像支持通过 gunicorn 的多进程度量生成。默认情况下,度量端点在 /metrics 上可用。

默认标签格式 xpublish-host 度量是

[XPUB_METRICS_PREFIX_NAME]_[metric_name]{app_name="[XPUB_METRICS_APP_NAME]",environment="[XPUB_METRICS_ENVIRONMENT]"}

可以使用环境变量配置度量端点

  • XPUB_METRICS_APP_NAME(默认:xpublish
  • XPUB_METRICS_PREFIX_NAME(默认:xpublish_host
  • XPUB_METRICS_ENDPOINT(默认:/metrics
  • XPUB_METRICS_ENVIRONMENT(默认:development
  • XPUB_METRICS_DISABLE - 将此设置为任何值以禁用度量端点

健康

健康检查端点位于 /health,可供各种健康检查器(docker、负载均衡器等)使用。您可以通过将环境变量 XPUB_HEALTH_DISABLE 设置为任何值来禁用健康检查端点。要更改端点,将 XPUB_HEALTH_ENDPOINT 设置为新值,例如 export XPUB_HEALTH_ENDPOINT="/amiworking"

DatasetConfigPlugin

此插件旨在从 DatatsetConfig 对象映射中加载数据集到 xpublish。它可以从插件参数或 yaml 文件中获取映射目录。

DatasetsConfigPlugin 插件可以接受两个参数

  • datasets_config: dict[str, DatasetConfig]
  • datasets_config_file: Path - 定义上述 datasets_config 对象的 YAML 文件路径。

xpublish-host 配置文件定义数据集

plugins_config:
  dconfig:
    module: xpublish_host.plugins.DatasetsConfigPlugin
    kwargs:
      datasets_config:
        simple:
          id: static
          title: Static
          description: Static dataset that is never reloaded
          loader: xpublish_host.examples.datasets.simple

或从引用外部数据集配置文件的 xpublish-host 配置文件中定义

# datasets.yaml
datasets_config:
  simple:
    id: static
    title: Static
    description: Static dataset that is never reloaded
    loader: xpublish_host.examples.datasets.simple
plugins_config:
  dconfig:
    module: xpublish_host.plugins.DatasetsConfigPlugin
    kwargs:
      datasets_config_file: datasets.yaml

您还可以混合使用内联配置和基于文件的数据集配置。务必确保每个定义的数据集的 id 字段是唯一的,否则它们将互相覆盖,配置文件定义将具有优先权。

plugins_config:
  dconfig:
    module: xpublish_host.plugins.DatasetsConfigPlugin
    kwargs:
      datasets_config_file: datasets.yaml
      datasets_config:
        simple_again:
          id: simple_again
          title: Simple
          description: Simple Dataset
          loader: xpublish_host.examples.datasets.simple

DatasetConfig

DatasetConfig 对象是一种存储关于如何通过 xpublish 发布数据集信息的方法。它支持在请求时动态加载数据集,而不是要求在 xpublish 启动时加载它们。它允许将不改变的静态数据集和您可能希望定期重新加载到 xpublish 实例的动态数据集混合在一起。

loader 参数应该是返回您想通过 xpublish 服务的数据集的 Python 模块路径。当 xpublish 需要加载或重新加载数据集时,将传递 argskwargs 参数到该函数。

以下是一个配置 xpublish 实例的示例,该实例将服务一个在服务器启动时加载一次的 static 数据集和一个在服务器启动时不会重新加载的 dynamic 数据集。它第一次在第一次请求时加载,然后每 10 秒重新加载一次。它不会按计划重新加载,如果数据集在 invalidate_after 秒后未访问,则会在请求时重新加载。

datasets_config:

  simple:
    id: static
    title: Static
    description: Static dataset that is never reloaded
    loader: xpublish_host.examples.datasets.simple

  dynamic:
    id: dynamic
    title: Dynamic
    description: Dynamic dataset re-loaded on request periodically
    loader: xpublish_host.examples.datasets.simple
    skip_initial_load: true
    invalidate_after: 10

您可以运行上述配置文件并查看生成的结果。有两个数据集:staticdynamic。如果您查看日志并不断刷新对 dynamic 数据集的访问,它将每 10 秒重新加载数据集。

$ xpublish-host -c xpublish_host/examples/dynamic.yaml

INFO:     Uvicorn running on http://0.0.0.0:9000 (Press CTRL+C to quit)
INFO:     127.0.0.1:42808 - "GET /datasets HTTP/1.1" 200 OK
# The static dataset is already loaded
INFO:     127.0.0.1:41938 - "GET /datasets/static/ HTTP/1.1" 200 OK
# The dynamic dataset is loaded on first access
INFO:xpublish_host.plugins:Loading dataset: dynamic
INFO:     127.0.0.1:41938 - "GET /datasets/dynamic/ HTTP/1.1" 200 OK
# Subsequent access to dynamic before [invalidate_after] seconds uses
# the already loaded dataset
INFO:     127.0.0.1:41938 - "GET /datasets/dynamic/ HTTP/1.1" 200 OK
INFO:     127.0.0.1:41938 - "GET /datasets/dynamic/ HTTP/1.1" 200 OK
INFO:     127.0.0.1:41938 - "GET /datasets/dynamic/ HTTP/1.1" 200 OK
INFO:     127.0.0.1:41938 - "GET /datasets/dynamic/ HTTP/1.1" 200 OK
INFO:     127.0.0.1:41938 - "GET /datasets/dynamic/ HTTP/1.1" 200 OK
# Eventually [invalidate_after] seconds elapses and the dynamic
# dataset is reloaded when the request is made
INFO:xpublish_host.plugins:Loading dataset: dynamic
INFO:     127.0.0.1:41938 - "GET /datasets/dynamic/ HTTP/1.1" 200 OK
INFO:     127.0.0.1:41938 - "GET /datasets/dynamic/ HTTP/1.1" 200 OK
# The static dataset is never reloaded
INFO:     127.0.0.1:41938 - "GET /datasets/static/ HTTP/1.1" 200 OK
# This works when accessing datasets through other plugins as well (i.e. ZarrPlugin)
INFO:xpublish_host.plugins:Loading dataset: dynamic
INFO:     127.0.0.1:48092 - "GET /datasets/dynamic/zarr/.zmetadata HTTP/1.1" 200 OK
INFO:     127.0.0.1:48092 - "GET /datasets/dynamic/zarr/.zmetadata HTTP/1.1" 200 OK

加载器

xpublish_host.loaders.mfdataset.load_mfdataset

一个加载器函数,用于利用 xarray.open_mfdataset 打开 netCDF 文件路径。常见的加载模式已抽象为关键字参数,以尽可能标准化函数。

def load_mfdataset(
    root_path: str | Path,  # root folder path
    file_glob: str,  # a file glob to load
    open_mfdataset_kwargs: t.Dict = {},  #  any kwargs to pass directly to xarray.open_mfdataset
    file_limit: int | None = None,  # limit the number of files to load, from the end after sorting ASC
    skip_head_files: int | None = 0,  # skip this number of files from the beginning of the file list
    skip_tail_files: int | None = 0,  # skip this number of files from the end of the file list
    computes: list[str] | None = None,  # A list of variable names to call .compute() on to they are evaluated (useful for coordinates)
    chunks: dict[str, int] | None = None,  # A dictionary of chunks to use for the dataset
    axes: dict[str, str] | None = None,  # A dictionary of axes mapping using the keys t, x, y, and z
    sort_by: dict[str, str] | None = None,  # The field to sort the resulting dataset by (usually the time axis)
    isel: dict[str, slice] | None = None,  # a list of isel slices to take after loading the dataset
    sel: dict[str, slice] | None = None,  # a list of sel slices to take after loading the dataset
    rechunk: bool = False,  # if we should re-chunk the data applying all sorting and slicing
    attrs_file_idx: int = -1,  # the index into the file list to extract metadata from
    combine_by_coords: list[str | Path] = None,  # a list of files to combine_by_coords with, useful for adding in grid definitions
    **kwargs
) -> xr.Dataset:

是的,有很多。一个例子可能会更好。

# Select the last 24 indexes of ocean_time and the first Depth index
# from the last 5 netCDF files found in a directory,
# after sorting by the filename. Drop un-needed variables
# and use a Dask cluster to load the files if one is available.
# Compute the h and mask variables into memory so they are
# not dask arrays, and finally, sort the resulting xarray
# dataset by ocean_time and then Depth.
datasets_config:
  sfbofs_latest:
    id: sfbofs_latest
    title: Last 24 hours of SFBOFS surface data
    description: Last 24 hours of SFBOFS surface data
    loader: xpublish_host.loaders.mfdataset.load_mfdataset
    kwargs:
      root_path: data/sfbofs/
      file_glob: "**/*.nc"
      file_limit: 5
      axes:
        t: ocean_time
        z: Depth
        x: Longitude
        y: Latitude
      computes:
        - h
        - mask
      chunks:
        ocean_time: 24
        Depth: 1
        nx: 277
        ny: 165
      sort_by:
        - ocean_time
        - Depth
      isel:
        Depth: [0, 1, null]
        ocean_time: [-24, null, null]
      open_mfdataset_kwargs:
        parallel: true
        drop_variables:
          - forecast_reference_time
          - forecast_hour

运行

运行 xpublish-host 有两种主要方式,一种适合开发(默认情况下,xpublish 使用 uvicorn.run),另一种适合生产(xpublish-host 使用 gunicorn)。有关更多信息,请参阅 Uvicorn 文档

开发

API

要配置和部署 xpublish 实例,同时从 yaml 文件和环境变量中提取设置,您可以使用 serve 函数。

从文件加载配置

>>> from xpublish_host.app import serve
>>> serve('xpublish_host/examples/example.yaml')

INFO:goodconf:Loading config from xpublish_host/examples/example.yaml
...
INFO:     Uvicorn running on http://0.0.0.0:9000 (Press CTRL+C to quit)python

从自定义 .env 文件加载环境变量

>>> import os
>>> os.environ['XPUB_ENV_FILES'] = 'xpublish_host/examples/example.env'
>>> from xpublish_host.app import serve
>>> serve()

INFO:goodconf:No config file specified. Loading with environment variables.
...
INFO:     Uvicorn running on http://0.0.0.0:9000 (Press CTRL+C to quit)python

设置加载配置文件的默认位置

>>> import os
>>> os.environ['XPUB_CONFIG_FILE'] = 'xpublish_host/examples/example.yaml'
>>> from xpublish_host.app import serve
>>> serve()

INFO:goodconf:Loading config from xpublish_host/examples/example.yaml
...
INFO:     Uvicorn running on http://0.0.0.0:9000 (Press CTRL+C to quit)python
RestConfig

您还可以直接使用 RestConfig 对象通过 API 服务器提供数据集,并根据需要混合配置文件。如果您以这种方式使用 API,而不使用配置文件或环境变量,则最好直接使用 xpublish API。

from xpublish_host.config import RestConfig, PluginConfig
from xpublish_host.plugins import DatasetsConfigPlugin

pc = PluginConfig(
    module=DatasetsConfigPlugin,
    kwargs=dict(
        datasets_config=dict(
            simple=dict(
                id='simple',
                title='title',
                description='description',
                loader='xpublish_host.examples.datasets.simple',
            ),
            kwargs=dict(
                id='kwargs',
                title='title',
                description='description',
                loader='xpublish_host.examples.datasets.kwargs',
                args=('temperature',),
                kwargs=dict(
                  values=[0, 1, 2, 3, 4, 5, 6, 7, 8]
                )
            )
        )
    )
)

rc = RestConfig(
    load=True,
    plugins_config={
        'dconfig': pc
    }
)

rest = rc.setup()  # This returns an `xpublish.Rest` instance
rest.serve(
    host='0.0.0.0',
    port=9000,
    log_level='debug',
)
DatasetConfig

如果您正在提供单个数据集,则在 DatasetConfig 对象上有一个辅助方法 serve

from xpublish_host.plugins import DatasetConfig
dc = DatasetConfig(
    id='id',
    title='title',
    description='description',
    loader='xpublish_host.examples.datasets.simple',
)

# Keyword arguments are passed into RestConfig and can include all of the
# top level configuration options.
dc.serve(
    host='0.0.0.0',
    port=9000,
    log_level='debug',
)

CLI (dev)

在本地开发或在非生产环境中,您可以使用辅助 CLI 方法运行 xpublish 服务器,并可选择传递配置文件的路径。使用提供的 xpublish-host 命令(通过 setuptools 安装)或 python xpublish_host/app.py,它们是同一件事!

传递配置文件参数

$ xpublish-host -c xpublish_host/examples/example.yaml

INFO:goodconf:Loading config from xpublish_host/examples/example.yaml
...
INFO:     Uvicorn running on http://0.0.0.0:9000 (Press CTRL+C to quit)

从环境变量中拉取配置文件

$ XPUB_CONFIG_FILE=xpublish_host/examples/example.yaml xpublish-host

INFO:goodconf:Loading config from xpublish_host/examples/example.yaml
...
INFO:     Uvicorn running on http://0.0.0.0:9000 (Press CTRL+C to quit)

无论如何,xpublish 都将在 9000 端口运行,带有(2)个数据集:simplekwargs。您可以通过 http://[host]:9000/datasets/ 访问实例。

生产环境

为了让 xpublish 与由 gunicorndask 运行的异步循环和进程友好地协作,有一个自定义的工作类(xpublish_host.app.XpdWorker)和一个 gunicorn 配置文件(xpublish_host/gunicorn.conf.py),必须使用。如果您使用提供的 Docker 镜像,它们将自动加载。

当使用 gunicorn 运行时,如果您定义了一个 cluster_config 对象,父进程中将启动一个集群,并将该集群的 scheduler_address 传递给每个工作进程。如果您确实希望每个进程有一个集群,您将必须自行实现并提交一个 PR ;)。更好的与 LocalCluster 集成将是很好的,但目前的实现方式允许如果您在项目范围之外管理 dask 集群,也能使用“自带的”集群配置。

注意:使用 gunicorn 时,只能通过 -b/--bind 参数或配置文件来传递主机和端口配置。如果它们被设置为环境变量,将忽略它们!

CLI(生产环境)

您可以手动运行 gunicorn(本地)以测试在 Docker 镜像内部运行的情况。

XPUB_CONFIG_FILE=xpublish_host/examples/example.yaml gunicorn xpublish_host.app:app -c xpublish_host/gunicorn.conf.py

如果您希望当通过 gunicorn 运行时,/metrics 端点能够正确工作,您需要为指标创建一个临时目录,并将其作为 PROMETHEUS_MULTIPROC_DIR 目录传递。这由提供的 Docker 镜像自动处理。

mkdir -p /tmp/xpub_metrics
PROMETHEUS_MULTIPROC_DIR=/tmp/xpub_metrics XPUB_CONFIG_FILE=xpublish_host/examples/example.yaml gunicorn xpublish_host.app:app -c xpublish_host/gunicorn.conf.py

无论如何,xpublish 都将在 9000 端口运行,带有(2)个数据集:simplekwargs。您可以通过 http://[host]:9000/datasets/ 访问实例。指标可在 http://[host]:9000/metrics 查找。

Docker

默认情况下,Docker 镜像从 /xpd/config.yaml 加载 xpublish-host 配置文件,从 /xpd/datasets.yaml 加载数据集配置对象,并从 /xpd/.env 加载环境变量文件。您可以通过设置环境变量 XPUB_CONFIG_FILEXPUBDC_CONFIG_FILEXPUB_ENV_FILES 分别更改这些文件的位置。

# Using default config path
docker run --rm -p 9000:9000 -v "$(pwd)/xpublish_host/examples/example.yaml:/xpd/config.yaml" axiom/xpublish-host:latest

# Using ENV variables
docker run --rm -p 9000:9000 -e "XPUB_CONFIG_FILE=/xpd/xpublish_host/examples/example.yaml" axiom/xpublish-host:latest

无论如何,xpublish 都将在 9000 端口运行,带有(2)个数据集:simplekwargs。您可以通过 http://[host]:9000/datasets/ 访问实例。

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分发

xpublish-host-1.1.5.tar.gz (21.0 kB 查看哈希值)

上传时间

构建分发

xpublish_host-1.1.5-py3-none-any.whl (24.8 kB 查看哈希值)

上传时间 Python 3

由以下支持