跳转到主要内容

Celery的缺失测试库。

项目描述

Celery Python库提供的缺失测试库。graveolens可以帮助你完成以下任务:

  • 在不触及你的代理的情况下为Celery任务调用提供结果。

  • 确保你知道调用的是哪些任务。

  • 轻松断言任务调用的参数。

  • 在Celery中使用send_task时轻松处理结果。

Celery的双名法是Apium graveolens

返回结果

from my_app.celery import app
import graveolens

def test_my_task():
    with graveolens.activate() as celery:
        celery.add('my_app.task', {'done': True, 'status': 'OK'})

        result = app.send_task('my_app.task', 'test', id=3)

        # The result is an EagerResult from Celery.
        assert result.get() == {'done': True, 'status': 'OK'}

        # You can also check ALL the calls that Celery received.
        assert len(celery.calls) == 1
        assert celery.calls[0].name == 'http://twitter.com/api/1/foobar'
        assert celery.calls[0].args == ('test', )
        assert celery.calls[0].kwargs == {'id': 3}

断言Celery调用

默认情况下,如果添加了未使用的结果,则在上下文管理器退出时将引发一个AssertionError,例如:

import graveolens

def test_my_task():
    with graveolens.activate() as celery:
        celery.add('my_app.task')

    # Assertion will be raised here because 'my_app.task' is never called.

这可以通过使用assert_all_tasks_called标志来配置。

此外,如果没有为Celery任务设置结果而调用任务,则将引发graveolens.NotMockedTask

from my_app.celery import app
import graveolens

def test_my_task():
    with graveolens.activate() as celery:
        try:
            result = app.send_task('my_app.task', 'test', id=3)
        except graveolens.NotMockedTask:
            # Exception will be raised since my_app.task has no result.
            pass

项目详情


下载文件

下载适用于您的平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分布

此版本中没有可用的源分布文件。请参阅生成分发存档的教程。

构建分布

graveolens-0.1.4-py2.py3-none-any.whl (4.4 kB 查看哈希值)

上传时间 Python 2 Python 3

支持