标准化 xpublish 托管工具集合
项目描述
xpublish-host
部署 xpublish
实例的工具和标准的集合。
为什么?
随着 ~50 个基于 netCDF 的数据集通过 xpublish
发布,Axiom 需要一种标准的方式来配置每个部署。我们可以创建一个单独的存储库并定义每个单独的 xpublish
部署,我们可以为每个数据集创建单独的存储库,或者我们可以采取中间的方法。我们决定将每个部署的通用部分抽象出来,并将其放入 xpublish-host
中。这防止了诸如身份验证(待定)、日志记录、度量等内容的重复实现,并允许数据工程师专注于数据而不是部署。
目标
- 使用配置文件和环境变量,而不是Python代码,标准化
xpublish
部署的配置(插件、端口、缓存、dask集群等)。 - 在一组核心的
FastAPI
可观察性中间件(度量、监控等)上实现标准化。 - 提供插件,通过配置(
DatasetsConfigPlugin
)定义xpublish
数据集。 - 提供一组通用的
loader
函数,用作DatasetsConfig
的参数,以标准化常见的访问模式(目前支持xarray.open_mfdataset
)。 - 提供预构建的 Docker 镜像,以使用
gunicorn
运行一个具有观点且性能良好的xpublish
实例。
思考
xpublish-host
对您想要通过 xpublish
发布的数据集没有假设,只需提供一个可导入的 Python 函数的路径,该函数返回您想要作为参数传递给 xpublish.Rest
的对象。这允许 xpublish-host
支持除了 xarray.Dataset
以外的数据集,例如 Parquet 文件。
作为对 xpublish-host
的补充,Axiom 维护一个私有仓库,其中包含与 xpublish-host
和 DatasetsConfigPlugin
兼容的服务器和数据集 YAML 文件。在部署时,我们将这些文件挂载到 xpublish-host
容器中,它们代表了启动一个可工作的 xpublish-host
实例所需的唯一自定义。
安装
大多数用户不需要直接将 xpublish_host
作为库安装,而是使用 Docker 镜像来部署一个 xpublish
实例。如果您想在 Python 代码中直接使用 xpublish_host
工具和配置对象,当然可以安装它。
对于 conda
用户,您可以使用
conda install --channel conda-forge xpublish_host
或者,如果您是 pip
用户
pip install xpublish_host
或者,如果您使用 docker
docker run --rm -p 9000:9000 axiom/xpublish-host:latest
一应俱全
主机配置
运行 xpublish-host
实例的顶层配置。
配置使用 Pydantic
BaseSettings 和 GoodConf 进行管理,以从文件中加载配置。
xpublish-host
配置可以通过以下几种方式设置
- 环境变量 - 前缀为
XPUB_
,它们直接映射到pydantic
设置类, - 环境文件 - 从文件加载环境变量。使用
XPUB_ENV_FILES
控制此文件的位置,如果已定义。有关更多信息,请参阅Pydantic
文档, - 配置文件(JSON 和 YAML) - 基于
GoodConf
的配置文件。当使用xpublish_host.app.serve
辅助程序时,可以通过定义XPUB_CONFIG_FILE
来设置此文件。 - Python 参数(仅限 API) - 当将
xpublish-host
作为库使用时,您可以使用每个配置对象的 args/kwargs 来控制您的xpublish
实例。
要了解哪些配置选项可用(直到文档更新),请查看 xpublish_host/config.py
中的实际配置类和 tests/test_config.py
及 tests/utils.py
中的测试。
一个功能齐全的配置如下,其中包含了每个字段的默认值。
# These are passed into the `xpublish.Rest.serve` method to control how the
# server is run. These are ignored if running through `gunicorn` in production mode
# or using the Docker image. See the `CLI` section below for more details.
publish_host: "0.0.0.0"
publish_port: 9000
log_level: debug
# Dask cluster configuration.
# The `args` and `kwargs` arguments are passed directly into the `module`
# Omitting cluster_config or setting to null will not use a cluster.
cluster_config:
module: dask.distributed.LocalCluster
args: []
kwargs:
processes: true
n_workers: 2
threads_per_worker: 1
memory_limit: 1GiB
host: "0.0.0.0"
scheduler_port: 0 # random port
dashboard_address: 0.0.0.0:0 # random port
worker_dashboard_address: 0.0.0.0:0 # random port
# Should xpublish discover and load plugins?
plugins_load_defaults: true
# Define any additional plugins. This is where you can override
# default plugins. These will replace any auto-discovered plugins.
# The keys here (zarr, dconfig) are not important and are not used internally
plugins_config:
zarr:
module: xpublish.plugins.included.zarr.ZarrPlugin
dconfig:
module: xpublish_host.plugins.DatasetsConfigPlugin
kwargs:
# Define all of the datasets to load into the xpublish instance.
datasets_config_file: datasets.yaml
datasets_config:
# The keys here (dataset_id_1) are not important and are not used internally
# but it is good practice to make them equal to the dataset's id field
dataset_id_1:
# The ID is used as the "key" of the dataset in `xpublish.Rest`
# i.e. xpublish.Rest({ [dataset.id]: [loader_function_return] })
id: dataset_id
title: Dataset Title
description: Dataset Description
# Path to an importable python function that returns the dataset you want
# to pass into `xpublish.Rest`
loader: [python module path]
# Arguments passed into the `loader` function
args:
- [loader arg1]
- [loader arg2]
# Keyword arguments passed into the `loader` function. See the `examples`
# directory for more details on how this can be used.
kwargs:
keyword1: 'keyword1'
keyword2: false
# After N seconds, invalidate the dataset and call the `loader` method again
invalidate_after: 10
# If true, defers the initial loading of the dataset until the first request
# for the dataset comes in. Speeds up server load times but slows down the
# first request (per-process) to each dataset
skip_initial_load: true
# Keyword arguments to pass into `xpublish.Rest` as app_kws
# i.e. xpublish.Rest(..., app_kws=app_config)
app_config:
docs_url: /api
openapi_url: /api.json
# Keyword arguments to pass into `xpublish.Rest` as cache_kws
# i.e. xpublish.Rest(..., cache_kws=cache_config)
cache_config:
available_bytes: 1e11
度量
xpublish-host
提供了一个与 prometheus 兼容的度量端点。Docker 镜像支持通过 gunicorn
的多进程度量生成。默认情况下,度量端点在 /metrics
上可用。
默认标签格式 xpublish-host
度量是
[XPUB_METRICS_PREFIX_NAME]_[metric_name]{app_name="[XPUB_METRICS_APP_NAME]",environment="[XPUB_METRICS_ENVIRONMENT]"}
可以使用环境变量配置度量端点
XPUB_METRICS_APP_NAME
(默认:xpublish
)XPUB_METRICS_PREFIX_NAME
(默认:xpublish_host
)XPUB_METRICS_ENDPOINT
(默认:/metrics
)XPUB_METRICS_ENVIRONMENT
(默认:development
)XPUB_METRICS_DISABLE
- 将此设置为任何值以禁用度量端点
健康
健康检查端点位于 /health
,可供各种健康检查器(docker、负载均衡器等)使用。您可以通过将环境变量 XPUB_HEALTH_DISABLE
设置为任何值来禁用健康检查端点。要更改端点,将 XPUB_HEALTH_ENDPOINT
设置为新值,例如 export XPUB_HEALTH_ENDPOINT="/amiworking"
DatasetConfigPlugin
此插件旨在从 DatatsetConfig
对象映射中加载数据集到 xpublish
。它可以从插件参数或 yaml
文件中获取映射目录。
DatasetsConfigPlugin
插件可以接受两个参数
datasets_config: dict[str, DatasetConfig]
datasets_config_file: Path
- 定义上述datasets_config
对象的 YAML 文件路径。
从 xpublish-host
配置文件定义数据集
plugins_config:
dconfig:
module: xpublish_host.plugins.DatasetsConfigPlugin
kwargs:
datasets_config:
simple:
id: static
title: Static
description: Static dataset that is never reloaded
loader: xpublish_host.examples.datasets.simple
或从引用外部数据集配置文件的 xpublish-host
配置文件中定义
# datasets.yaml
datasets_config:
simple:
id: static
title: Static
description: Static dataset that is never reloaded
loader: xpublish_host.examples.datasets.simple
plugins_config:
dconfig:
module: xpublish_host.plugins.DatasetsConfigPlugin
kwargs:
datasets_config_file: datasets.yaml
您还可以混合使用内联配置和基于文件的数据集配置。务必确保每个定义的数据集的 id
字段是唯一的,否则它们将互相覆盖,配置文件定义将具有优先权。
plugins_config:
dconfig:
module: xpublish_host.plugins.DatasetsConfigPlugin
kwargs:
datasets_config_file: datasets.yaml
datasets_config:
simple_again:
id: simple_again
title: Simple
description: Simple Dataset
loader: xpublish_host.examples.datasets.simple
DatasetConfig
DatasetConfig
对象是一种存储关于如何通过 xpublish
发布数据集信息的方法。它支持在请求时动态加载数据集,而不是要求在 xpublish
启动时加载它们。它允许将不改变的静态数据集和您可能希望定期重新加载到 xpublish
实例的动态数据集混合在一起。
loader
参数应该是返回您想通过 xpublish
服务的数据集的 Python 模块路径。当 xpublish
需要加载或重新加载数据集时,将传递 args
和 kwargs
参数到该函数。
以下是一个配置 xpublish
实例的示例,该实例将服务一个在服务器启动时加载一次的 static
数据集和一个在服务器启动时不会重新加载的 dynamic
数据集。它第一次在第一次请求时加载,然后每 10 秒重新加载一次。它不会按计划重新加载,如果数据集在 invalidate_after
秒后未访问,则会在请求时重新加载。
datasets_config:
simple:
id: static
title: Static
description: Static dataset that is never reloaded
loader: xpublish_host.examples.datasets.simple
dynamic:
id: dynamic
title: Dynamic
description: Dynamic dataset re-loaded on request periodically
loader: xpublish_host.examples.datasets.simple
skip_initial_load: true
invalidate_after: 10
您可以运行上述配置文件并查看生成的结果。有两个数据集:static
和 dynamic
。如果您查看日志并不断刷新对 dynamic
数据集的访问,它将每 10 秒重新加载数据集。
$ xpublish-host -c xpublish_host/examples/dynamic.yaml
INFO: Uvicorn running on http://0.0.0.0:9000 (Press CTRL+C to quit)
INFO: 127.0.0.1:42808 - "GET /datasets HTTP/1.1" 200 OK
# The static dataset is already loaded
INFO: 127.0.0.1:41938 - "GET /datasets/static/ HTTP/1.1" 200 OK
# The dynamic dataset is loaded on first access
INFO:xpublish_host.plugins:Loading dataset: dynamic
INFO: 127.0.0.1:41938 - "GET /datasets/dynamic/ HTTP/1.1" 200 OK
# Subsequent access to dynamic before [invalidate_after] seconds uses
# the already loaded dataset
INFO: 127.0.0.1:41938 - "GET /datasets/dynamic/ HTTP/1.1" 200 OK
INFO: 127.0.0.1:41938 - "GET /datasets/dynamic/ HTTP/1.1" 200 OK
INFO: 127.0.0.1:41938 - "GET /datasets/dynamic/ HTTP/1.1" 200 OK
INFO: 127.0.0.1:41938 - "GET /datasets/dynamic/ HTTP/1.1" 200 OK
INFO: 127.0.0.1:41938 - "GET /datasets/dynamic/ HTTP/1.1" 200 OK
# Eventually [invalidate_after] seconds elapses and the dynamic
# dataset is reloaded when the request is made
INFO:xpublish_host.plugins:Loading dataset: dynamic
INFO: 127.0.0.1:41938 - "GET /datasets/dynamic/ HTTP/1.1" 200 OK
INFO: 127.0.0.1:41938 - "GET /datasets/dynamic/ HTTP/1.1" 200 OK
# The static dataset is never reloaded
INFO: 127.0.0.1:41938 - "GET /datasets/static/ HTTP/1.1" 200 OK
# This works when accessing datasets through other plugins as well (i.e. ZarrPlugin)
INFO:xpublish_host.plugins:Loading dataset: dynamic
INFO: 127.0.0.1:48092 - "GET /datasets/dynamic/zarr/.zmetadata HTTP/1.1" 200 OK
INFO: 127.0.0.1:48092 - "GET /datasets/dynamic/zarr/.zmetadata HTTP/1.1" 200 OK
加载器
xpublish_host.loaders.mfdataset.load_mfdataset
一个加载器函数,用于利用 xarray.open_mfdataset
打开 netCDF 文件路径。常见的加载模式已抽象为关键字参数,以尽可能标准化函数。
def load_mfdataset(
root_path: str | Path, # root folder path
file_glob: str, # a file glob to load
open_mfdataset_kwargs: t.Dict = {}, # any kwargs to pass directly to xarray.open_mfdataset
file_limit: int | None = None, # limit the number of files to load, from the end after sorting ASC
skip_head_files: int | None = 0, # skip this number of files from the beginning of the file list
skip_tail_files: int | None = 0, # skip this number of files from the end of the file list
computes: list[str] | None = None, # A list of variable names to call .compute() on to they are evaluated (useful for coordinates)
chunks: dict[str, int] | None = None, # A dictionary of chunks to use for the dataset
axes: dict[str, str] | None = None, # A dictionary of axes mapping using the keys t, x, y, and z
sort_by: dict[str, str] | None = None, # The field to sort the resulting dataset by (usually the time axis)
isel: dict[str, slice] | None = None, # a list of isel slices to take after loading the dataset
sel: dict[str, slice] | None = None, # a list of sel slices to take after loading the dataset
rechunk: bool = False, # if we should re-chunk the data applying all sorting and slicing
attrs_file_idx: int = -1, # the index into the file list to extract metadata from
combine_by_coords: list[str | Path] = None, # a list of files to combine_by_coords with, useful for adding in grid definitions
**kwargs
) -> xr.Dataset:
是的,有很多。一个例子可能会更好。
# Select the last 24 indexes of ocean_time and the first Depth index
# from the last 5 netCDF files found in a directory,
# after sorting by the filename. Drop un-needed variables
# and use a Dask cluster to load the files if one is available.
# Compute the h and mask variables into memory so they are
# not dask arrays, and finally, sort the resulting xarray
# dataset by ocean_time and then Depth.
datasets_config:
sfbofs_latest:
id: sfbofs_latest
title: Last 24 hours of SFBOFS surface data
description: Last 24 hours of SFBOFS surface data
loader: xpublish_host.loaders.mfdataset.load_mfdataset
kwargs:
root_path: data/sfbofs/
file_glob: "**/*.nc"
file_limit: 5
axes:
t: ocean_time
z: Depth
x: Longitude
y: Latitude
computes:
- h
- mask
chunks:
ocean_time: 24
Depth: 1
nx: 277
ny: 165
sort_by:
- ocean_time
- Depth
isel:
Depth: [0, 1, null]
ocean_time: [-24, null, null]
open_mfdataset_kwargs:
parallel: true
drop_variables:
- forecast_reference_time
- forecast_hour
运行
运行 xpublish-host
有两种主要方式,一种适合开发(默认情况下,xpublish
使用 uvicorn.run
),另一种适合生产(xpublish-host
使用 gunicorn
)。有关更多信息,请参阅 Uvicorn
文档。
开发
API
要配置和部署 xpublish
实例,同时从 yaml 文件和环境变量中提取设置,您可以使用 serve
函数。
从文件加载配置
>>> from xpublish_host.app import serve
>>> serve('xpublish_host/examples/example.yaml')
INFO:goodconf:Loading config from xpublish_host/examples/example.yaml
...
INFO: Uvicorn running on http://0.0.0.0:9000 (Press CTRL+C to quit)python
从自定义 .env 文件加载环境变量
>>> import os
>>> os.environ['XPUB_ENV_FILES'] = 'xpublish_host/examples/example.env'
>>> from xpublish_host.app import serve
>>> serve()
INFO:goodconf:No config file specified. Loading with environment variables.
...
INFO: Uvicorn running on http://0.0.0.0:9000 (Press CTRL+C to quit)python
设置加载配置文件的默认位置
>>> import os
>>> os.environ['XPUB_CONFIG_FILE'] = 'xpublish_host/examples/example.yaml'
>>> from xpublish_host.app import serve
>>> serve()
INFO:goodconf:Loading config from xpublish_host/examples/example.yaml
...
INFO: Uvicorn running on http://0.0.0.0:9000 (Press CTRL+C to quit)python
RestConfig
您还可以直接使用 RestConfig
对象通过 API 服务器提供数据集,并根据需要混合配置文件。如果您以这种方式使用 API,而不使用配置文件或环境变量,则最好直接使用 xpublish
API。
from xpublish_host.config import RestConfig, PluginConfig
from xpublish_host.plugins import DatasetsConfigPlugin
pc = PluginConfig(
module=DatasetsConfigPlugin,
kwargs=dict(
datasets_config=dict(
simple=dict(
id='simple',
title='title',
description='description',
loader='xpublish_host.examples.datasets.simple',
),
kwargs=dict(
id='kwargs',
title='title',
description='description',
loader='xpublish_host.examples.datasets.kwargs',
args=('temperature',),
kwargs=dict(
values=[0, 1, 2, 3, 4, 5, 6, 7, 8]
)
)
)
)
)
rc = RestConfig(
load=True,
plugins_config={
'dconfig': pc
}
)
rest = rc.setup() # This returns an `xpublish.Rest` instance
rest.serve(
host='0.0.0.0',
port=9000,
log_level='debug',
)
DatasetConfig
如果您正在提供单个数据集,则在 DatasetConfig
对象上有一个辅助方法 serve
。
from xpublish_host.plugins import DatasetConfig
dc = DatasetConfig(
id='id',
title='title',
description='description',
loader='xpublish_host.examples.datasets.simple',
)
# Keyword arguments are passed into RestConfig and can include all of the
# top level configuration options.
dc.serve(
host='0.0.0.0',
port=9000,
log_level='debug',
)
CLI (dev)
在本地开发或在非生产环境中,您可以使用辅助 CLI 方法运行 xpublish
服务器,并可选择传递配置文件的路径。使用提供的 xpublish-host
命令(通过 setuptools
安装)或 python xpublish_host/app.py
,它们是同一件事!
传递配置文件参数
$ xpublish-host -c xpublish_host/examples/example.yaml
INFO:goodconf:Loading config from xpublish_host/examples/example.yaml
...
INFO: Uvicorn running on http://0.0.0.0:9000 (Press CTRL+C to quit)
从环境变量中拉取配置文件
$ XPUB_CONFIG_FILE=xpublish_host/examples/example.yaml xpublish-host
INFO:goodconf:Loading config from xpublish_host/examples/example.yaml
...
INFO: Uvicorn running on http://0.0.0.0:9000 (Press CTRL+C to quit)
无论如何,xpublish
都将在 9000 端口运行,带有(2)个数据集:simple
和 kwargs
。您可以通过 http://[host]:9000/datasets/
访问实例。
生产环境
为了让 xpublish
与由 gunicorn
和 dask
运行的异步循环和进程友好地协作,有一个自定义的工作类(xpublish_host.app.XpdWorker
)和一个 gunicorn
配置文件(xpublish_host/gunicorn.conf.py
),必须使用。如果您使用提供的 Docker 镜像,它们将自动加载。
当使用 gunicorn
运行时,如果您定义了一个 cluster_config
对象,父进程中将启动一个集群,并将该集群的 scheduler_address 传递给每个工作进程。如果您确实希望每个进程有一个集群,您将必须自行实现并提交一个 PR ;)。更好的与 LocalCluster
集成将是很好的,但目前的实现方式允许如果您在项目范围之外管理 dask
集群,也能使用“自带的”集群配置。
注意:使用 gunicorn
时,只能通过 -b/--bind
参数或配置文件来传递主机和端口配置。如果它们被设置为环境变量,将忽略它们!
CLI(生产环境)
您可以手动运行 gunicorn
(本地)以测试在 Docker 镜像内部运行的情况。
XPUB_CONFIG_FILE=xpublish_host/examples/example.yaml gunicorn xpublish_host.app:app -c xpublish_host/gunicorn.conf.py
如果您希望当通过 gunicorn
运行时,/metrics
端点能够正确工作,您需要为指标创建一个临时目录,并将其作为 PROMETHEUS_MULTIPROC_DIR
目录传递。这由提供的 Docker 镜像自动处理。
mkdir -p /tmp/xpub_metrics
PROMETHEUS_MULTIPROC_DIR=/tmp/xpub_metrics XPUB_CONFIG_FILE=xpublish_host/examples/example.yaml gunicorn xpublish_host.app:app -c xpublish_host/gunicorn.conf.py
无论如何,xpublish
都将在 9000 端口运行,带有(2)个数据集:simple
和 kwargs
。您可以通过 http://[host]:9000/datasets/
访问实例。指标可在 http://[host]:9000/metrics
查找。
Docker
默认情况下,Docker 镜像从 /xpd/config.yaml
加载 xpublish-host
配置文件,从 /xpd/datasets.yaml
加载数据集配置对象,并从 /xpd/.env
加载环境变量文件。您可以通过设置环境变量 XPUB_CONFIG_FILE
、XPUBDC_CONFIG_FILE
和 XPUB_ENV_FILES
分别更改这些文件的位置。
# Using default config path
docker run --rm -p 9000:9000 -v "$(pwd)/xpublish_host/examples/example.yaml:/xpd/config.yaml" axiom/xpublish-host:latest
# Using ENV variables
docker run --rm -p 9000:9000 -e "XPUB_CONFIG_FILE=/xpd/xpublish_host/examples/example.yaml" axiom/xpublish-host:latest
无论如何,xpublish
都将在 9000 端口运行,带有(2)个数据集:simple
和 kwargs
。您可以通过 http://[host]:9000/datasets/
访问实例。
项目详情
下载文件
下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。