跳转到主要内容

用Python编写的cumulus任务的处理器库

项目描述

cumulus-message-adapter-python

CircleCI PyPI version

什么是Cumulus?

Cumulus是NASA未来地球科学数据流的云数据摄取、归档、分发和管理原型。

阅读Cumulus 文档

什么是Cumulus消息适配器?

Cumulus消息适配器是一个库,它将Cumulus协议中的传入消息适配到更易于Cumulus任务消费的格式,调用任务,然后将响应适配回Cumulus消息协议以发送给下一个任务。

安装

pip install cumulus-message-adapter-python

任务定义

为了使用Cumulus消息适配器,你需要在你的任务模块中创建两个方法:一个处理器函数和一个业务逻辑函数。

处理器函数是一个标准的Lambda处理器函数,它接受两个参数(由AWS指定):eventcontext

业务逻辑函数是任务的实际工作所在。它应接受两个参数:eventcontext

event对象包含两个键

  • input - 任务的输入,通常是消息的payload,在运行时产生
  • config - 任务的配置,其中任何模板变量都已解析

context参数是AWS传递的标准Lambda上下文。

业务逻辑函数的返回值将被放置在结果的Cumulus消息的payload中。

输入、配置和返回值的期望都由任务定义,并且应得到良好的文档记录。任务应仔细考虑其输入和返回值,因为破坏性更改可能会对整个工作流程中的任务产生级联效应。配置更改的影响略小,但必须通知使用此任务的人员。

Cumulus消息适配器接口

Cumulus Message适配器为Python提供了一个方法:`run_cumulus_task`。它接受四个参数

  • task_function - 包含您业务逻辑的函数(如上所述)

  • cumulus_message - 由Lambda传递的事件,应该是一个Cumulus消息,或者是一个CMA封装的消息(见Cumulus工作流程文档

    {
       "cma": {
          "event": "<cumulus message object>",
          "SomeCMAConfigKey": "CMA configuration object>"
       }
    }
    
  • context - Lambda上下文

  • schemas - 可选:一个字典,具有`input`、`config`和`output`属性。每个属性都应设置为对应JSON模式文件的文件路径。此字典的所有三个属性都是可选的。如果省略,消息适配器将在`/<task_root>/schemas/<schema_type>.json`中查找,如果找不到,则忽略。

  • taskargs - 可选。为`task_function`提供的附加关键字参数

示例

使用此包的`run_cumulus_task`函数作为其他函数包装器的简单示例

>>> from run_cumulus_task import run_cumulus_task

# simple task that returns the event
>>> def task(event, context):
...     return event

# handler that is provided to aws lambda
>>> def handler(event, context):
...     return run_cumulus_task(task, event, context)

有关完整示例,请参阅示例文件夹

创建部署包

使用此库的任务只是标准的AWS Lambda任务。有关创建发布包的信息,请参阅创建发布包

Cumulus部署中的使用

有关如何在Cumulus部署中利用此包的文档,请参阅Cumulus工作流程文档

开发

依赖关系安装

$ pip install -r requirements-dev.txt
$ pip install -r requirements.txt

使用CumulusLogger进行日志记录

在此包中包含的cumulus_logger包含一个用于Cumulus日志记录格式的标准化的日志记录类CumulusLogger。提供了用于记录错误、致命、警告、调试、信息和跟踪的方法。

导入CumulusLogger

>>> from cumulus_logger import CumulusLogger

在任务定义内实例化记录器(名称和级别是可选的)

>>> import logging
>>> logger = CumulusLogger("event_name", logging.ERROR)

使用不同级别的日志记录方法

>>> logger.trace('<your message>')

>>> logger.debug('<your message>')

>>> logger.info('<your message>')

>>> logger.warn('<your message>')

>>> logger.error('<your message>')

>>> logger.fatal('<your message>')

它还可以像Python Logger一样接受额外的非关键字和关键字参数。

msg是消息格式化字符串,argskwargs是字符串格式化的参数。

如果kwargs中的exc_info不是False,则将exc_infosys.exc_info()中的异常信息添加到消息中。

>>> logger.debug(msg, *args, **kwargs)

示例用法

>>> import os
>>> import sys

>>> from run_cumulus_task import run_cumulus_task
>>> from cumulus_logger import CumulusLogger

# instantiate CumulusLogger
>>> logger = CumulusLogger()

>>> def task(event, context):
...     logger.info('task executed')
... 
...     # log error when an exception is caught
...     logger.error("task formatted message {} exc_info ", "bar", exc_info=True)
... 
...     # return the output of the task
...     return { "example": "output" }

>>> def handler(event, context):
...     # make sure event & context metadata is set in the logger
...     logger.setMetadata(event, context)
...     return run_cumulus_task(task, event, context)

运行测试

运行测试需要localstack

测试只需要运行S3的localstack,可以使用以下命令启动

$ SERVICES=s3 localstack start

然后您可以使用以下nosetests命令检查测试是否通过

$ CUMULUS_ENV=testing nose2

代码风格检查

$ pylint run_cumulus_task.py

为什么?

这种方法有几个主要优点

  1. 它明确防止任务假设`meta`和`cumulus_meta`等内部拥有的数据结构,这些数据结构可能在未来的更新中损坏。为了访问这些结构中的字段,任务必须在工作流程配置中显式传递数据。
  2. 它提供了对各种数据结构的更清晰的归属。操作员拥有`meta`。Cumulus拥有`cumulus_meta`。任务定义它们自己的`config`、`input`和`output`格式。
  3. Cumulus Message Adapter极大地简化了运行未明确为Cumulus创建的Lambda函数。
  4. 这种方法极大地简化了任务的测试,因为任务不需要设置复杂的结构来模拟消息协议,只需测试其业务功能。

许可证

Apache 2.0

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源代码分发

cumulus_message_adapter_python-2.3.0.tar.gz (15.0 kB 查看哈希值)

上传时间 源代码

支持者