Nameko gRPC 扩展
项目描述
nameko-grpc
这是用于 nameko 微服务的 gRPC 服务器和客户端的原型实现。
已实现并测试了四种请求-响应模式
- 单一-单一
- 单一-流
- 流-单一
- 流-流
每个模式都支持异步调用。
支持Python 3.4+。
安装
$ pip install nameko-grpc
示例
服务器
示例 Nameko 服务,可以响应 gRPC 请求
from example_pb2 import ExampleReply
from example_pb2_grpc import exampleStub
from nameko_grpc.entrypoint import Grpc
grpc = Grpc.implementing(exampleStub)
class ExampleService:
name = "example"
@grpc
def unary_unary(self, request, context):
message = request.value * (request.multiplier or 1)
return ExampleReply(message=message)
@grpc
def unary_stream(self, request, context):
message = request.value * (request.multiplier or 1)
yield ExampleReply(message=message, seqno=1)
yield ExampleReply(message=message, seqno=2)
@grpc
def stream_unary(self, request, context):
messages = []
for req in request:
message = req.value * (req.multiplier or 1)
messages.append(message)
return ExampleReply(message=",".join(messages))
@grpc
def stream_stream(self, request, context):
for index, req in enumerate(request):
message = req.value * (req.multiplier or 1)
yield ExampleReply(message=message, seqno=index + 1)
客户端
示例 Nameko 服务,可以发起 gRPC 请求
from example_pb2 import ExampleReply
from example_pb2_grpc import exampleStub
from nameko.rpc import rpc
from nameko_grpc.dependency_provider import GrpcProxy
class ClientService:
name = "client"
example_grpc = GrpcProxy("//127.0.0.1", exampleStub)
@rpc
def method(self):
responses = self.example_grpc.unary_stream(ExampleRequest(value="A"))
for response in responses:
print(response.message)
示例独立客户端,可以与或无需 Eventlet 使用
from example_pb2 import ExampleReply
from example_pb2_grpc import exampleStub
from nameko_grpc.client import Client
with Client("//127.0.0.1", exampleStub) as client:
responses = client.unary_stream(ExampleRequest(value="A"))
for response in responses:
print(response.message)
Protobuf
上述示例的 protobuf 如下
syntax = "proto3";
package nameko;
service example {
rpc unary_unary (ExampleRequest) returns (ExampleReply) {}
rpc unary_stream (ExampleRequest) returns (stream ExampleReply) {}
rpc stream_unary (stream ExampleRequest) returns (ExampleReply) {}
rpc stream_stream (stream ExampleRequest) returns (stream ExampleReply) {}
}
message ExampleRequest {
string value = 1;
int32 multiplier = 2;
}
message ExampleReply {
string message = 1;
int32 seqno = 2;
}
样式
本仓库中的示例 protobuf 使用 snake_case
对方法名称进行命名,遵循 Nameko 规范,而不是 gRPC 的 CamelCase
。这不是强制性的--装饰方法名称仅匹配 protobuf 中定义的方法;同样适用于服务名称。
上下文和元数据
就其实现而言,服务方法的 context
参数具有与标准 Python 实现相同的 API
context.invocation_metadata()
返回调用客户端提供的任何元数据。context.send_initial_metadata()
可用于向响应头添加元数据。context.set_trailing_metadata()
可用于向响应尾部添加元数据。
独立客户端和依赖提供者都允许使用metadata
关键字参数提供元数据。它们接受一个(name, value)
元组的列表,就像标准Python客户端一样。二进制值必须是base64编码的,并使用后缀为"-bin"的标题名,如标准Python客户端。
gRPC请求元数据被添加到Nameko工作器上下文的“上下文数据”中,因此可供其他Nameko扩展使用。
依赖提供者客户端将Nameko工作器上下文数据作为元数据添加到所有gRPC请求中。这使得Nameko调用ID堆栈可以被填充和传播,以及其他任何上下文数据。
压缩
服务器和客户端都支持压缩。默认情况下,提供deflate
和gzip
算法,并将包含在客户端请求和服务器响应的grpc-accept-encoding
标题中。
服务器尊重它能够接受的任何压缩算法,并优先使用与请求相同的算法来编码响应。
在创建客户端时指定默认压缩算法,并且可以使用compression
关键字参数按调用指定。
client = Client(default_compression="deflate", ...)
client.unary_unary(ExampleRequest(value="foo"), compression="gzip") # use gzip instead
不支持压缩级别。
gRPC规范允许服务器使用与请求不同的算法进行响应,或者完全不进行压缩。这目前在标准的Python gRPC实现和nameko-grpc中都不支持。
错误
客户端
客户端通过GrpcError
异常类实例引发gRPC错误。一个GrpcError
封装了描述错误的code
和message
字符串。这些作为gRPC规范定义的grpc-status
和grpc-message
标题进行传输。
此外,GrpcError
有一个可以包含有关错误额外信息的status
属性,它可以持有google.rpc.status.Status
protobuf消息。这与官方Python gRPC库中的grpc_status
包类似,实际上那个包与nameko-grpc客户端兼容。(待测试)
在grpc-status-details-bin
尾部标题中接收到的google.rpc.status.Status
消息。
服务器
如果服务方法引发异常,生成的错误默认具有以下属性
* `code`: grpc.StatusCode.UNKNOWN
* `message`: "Exception calling application: <stringified exception>"
* `status`: `google.rpc.Status` protobuf message
google.rpc.Status
消息再次封装了code
和message
,并带有包含异常堆栈跟踪的details
属性,该属性作为google.rpc.error_details.DebugInfo
消息。
您可以通过以下两种方式自定义服务器返回的错误
- 使用
context
对象显式设置代码、消息和尾部标题。这基本上是官方Python gRPC库的做法
class Service:
...
@grpc
def stream_error_via_context(self, request, context):
for index, item in enumerate(...):
if index > MAX_TOKENS:
context.set_code(StatusCode.RESOURCE_EXHAUSTED)
context.set_message("Out of tokens!")
context.set_trailing_metadata([
("grpc-status-details-bin", make_grpc_status(...))
])
break
yield Reply(...)
- 直接返回一个
GrpcError
from nameko_grpc.errors import GrpcError, StatusCode
class Service:
...
@grpc
def stream_grpc_error(self, request, context):
for index, item in enumerate(...):
if index > MAX_TOKENS:
raise GrpcError(
code=StatusCode.RESOURCE_EXHAUSTED,
message="Out of tokens!",
status=make_grpc_status(...)
)
yield Reply(...)
- 注册一个错误处理器,将给定的异常类型映射到从引发异常生成
GrpcError
实例的函数
from nameko_grpc.errors import register_handler, GrpcError, StatusCode
class NoMoreTokens(Exception):
pass
def handle_no_more_tokens(exc, code=None, message=None):
return GrpcError(
code=StatusCode.RESOURCE_EXHAUSTED,
message=str(exc),
details=make_grpc_status(...)
)
register_handler(NoMoreTokens, handle_no_more_tokens)
class Service:
...
@grpc
def grpc_error_from_exception(self, request, context):
for index, item in enumerate(...):
if index > MAX_TOKENS:
raise NoMoreTokens("Out of tokens!")
yield Reply(...)
最后一种方法在您想映射异常而无需在服务方法中包裹try/except的情况下很有用,或者当没有机会这样做时(例如,当服务方法上的装饰器引发异常时)。
超时
客户端和服务器都支持超时,如果在请求时间内RPC未完成,则会引发DEADLINE_EXCEEDED
。客户端在请求启动时开始计时,服务器在收到请求时开始计时。
截止日期计算为当前时间加上超时值。
在客户端,使用调用方法的timeout
关键字参数以秒指定超时值
client = Client(...)
client.unary_unary(ExampleRequest(value="foo"), timeout=0.1) # 100 ms timeout
没有默认值,因为没有适用于所有用例的有意义的值,但建议始终设置截止日期。
测试
大多数测试是对gRPC服务器/客户端与Nameko服务器/客户端的每一种排列组合进行的。这大致展示了两种实现之间的等效性。这些测试被标记为“equivalence” pytest标记。
此外,我们还运行来自官方gRPC仓库的互操作性测试,这些测试用于验证语言实现之间的兼容性。Nameko gRPC实现支持官方Python gRPC实现的所有功能。这些测试被标记为“interop” pytest标记。
test/spec
目录包含各种测试中使用的protobufs和服务器实现。
运行测试
克隆或下载仓库,并确保已安装开发依赖项
$ pip install nameko-grpc[dev]
然后运行测试
$ pytest test
互操作性测试需要docker。它们使用https://hub.docker.com/r/nameko/nameko-grpc-interop中的镜像,该镜像包含预构建的C++互操作性客户端。要运行所有测试,但不包括互操作性测试
$ pytest test -m "not interop"
实现说明
gRPC建立在HTTP2之上,因此nameko-grpc在很大程度上依赖于hyper-h2库。H2是HTTP2协议的有限状态机实现,其文档非常好。当你熟悉h2时,nameko-grpc中的代码更容易理解。
nameko-grpc中的大部分繁重工作都是由ConnectionManager
的子类(服务器或客户端)完成的。一个ConnectionManager
处理一个HTTP2连接,并实现该连接上每个HTTP2事件的处理器(例如request_received
或stream_ended
)。参见
nameko_grpc/client.py::ClientConnectionManager
nameko_grpc/entrypoint.py::ServerConnectionManager
nameko_grpc/connection.py::ConnectionManager
下一个最重要的模块是nameko_grpc/streams.py
。此模块包含SendStream
和ReceiveStream
类,分别表示正在发送或接收的HTTP2流。一个ReceiveStream
从ConnectionManager
接收字节数据,并将其解析为gRPC消息流。一个SendStream
执行相反的操作,将gRPC消息编码为可以发送到HTTP2连接的字节。
@grpc
入口点是正常的Nameko入口点,当发出适当的请求时执行服务方法。入口点处理封装请求的ReceiveStream
对象和接受响应的SendStream
对象。流由共享的GrpcServer
管理,它接受传入的连接,并将每个连接包装在ServerConnectionManager
中。
独立的客户端是围绕ClientConnectionManager
的小型包装器。客户端简单地创建一个套接字连接,并将其交给连接管理器。当在客户端调用方法时,连接管理器启动适当的请求。该请求的标头描述了调用的方法、编码、消息类型等。所有这些逻辑都封装在Method
类中。
gRPC DependencyProvider是正常的Nameko DependencyProvider,它也是一个围绕ClientConnectionManager
的小型包装器。它以完全相同的方式运行。
等效性测试说明
为了展示nameko-grpc实现与标准gRPC实现之间的等效性,所有标记为equivalence
的测试都针对以下每一组进行
- gRPC标准服务器(Python实现)或
- Nameko服务器
和
- gRPC标准客户端(Python实现)或
- Nameko独立客户端或
- Nameko DependencyProvider客户端
Nameko使用Eventlet进行并发,这与标准gRPC服务器和客户端不兼容。因此,必须单独运行它们,并 somehow进行通信,以便对标准实现的性能进行断言。
运行进程外客户端和服务器脚本的脚本可以在test/grpc_indirect_client.py
和test/grpc_indirect_server.py
中找到
通信使用ZeroMQ完成。相关的逻辑包含在test/helpers.py
文件中的RemoteClientTransport
和Command
类中,以及test/conftest.py
中的start_grpc_client
和start_grpc_server
配置。
未来,这种安排将使我们能够对不同(功能更完整的)标准gRPC实现运行等效性测试。
项目详情
下载文件
下载适合您平台的应用程序文件。如果您不确定选择哪个,请了解更多关于安装包的信息。
源代码分发
构建分发
nameko-grpc-1.3.0.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 42fb9162343c13b68cbaea0f9d0501062148ca32b3a83ce9f82b04eff52ce080 |
|
MD5 | 2cac829883413a05e36d3eb81b1f4c1c |
|
BLAKE2b-256 | 7edc520d3982fc2cba9f07f9235355ef1c3d8094f24601183fb1c71447699bfd |
nameko_grpc-1.3.0-py2.py3-none-any.whl的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 7511b892369a240d4da59de99a5db555531d96fe61dd7d8d7debfecb579621ca |
|
MD5 | 1dd600d86990cee573348d1b20354bc5 |
|
BLAKE2b-256 | 3228272b8017ce8e13967938aac554731b98878ab62e36ff6be6743beba6987d |