AWS Lambda事件处理程序管理器
项目描述
Domovoi 是AWS Chalice的扩展,用于处理除了通过API Gateway发起的HTTP请求之外的AWS Lambda 事件源。Domovoi 允许您轻松配置和部署Lambda函数,通过ALB、定时或在响应诸如SNS或SQS消息、S3事件或自定义状态机转换等事件时,为HTTP请求提供服务。
import json, boto3, domovoi
app = domovoi.Domovoi()
# Compared to API Gateway, ALB increases the response timeout from 30s to 900s, but reduces the payload
# limit from 10MB to 1MB. It also does not try to negotiate on the Accept/Content-Type headers.
@app.alb_target()
def serve(event, context):
return dict(statusCode=200,
statusDescription="200 OK",
isBase64Encoded=False,
headers={"Content-Type": "application/json"},
body=json.dumps({"hello": "world"}))
@app.scheduled_function("cron(0 18 ? * MON-FRI *)")
def foo(event, context):
context.log("foo invoked at 06:00pm (UTC) every Mon-Fri")
return dict(result=True)
@app.scheduled_function("rate(1 minute)")
def bar(event, context):
context.log("bar invoked once a minute")
boto3.resource("sns").create_topic(Name="bartender").publish(Message=json.dumps({"beer": 1}))
return dict(result="Work work work")
@app.sns_topic_subscriber("bartender")
def tend(event, context):
message = json.loads(event["Records"][0]["Sns"]["Message"])
context.log(dict(beer="Quadrupel", quantity=message["beer"]))
# SQS messages are deleted upon successful exit, requeued otherwise.
# See https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
@app.sqs_queue_subscriber("my_queue", batch_size=64)
def process_queue_messages(event, context):
message = json.loads(event["Records"][0]["body"])
message_attributes = event["Records"][0]["messageAttributes"]
# You can colocate a state machine definition with an SQS handler to launch a SFN driven lambda from SQS.
return app.state_machine.start_execution(**message)["executionArn"]
@app.cloudwatch_event_handler(source=["aws.ecs"])
def monitor_ecs_events(event, context):
message = json.loads(event["Records"][0]["Sns"]["Message"])
context.log("Got an event from ECS: {}".format(message))
@app.s3_event_handler(bucket="myS3bucket", events=["s3:ObjectCreated:*"], prefix="foo", suffix=".bar")
def monitor_s3(event, context):
context.log("Got an event from S3: {}".format(event))
# Set use_sns=False, use_sqs=False to subscribe your Lambda directly to S3 events without forwarding them through an SNS-SQS bridge.
# That approach has fewer moving parts, but you can only subscribe one Lambda function to events in a given S3 bucket.
@app.s3_event_handler(bucket="myS3bucket", events=["s3:ObjectCreated:*"], prefix="foo", suffix=".bar", use_sns=False, use_sqs=False)
def monitor_s3(event, context):
context.log("Got an event from S3: {}".format(event))
# DynamoDB event format: https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html
@app.dynamodb_stream_handler(table_name="MyDynamoTable", batch_size=200)
def handle_dynamodb_stream(event, context):
context.log("Got {} events from DynamoDB".format(len(event["Records"])))
context.log("First event: {}".format(event["Records"][0]["dynamodb"]))
# Use the following command to log a CloudWatch Logs message that will trigger this handler:
# python -c'import watchtower as w, logging as l; L=l.getLogger(); L.addHandler(w.CloudWatchLogHandler()); L.error(dict(x=8))'
# See http://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html for the filter pattern syntax
@app.cloudwatch_logs_sub_filter_handler(log_group_name="watchtower", filter_pattern="{$.x = 8}")
def monitor_cloudwatch_logs(event, context):
print("Got a CWL subscription filter event:", event)
# See http://docs.aws.amazon.com/step-functions/latest/dg/concepts-amazon-states-language.html
# See the "AWS Step Functions state machines" section below for a complete example of setting up a state machine.
@app.step_function_task(state_name="Worker", state_machine_definition=state_machine)
def worker(event, context):
return {"result": event["input"] + 1, "my_state": context.stepfunctions_task_name}
安装
pip install domovoi
用法
首次设置
domovoi new-project
使用上面的示例编辑app.py中的Domovoi应用程序入口点。
在my_project/.chalice/policy-dev.json中编辑您的Lambda函数的IAM策略,以添加它需要的任何权限。
部署事件处理程序
domovoi deploy
要将文件阶段到部署包中,使用项目中的domovoilib目录,在该目录中您会使用Chalice中的chalicelib。例如,my_project/domovoilib/rds_cert.pem变为/var/task/domovoilib/rds_cert.pem,您的函数在/var/task/app.py中执行,工作目录为/var/task。有关如何设置Chalice配置的更多信息,请参阅Chalice文档。
支持的事件类型
请参阅支持的事件源,了解可用于触发Lambda函数的事件源概述。Domovoi支持以下事件源:
CloudWatch Events规则目标,包括CloudWatch计划事件(请参阅CloudWatch事件示例,以获取CloudWatch Events支持的事件类型列表)
AWS Step Functions状态机任务
可能未来支持的事件源
Kinesis流事件
SES(电子邮件)事件
AWS Step Functions状态机
Domovoi支持与AWS Step Functions的AWS Lambda集成。可以使用StartExecution方法或API Gateway Step Functions集成启动状态机。
请参阅domovoi/examples目录,了解使用状态机的Domovoi app.py应用程序示例,包括在即将达到执行时间限制时重启Lambda的循环,以及将工作分配给多个Lambda的线程池模式。
当创建由Step Functions状态机驱动的Domovoi守护程序Lambda时,状态机假定与Lambda相同的IAM角色。要允许状态机调用Lambda,请编辑IAM策略(在您的应用程序目录下的.chalice/policy-dev.json),以包括允许对所有资源执行“lambda:InvokeFunction”操作的语句,或Lambda本身的ARN。
配置
ALB
要使用带有@alb_target(prefix="...")装饰器的Lambda作为ALB目标,您应该在AWS账户中预先配置以下资源:
例如example.com的Route 53托管DNS区域,具有指向它的域名(example.com)
您DNS区域内的DNS名称的有效(已验证/签发)ACM证书,例如domovoi.example.com
配置这些后,将文件.chalice/config.json中的alb_acm_cert_dns_name配置键设置为您的DNS名称。例如
{ "app_name": "my_app", ... "alb_acm_cert_dns_name": "domovoi.example.com" }
Domovoi将自动创建、管理和链接您的Route 53区域中的ALB和DNS记录。
死信队列
要使您的Lambda函数能够将失败的调用通知转发到死信队列,请将文件.chalice/config.json中的配置键dead_letter_queue_target_arn设置为目标DLQ ARN。例如
{ "app_name": "my_app", ... "dead_letter_queue_target_arn": "arn:aws:sns:us-east-1:123456789012:my-dlq" }
您可能需要更新Lambda IAM策略(.chalice/policy-dev.json),以授予Lambda对SNS或SQS的访问权限。
并发预留
对于账户中有多个Lambda的高量Lambda调用,您可能需要设置每个函数并发限制,以划分整体并发配额,并防止一组Lambda过载另一组。在Domovoi中,您可以通过设置文件.chalice/config.json中的配置键reserved_concurrent_executions来实现这一点,以设置所需的并发预留。例如
{ "app_name": "my_app", ... "reserved_concurrent_executions": 500 }
链接
错误
请在GitHub上报告错误、问题、功能请求等。
许可
根据Apache许可证第2版的条款进行许可。
项目详细信息
下载文件
下载适合您平台的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。