跳转到主要内容

Django的简单Postgres队列

项目描述

Travis CI Status Code Coverage https://badge.fury.io/py/django-tpq.svg

Django的简单Postgres队列

这是一个集成了tpq的Django应用程序。此应用程序提供了基本的消息队列功能以及高级的future实现。

消息队列

要实现消息队列,首先必须创建一个从抽象模型BaseQueue派生的模型。完成后,为该模型创建一个迁移,然后编辑迁移以添加一些额外的数据库对象(tpq为您这样做,但您必须从迁移中调用它)。然后迁移并使用您的队列。

from django_tpq.main.models import BaseQueue

class MyQueue(BaseQueue):
    pass
$ python manage.py makemigrations

现在编辑迁移,并添加RunPython步骤,就像处理futures的初始迁移一样。您还需要在forward函数中自定义模型名称。

$ python manage.py migrate
from myapp.models import MyQueue

MyQueue.objects.enqueue({'field': 'value'})
message = MyQueue.objects.dequeue()

Future

以以上为基础,提供了一个简单的Future实现。首先,您必须将您希望异步调用的任何函数注册为future。然后调用该future。您可以等待或轮询结果。

import time

from django_tpq.futures.decorators import future


@future()
def long_running_function(*args):
    time.sleep(100)

# You can execute the future without waiting for a result. This returns
# immediately and your future runs within another process (fire and forget).
long_running_function.async('argument_1')

# Or you can poll for the results (or check after you do some other work).
f = long_running_function.async('argument_1')

while True:
    try:
        r = f.result()
    except Exception:
        # Exceptions are re-raised.
        LOGGER.exception('Future failed', exc_info)
        break
    else:
        break
    time.sleep(10)

print(r)

# Or optionally, you can block waiting for the result.
f = long_running_function.call('argument_1')

try:
    r = f.result(wait=0)
except Exception:
    # Exceptions are re-raised.
    LOGGER.exception('Future failed', exc_info)

print(r)

函数调用通过消息队列进行分发。参数被序列化,因此您可以发送任何可序列化的Python对象。结果通过您配置的缓存进行交付。默认情况下使用default缓存,但您可以使用FUTURES_RESULT_CACHE设置提供您想要用于结果的Django缓存的替代名称。结果默认TTL为60分钟,但您可以使用FUTURES_RESULT_TTL设置进行调整。

* 注意,如果您使用非常短的TTL并在它过期后开始轮询,您将永远看不到结果。此外,如果您使用wait,您将无限期地等待。

Future通过使用Django管理命令启动的守护程序执行。

$ python manage.py futures_executor --help
usage: manage.py futures_executor [-h] [--version] [-v {0,1,2,3}]
                                  [--settings SETTINGS]
                                  [--pythonpath PYTHONPATH] [--traceback]
                                  [--no-color] [--queue_name QUEUE_NAME]
                                  [--once] [--wait WAIT]

Daemon to execute futures.

optional arguments:
  -h, --help            show this help message and exit
  --version             show program's version number and exit
  -v {0,1,2,3}, --verbosity {0,1,2,3}
                        Verbosity level; 0=minimal output, 1=normal output,
                        2=verbose output, 3=very verbose output
  --settings SETTINGS   The Python path to a settings module, e.g.
                        "myproject.settings.main". If this isn't provided, the
                        DJANGO_SETTINGS_MODULE environment variable will be
                        used.
  --pythonpath PYTHONPATH
                        A directory to add to the Python path, e.g.
                        "/home/djangoprojects/myproject".
  --traceback           Raise on CommandError exceptions
  --no-color            Don't colorize the command output.
  --queue_name QUEUE_NAME
                        The queue to monitor. default: futures.FutureQueue
  --once                Run one, then exit.
  --wait WAIT           Wait time. Useful with --once.

一些Future统计数据也存储在您的Postgres数据库中,用于报告目的。

from django_tpq.futures.models import FutureStat

FutureStat.objects.all()

FutureStat 模型具有以下字段。

  • name - 未来的 python 模块.function。

  • running - 当前正在执行此类未来的数量。

  • total - 已执行此类未来的总数。

  • failed - 导致异常的未来的数量。

  • last_seen - 最近的未来执行的时间戳。

  • first_seen - 最早的未来执行的的时间戳。

作为一个模型,您可以使用 Django ORM 以任何您认为合适的方式报告这些字段。

项目详情


下载文件

下载您平台对应的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。

源分布

django_tpq-1.9.tar.gz (3.7 kB 查看散列)

上传时间

由以下支持