Python的Microsoft Azure Event Grid客户端库
项目描述
Azure Event Grid Python客户端库
Azure Event Grid是一个完全托管的智能事件路由服务,它允许使用发布/订阅模型进行统一的事件消费。
源代码 | 包(PyPI) | 包(Conda) | API参考文档 | 产品文档 | 示例 | 变更日志
免责声明
这是 Azure Event Grid 的 EventGridPublisherClient
和 EventGridConsumerClient
的 GA 版本。 EventGridPublisherClient
支持 Event Grid Basic 和 Event Grid Namespaces 的 send
操作。 EventGridConsumerClient
支持 Event Grid Namespaces 的 receive
、acknowledge
、release
、reject
和 renew_locks
操作。请参阅 示例 获取更多信息。
入门
先决条件
- 使用此包需要 Python 3.8 或更高版本。
- 您必须拥有 Azure 订阅,并且至少有以下之一:
Event Grid 资源
Azure Event Grid Namespaces 支持拉取和推送交付。Azure Event Grid Basic 只支持推送交付。有关这两个资源层的更多信息,请参阅 此处。
注意: Azure Event Grid Namespaces 只支持 Cloud Event v1.0 Schema。
安装包
使用 pip 安装 Azure Event Grid Python 客户端库。
pip install azure-eventgrid
- 需要一个现有的 Event Grid Basic 主题或域,或者 Event Grid Namespaces 主题。您可以使用 Azure Portal 或 Azure CLI 创建资源。
如果您使用 Azure CLI,请使用您自己的唯一名称替换 <resource-group-name>
和 <resource-name>
。
创建 Event Grid Namespaces
az eventgrid namespace create --location <location> --resource-group <resource-group-name> --name <resource-name>
创建 Event Grid Namespaces 主题
az eventgrid namespace create topic --location <location> --resource-group <resource-group-name> --name <resource-name>
验证客户端
为了与 Event Grid 服务交互,您需要创建客户端实例。创建客户端对象需要 端点 和 凭证。
默认创建的 EventGridPublisherClient 与 Event Grid Basic 资源兼容。要创建与 Event Grid Namespaces 兼容的客户端,请在实例化客户端时指定 namespace_topic="YOUR_TOPIC_NAME"
。
# Event Grid Namespace client
client = EventGridPublisherClient(endpoint, credential, namespace_topic=YOUR_TOPIC_NAME)
# Event Grid Basic Client
client = EventGridPublisherClient(endpoint, credential)
EventGridConsumerClient 只支持 Event Grid Namespaces。
# Event Grid Namespace Client
client = EventGridConsumerClient(endpoint, credential, namespace_topic=YOUR_TOPIC_NAME, subscription=YOUR_SUBSCRIPTION_NAME)
使用 Azure Active Directory (AAD)
Azure Event Grid 为请求提供基于身份的认证与 Azure Active Directory (Azure AD) 的集成。使用 Azure AD,您可以使用基于角色的访问控制 (RBAC) 授予权限,允许用户、组或应用程序访问您的 Azure Event Grid 资源。
要使用 TokenCredential
向主题或域发送事件,已认证的身份应分配有 "Event Grid Data Sender" 角色。要从主题事件订阅接收事件,已认证的身份应分配有 "Event Grid Data Receiver" 角色。要向/从主题发送/接收事件,已认证的身份应分配有 "Event Grid Data Contributor" 角色。
有关 RBAC 设置的更多信息,请参阅 此处。
使用 azure-identity
包,您可以在开发和生产环境中无缝授权请求。有关 Azure Active Directory 的更多信息,请参阅 azure-identity
README。
例如,您可以使用 DefaultAzureCredential
构造一个客户端,该客户端将使用 Azure Active Directory 进行认证
from azure.identity import DefaultAzureCredential
from azure.eventgrid import EventGridPublisherClient, EventGridEvent
default_az_credential = DefaultAzureCredential()
endpoint = os.environ["EVENTGRID_TOPIC_ENDPOINT"]
client = EventGridPublisherClient(endpoint, default_az_credential)
查找端点
Event Grid Namespaces
您可以在Azure门户的Event Grid命名空间资源中找到命名空间端点。它看起来像这样:"<event-grid-namespace-name>.<namespace-location>.eventgrid.azure.net"
Event Grid基本
您可以在Azure门户的Event Grid主题资源中找到主题端点。它看起来像这样:"https://<event-grid-topic-name>.<topic-location>.eventgrid.azure.net/api/events"
使用AzureKeyCredential创建客户端
要将访问密钥用作credential
参数,将密钥作为字符串传递给一个AzureKeyCredential实例。
注意:访问密钥可以在Azure门户的Event Grid主题资源的“访问密钥”菜单中找到。您也可以通过Azure CLI或
azure-mgmt-eventgrid
库获取。获取访问密钥的指南请参阅这里。
import os
from azure.eventgrid import EventGridPublisherClient
from azure.core.credentials import AzureKeyCredential
topic_key = os.environ["EVENTGRID_TOPIC_KEY"]
endpoint = os.environ["EVENTGRID_TOPIC_ENDPOINT"]
credential_key = AzureKeyCredential(topic_key)
client = EventGridPublisherClient(endpoint, credential_key)
注意:基本客户端也可以通过SAS签名进行认证,使用
AzureSasCredential
。一个演示此功能的示例,请参阅这里(异步版本)。
注意:可以使用
generate_sas
方法生成共享访问签名。一个演示此功能的示例,请参阅这里。
关键概念
Event Grid Namespaces
命名空间是其他资源的管理容器。它允许将相关资源分组,以便在单个订阅下进行管理。
命名空间主题
命名空间主题是在Event Grid命名空间中创建的主题。客户端将事件发布到HTTP命名空间端点,指定命名空间主题,其中包含逻辑上包含的事件。命名空间主题仅支持CloudEvent v1.0架构。
事件订阅
事件订阅是与单个主题关联的配置资源。
Event Grid基本
主题
主题是Event Grid服务中的一个通道,用于发送事件。主题在创建时确定接受的事件架构。如果将架构类型的事件发送到需要不同架构类型的主题,将引发错误。
域
事件域是用于管理与同一应用程序相关的大量Event Grid主题的管理工具。它们允许您将事件发布到成千上万的主题。域还为您提供对每个主题的授权和认证控制。有关更多信息,请访问事件域概述。
事件架构
事件是完整描述系统发生事件的最低信息量。当创建自定义主题或域时,您必须指定发布事件时将使用的架构。
Event Grid支持多种架构来编码事件。
系统主题
Event Grid中的系统主题代表Azure服务(如Azure存储或Azure事件中心)发布的一个或多个事件。例如,系统主题可以代表为特定存储账户发布的所有blob事件,或者仅代表blob创建和blob删除事件。
发布到 Azure Event Grid 的系统事件的各种事件类型名称可在 azure.eventgrid.SystemEventNames
中找到。要查看可识别的系统主题的完整列表,请访问 系统主题。
有关 Azure Event Grid 中的关键概念的更多信息,请参阅 Azure Event Grid 概念。
EventGridPublisherClient
EventGridPublisherClient
提供向客户端初始化期间指定的资源发送事件数据的操作。
如果您使用的是 Event Grid Basic,无论您的主题或域配置了什么架构,都会使用 EventGridPublisherClient
将事件发布到它。使用 send
方法发布事件。
允许将以下事件格式发送到 Event Grid Basic 资源:
-
强类型 EventGridEvents 的列表或单个实例。
-
序列化 EventGridEvent 对象的字典表示。
-
强类型 CloudEvents 的列表或单个实例。
-
序列化 CloudEvent 对象的字典表示。
-
任何自定义架构的字典表示。
当指定命名空间主题时,允许将以下事件格式发送到 Event Grid 命名空间资源:
- 单个强类型 CloudEvents 的列表。
- 序列化 CloudEvent 对象的字典表示。
请参阅 示例 获取详细的示例。
使用 Azure Arc 在 Kubernetes 上运行 Event Grid
使用 Azure Arc 在 Kubernetes 上运行 Event Grid 是一种服务,允许您在自己的 Kubernetes 集群上运行 Event Grid。此功能通过使用 Azure Arc 启用的 Kubernetes 实现。通过 Azure Arc 启用的 Kubernetes,支持连接到 Azure 的 Kubernetes 集群。一旦连接,您就可以在它上安装 Event Grid。更多关于它的信息请访问 这里。
示例
以下部分提供了几个代码片段,涵盖了 Event Grid 的一些最常见任务,包括
发送 Cloud Event
此示例发布了一个 Cloud 事件。
import os
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent
from azure.eventgrid import EventGridPublisherClient
key = os.environ["EVENTGRID_KEY"]
endpoint = os.environ["EVENTGRID_ENDPOINT"]
topic_name = os.environ["EVENTGRID_TOPIC_NAME"]
event = CloudEvent(
type="Azure.Sdk.Sample",
source="https://egsample.dev/sampleevent",
data={"team": "azure-sdk"}
)
credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential, namespace_topic=topic_name)
client.send(event)
发送多个事件
当向主题或域发送多个事件时,可以以批量的方式发送事件。此示例使用 send 方法发送 CloudEvents 列表。
警告:当一次发送多个事件列表时,逐个迭代和发送每个事件不会产生最佳性能。为了获得最佳性能,强烈建议发送事件列表。
import os
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent
from azure.eventgrid import EventGridPublisherClient
key = os.environ["EVENTGRID_KEY"]
endpoint = os.environ["EVENTGRID_ENDPOINT"]
topic_name = os.environ["EVENTGRID_TOPIC_NAME"]
event0 = CloudEvent(
type="Azure.Sdk.Sample",
source="https://egsample.dev/sampleevent",
data={"team": "azure-sdk"}
)
event1 = CloudEvent(
type="Azure.Sdk.Sample",
source="https://egsample.dev/sampleevent",
data={"team2": "azure-eventgrid"}
)
events = [event0, event1]
credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential, namespace_topic=topic_name)
client.send(events)
从命名空间接收和处理事件
使用 EventGridConsumerClient 的 receive 函数从命名空间事件订阅接收 CloudEvents。然后尝试确认、拒绝、释放或续订锁。
import os
import uuid
import datetime as dt
from azure.core.credentials import AzureKeyCredential
from azure.eventgrid import EventGridConsumerClient
key = os.environ["EVENTGRID_KEY"]
endpoint = os.environ["EVENTGRID_ENDPOINT"]
topic_name = os.environ["EVENTGRID_TOPIC_NAME"]
sub_name = os.environ["EVENTGRID_EVENT_SUBSCRIPTION_NAME"]
credential = AzureKeyCredential(key)
client = EventGridConsumerClient(endpoint, credential, namespace_topic=topic_name, subscription=sub_name)
events = client.receive(max_events=4)
for detail in events.value:
data = detail.event.data
broker_properties = detail.broker_properties
if data == "release":
release_events.append(broker_properties.lock_token)
elif data == "acknowledge":
acknowledge_events.append(broker_properties.lock_token)
else:
reject_events.append(broker_properties.lock_token)
# Renew all Locks
renew_tokens = e.broker_properties.lock_token
renew_result = client.renew_locks(
lock_tokens=renew_tokens,
)
release_result = client.release(
lock_tokens=release_events,
)
ack_result = client.acknowledge(
lock_tokens=acknowledge_events,
)
reject_result = client.reject(
lock_tokens=reject_events,
)
使用 Event Grid 进行分布式跟踪
您可以使用 Python 的 OpenTelemetry,因为它与 azure-core 跟踪集成兼容。
以下是一个使用 OpenTelemetry 跟踪发送 CloudEvent 的示例。
首先,将 OpenTelemetry 设置为 Event Grid 的启用跟踪插件。
from azure.core.settings import settings
from azure.core.tracing.ext.opentelemetry_span import OpenTelemetrySpan
settings.tracing_implementation = OpenTelemetrySpan
从这里开始使用常规 open telemetry。有关详细信息,请参阅 OpenTelemetry。此示例使用简单的控制台导出器导出跟踪。这里可以使用任何导出器,包括 azure-monitor-opentelemetry-exporter
、jaeger
、zipkin
等。
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter
from opentelemetry.sdk.trace.export import SimpleSpanProcessor # this requires opentelemetry >= 1.0.0
# Simple console exporter
exporter = ConsoleSpanExporter()
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(exporter)
)
设置 tracer
和 exporter
后,请按照以下示例开始收集使用 EventGridClient
的 send
方法发送 CloudEvent 对象时的跟踪。
import os
from azure.eventgrid import EventGridPublisherClient
from azure.core.messaging import CloudEvent
from azure.core.credentials import AzureKeyCredential
hostname = os.environ['CLOUD_TOPIC_HOSTNAME']
key = AzureKeyCredential(os.environ['CLOUD_ACCESS_KEY'])
cloud_event = CloudEvent(
source = 'demo',
type = 'sdk.demo',
data = {'test': 'hello'},
)
with tracer.start_as_current_span(name="MyApplication"):
client = EventGridPublisherClient(hostname, key)
client.send(cloud_event)
故障排除
- 启用
azure.eventgrid
记录器以收集库的跟踪。
常规
Event Grid 客户端库将引发定义在 Azure Core 中的异常。
日志记录
此库使用标准的 logging 库进行日志记录。HTTP 会话的基本信息(URL、头等)在 INFO 级别进行记录。
可选配置
客户端和操作级别可以传递可选的关键字参数。Azure Core的参考文档描述了重试、日志记录、传输协议等可用的配置。
下一步
以下部分提供了几个代码片段,展示了在Event Grid Python API中使用的常见模式。
更多示例代码
这些代码示例显示了使用Azure Event Grid客户端库的常见场景操作。
其他命名空间Event Grid场景
- 验证客户端:sample_namespace_authentication_async.py
- 发布到命名空间主题:sample_publish_cloud_event_async.py
- 从事件订阅中消费和处理:sample_consume_process_events.py
其他基本Event Grid场景
-
生成共享访问签名:sample_generate_sas.py
-
验证客户端:sample_authentication.py (异步版本)
-
使用SAS将事件发布到主题:sample_publish_events_to_a_topic_using_sas_credential_async.py (异步版本)
-
将Event Grid事件发布到主题:sample_publish_eg_events_to_a_topic.py (异步版本)
-
将Event Grid事件发布到域主题:sample_publish_eg_events_to_a_domain_topic.py (异步版本)
-
发布Cloud Event:sample_publish_events_using_cloud_events_1.0_schema.py (异步版本)
以下示例涵盖了发布和消费EventGridEvents和CloudEvents的dict表示。
-
将EventGridEvent作为dict-like表示发布:sample_publish_eg_event_using_dict.py (异步版本)
-
将CloudEvent作为dict-like表示发布:sample_publish_cloud_event_using_dict.py (异步版本)
-
消费原始cloudevent数据的自定义有效负载:sample_consume_custom_payload.py
更多示例可以在此处找到。
其他文档
有关 Azure Event Grid 的更详细文档,请参阅 docs.microsoft.com 上的 Event Grid 文档。
贡献
此项目欢迎贡献和建议。大多数贡献都需要您同意贡献者许可协议(CLA),声明您有权并且确实授予我们使用您的贡献的权利。有关详细信息,请访问 cla.microsoft.com。
当您提交拉取请求时,CLA-bot 将自动确定您是否需要提供 CLA,并适当地装饰 PR(例如,标签、注释)。只需遵循机器人提供的说明即可。您只需要在整个使用我们的 CLA 的所有存储库中这样做一次。
此项目已采用 Microsoft 开源行为准则。有关更多信息,请参阅 行为准则常见问题解答 或联系 opencode@microsoft.com 以提出任何额外的问题或评论。
项目详情
下载文件
下载适合您平台的项目文件。如果您不确定要选择哪个,请了解有关 安装包 的更多信息。
源分发
构建分发
azure-eventgrid-4.20.0.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | c1edab90769bc4e7a2fb0a2037cf0455754d50af793674c08b3fec2293248c4c |
|
MD5 | c818c7b2e8be6c5aa39a1ae8815014e6 |
|
BLAKE2b-256 | bcc8ec467b14e18be5377acb350828f5ba75cfa8e792d0c19c2eafe4c5a1ab8d |
为 azure_eventgrid-4.20.0-py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | e93c87df46bf16f38d427adf7fd1673c47066f293dfa6f3cbc19036e2b3d6af1 |
|
MD5 | 5170d01f330b7e426e8665546bc9bd09 |
|
BLAKE2b-256 | a2bfa8e8cd88795803c7a593c3655f86131788b0e6b9fcd0860b30d77209a258 |