跳转到主要内容

Nameko gRPC 扩展

项目描述

nameko-grpc

这是用于 nameko 微服务的 gRPC 服务器和客户端的原型实现。

已实现并测试了四种请求-响应模式

  1. 单一-单一
  2. 单一-流
  3. 流-单一
  4. 流-流

每个模式都支持异步调用。

支持Python 3.4+。

安装

$ pip install nameko-grpc

示例

服务器

示例 Nameko 服务,可以响应 gRPC 请求

from example_pb2 import ExampleReply
from example_pb2_grpc import exampleStub

from nameko_grpc.entrypoint import Grpc

grpc = Grpc.implementing(exampleStub)


class ExampleService:
    name = "example"

    @grpc
    def unary_unary(self, request, context):
        message = request.value * (request.multiplier or 1)
        return ExampleReply(message=message)

    @grpc
    def unary_stream(self, request, context):
        message = request.value * (request.multiplier or 1)
        yield ExampleReply(message=message, seqno=1)
        yield ExampleReply(message=message, seqno=2)

    @grpc
    def stream_unary(self, request, context):
        messages = []
        for req in request:
            message = req.value * (req.multiplier or 1)
            messages.append(message)

        return ExampleReply(message=",".join(messages))

    @grpc
    def stream_stream(self, request, context):
        for index, req in enumerate(request):
            message = req.value * (req.multiplier or 1)
            yield ExampleReply(message=message, seqno=index + 1)

客户端

示例 Nameko 服务,可以发起 gRPC 请求

from example_pb2 import ExampleReply
from example_pb2_grpc import exampleStub

from nameko.rpc import rpc

from nameko_grpc.dependency_provider import GrpcProxy


class ClientService:
    name = "client"

    example_grpc = GrpcProxy("//127.0.0.1", exampleStub)

    @rpc
    def method(self):
        responses = self.example_grpc.unary_stream(ExampleRequest(value="A"))
        for response in responses:
            print(response.message)

示例独立客户端,可以与或无需 Eventlet 使用

from example_pb2 import ExampleReply
from example_pb2_grpc import exampleStub

from nameko_grpc.client import Client

with Client("//127.0.0.1", exampleStub) as client:
    responses = client.unary_stream(ExampleRequest(value="A"))
    for response in responses:
        print(response.message)

Protobuf

上述示例的 protobuf 如下

syntax = "proto3";

package nameko;

service example {
  rpc unary_unary (ExampleRequest) returns (ExampleReply) {}
  rpc unary_stream (ExampleRequest) returns (stream ExampleReply) {}
  rpc stream_unary (stream ExampleRequest) returns (ExampleReply) {}
  rpc stream_stream (stream ExampleRequest) returns (stream ExampleReply) {}
}

message ExampleRequest {
  string value = 1;
  int32 multiplier = 2;
}


message ExampleReply {
  string message = 1;
  int32 seqno = 2;
}

样式

本仓库中的示例 protobuf 使用 snake_case 对方法名称进行命名,遵循 Nameko 规范,而不是 gRPC 的 CamelCase。这不是强制性的--装饰方法名称仅匹配 protobuf 中定义的方法;同样适用于服务名称。

上下文和元数据

就其实现而言,服务方法的 context 参数具有与标准 Python 实现相同的 API

  • context.invocation_metadata() 返回调用客户端提供的任何元数据。
  • context.send_initial_metadata() 可用于向响应头添加元数据。
  • context.set_trailing_metadata() 可用于向响应尾部添加元数据。

独立客户端和依赖提供者都允许使用metadata关键字参数提供元数据。它们接受一个(name, value)元组的列表,就像标准Python客户端一样。二进制值必须是base64编码的,并使用后缀为"-bin"的标题名,如标准Python客户端。

gRPC请求元数据被添加到Nameko工作器上下文的“上下文数据”中,因此可供其他Nameko扩展使用。

依赖提供者客户端将Nameko工作器上下文数据作为元数据添加到所有gRPC请求中。这使得Nameko调用ID堆栈可以被填充和传播,以及其他任何上下文数据。

压缩

服务器和客户端都支持压缩。默认情况下,提供deflategzip算法,并将包含在客户端请求和服务器响应的grpc-accept-encoding标题中。

服务器尊重它能够接受的任何压缩算法,并优先使用与请求相同的算法来编码响应。

在创建客户端时指定默认压缩算法,并且可以使用compression关键字参数按调用指定。

client = Client(default_compression="deflate", ...)
client.unary_unary(ExampleRequest(value="foo"), compression="gzip")  # use gzip instead

不支持压缩级别。

gRPC规范允许服务器使用与请求不同的算法进行响应,或者完全不进行压缩。这目前在标准的Python gRPC实现和nameko-grpc中都不支持。

错误

客户端

客户端通过GrpcError异常类实例引发gRPC错误。一个GrpcError封装了描述错误的codemessage字符串。这些作为gRPC规范定义的grpc-statusgrpc-message标题进行传输。

此外,GrpcError有一个可以包含有关错误额外信息的status属性,它可以持有google.rpc.status.Status protobuf消息。这与官方Python gRPC库中的grpc_status包类似,实际上那个包与nameko-grpc客户端兼容。(待测试)

grpc-status-details-bin尾部标题中接收到的google.rpc.status.Status消息。

服务器

如果服务方法引发异常,生成的错误默认具有以下属性

* `code`: grpc.StatusCode.UNKNOWN
* `message`: "Exception calling application: <stringified exception>"
* `status`: `google.rpc.Status` protobuf message

google.rpc.Status消息再次封装了codemessage,并带有包含异常堆栈跟踪的details属性,该属性作为google.rpc.error_details.DebugInfo消息。

您可以通过以下两种方式自定义服务器返回的错误

  1. 使用context对象显式设置代码、消息和尾部标题。这基本上是官方Python gRPC库的做法
class Service:

    ...

    @grpc
    def stream_error_via_context(self, request, context):
        for index, item in enumerate(...):
            if index > MAX_TOKENS:
                context.set_code(StatusCode.RESOURCE_EXHAUSTED)
                context.set_message("Out of tokens!")
                context.set_trailing_metadata([
                    ("grpc-status-details-bin", make_grpc_status(...))
                ])
                break
            yield Reply(...)
  1. 直接返回一个GrpcError
from nameko_grpc.errors import GrpcError, StatusCode


class Service:

    ...

    @grpc
    def stream_grpc_error(self, request, context):
        for index, item in enumerate(...):
            if index > MAX_TOKENS:
                raise GrpcError(
                    code=StatusCode.RESOURCE_EXHAUSTED,
                    message="Out of tokens!",
                    status=make_grpc_status(...)
                )
            yield Reply(...)
  1. 注册一个错误处理器,将给定的异常类型映射到从引发异常生成GrpcError实例的函数
from nameko_grpc.errors import register_handler, GrpcError, StatusCode


class NoMoreTokens(Exception):
    pass


def handle_no_more_tokens(exc, code=None, message=None):
    return GrpcError(
        code=StatusCode.RESOURCE_EXHAUSTED,
        message=str(exc),
        details=make_grpc_status(...)
    )


register_handler(NoMoreTokens, handle_no_more_tokens)

class Service:

    ...

    @grpc
    def grpc_error_from_exception(self, request, context):
        for index, item in enumerate(...):
            if index > MAX_TOKENS:
                raise NoMoreTokens("Out of tokens!")
            yield Reply(...)

最后一种方法在您想映射异常而无需在服务方法中包裹try/except的情况下很有用,或者当没有机会这样做时(例如,当服务方法上的装饰器引发异常时)。

超时

客户端和服务器都支持超时,如果在请求时间内RPC未完成,则会引发DEADLINE_EXCEEDED。客户端在请求启动时开始计时,服务器在收到请求时开始计时。

截止日期计算为当前时间加上超时值。

在客户端,使用调用方法的timeout关键字参数以秒指定超时值

client = Client(...)
client.unary_unary(ExampleRequest(value="foo"), timeout=0.1)  # 100 ms timeout

没有默认值,因为没有适用于所有用例的有意义的值,但建议始终设置截止日期。

测试

大多数测试是对gRPC服务器/客户端与Nameko服务器/客户端的每一种排列组合进行的。这大致展示了两种实现之间的等效性。这些测试被标记为“equivalence” pytest标记。

此外,我们还运行来自官方gRPC仓库的互操作性测试,这些测试用于验证语言实现之间的兼容性。Nameko gRPC实现支持官方Python gRPC实现的所有功能。这些测试被标记为“interop” pytest标记。

test/spec目录包含各种测试中使用的protobufs和服务器实现。

运行测试

克隆或下载仓库,并确保已安装开发依赖项

$ pip install nameko-grpc[dev]

然后运行测试

$ pytest test

互操作性测试需要docker。它们使用https://hub.docker.com/r/nameko/nameko-grpc-interop中的镜像,该镜像包含预构建的C++互操作性客户端。要运行所有测试,但不包括互操作性测试

$ pytest test -m "not interop"

实现说明

gRPC建立在HTTP2之上,因此nameko-grpc在很大程度上依赖于hyper-h2库。H2是HTTP2协议的有限状态机实现,其文档非常好。当你熟悉h2时,nameko-grpc中的代码更容易理解。

nameko-grpc中的大部分繁重工作都是由ConnectionManager的子类(服务器或客户端)完成的。一个ConnectionManager处理一个HTTP2连接,并实现该连接上每个HTTP2事件的处理器(例如request_receivedstream_ended)。参见

  • nameko_grpc/client.py::ClientConnectionManager
  • nameko_grpc/entrypoint.py::ServerConnectionManager
  • nameko_grpc/connection.py::ConnectionManager

下一个最重要的模块是nameko_grpc/streams.py。此模块包含SendStreamReceiveStream类,分别表示正在发送或接收的HTTP2流。一个ReceiveStreamConnectionManager接收字节数据,并将其解析为gRPC消息流。一个SendStream执行相反的操作,将gRPC消息编码为可以发送到HTTP2连接的字节。

@grpc入口点是正常的Nameko入口点,当发出适当的请求时执行服务方法。入口点处理封装请求的ReceiveStream对象和接受响应的SendStream对象。流由共享的GrpcServer管理,它接受传入的连接,并将每个连接包装在ServerConnectionManager中。

独立的客户端是围绕ClientConnectionManager的小型包装器。客户端简单地创建一个套接字连接,并将其交给连接管理器。当在客户端调用方法时,连接管理器启动适当的请求。该请求的标头描述了调用的方法、编码、消息类型等。所有这些逻辑都封装在Method类中。

gRPC DependencyProvider是正常的Nameko DependencyProvider,它也是一个围绕ClientConnectionManager的小型包装器。它以完全相同的方式运行。

等效性测试说明

为了展示nameko-grpc实现与标准gRPC实现之间的等效性,所有标记为equivalence的测试都针对以下每一组进行

  • gRPC标准服务器(Python实现)或
  • Nameko服务器

  • gRPC标准客户端(Python实现)或
  • Nameko独立客户端或
  • Nameko DependencyProvider客户端

Nameko使用Eventlet进行并发,这与标准gRPC服务器和客户端不兼容。因此,必须单独运行它们,并 somehow进行通信,以便对标准实现的性能进行断言。

运行进程外客户端和服务器脚本的脚本可以在test/grpc_indirect_client.pytest/grpc_indirect_server.py中找到

通信使用ZeroMQ完成。相关的逻辑包含在test/helpers.py文件中的RemoteClientTransportCommand类中,以及test/conftest.py中的start_grpc_clientstart_grpc_server配置。

未来,这种安排将使我们能够对不同(功能更完整的)标准gRPC实现运行等效性测试。

项目详情


下载文件

下载适合您平台的应用程序文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源代码分发

nameko-grpc-1.3.0.tar.gz (60.9 kB 查看哈希值)

上传时间 源代码

构建分发

nameko_grpc-1.3.0-py2.py3-none-any.whl (32.3 kB 查看哈希值)

上传时间 Python 2 Python 3

由以下机构支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面