跳转到主要内容

使用Google Cloud Pub/Sub的简单任务队列

项目描述

Build Status Coverage Status PyPI Version

psq 是使用 Google Cloud Pub/Sub 的简单分布式任务队列的Python示例实现。

psq 需要最少的配置,并依赖于Cloud Pub/Sub提供可扩展和可靠的消息传递。

psq 故意与 rqsimpleq 相似,并从 celery这篇博客文章 中汲取了一些灵感。

安装

通过 pip 安装

pip install psq

先决条件

  • Google开发者控制台上的一个项目。

  • 已在本地上安装的 Google Cloud SDK

  • 您需要在项目中启用云 Pub/Sub API。链接将引导您启用 API。

  • 在运行这些示例之前,您需要运行 gcloud auth,以便透明地处理对 Google Cloud Platform 服务的身份验证。

用法

首先,创建一个任务

def adder(a, b):
    return a + b

然后,创建一个 pubsub 客户端和一个队列

from google.cloud import pubsub_v1
import psq


project = 'your-project-id'

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()

q = psq.Queue(publisher, subscriber, project)

现在您可以入队任务

from tasks import adder

q.enqueue(adder)

为了获取任务结果,您必须配置存储

from google.cloud import pubsub_v1
from google.cloud import datastore
import psq


project = 'your-project-id'

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
ds_client = datastore.Client()

q = psq.Queue(
    publisher, subscriber, project,
    storage=psq.DatastoreStorage(ds_client))

配置存储后,您可以获取任务的结果

r = q.enqueue(adder, 5, 6)
r.result() # -> 11

您还可以定义多个队列

fast = psq.Queue(publisher, subscriber, project, 'fast')
slow = psq.Queue(publisher, subscriber, project, 'slow')

注意事项

因为 psqrq 大部分相似,所以关于任务的规则也类似。只要满足以下条件,您可以将任何 Python 函数调用放入队列中:

  • 函数可以被工作进程导入。这意味着函数所在的 __module__ 必须可以被导入。值得注意的是,您不能将声明在 main 模块中的函数入队 - 例如,通过 python 直接运行或在交互式解释器中运行的文件中定义的任务。

  • 函数可以是一个字符串,但必须是工作进程可以导入的函数的绝对可导入路径。否则,任务将失败。

  • 工作进程和排队任务的应用程序必须共享完全相同的源代码。

  • 函数不能依赖于全局上下文,例如全局变量、当前请求等。在队列时将任何需要的上下文传递给工作进程。

交付保证

Pub/sub 保证任务将被发送到工作进程,但 psq 目前不保证任务完成执行或一次且仅一次的语义,尽管它允许您提供自己的机制来做到这一点。这与 Celery 的 默认 配置类似。

可以通过延迟确认支持来提供任务完成保证。使用 Cloud Pub/sub 可以实现延迟确认,但此库目前尚未实现。请参阅 CONTRIBUTING.md

有许多实现一次且仅一次语义的方法,例如分布式锁。这可以在诸如 zookeeperredis 的系统中实现。

运行工作进程

在定义任务的同一目录下执行 psqworker

psqworker.py config.q

psqworker 只操作一个队列。如果您想让服务器监听多个队列,请使用类似 supervisord 的工具来运行多个 psqworker 进程。

广播队列

正常队列将单个任务发送到单个工作进程,将您的任务分散到所有监听同一队列的工作进程。还有广播队列,它们将任务副本发送到 每个 工作进程。这在您希望每个工作进程执行相同任务的情况下很有用,例如在每台服务器上安装或升级软件。

broadcast_q = psq.BroadcastQueue(publisher, subscriber, project)

def restart_apache_task():
    call(["apachectl", "restart"])

broadcast_q.enqueue(restart_apache_task)

广播队列实现了在 Google Compute Engine 上的可靠任务调度 中描述的解决方案。

注意:广播队列目前不支持任何形式的存储,也不支持返回值。

重试

在您的任务中引发 psq.Retry 将导致它被重试。

from psq import Retry

def retry_if_fail(self):
    try:
        r = requests.get('http://some.flaky.service.com')
    except Exception as e:
        logging.error(e)
        raise Retry()

Flask & 其他上下文

您可以将额外的上下文管理器绑定到队列。

app = Flask(__name__)

q = psq.Queue(extra_context=app.app_context)

这将确保上下文在您的任务中可用,这对于数据库连接等很有用。

from flask import current_app

def flasky_task():
    backend = current_app.config['BACKEND']

绕过工作进程进行测试

在进行单元测试时,你绝对不希望启动工作进程,而是立即和同步地执行入队的函数。为此,将< cite>asynchronous=False传递给队列的构造函数(默认为True)。此外,在这种情况下,您不需要提供发布者、订阅者或项目参数,只需将所有这些参数都传递给队列的None即可。

q = psq.Queue(None, None, project=None, asynchronous=False)
r = q.enqueue(adder, 1, 2) # Will be run immediately

改进想法

  • 为广播队列提供某种存储解决方案。

  • Memcache/redis值存储。

  • @task装饰器添加延迟/延迟函数。

  • 任务链/组/和弦。

  • 延迟确认。

  • Gevent工作进程。

  • 对队列的批量支持。

贡献更改

许可

项目详细信息


下载文件

下载您平台上的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分发

psq-0.8.0.tar.gz (17.5 kB 查看哈希值)

上传时间

构建分发

psq-0.8.0-py2.py3-none-any.whl (27.4 kB 查看哈希值)

上传时间 Python 2 Python 3

由以下机构支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面