使用Google Cloud Pub/Sub的简单任务队列
项目描述
psq 是使用 Google Cloud Pub/Sub 的简单分布式任务队列的Python示例实现。
psq 需要最少的配置,并依赖于Cloud Pub/Sub提供可扩展和可靠的消息传递。
psq 故意与 rq 和 simpleq 相似,并从 celery 和 这篇博客文章 中汲取了一些灵感。
安装
通过 pip 安装
pip install psq
先决条件
Google开发者控制台上的一个项目。
已在本地上安装的 Google Cloud SDK。
您需要在项目中启用云 Pub/Sub API。链接将引导您启用 API。
在运行这些示例之前,您需要运行 gcloud auth,以便透明地处理对 Google Cloud Platform 服务的身份验证。
用法
首先,创建一个任务
def adder(a, b):
return a + b
然后,创建一个 pubsub 客户端和一个队列
from google.cloud import pubsub_v1
import psq
project = 'your-project-id'
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
q = psq.Queue(publisher, subscriber, project)
现在您可以入队任务
from tasks import adder
q.enqueue(adder)
为了获取任务结果,您必须配置存储
from google.cloud import pubsub_v1
from google.cloud import datastore
import psq
project = 'your-project-id'
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
ds_client = datastore.Client()
q = psq.Queue(
publisher, subscriber, project,
storage=psq.DatastoreStorage(ds_client))
配置存储后,您可以获取任务的结果
r = q.enqueue(adder, 5, 6)
r.result() # -> 11
您还可以定义多个队列
fast = psq.Queue(publisher, subscriber, project, 'fast')
slow = psq.Queue(publisher, subscriber, project, 'slow')
注意事项
因为 psq 与 rq 大部分相似,所以关于任务的规则也类似。只要满足以下条件,您可以将任何 Python 函数调用放入队列中:
函数可以被工作进程导入。这意味着函数所在的 __module__ 必须可以被导入。值得注意的是,您不能将声明在 main 模块中的函数入队 - 例如,通过 python 直接运行或在交互式解释器中运行的文件中定义的任务。
函数可以是一个字符串,但必须是工作进程可以导入的函数的绝对可导入路径。否则,任务将失败。
工作进程和排队任务的应用程序必须共享完全相同的源代码。
函数不能依赖于全局上下文,例如全局变量、当前请求等。在队列时将任何需要的上下文传递给工作进程。
交付保证
Pub/sub 保证任务将被发送到工作进程,但 psq 目前不保证任务完成执行或一次且仅一次的语义,尽管它允许您提供自己的机制来做到这一点。这与 Celery 的 默认 配置类似。
可以通过延迟确认支持来提供任务完成保证。使用 Cloud Pub/sub 可以实现延迟确认,但此库目前尚未实现。请参阅 CONTRIBUTING.md。
运行工作进程
在定义任务的同一目录下执行 psqworker
psqworker.py config.q
psqworker 只操作一个队列。如果您想让服务器监听多个队列,请使用类似 supervisord 的工具来运行多个 psqworker 进程。
广播队列
正常队列将单个任务发送到单个工作进程,将您的任务分散到所有监听同一队列的工作进程。还有广播队列,它们将任务副本发送到 每个 工作进程。这在您希望每个工作进程执行相同任务的情况下很有用,例如在每台服务器上安装或升级软件。
broadcast_q = psq.BroadcastQueue(publisher, subscriber, project)
def restart_apache_task():
call(["apachectl", "restart"])
broadcast_q.enqueue(restart_apache_task)
广播队列实现了在 Google Compute Engine 上的可靠任务调度 中描述的解决方案。
注意:广播队列目前不支持任何形式的存储,也不支持返回值。
重试
在您的任务中引发 psq.Retry 将导致它被重试。
from psq import Retry
def retry_if_fail(self):
try:
r = requests.get('http://some.flaky.service.com')
except Exception as e:
logging.error(e)
raise Retry()
Flask & 其他上下文
您可以将额外的上下文管理器绑定到队列。
app = Flask(__name__)
q = psq.Queue(extra_context=app.app_context)
这将确保上下文在您的任务中可用,这对于数据库连接等很有用。
from flask import current_app
def flasky_task():
backend = current_app.config['BACKEND']
绕过工作进程进行测试
在进行单元测试时,你绝对不希望启动工作进程,而是立即和同步地执行入队的函数。为此,将< cite>asynchronous=False传递给队列的构造函数(默认为True)。此外,在这种情况下,您不需要提供发布者、订阅者或项目参数,只需将所有这些参数都传递给队列的None即可。
q = psq.Queue(None, None, project=None, asynchronous=False)
r = q.enqueue(adder, 1, 2) # Will be run immediately
改进想法
为广播队列提供某种存储解决方案。
Memcache/redis值存储。
@task装饰器添加延迟/延迟函数。
任务链/组/和弦。
延迟确认。
Gevent工作进程。
对队列的批量支持。
贡献更改
许可
Apache 2.0 - 请参阅LICENSE
项目详细信息
下载文件
下载您平台上的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。
源分发
构建分发
psq-0.8.0.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | c79de0aa7853799cb3dd06fa1b4076511aa9ec4e5db873a95001fc67abe8c381 |
|
MD5 | 2459210a958230f0454719cb1afe6799 |
|
BLAKE2b-256 | 11f4bf27d21679fd1558840c50cb8bc04348992a2d91d7b2be3c81a06f06a6ed |
psq-0.8.0-py2.py3-none-any.whl的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 69608f691ecc5b8f0bfa633cf61d96db209342264d8f7d0833eda297a9cfe03c |
|
MD5 | ad6f72c7cc426e47517352d73ac6f635 |
|
BLAKE2b-256 | 14450a79c3f1e74a333ba8c92f40744d26b4b10af673df89e018c7915dda7474 |