用Python编写的cumulus任务的处理器库
项目描述
cumulus-message-adapter-python
什么是Cumulus?
Cumulus是NASA未来地球科学数据流的云数据摄取、归档、分发和管理原型。
什么是Cumulus消息适配器?
Cumulus消息适配器是一个库,它将Cumulus协议中的传入消息适配到更易于Cumulus任务消费的格式,调用任务,然后将响应适配回Cumulus消息协议以发送给下一个任务。
安装
pip install cumulus-message-adapter-python
任务定义
为了使用Cumulus消息适配器,你需要在你的任务模块中创建两个方法:一个处理器函数和一个业务逻辑函数。
处理器函数是一个标准的Lambda处理器函数,它接受两个参数(由AWS指定):event
和context
。
业务逻辑函数是任务的实际工作所在。它应接受两个参数:event
和context
。
event
对象包含两个键
input
- 任务的输入,通常是消息的payload
,在运行时产生config
- 任务的配置,其中任何模板变量都已解析
context
参数是AWS传递的标准Lambda上下文。
业务逻辑函数的返回值将被放置在结果的Cumulus消息的payload
中。
输入、配置和返回值的期望都由任务定义,并且应得到良好的文档记录。任务应仔细考虑其输入和返回值,因为破坏性更改可能会对整个工作流程中的任务产生级联效应。配置更改的影响略小,但必须通知使用此任务的人员。
Cumulus消息适配器接口
Cumulus Message适配器为Python提供了一个方法:`run_cumulus_task`。它接受四个参数
-
task_function
- 包含您业务逻辑的函数(如上所述) -
cumulus_message
- 由Lambda传递的事件,应该是一个Cumulus消息,或者是一个CMA封装的消息(见Cumulus工作流程文档){ "cma": { "event": "<cumulus message object>", "SomeCMAConfigKey": "CMA configuration object>" } }
-
context
- Lambda上下文 -
schemas
- 可选:一个字典,具有`input`、`config`和`output`属性。每个属性都应设置为对应JSON模式文件的文件路径。此字典的所有三个属性都是可选的。如果省略,消息适配器将在`/<task_root>/schemas/<schema_type>.json
`中查找,如果找不到,则忽略。 -
taskargs
- 可选。为`task_function`提供的附加关键字参数
示例
使用此包的`run_cumulus_task`函数作为其他函数包装器的简单示例
>>> from run_cumulus_task import run_cumulus_task
# simple task that returns the event
>>> def task(event, context):
... return event
# handler that is provided to aws lambda
>>> def handler(event, context):
... return run_cumulus_task(task, event, context)
有关完整示例,请参阅示例文件夹。
创建部署包
使用此库的任务只是标准的AWS Lambda任务。有关创建发布包的信息,请参阅创建发布包。
Cumulus部署中的使用
有关如何在Cumulus部署中利用此包的文档,请参阅Cumulus工作流程文档。
开发
依赖关系安装
$ pip install -r requirements-dev.txt
$ pip install -r requirements.txt
使用CumulusLogger
进行日志记录
在此包中包含的cumulus_logger
包含一个用于Cumulus日志记录格式的标准化的日志记录类CumulusLogger
。提供了用于记录错误、致命、警告、调试、信息和跟踪的方法。
导入CumulusLogger
类
>>> from cumulus_logger import CumulusLogger
在任务定义内实例化记录器(名称和级别是可选的)
>>> import logging
>>> logger = CumulusLogger("event_name", logging.ERROR)
使用不同级别的日志记录方法
>>> logger.trace('<your message>')
>>> logger.debug('<your message>')
>>> logger.info('<your message>')
>>> logger.warn('<your message>')
>>> logger.error('<your message>')
>>> logger.fatal('<your message>')
它还可以像Python Logger一样接受额外的非关键字和关键字参数。
msg
是消息格式化字符串,args
和kwargs
是字符串格式化的参数。
如果kwargs
中的exc_info
不是False
,则将exc_info
或sys.exc_info()
中的异常信息添加到消息中。
>>> logger.debug(msg, *args, **kwargs)
示例用法
>>> import os
>>> import sys
>>> from run_cumulus_task import run_cumulus_task
>>> from cumulus_logger import CumulusLogger
# instantiate CumulusLogger
>>> logger = CumulusLogger()
>>> def task(event, context):
... logger.info('task executed')
...
... # log error when an exception is caught
... logger.error("task formatted message {} exc_info ", "bar", exc_info=True)
...
... # return the output of the task
... return { "example": "output" }
>>> def handler(event, context):
... # make sure event & context metadata is set in the logger
... logger.setMetadata(event, context)
... return run_cumulus_task(task, event, context)
运行测试
运行测试需要localstack。
测试只需要运行S3的localstack,可以使用以下命令启动
$ SERVICES=s3 localstack start
然后您可以使用以下nosetests命令检查测试是否通过
$ CUMULUS_ENV=testing nose2
代码风格检查
$ pylint run_cumulus_task.py
为什么?
这种方法有几个主要优点
- 它明确防止任务假设`meta`和`cumulus_meta`等内部拥有的数据结构,这些数据结构可能在未来的更新中损坏。为了访问这些结构中的字段,任务必须在工作流程配置中显式传递数据。
- 它提供了对各种数据结构的更清晰的归属。操作员拥有`meta`。Cumulus拥有`cumulus_meta`。任务定义它们自己的`config`、`input`和`output`格式。
- Cumulus Message Adapter极大地简化了运行未明确为Cumulus创建的Lambda函数。
- 这种方法极大地简化了任务的测试,因为任务不需要设置复杂的结构来模拟消息协议,只需测试其业务功能。
许可证
项目详情
cumulus_message_adapter_python-2.3.0.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | ff55768fae327db5aad1239b2186685f4085a2d6a6adb5ed29cda32f79b43abc |
|
MD5 | c0db7428eaff56f1b5630c0df373c1c0 |
|
BLAKE2b-256 | 08560f6be3b511b630e25cbd2214f15dab17e46439ee52118a4577a9fbce4b5a |