nucliadb-telemetry 5.2.1.post1708
pip install nucliadb-telemetry
最新版本
发布时间:
NucliaDB遥测库Python进程
导航
未验证详情
这些详情尚未通过PyPI验证项目链接
元数据
- 许可证: GNU Affero通用公共许可证v3或更高版本(AGPLv3+)(MIT)
- 作者: nucliadb 作者
- 需求: Python >=3.9
-
提供额外功能:
all
,fastapi
,grpc
,nats
,otel
分类器
项目描述
NucliaDB Telemetry
与Open Telemetry兼容的插件,用于在FastAPI、Nats和GRPC上使用Asyncio传播traceid。
环境变量
JAEGER_ENABLED = True
JAEGER_HOST = "127.0.0.1"
JAEGER_PORT = server.port
在FastAPI上,您应该添加
tracer_provider = get_telemetry("HTTP_SERVICE")
app = FastAPI(title="Test API") # type: ignore
if not tracer_provider.initialized:
await init_telemetry(tracer_provider)
set_global_textmap(B3MultiFormat())
FastAPIInstrumentor.instrument_app(app, tracer_provider=tracer_provider)
..
await init_telemetry(tracer_provider) # To start asyncio task
..
在GRPC服务器上,您应该添加
tracer_provider = get_telemetry("GRPC_SERVER_SERVICE")
telemetry_grpc = GRPCTelemetry("GRPC_CLIENT_SERVICE", tracer_provider)
if not tracer_provider.initialized:
await init_telemetry(tracer_provider)
set_global_textmap(B3MultiFormat())
server = telemetry_grpc.init_server()
helloworld_pb2_grpc.add_GreeterServicer_to_server(SERVICER, server)
..
await init_telemetry(tracer_provider) # To start asyncio task
..
在GRPC客户端上,您应该添加
tracer_provider = get_telemetry("GRPC_CLIENT_SERVICE")
telemetry_grpc = GRPCTelemetry("GRPC_CLIENT_SERVICE", tracer_provider)
if not tracer_provider.initialized:
await init_telemetry(tracer_provider)
set_global_textmap(B3MultiFormat())
channel = telemetry_grpc.init_client(f"localhost:{grpc_service}")
stub = helloworld_pb2_grpc.GreeterStub(channel)
..
await init_telemetry(tracer_provider) # To start asyncio task
..
在Nats jetstream推送订阅者上,您应该添加
nc = await nats.connect(servers=[self.natsd])
js = self.nc.jetstream()
tracer_provider = get_telemetry("NATS_SERVICE")
if not tracer_provider.initialized:
await init_telemetry(tracer_provider)
set_global_textmap(B3MultiFormat())
jsotel = JetStreamContextTelemetry(
js, "NATS_SERVICE", tracer_provider
)
subscription = await jsotel.subscribe(
subject="testing.telemetry",
stream="testing",
cb=handler,
)
在Nats发布者上,您应该添加
nc = await nats.connect(servers=[self.natsd])
js = self.nc.jetstream()
tracer_provider = get_telemetry("NATS_SERVICE")
if not tracer_provider.initialized:
await init_telemetry(tracer_provider)
set_global_textmap(B3MultiFormat())
jsotel = JetStreamContextTelemetry(
js, "NATS_SERVICE", tracer_provider
)
await jsotel.publish("testing.telemetry", request.name.encode())
在Nats jetstream拉取订阅上,如果您只想获取一条消息并退出或获取多条消息,可以使用不同的模式。对于仅获取一条消息
nc = await nats.connect(servers=[self.natsd])
js = self.nc.jetstream()
tracer_provider = get_telemetry("NATS_SERVICE")
if not tracer_provider.initialized:
await init_telemetry(tracer_provider)
set_global_textmap(B3MultiFormat())
jsotel = JetStreamContextTelemetry(
js, "NATS_SERVICE", tracer_provider
)
# You can use either pull_subscribe or pull_subscribe_bind
subscription = await jsotel.pull_subscribe(
subject="testing.telemetry",
durable="consumer_name"
stream="testing",
)
async def callback(message):
# Do something with your message
# and optionally return something
return True
try:
result = await jsotel.pull_one(subscription, callback)
except errors.TimeoutError
pass
对于多条消息,只需将其包裹在循环中。
while True:
try:
result = await jsotel.pull_one(subscription, callback)
except errors.TimeoutError
pass
在Nats客户端(不包括Jestream!)的发布者中,您应该添加:
nc = await nats.connect(servers=[self.natsd])
js = self.nc.jetstream()
tracer_provider = get_telemetry("NATS_SERVICE")
if not tracer_provider.initialized:
await init_telemetry(tracer_provider)
set_global_textmap(B3MultiFormat())
ncotel = NatsClientTelemetry(
nc, "NATS_SERVICE", tracer_provider
)
await ncotel.publish("testing.telemetry", request.name.encode())
在Nats客户端(不包括Jestream!)的订阅者中,您应该添加:
nc = await nats.connect(servers=[self.natsd])
js = self.nc.jetstream()
tracer_provider = get_telemetry("NATS_SERVICE")
if not tracer_provider.initialized:
await init_telemetry(tracer_provider)
set_global_textmap(B3MultiFormat())
ncotel = NatsClientContextTelemetry(
js, "NATS_SERVICE", tracer_provider
)
subscription = await ncotel.subscribe(
subject="testing.telemetry",
queue="queue_nname",
cb=handler,
)
在Nats客户端(不包括Jestream!)的请求中,您应该添加:
nc = await nats.connect(servers=[self.natsd])
js = self.nc.jetstream()
tracer_provider = get_telemetry("NATS_SERVICE")
if not tracer_provider.initialized:
await init_telemetry(tracer_provider)
set_global_textmap(B3MultiFormat())
ncotel = NatsClientTelemetry(
nc, "NATS_SERVICE", tracer_provider
)
response = await ncotel.request("testing.telemetry", request.name.encode())
要处理另一侧的响应,您可以使用与纯Nats客户端订阅者相同的模式,只需在处理完毕后在处理程序中添加msg.respond()
即可。
项目详情
未验证详情
这些详情尚未通过PyPI验证项目链接
元数据
- 许可证: GNU Affero通用公共许可证v3或更高版本(AGPLv3+)(MIT)
- 作者: nucliadb 作者
- 需求: Python >=3.9
-
提供额外功能:
all
,fastapi
,grpc
,nats
,otel
分类器
关闭
哈希值 for nucliadb_telemetry-5.2.1.post1708-py3-none-any.whl
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 946163fd62de9eed03d889be02f83b0ef783f1a0d97176d96576c49b31eac695 |
|
MD5 | b6e1e32ad8fb80e9016f55f2d2932ef6 |
|
BLAKE2b-256 | 56bf3dd0ae04f5a21ae2d2d22d1304aec30433dfff347768c96fa2cd6c2f6688 |