跳转到主要内容

Taskcluster的Python客户端

项目描述

Taskcluster Python客户端

Download License

一个用于Python的Taskcluster客户端库。

此库是Python中Taskcluster的完整接口。它为所有Taskcluster API方法提供同步和异步接口。

用法

有关使用Taskcluster客户端的一般指南,请参阅 调用Taskcluster API

设置

在调用API端点之前,您需要创建一个客户端实例。每个服务都有一个类,例如QueueAuth。每个类都接受以下描述的相同选项。请注意,仅需要rootUrl,通常不配置除credentials之外的其他选项。

对于每个服务,都有同步和异步变体。在taskcluster下(例如,taskcluster.Queue)的类以同步方式操作。在taskcluster.aio下(例如,taskcluster.aio.Queue)的类是异步的。

身份验证选项

以下是一个Index客户端的简单设置

import taskcluster
index = taskcluster.Index({
  'rootUrl': 'https://tc.example.com',
  'credentials': {'clientId': 'id', 'accessToken': 'accessToken'},
})

必须提供rootUrl选项,因为它指定了API请求应发送到的Taskcluster部署。只有当请求需要认证时才需要凭证——许多Taskcluster API方法不需要认证。

在大多数情况下,应在标准环境变量中提供根URL和Taskcluster凭证。使用taskcluster.optionsFromEnvironment()自动读取这些变量。

auth = taskcluster.Auth(taskcluster.optionsFromEnvironment())

请注意,此函数不尊重TASKCLUSTER_PROXY_URL。要在任务内部使用Taskcluster代理

auth = taskcluster.Auth({'rootUrl': os.environ['TASKCLUSTER_PROXY_URL']})

授权范围

如果您想代表具有比您更小范围的一组范围的三方执行请求,您可以在authorizedScopes选项中指定您的请求应允许使用的范围,具体请参阅此处

opts = taskcluster.optionsFromEnvironment()
opts['authorizedScopes'] = ['queue:create-task:highest:my-provisioner/my-worker-type']
queue = taskcluster.Queue(opts)

其他选项

在构建客户端对象时,可以接受以下附加选项:

  • signedUrlExpiration - buildSignedUrl方法的expiration参数的默认值
  • maxRetries - 重新尝试失败请求的最大次数

调用API方法

API方法作为相应客户端对象上的方法提供。对于同步客户端,这些是同步方法,对于异步客户端,它们是异步方法;两种情况下调用约定都是相同的。

方法有四种调用约定

client.method(v1, v1, payload)
client.method(payload, k1=v1, k2=v2)
client.method(payload=payload, query=query, params={k1: v1, k2: v2})
client.method(v1, v2, payload=payload, query=query)

在这里,v1v2是URL参数(命名为k1k2),payload是请求有效载荷,query是查询参数的字典。

例如,为了以查询字符串参数调用API方法

await queue.listTaskGroup('JzTGxwxhQ76_Tt1dxkaG5g',
  query={'continuationToken': previousResponse.get('continuationToken')})

生成URL

通常需要生成API方法的URL而实际上并不调用该方法。为此,请使用buildUrl或对于需要认证的API方法,使用buildSignedUrl

import taskcluster

index = taskcluster.Index(taskcluster.optionsFromEnvironment())
print(index.buildUrl('findTask', 'builds.v1.latest'))
secrets = taskcluster.Secrets(taskcluster.optionsFromEnvironment())
print(secret.buildSignedUrl('get', 'my-secret'))

请注意,签名URL是有时间限制的;可以使用客户端构造函数中的signedUrlExpiration选项或使用buildSignedUrl中的expiration关键字参数设置过期时间,两者都以秒为单位。

生成临时凭证

如果您有非临时任务集群凭证,可以按照以下方式生成一组临时凭证。请注意,凭证不能超过31天,并且您只能通过撤销用于颁发它们的凭证来撤销它们(这需要最多一个小时)。

调用者不需要对开始或过期时间应用任何时钟漂移调整——这是由认证服务直接处理的。

import datetime

start = datetime.datetime.now()
expiry = start + datetime.timedelta(0,60)
scopes = ['ScopeA', 'ScopeB']
name = 'foo'

credentials = taskcluster.createTemporaryCredentials(
    # issuing clientId
    clientId,
    # issuing accessToken
    accessToken,
    # Validity of temporary credentials starts here, in timestamp
    start,
    # Expiration of temporary credentials, in timestamp
    expiry,
    # Scopes to grant the temporary credentials
    scopes,
    # credential name (optional)
    name
)

您不能使用临时凭证颁发新的临时凭证。您必须有auth:create-client:<name>来创建一个命名的临时凭证,但无论您的范围如何,都可以创建未命名的临时凭证。

处理时间戳

许多任务集群API需要ISO 8601时间戳偏移到未来作为提供过期时间、最后期限等方式。这些可以使用datetime.datetime.isoformat()轻松创建,但是,将datetime.datetime对象偏移到未来可能会相当容易出错且繁琐。因此,该库为此目的提供了两个实用函数。

dateObject = taskcluster.fromNow("2 days 3 hours 1 minute")
  # -> datetime.datetime(2017, 1, 21, 17, 8, 1, 607929)
dateString = taskcluster.fromNowJSON("2 days 3 hours 1 minute")
  # -> '2017-01-21T17:09:23.240178Z'

默认情况下,它将日期时间偏移到未来,如果偏移字符串以减号(-)开头,则日期对象将偏移到过去。这在某些边缘情况下很有用。

dateObject = taskcluster.fromNow("- 1 year 2 months 3 weeks 5 seconds");
  # -> datetime.datetime(2015, 10, 30, 18, 16, 50, 931161)

偏移字符串忽略空格,并且不区分大小写。它还可以选择性地以加号+(如果没有加号则为减号)为前缀,任何+前缀都将被忽略。然而,偏移字符串中的条目必须从高到低给出,即2年1天。此外,还可以使用各种缩写,如下所示。

  years,    year,   yr,   y
  months,   month,  mo
  weeks,    week,         w
  days,     day,          d
  hours,    hour,         h
  minutes,  minute, min
  seconds,  second, sec,  s

fromNow方法还可以接受一个作为第二个参数的日期,相对于该日期为相对日期。这在将任务到期与任务截止日期偏移或执行类似操作时很有用。此参数也可以作为关键字参数dateObj传递。

dateObject1 = taskcluster.fromNow("2 days 3 hours");
dateObject2 = taskcluster.fromNow("1 year", dateObject1);
taskcluster.fromNow("1 year", dateObj=dateObject1);
  # -> datetime.datetime(2018, 1, 21, 17, 59, 0, 328934)

生成SlugIDs

要生成slugIds(Taskcluster的客户生成唯一ID),请使用taskcluster.slugId(),它将在每次调用时返回一个唯一的slugId。

在某些情况下,能够从名称到slugIds创建映射很有用,并且能够多次生成相同的slugId。函数taskcluster.stableSlugId()返回一个可调用的对象,它正好执行此操作。

gen = taskcluster.stableSlugId()
sometask = gen('sometask')
assert gen('sometask') == sometask  # same input generates same output
assert gen('sometask') != gen('othertask')

gen2 = taskcluster.stableSlugId()
sometask2 = gen('sometask')
assert sometask2 != sometask  # but different slugId generators produce
                              # different output

范围分析

函数scopeMatch(assumedScopes, requiredScopeSets)确定一组所需范围是否由假设范围满足,考虑*-扩展。这在做出范围满足的本地决策时很有用,但请注意,assumed_scopes必须是展开的范围,因为此函数无法执行扩展。

它接受一个假设范围列表和一个要求范围集列表(析取范式),并检查是否有任何所需范围集被满足。

示例

requiredScopeSets = [
    ["scopeA", "scopeB"],
    ["scopeC:*"]
]
assert scopesMatch(['scopeA', 'scopeB'], requiredScopeSets)
assert scopesMatch(['scopeC:xyz'], requiredScopeSets)
assert not scopesMatch(['scopeA'], requiredScopeSets)
assert not scopesMatch(['scopeC'], requiredScopeSets)

分页

许多Taskcluster API方法都是分页的。Python客户端有两种简单处理分页的方式。第一种是在你的代码中实现分页。

import taskcluster
queue = taskcluster.Queue({'rootUrl': 'https://tc.example.com'})
i = 0
tasks = 0
outcome = queue.listTaskGroup('JzTGxwxhQ76_Tt1dxkaG5g')
while outcome.get('continuationToken'):
    print('Response %d gave us %d more tasks' % (i, len(outcome['tasks'])))
    if outcome.get('continuationToken'):
        outcome = queue.listTaskGroup('JzTGxwxhQ76_Tt1dxkaG5g', query={'continuationToken': outcome.get('continuationToken')})
    i += 1
    tasks += len(outcome.get('tasks', []))
print('Task Group %s has %d tasks' % (outcome['taskGroupId'], tasks))

还有一个实验性功能,支持同步客户端的内置自动分页。此功能允许将回调作为关键字参数'paginationHandler'传递。此函数将只传递API方法响应体作为其唯一位置参数。

此内置分页示例显示了如何构建任务列表并对其进行计数。

import taskcluster
queue = taskcluster.Queue({'rootUrl': 'https://tc.example.com'})

responses = []

def handle_page(y):
    print("%d tasks fetched" % len(y.get('tasks', [])))
    responses.append(y)

queue.listTaskGroup('JzTGxwxhQ76_Tt1dxkaG5g', paginationHandler=handle_page)

tasks = 0
for response in responses:
    tasks += len(response.get('tasks', []))

print("%d requests fetch %d tasks" % (len(responses), tasks))

Pulse事件

此库可以根据每个服务提供的交换定义生成Pulse消息的交换模式。这是通过实例化一个<service>Events类并调用一个名为事件的方法来完成的。主题交换方法的选项可以是单个字典参数或关键字参数的形式。只能使用其中一种形式。

from taskcluster import client
qEvt = client.QueueEvents({rootUrl: 'https://tc.example.com'})
# The following calls are equivalent
print(qEvt.taskCompleted({'taskId': 'atask'}))
print(qEvt.taskCompleted(taskId='atask'))

请注意,客户端库不提供与Pulse服务器接口的支持。

日志记录

日志记录在taskcluster/__init__.py中设置。如果设置了特殊环境变量DEBUG_TASKCLUSTER_CLIENT,则__init__.py模块将设置其日志记录器级别的logging模块为logging.DEBUG,如果没有现有处理程序,则添加一个logging.StreamHandler()实例。这是为了帮助那些不希望麻烦配置python日志记录模块但想要调试消息的人。

上传和下载对象

对象服务提供了一个API,用于可靠地上传和下载大对象。此库提供了方便的方法来实现这些API的客户部分,提供了经过良好测试和健壮的上传和下载功能。这些方法将与对象服务协商适当的方法并执行所需的步骤来传输数据。

所有方法都提供同步和异步版本,API相同,只是除了async/await关键字。

在任何情况下,您都需要提供一个配置了适当凭据的操作Object实例。

注意:存在一个用于上传 s3 艺术品的辅助函数,taskcluster.helper.upload_artifact,但它已被弃用,因为它只支持 s3 艺术品类型。

上传

要上传,可以使用以下任何一个

  • await taskcluster.aio.upload.uploadFromBuf(projectId=.., name=.., contentType=.., contentLength=.., uploadId=.., expires=.., maxRetries=.., objectService=.., data=..) - 异步从字节缓冲区上传数据。
  • await taskcluster.aio.upload.uploadFromFile(projectId=.., name=.., contentType=.., contentLength=.., uploadId=.., expires=.., maxRetries=.., objectService=.., file=..) - 异步从标准 Python 文件上传数据。注意,即使在异步上下文中,这也可能是你想要的选项
  • await taskcluster.aio.upload(projectId=.., name=.., contentType=.., contentLength=.., expires=.., uploadId=.., maxRetries=.., objectService=.., readerFactory=..) - 异步从异步读取器工厂上传数据。
  • taskcluster.upload.uploadFromBuf(projectId=.., name=.., contentType=.., contentLength=.., expires=.., uploadId=.., maxRetries=.., objectService=.., data=..) - 从字节缓冲区上传数据。
  • taskcluster.upload.uploadFromFile(projectId=.., name=.., contentType=.., contentLength=.., expires=.., uploadId=.., maxRetries=.., objectService=.., file=..) - 从标准 Python 文件上传数据。
  • taskcluster.upload(projectId=.., name=.., contentType=.., contentLength=.., expires=.., uploadId=.., maxRetries=.., objectService=.., readerFactory=..) - 从同步读取器工厂上传数据。

“读取器”是一个具有 read(max_size=-1) 方法的对象,它读取并返回一个 1 .. max_size 字节的块,或者在 EOF 时返回一个空字符串。对于异步函数是异步的,对于其余的是同步的。“读取器工厂”是一个异步可调用对象,它返回一个新鲜的读取器,准备好读取对象的第一字节。在重试上传时,可能会多次调用读取器工厂。

可以省略 uploadId 参数,在这种情况下将生成新的 slugId。

下载

要下载,可以使用以下任何一个

  • await taskcluster.aio.download.downloadToBuf(name=.., maxRetries=.., objectService=..) - 异步将对象下载到内存缓冲区,返回一个元组 (buffer, content-type)。如果文件大于可用内存,则将崩溃。
  • await taskcluster.aio.download.downloadToFile(name=.., maxRetries=.., objectService=.., file=..) - 异步将对象下载到标准 Python 文件,返回内容类型。
  • await taskcluster.aio.download.download(name=.., maxRetries=.., objectService=.., writerFactory=..) - 异步将对象下载到异步写入器工厂,返回内容类型。
  • taskcluster.download.downloadToBuf(name=.., maxRetries=.., objectService=..) - 将对象下载到内存缓冲区,返回一个元组 (buffer, content-type)。如果文件大于可用内存,则将崩溃。
  • taskcluster.download.downloadToFile(name=.., maxRetries=.., objectService=.., file=..) - 将对象下载到标准 Python 文件,返回内容类型。
  • taskcluster.download.download(name=.., maxRetries=.., objectService=.., writerFactory=..) - 将对象下载到同步写入器工厂,返回内容类型。

“写入器”是一个具有 write(data) 方法的对象,它写入给定的数据,对于异步函数是异步的,对于其余的是同步的。“写入器工厂”是一个可调用对象(再次是异步或同步),它返回一个新鲜的写入器,准备好写入对象的第一字节。在重试上传时,可能会多次调用写入器工厂。

艺术品下载

可以使用与上面类似的函数从队列服务下载艺术品。这些函数支持队列的所有存储类型,对于 error 艺术品会引发错误。在每种情况下,如果省略了 runId,则将使用最近的运行。

  • await taskcluster.aio.download.downloadArtifactToBuf(taskId=.., runId=.., name=.., maxRetries=.., queueService=..) - 异步将对象下载到内存缓冲区,返回一个元组 (buffer, content-type)。如果文件大于可用内存,则将崩溃。
  • await taskcluster.aio.download.downloadArtifactToFile(taskId=.., runId=.., name=.., maxRetries=.., queueService=.., file=..) - 异步将对象下载到标准Python文件中,返回内容类型。
  • await taskcluster.aio.download.downloadArtifact(taskId=.., runId=.., name=.., maxRetries=.., queueService=.., writerFactory=..) - 异步将对象下载到异步写入工厂中,返回内容类型。
  • taskcluster.download.downloadArtifactToBuf(taskId=.., runId=.., name=.., maxRetries=.., queueService=..) - 将对象下载到内存缓冲区中,返回一个元组(缓冲区,内容类型)。如果文件大于可用内存,这将崩溃。
  • taskcluster.download.downloadArtifactToFile(taskId=.., runId=.., name=.., maxRetries=.., queueService=.., file=..) - 将对象下载到标准Python文件中,返回内容类型。
  • taskcluster.download.downloadArtifact(taskId=.., runId=.., name=.., maxRetries=.., queueService=.., writerFactory=..) - 将对象下载到同步写入工厂中,返回内容类型。

集成助手

Python Taskcluster客户端有一个名为taskcluster.helper的模块,其中包含一些实用工具,可以轻松地在项目中的多个服务之间共享认证选项。

通常,使用此库的项目将面临不同的用例和认证选项

  • 没有Taskcluster访问权限的新贡献者无需认证
  • 通过开发者的计算机上的环境变量使用特定的客户端凭证
  • 在任务运行时使用Taskcluster代理

共享认证

taskcluster.helper.TaskclusterConfig类旨在在项目中的一个地方实例化,通常在顶级模块中。然后,通过项目中的不同部分访问这个单例,每当需要Taskcluster服务时。

以下是一个示例用法

  1. project/__init__.py中,此时没有调用Taskcluster
from taskcluster.helper import Taskcluster config

tc = TaskclusterConfig('https://community-tc.services.mozilla.com')
  1. project/boot.py中,我们使用提供的凭证、环境变量或taskcluster代理(按此顺序)在Taskcuster上进行认证
from project import tc

tc.auth(client_id='XXX', access_token='YYY')
  1. 此时,您可以从代码的任何位置使用认证包装器加载任何服务
from project import tc

def sync_usage():
    queue = tc.get_service('queue')
    queue.ping()

async def async_usage():
    hooks = tc.get_service('hooks', use_async=True)  # Asynchronous service class
    await hooks.ping()

支持的环境变量有

  • TASKCLUSTER_ROOT_URL用于指定您的Taskcluster实例的基础URL。您可以使用该变量或使用基础URL实例化TaskclusterConfig
  • TASKCLUSTER_CLIENT_IDTASKCLUSTER_ACCESS_TOKEN用于指定您的客户端凭证,而不是将它们提供给TaskclusterConfig.auth
  • TASKCLUSTER_PROXY_URL用于指定在任务中访问Taskcluster时使用的代理地址。未指定时默认为http://taskcluster

有关Taskcluster环境变量的更多详细信息,请参阅此处文档

跨多个认证加载机密信息

另一个可用的实用工具是taskcluster.helper.load_secrets,它允许您使用认证的taskcluster.Secrets实例(使用TaskclusterConfig.get_service或同步类直接)检索机密。

此实用工具加载机密,但允许您

  1. 通过在机密中使用键前缀共享机密跨多个项目
  2. 检查机密中是否存在某些必需的键
  3. 提供一些默认值
  4. 提供本地机密源而不是使用Taskcluster服务(对于本地开发或与贡献者共享机密很有用)

假设您在一个名为project/foo/prod-config的Taskcluster实例上有一个机密,它由后端和一些任务使用。以下是其内容

common:
  environment: production
  remote_log: https://log.xx.com/payload

backend:
  bugzilla_token: XXXX

task:
  backend_url: https://backend.foo.mozilla.com

在您的后端中,您会这样做

from taskcluster import Secrets
from taskcluster.helper import load_secrets

prod_config = load_secrets(
  Secrets({...}),
  'project/foo/prod-config',

  # We only need the common & backend parts
  prefixes=['common', 'backend'],

  # We absolutely need a bugzilla token to run
  required=['bugzilla_token'],

  # Let's provide some default value for the environment
  existing={
    'environment': 'dev',
  }
)
  # -> prod_config == {
  #     "environment": "production"
  #     "remote_log": "https://log.xx.com/payload",
  #     "bugzilla_token": "XXXX",
  #   }

在您的任务中,您可以使用上面提到的TaskclusterConfig(该类具有使用认证的Secrets服务自动的快捷方式)执行以下操作

from project import tc

prod_config = tc.load_secrets(
  'project/foo/prod-config',

  # We only need the common & bot parts
  prefixes=['common', 'bot'],

  # Let's provide some default value for the environment and backend_url
  existing={
    'environment': 'dev',
    'backend_url': 'http://localhost:8000',
  }
)
  # -> prod_config == {
  #     "environment": "production"
  #     "remote_log": "https://log.xx.com/payload",
  #     "backend_url": "https://backend.foo.mozilla.com",
  #   }

要提供本地机密值,您首先需要将这些值加载为字典(通常通过读取您选择的本地文件格式:YAML、JSON等...),并将字典提供给load_secrets,使用local_secrets参数

import os
import yaml

from taskcluster import Secrets
from taskcluster.helper import load_secrets

local_path = 'path/to/file.yml'

prod_config = load_secrets(
  Secrets({...}),
  'project/foo/prod-config',

  # We support an optional local file to provide some configuration without reaching Taskcluster
  local_secrets=yaml.safe_load(open(local_path)) if os.path.exists(local_path) else None,
)

兼容性

这个库与Taskcluster本身是同版本的。也就是说,版本为x.y.z的客户端包含对应于Taskcluster版本x.y.z的API方法。Taskcluster小心翼翼地保持API兼容性,并在主版本内保证它。这意味着任何版本为x.*的客户端都将与任何版本为x.*的Taskcluster服务兼容,并且很可能与其他许多主要版本的Taskcluster服务兼容。任何不兼容性都将在变更日志中记录。

项目详情


发布历史 发布通知 | RSS订阅

下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源代码分发

taskcluster-72.0.1.tar.gz (127.5 kB 查看哈希值)

上传时间: 源代码

构建分发版本

taskcluster-72.0.1-py3-none-any.whl (145.9 kB 查看哈希值)

上传时间: Python 3

支持者