Django的简单Postgres队列
项目描述
Django的简单Postgres队列
这是一个集成了tpq的Django应用程序。此应用程序提供了基本的消息队列功能以及高级的future实现。
消息队列
要实现消息队列,首先必须创建一个从抽象模型BaseQueue派生的模型。完成后,为该模型创建一个迁移,然后编辑迁移以添加一些额外的数据库对象(tpq为您这样做,但您必须从迁移中调用它)。然后迁移并使用您的队列。
from django_tpq.main.models import BaseQueue
class MyQueue(BaseQueue):
pass
$ python manage.py makemigrations
现在编辑迁移,并添加RunPython步骤,就像处理futures的初始迁移一样。您还需要在forward函数中自定义模型名称。
$ python manage.py migrate
from myapp.models import MyQueue
MyQueue.objects.enqueue({'field': 'value'})
message = MyQueue.objects.dequeue()
Future
以以上为基础,提供了一个简单的Future实现。首先,您必须将您希望异步调用的任何函数注册为future。然后调用该future。您可以等待或轮询结果。
import time
from django_tpq.futures.decorators import future
@future()
def long_running_function(*args):
time.sleep(100)
# You can execute the future without waiting for a result. This returns
# immediately and your future runs within another process (fire and forget).
long_running_function.async('argument_1')
# Or you can poll for the results (or check after you do some other work).
f = long_running_function.async('argument_1')
while True:
try:
r = f.result()
except Exception:
# Exceptions are re-raised.
LOGGER.exception('Future failed', exc_info)
break
else:
break
time.sleep(10)
print(r)
# Or optionally, you can block waiting for the result.
f = long_running_function.call('argument_1')
try:
r = f.result(wait=0)
except Exception:
# Exceptions are re-raised.
LOGGER.exception('Future failed', exc_info)
print(r)
函数调用通过消息队列进行分发。参数被序列化,因此您可以发送任何可序列化的Python对象。结果通过您配置的缓存进行交付。默认情况下使用default缓存,但您可以使用FUTURES_RESULT_CACHE设置提供您想要用于结果的Django缓存的替代名称。结果默认TTL为60分钟,但您可以使用FUTURES_RESULT_TTL设置进行调整。
* 注意,如果您使用非常短的TTL并在它过期后开始轮询,您将永远看不到结果。此外,如果您使用wait,您将无限期地等待。
Future通过使用Django管理命令启动的守护程序执行。
$ python manage.py futures_executor --help usage: manage.py futures_executor [-h] [--version] [-v {0,1,2,3}] [--settings SETTINGS] [--pythonpath PYTHONPATH] [--traceback] [--no-color] [--queue_name QUEUE_NAME] [--once] [--wait WAIT] Daemon to execute futures. optional arguments: -h, --help show this help message and exit --version show program's version number and exit -v {0,1,2,3}, --verbosity {0,1,2,3} Verbosity level; 0=minimal output, 1=normal output, 2=verbose output, 3=very verbose output --settings SETTINGS The Python path to a settings module, e.g. "myproject.settings.main". If this isn't provided, the DJANGO_SETTINGS_MODULE environment variable will be used. --pythonpath PYTHONPATH A directory to add to the Python path, e.g. "/home/djangoprojects/myproject". --traceback Raise on CommandError exceptions --no-color Don't colorize the command output. --queue_name QUEUE_NAME The queue to monitor. default: futures.FutureQueue --once Run one, then exit. --wait WAIT Wait time. Useful with --once.
一些Future统计数据也存储在您的Postgres数据库中,用于报告目的。
from django_tpq.futures.models import FutureStat
FutureStat.objects.all()
FutureStat 模型具有以下字段。
name - 未来的 python 模块.function。
running - 当前正在执行此类未来的数量。
total - 已执行此类未来的总数。
failed - 导致异常的未来的数量。
last_seen - 最近的未来执行的时间戳。
first_seen - 最早的未来执行的的时间戳。
作为一个模型,您可以使用 Django ORM 以任何您认为合适的方式报告这些字段。
项目详情
django_tpq-1.9.tar.gz 的散列
算法 | 散列摘要 | |
---|---|---|
SHA256 | 7c6985ff1a12c2373d403678d5fe4b603bc9267cdbcdc89bbf8e8fabc2642227 |
|
MD5 | c53e01b3c561d9175ee83af68c265db9 |
|
BLAKE2b-256 | 63bb019783c009bb4a46d2f1a2494d00867d33edd9b6c7d06d6caa663886d171 |