跳转到主要内容

适用于Python的Microsoft Communication JobRouter客户端库

项目描述

Azure Communication JobRouter Package客户端库,适用于Python

此包包含适用于Azure Communication Services for JobRouter的Python SDK。有关Azure Communication Services的更多信息,请参阅此处

源代码 | 包(Pypi) | 产品文档

免责声明

Azure SDK Python包对Python 2.7的支持已于2022年1月1日结束。有关更多信息,请参阅https://github.com/Azure/azure-sdk-for-python/issues/20691

入门

先决条件

您需要一个Azure订阅和一个通信服务资源才能使用此包。

安装包

使用pip安装Azure Communication JobRouter客户端库的Python版本

pip install azure-communication-jobrouter

关键概念

作业

作业表示需要路由到可用的Worker的工作单元。在呼叫中心环境中,这可能是一个传入的电话或聊天。

Worker

Worker代表处理作业的资源。每个Worker都会注册一个或多个队列以接收作业。在呼叫中心环境中,这可能是一个代表。

队列

队列代表一个按顺序等待由Worker处理的作业列表。Worker将注册队列以从中接收工作。在呼叫中心环境中,这可能是一个电话队列。

通道

通道表示按某种类型对作业进行分组。当Worker注册接收工作时,他们还必须指定可以处理哪些通道的工作,以及他们可以同时处理多少个。在呼叫中心环境中,这可能包括电话呼叫聊天

报价

当JobRouter确定匹配时,它会向Worker扩展报价以处理特定的作业,这种通知通常通过EventGrid发送。Worker可以使用JobRouter API接受或拒绝报价,或者根据分配策略上配置的生存时间过期。在呼叫中心环境中,这可能是一个代表的电话响铃。

分配策略

分配策略代表一组配置,用于控制队列中的作业如何分配给已注册该队列的Worker。此配置包括报价在过期前的有效期和分配模式,后者定义在有多名可用Worker时选择Worker的顺序。

分配模式

有三种模式:

  • 轮询:Worker按Id排序,选择上一个获得报价的Worker之后的下一个Worker。
  • 最长空闲:最长时间未处理作业的Worker。
  • 最佳Worker:您可以指定一个表达式来比较两个Worker,以确定选择哪一个。

标签

您可以将标签附加到Worker、作业和队列上。这些是键值对,可以是stringnumberboolean数据类型。在呼叫中心环境中,这可能是一个特定Worker的技能水平或团队或地理位置。

标签选择器

标签选择器可以附加到作业上,以针对队列中服务的子集的Worker。在呼叫中心环境中,这可能是一个条件,即代表必须对特定产品的知识有最低水平。

分类策略

分类策略可以使用规则引擎动态选择队列、确定作业优先级并将Worker标签选择器附加到作业上。

异常策略

异常策略根据触发器控制作业的行为,并执行所需操作。异常策略附加到队列,因此可以控制队列中作业的行为。

示例

客户端初始化

要初始化短信客户端,可以使用连接字符串进行实例化。或者,您也可以使用DefaultAzureCredential通过Active Directory身份验证。

from azure.communication.jobrouter import (
    JobRouterClient,
    JobRouterAdministrationClient
)

connection_string = "endpoint=ENDPOINT;accessKey=KEY"
router_client = JobRouterClient.from_connection_string(conn_str = connection_string)
router_admin_client = JobRouterAdministrationClient.from_connection_string(conn_str = connection_string)

分配策略

在我们可以创建队列之前,我们需要一个分配策略。

from azure.communication.jobrouter.models import (
    LongestIdleMode,
    DistributionPolicy
)

distribution_policy: DistributionPolicy = DistributionPolicy(
    offer_expires_after_seconds = 24 * 60 * 60,
    mode = LongestIdleMode(
        min_concurrent_offers = 1,
        max_concurrent_offers = 1
    )
)

distribution_policy: DistributionPolicy = router_admin_client.upsert_distribution_policy(
    "distribution-policy-1",
    distribution_policy
)

队列

接下来,我们可以创建队列。

from azure.communication.jobrouter.models import (
    RouterQueue
)

queue: RouterQueue = RouterQueue(
    distribution_policy_id = "distribution-policy-1"
)

queue: RouterQueue = router_admin_client.upsert_queue(
    "queue-1",
    queue
)

作业

现在,我们可以直接将任务提交到该队列,工作选择器要求工作者的标签 Some-Skill 大于 10。

from azure.communication.jobrouter.models import (
    RouterJob,
    RouterWorkerSelector,
    LabelOperator
)

router_job: RouterJob = RouterJob(
    channel_id = "my-channel",
    queue_id = "queue-1",
    channel_reference = "12345",
    priority = 1,
    requested_worker_selectors = [
        RouterWorkerSelector(key = "Some-Skill", label_operator = LabelOperator.EQUAL, value = 10)
    ]
)

job: RouterJob = router_client.upsert_job(
    "jobId-1",
    router_job
)

Worker

现在,我们注册一个工作节点以从该队列接收工作,标签为 Some-Skill 等于 11。

from azure.communication.jobrouter.models import (
    RouterWorker,
    RouterChannel
)

router_worker: RouterWorker = RouterWorker(
    capacity = 1,
    queues = [
        "queue-1"
    ],
    labels = {
        "Some-Skill": 11
    },
    channels = [
        RouterChannel(channel_id = "my-channel", capacity_cost_per_job = 1)
    ],
    available_for_offers = True
)

worker = router_client.upsert_worker(
    "worker-1",
    router_worker
)

报价

我们应该从我们的 事件网格订阅 中收到一个 RouterWorkerOfferIssued

有几个不同的 Azure 服务充当 事件处理器。对于此场景,我们将假设使用 Webhook 进行事件交付。 了解有关 Webhook 事件交付的更多信息

一旦事件被发送到事件处理器,我们就可以将 JSON 有效负载反序列化为事件列表。

# Parse the JSON payload into a list of events
from azure.eventgrid import EventGridEvent
import json

## deserialize payload into a list of typed Events
events = [EventGridEvent.from_json(json.loads(msg)) for msg in payload]
offer_id = ""
for event in events:
    if event.event_type == "Microsoft.Communication.RouterWorkerOfferIssued":
        offer_id = event.data.offer_id
    else:
        continue

然而,我们也可以等待几秒钟,然后直接使用 JobRouter API 查询工作节点,看看是否有提供。

from azure.communication.jobrouter.models import (
    RouterWorker,
)

router_worker: RouterWorker = router_client.get_worker(worker_id = "worker-1")

for offer in router_worker.offers:
    print(f"Worker {router_worker.id} has an active offer for job {offer.job_id}")

接受提供

一旦工作节点收到提供,它可以选择两个可能的行为:接受或拒绝。我们将接受提供。

from azure.communication.jobrouter.models import (
    RouterJobOffer,
    AcceptJobOfferResult,
    RouterJobStatus
)

# fetching the offer id
job_offer: RouterJobOffer = [offer for offer in router_worker.offers if offer.job_id == "jobId-1"][0]
offer_id = job_offer.offer_id

# accepting the offer sent to `worker-1`
accept_job_offer_result: AcceptJobOfferResult = router_client.accept_job_offer(
    worker_id = "worker-1",
    offer_id = offer_id
)

print(f"Offer: {job_offer.offer_id} sent to worker: {router_worker.id} has been accepted")
print(f"Job has been assigned to worker: {router_worker.id} with assignment: {accept_job_offer_result.assignment_id}")

# verify job assignment is populated when querying job
updated_job = router_client.get_job(job_id = "jobId-1")
print(f"Job assignment has been successful: {updated_job.job_status == RouterJobStatus.Assigned and accept_job_offer_result.assignment_id in updated_job.assignments}")

完成任务

一旦工作节点完成工作,工作节点必须将任务标记为 完成

import datetime
from azure.communication.jobrouter.models import (
    CompleteJobOptions
)
complete_job_result = router_client.complete_job(
    "jobId-1",
    accept_job_offer_result.assignment_id,
    CompleteJobOptions(
        note = f"Job has been completed by {router_worker.id} at {datetime.datetime.utcnow()}"
    )
)

print(f"Job has been successfully completed.")

关闭任务

在任务完成后,工作节点可以在关闭任务并最终释放其容量以接受更多传入任务之前执行任务总结操作。

from azure.communication.jobrouter.models import (
    RouterJob,
    RouterJobStatus,
    CloseJobOptions,
)

close_job_result = router_client.close_job(
    "jobId-1",
    accept_job_offer_result.assignment_id,
    CloseJobOptions(
        note = f"Job has been closed by {router_worker.id} at {datetime.datetime.utcnow()}"
    )
)

print(f"Job has been successfully closed.")

update_job: RouterJob = router_client.get_job(job_id = "jobId-1")
print(f"Updated job status: {update_job.job_status == RouterJobStatus.CLOSED}")
import time
from datetime import datetime, timedelta
from azure.communication.jobrouter.models import (
    RouterJob,
    RouterJobStatus,
    CloseJobOptions,
)

close_job_in_future_result = router_client.close_job(
    "jobId-1",
    accept_job_offer_result.assignment_id,
    CloseJobOptions(
        note = f"Job has been closed by {router_worker.id} at {datetime.utcnow()}",
        close_at = datetime.utcnow() + timedelta(seconds = 2)
    )
)

print(f"Job has been marked to close")
time.sleep(secs = 2)
update_job: RouterJob = router_client.get_job(job_id = "jobId-1")
print(f"Updated job status: {update_job.job_status == RouterJobStatus.CLOSED}")

故障排除

遇到问题?本节应包含有关如何处理的详细信息。

下一步

更多示例代码

请查看 示例 目录,以获取如何使用此库的详细示例。

提供反馈

如果您遇到任何错误或有建议,请在项目的 问题 部分提交问题。

贡献

本项目欢迎贡献和建议。大多数贡献需要您同意贡献者许可协议(CLA),声明您有权并且实际上确实授予我们使用您贡献的权利。有关详细信息,请访问 cla.microsoft.com

本项目已采用 Microsoft 开源行为准则。有关更多信息,请参阅 行为准则常见问题解答 或通过 opencode@microsoft.com 联系以提出任何额外的问题或评论。

项目详情


下载文件

下载适用于您的平台文件。如果您不确定选择哪个,请了解有关 安装软件包 的更多信息。

源分发

azure-communication-jobrouter-1.0.0.tar.gz (145.1 kB 查看哈希值)

上传时间

构建分发

azure_communication_jobrouter-1.0.0-py3-none-any.whl (114.0 kB 查看哈希值)

上传时间 Python 3

由以下支持