跳转到主要内容

AWS Lambda事件处理程序管理器

项目描述

Domovoi 是AWS Chalice的扩展,用于处理除了通过API Gateway发起的HTTP请求之外的AWS Lambda 事件源。Domovoi 允许您轻松配置和部署Lambda函数,通过ALB、定时或在响应诸如SNSSQS消息、S3事件或自定义状态机转换等事件时,为HTTP请求提供服务。

import json, boto3, domovoi

app = domovoi.Domovoi()

# Compared to API Gateway, ALB increases the response timeout from 30s to 900s, but reduces the payload
# limit from 10MB to 1MB. It also does not try to negotiate on the Accept/Content-Type headers.
@app.alb_target()
def serve(event, context):
    return dict(statusCode=200,
                statusDescription="200 OK",
                isBase64Encoded=False,
                headers={"Content-Type": "application/json"},
                body=json.dumps({"hello": "world"}))

@app.scheduled_function("cron(0 18 ? * MON-FRI *)")
def foo(event, context):
    context.log("foo invoked at 06:00pm (UTC) every Mon-Fri")
    return dict(result=True)

@app.scheduled_function("rate(1 minute)")
def bar(event, context):
    context.log("bar invoked once a minute")
    boto3.resource("sns").create_topic(Name="bartender").publish(Message=json.dumps({"beer": 1}))
    return dict(result="Work work work")

@app.sns_topic_subscriber("bartender")
def tend(event, context):
    message = json.loads(event["Records"][0]["Sns"]["Message"])
    context.log(dict(beer="Quadrupel", quantity=message["beer"]))

# SQS messages are deleted upon successful exit, requeued otherwise.
# See https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
@app.sqs_queue_subscriber("my_queue", batch_size=64)
def process_queue_messages(event, context):
    message = json.loads(event["Records"][0]["body"])
    message_attributes = event["Records"][0]["messageAttributes"]
    # You can colocate a state machine definition with an SQS handler to launch a SFN driven lambda from SQS.
    return app.state_machine.start_execution(**message)["executionArn"]

@app.cloudwatch_event_handler(source=["aws.ecs"])
def monitor_ecs_events(event, context):
    message = json.loads(event["Records"][0]["Sns"]["Message"])
    context.log("Got an event from ECS: {}".format(message))

@app.s3_event_handler(bucket="myS3bucket", events=["s3:ObjectCreated:*"], prefix="foo", suffix=".bar")
def monitor_s3(event, context):
    context.log("Got an event from S3: {}".format(event))

# Set use_sns=False, use_sqs=False to subscribe your Lambda directly to S3 events without forwarding them through an SNS-SQS bridge.
# That approach has fewer moving parts, but you can only subscribe one Lambda function to events in a given S3 bucket.
@app.s3_event_handler(bucket="myS3bucket", events=["s3:ObjectCreated:*"], prefix="foo", suffix=".bar", use_sns=False, use_sqs=False)
def monitor_s3(event, context):
    context.log("Got an event from S3: {}".format(event))

# DynamoDB event format: https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html
@app.dynamodb_stream_handler(table_name="MyDynamoTable", batch_size=200)
def handle_dynamodb_stream(event, context):
    context.log("Got {} events from DynamoDB".format(len(event["Records"])))
    context.log("First event: {}".format(event["Records"][0]["dynamodb"]))

# Use the following command to log a CloudWatch Logs message that will trigger this handler:
# python -c'import watchtower as w, logging as l; L=l.getLogger(); L.addHandler(w.CloudWatchLogHandler()); L.error(dict(x=8))'
# See http://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html for the filter pattern syntax
@app.cloudwatch_logs_sub_filter_handler(log_group_name="watchtower", filter_pattern="{$.x = 8}")
def monitor_cloudwatch_logs(event, context):
    print("Got a CWL subscription filter event:", event)

# See http://docs.aws.amazon.com/step-functions/latest/dg/concepts-amazon-states-language.html
# See the "AWS Step Functions state machines" section below for a complete example of setting up a state machine.
@app.step_function_task(state_name="Worker", state_machine_definition=state_machine)
def worker(event, context):
    return {"result": event["input"] + 1, "my_state": context.stepfunctions_task_name}

安装

pip install domovoi

用法

首次设置

domovoi new-project
  • 使用上面的示例编辑app.py中的Domovoi应用程序入口点。

  • my_project/.chalice/policy-dev.json中编辑您的Lambda函数的IAM策略,以添加它需要的任何权限。

  • 部署事件处理程序

    domovoi deploy

要将文件阶段到部署包中,使用项目中的domovoilib目录,在该目录中您会使用Chalice中的chalicelib。例如,my_project/domovoilib/rds_cert.pem变为/var/task/domovoilib/rds_cert.pem,您的函数在/var/task/app.py中执行,工作目录为/var/task。有关如何设置Chalice配置的更多信息,请参阅Chalice文档

支持的事件类型

请参阅支持的事件源,了解可用于触发Lambda函数的事件源概述。Domovoi支持以下事件源:

可能未来支持的事件源

  • Kinesis流事件

  • SES(电子邮件)事件

AWS Step Functions状态机

Domovoi支持与AWS Step Functions的AWS Lambda集成。可以使用StartExecution方法或API Gateway Step Functions集成启动状态机。

请参阅domovoi/examples目录,了解使用状态机的Domovoi app.py应用程序示例,包括在即将达到执行时间限制时重启Lambda的循环,以及将工作分配给多个Lambda的线程池模式。

当创建由Step Functions状态机驱动的Domovoi守护程序Lambda时,状态机假定与Lambda相同的IAM角色。要允许状态机调用Lambda,请编辑IAM策略(在您的应用程序目录下的.chalice/policy-dev.json),以包括允许对所有资源执行“lambda:InvokeFunction”操作的语句,或Lambda本身的ARN。

配置

ALB

要使用带有@alb_target(prefix="...")装饰器的Lambda作为ALB目标,您应该在AWS账户中预先配置以下资源:

  • 例如example.com的Route 53托管DNS区域,具有指向它的域名(example.com

  • 您DNS区域内的DNS名称的有效(已验证/签发)ACM证书,例如domovoi.example.com

配置这些后,将文件.chalice/config.json中的alb_acm_cert_dns_name配置键设置为您的DNS名称。例如

{
  "app_name": "my_app",
  ...
  "alb_acm_cert_dns_name": "domovoi.example.com"
}

Domovoi将自动创建、管理和链接您的Route 53区域中的ALB和DNS记录。

死信队列

要使您的Lambda函数能够将失败的调用通知转发到死信队列,请将文件.chalice/config.json中的配置键dead_letter_queue_target_arn设置为目标DLQ ARN。例如

{
  "app_name": "my_app",
  ...
  "dead_letter_queue_target_arn": "arn:aws:sns:us-east-1:123456789012:my-dlq"
}

您可能需要更新Lambda IAM策略(.chalice/policy-dev.json),以授予Lambda对SNS或SQS的访问权限。

并发预留

对于账户中有多个Lambda的高量Lambda调用,您可能需要设置每个函数并发限制,以划分整体并发配额,并防止一组Lambda过载另一组。在Domovoi中,您可以通过设置文件.chalice/config.json中的配置键reserved_concurrent_executions来实现这一点,以设置所需的并发预留。例如

{
  "app_name": "my_app",
  ...
  "reserved_concurrent_executions": 500
}

许可

根据Apache许可证第2版的条款进行许可。

https://travis-ci.org/kislyuk/domovoi.png https://codecov.io/github/kislyuk/domovoi/coverage.svg?branch=master https://img.shields.io/pypi/v/domovoi.svg https://img.shields.io/pypi/l/domovoi.svg https://readthedocs.org/projects/domovoi/badge/?version=latest

项目详细信息


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。

源分布

domovoi-2.0.2.tar.gz (28.7 kB 查看散列值)

上传时间

构建分布

domovoi-2.0.2-py2.py3-none-any.whl (31.3 kB 查看散列值)

上传时间 Python 2 Python 3

由以下支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面