跳转到主要内容

Apache Airflow使用的抽象层,用于处理DKIST数据。

项目描述

概述

dkist-processing-core软件包提供了一个抽象层,位于dkist数据处理代码、支持它的工作流引擎(Airflow)和日志基础设施之间。通过向Airflow提供抽象层,实现了版本控制系统。

Core, Common, and Instrument Brick Diagram

以下是实现抽象的4个主要实体,下面将进行描述。

任务:任务定义了工作流中某一步骤所使用的数据处理管道的接口。通过遵守此接口(即子类化),数据处理管道可以保持对任务最终运行方式的无知。任务还实现了一些基于其运行的基础设施的全局方法(例如,应用程序性能监控基础设施)。

节点:节点的工作是将任务转换为可以实例化该任务的代码。任务的实例化可能因目标环境而异,例如,对于Airflow的BashOperator虚拟环境,与笔记本的纯Python相比。

工作流:工作流定义了数据处理管道用于将任务链在一起以形成有向图的接口。工作流通过提供任何包装样板代码、任务排序和选择适当的节点实例化来将此图转换为工作流引擎格式。

构建工具:构建工具是顶层,旨在简化在处理管道构建过程中同时处理多个工作流的转换过程。

使用方法

工作流和任务是客户端库使用的核心对象。任务用作基类,子类至少必须实现run方法。工作流用于为任务指定执行顺序和流程名称。

from dkist_processing_core import TaskBase
from dkist_processing_core import Workflow

# Task definitions
class MyTask1(TaskBase):
    def run(self):
        print("Running MyTask1")


class MyTask2(TaskBase):
    def run(self):
        print("Running MyTask2")

# Workflow definition
# MyTask1 -> MyTask2
w = Workflow(process_category="My", process_name="Workflow", workflow_package=__package__, workflow_version="dev")
w.add_node(MyTask1, upstreams=None)
w.add_node(MyTask2, upstreams=MyTask1)

使用dkist-processing-core与Airflow进行数据处理涉及一个项目结构和构建过程,该过程将代码工件部署到PyPI,并将工作流工件压缩包部署到artifactory。

Build Artifacts Diagram

客户端dkist数据处理库应使用dkist-processing-test作为示例来实现结构和构建管道。客户端仓库的构建管道可以利用build_utils进行测试和导出。

对于Airflow,结果部署将具有所有版本化的工作流工件可供调度程序使用,以及所有版本化的代码工件可供工作者执行任务。

Airflow Deployment Diagram

构建

dkist-processing-core使用bitbucket-pipelines进行构建。

部署

dkist-processing-core部署到PyPI

环境变量

变量

描述

类型

默认值

BUILD_VERSION

仅构建/导出管道。这是将被附加到所有工件上的值,代表它们的唯一版本。

STR

dev

MESH_CONFIG

提供dkistdc云网状配置。具体为消息代理的位置。

JSON

{}

ISB_USERNAME

消息代理用户名

STR

guest

ISB_PASSWORD

消息代理密码

STR

guest

ISB_EXCHANGE

发布消息的消息代理交换名称

STR

master.direct.x

ISB_QUEUE_TYPE

运输消息的消息代理队列类型

STR

classic

ELASTIC_APM_SERVICE_NAME

由Elastic应用性能监控使用的服务名称

STR

ELASTIC_APM_OTHER_OPTIONS

Elastic应用性能监控客户端配置的字典

STR

{}

ELASTIC_APM_ENABLED

禁用/启用Elastic应用性能监控客户端调用标志,如果不连接到APM服务器则非常频繁。

BOOL

FALSE

BUILD_VERSION

管道的版本。构建时,它将进入工作流或dag名称。

STR

dev

开发

测试执行的前提是本地机器上运行着rabbitmq和docker实例。对于RabbitMQ,测试将使用默认的guest/guest凭据,以及主机IP 127.0.0.1和端口5672连接到代理。根据系统设置,docker的设置可能有所不同,但测试将使用docker守护程序的默认unix套接字。

要本地运行测试,请克隆存储库,并使用测试额外内容以可编辑模式安装包。

git clone git@bitbucket.org:dkistdc/dkist-processing-core.git
cd dkist-processing-core
pre-commit install
pip install -e .[test]
# RabbitMQ and Docker needs to be running
pytest -v --cov dkist_processing_core

变更日志

当你对这个存储库进行任何更改时,它都必须伴随着一个变更日志文件。这个存储库的变更日志使用towncrier包。下一个版本的变更日志条目作为单独的文件(每个更改一个)添加到changelog/目录中。

编写变更日志条目

伴随更改的变更日志条目应添加到changelog/目录。该目录中文件的名称遵循特定模板

<PULL REQUEST NUMBER>.<TYPE>[.<COUNTER>].rst

字段具有以下含义

  • <PULL REQUEST NUMBER>:这是拉取请求的编号,因此人们可以从变更日志条目跳转到BitBucket上的diff。

  • <TYPE>:这是更改的类型,必须是以下描述的值之一。

  • <COUNTER>:这是一个可选字段,如果你对同一种类型的更改进行了多次更改,你可以在后续更改中附加一个计数器,例如,对于同一PR中的两个bugfix更改,可以是100.bugfix.rst100.bugfix.1.rst

可能的类型列表定义在 pyproject.toml 的 towncrier 部分,类型包括:

  • feature:这是一个新增的代码功能。

  • bugfix:这是一个修复错误的变更。

  • doc:这是一个文档变更。

  • removal:这是一个弃用或移除公共 API 的变更。

  • misc:任何不适合其他地方的微小变更,例如对包基础设施的变更。

发布时渲染 Changelog

当你准备标记一个发布版本时,首先必须运行 towncrier 来渲染变更日志。以下是该步骤:

  • 使用您要标记的版本号运行 towncrier build –version vx.y.z

  • 同意 towncrier 删除片段。

  • 添加并提交您的更改。

  • 标记发布版本。

注意:如果您忘记为标记的发布版本添加 Changelog 条目(无论是手动还是使用 towncrier 自动添加),那么 Bitbucket 管道将失败。要使用相同的标签,您必须在本地及其远程分支上删除它。

# First, actually update the CHANGELOG and commit the update
git commit

# Delete tags
git tag -d vWHATEVER.THE.VERSION
git push --delete origin vWHATEVER.THE.VERSION

# Re-tag with the same version
git tag vWHATEVER.THE.VERSION
git push --tags origin main

项目详情


发布历史 发布通知 | RSS 源

下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源分发

dkist-processing-core-4.2.1.tar.gz (387.9 kB 查看哈希值)

上传时间

支持者