跳转到主要内容

作业队列

项目描述

Mature License: LGPL-3 OCA/queue Translate me on Weblate Try me on Runboat

此插件为Odoo添加了集成的作业队列。

它允许延迟异步执行的方法调用。

作业由Jobrunner在后台执行,在自己的事务中。

示例

from odoo import models, fields, api

class MyModel(models.Model):
   _name = 'my.model'

   def my_method(self, a, k=None):
       _logger.info('executed with a: %s and k: %s', a, k)


class MyOtherModel(models.Model):
    _name = 'my.other.model'

    def button_do_stuff(self):
        self.env['my.model'].with_delay().my_method('a', k=2)

在上面的代码片段中,当我们调用button_do_stuff时,一个捕获方法及其参数的作业将被延迟。它将在Jobrunner有可用桶时执行,如果没有其他作业正在运行,这可能是瞬间的。

功能

  • 作业视图,作业存储在PostgreSQL中

  • Jobrunner:执行作业,由于PostgreSQL的NOTIFY而非常高效

  • 通道:为根通道及其子通道提供容量,并在此通道中隔离作业。例如,可以限制重作业一次执行,而轻作业则四次执行。

  • 重试:通过引发异常类型来重试作业的能力

  • 重试模式:前三次尝试,10秒后重试,接下来的五次尝试,1分钟后重试,……

  • 作业属性:优先级,预计到达时间(ETA),自定义描述,重试次数

  • 相关操作:在作业视图中链接操作,例如打开由作业关心的记录

目录

安装

请确保拥有 requests 库。

配置

  • 使用环境变量和命令行

    • 调整环境变量(可选)

      • ODOO_QUEUE_JOB_CHANNELS=root:4 或任何其他通道配置。默认为 root:1

      • 如果未设置 xmlrpc_portODOO_QUEUE_JOB_PORT=8069

    • 使用 --load=web,queue_job--workers 大于 1 来启动 Odoo。 [1]

  • 使用 Odoo 配置文件

[options]
(...)
workers = 6
server_wide_modules = web,queue_job

(...)
[queue_job]
channels = root:2
  • 通过检查 Odoo 日志文件来确认运行器是否正确启动

...INFO...queue_job.jobrunner.runner: starting
...INFO...queue_job.jobrunner.runner: initializing database connections
...INFO...queue_job.jobrunner.runner: queue job runner ready for db <dbname>
...INFO...queue_job.jobrunner.runner: database connections ready
  • 创建作业(例如使用 base_import_async)并观察它们立即并行启动。

  • 提示:要启用队列作业的调试日志记录,使用 --log-handler=odoo.addons.queue_job:DEBUG

使用

要使用此模块,您需要

  1. 转到 作业队列 菜单

开发者

延迟作业

将作业加入队列的一种快速方法是使用记录或模型上的 with_delay()

def button_done(self):
    self.with_delay().print_confirmation_document(self.state)
    self.write({"state": "done"})
    return True

在此,方法 print_confirmation_document() 将作为作业异步执行。 with_delay() 可以接受多个参数来更精确地定义作业的执行方式(优先级等)。

传递给被延迟方法的所有参数都存储在作业中,并在异步执行时传递给该方法,包括 self,因此在作业执行期间保持当前记录(警告:上下文不保留)。

可以在作业之间表达依赖关系。要启动作业图,请使用记录或模型上的 delayable()。以下与 with_delay() 等效,但使用长形式

def button_done(self):
    delayable = self.delayable()
    delayable.print_confirmation_document(self.state)
    delayable.delay()
    self.write({"state": "done"})
    return True

Delayable 对象的方法返回自身,因此它可以用作构建器模式,在某些情况下允许动态构建作业

def button_generate_simple_with_delayable(self):
    self.ensure_one()
    # Introduction of a delayable object, using a builder pattern
    # allowing to chain jobs or set properties. The delay() method
    # on the delayable object actually stores the delayable objects
    # in the queue_job table
    (
        self.delayable()
        .generate_thumbnail((50, 50))
        .set(priority=30)
        .set(description=_("generate xxx"))
        .delay()
    )

定义依赖关系的最简单方法是使用 Delayable 上的 .on_done(job)

def button_chain_done(self):
    self.ensure_one()
    job1 = self.browse(1).delayable().generate_thumbnail((50, 50))
    job2 = self.browse(1).delayable().generate_thumbnail((50, 50))
    job3 = self.browse(1).delayable().generate_thumbnail((50, 50))
    # job 3 is executed when job 2 is done which is executed when job 1 is done
    job1.on_done(job2.on_done(job3)).delay()

可以使用 chain()group() 基本操作将 Delayable 连接起来,形成更复杂的图。链表示按顺序执行的一系列作业,组表示可以并行执行的作业。使用 chain() 的效果与使用多个嵌套 on_done() 相同,但更易读。两者可以组合形成图,例如,我们可以将 [A] 作业分组,这将阻止另一个 [B] 作业分组。只有当 [A] 作业组中的所有作业都执行完毕时,[B] 作业组的作业才会执行。代码如下

from odoo.addons.queue_job.delay import group, chain

def button_done(self):
    group_a = group(self.delayable().method_foo(), self.delayable().method_bar())
    group_b = group(self.delayable().method_baz(1), self.delayable().method_baz(2))
    chain(group_a, group_b).delay()
    self.write({"state": "done"})
    return True

当作业图中发生失败时,依赖于失败作业的作业执行停止。它们将保持 wait_dependencies 状态,直到它们的“父”作业成功。这可以通过两种方式发生:父作业重试并在第二次尝试中成功,或者用户手动将父作业“设置为完成”。在这两种情况下,依赖关系得到解决,图将继续处理。或者,用户可以取消失败作业及其所有依赖作业。在所有情况下,图中的其他作业将继续执行,这些作业不依赖于失败的作业。

注意:delay()必须在图顶部的可延迟项、链或组上调用。在上面的示例中,如果它在group_a上调用,那么group_b将永远不会被延迟(但会显示警告)。

作业队列选项

  • 优先级:默认为10,越接近0,执行速度越快

  • eta:作业到达的估计时间。在此日期/时间之前不会执行

  • max_retries:默认为5,在放弃并将作业状态设置为‘失败’之前的最大重试次数。0的值表示无限重试。

  • 描述:作业的人类描述。如果没有设置,则描述将从函数文档或方法名称计算得出

  • channel:用于处理函数的通道的完整名称。如果指定,则覆盖函数上定义的通道

  • identity_key:唯一标识作业的密钥,如果指定并且尚未运行具有相同键的作业,则不会创建新作业

配置作业的默认选项

在早期版本中,可以使用@job装饰器配置作业。现在已弃用,它们可以使用可选的queue.job.functionqueue.job.channel XML记录进行配置。

通道示例

<record id="channel_sale" model="queue.job.channel">
    <field name="name">sale</field>
    <field name="parent_id" ref="queue_job.channel_root" />
</record>

作业函数示例

<record id="job_function_sale_order_action_done" model="queue.job.function">
    <field name="model_id" ref="sale.model_sale_order" />
    <field name="method">action_done</field>
    <field name="channel_id" ref="channel_sale" />
    <field name="related_action" eval='{"func_name": "custom_related_action"}' />
    <field name="retry_pattern" eval="{1: 60, 2: 180, 3: 10, 5: 300}" />
</record>

name的一般形式是:<model.name>.method

通道、相关动作和重试模式选项是可选的,它们在下面有说明。

在编写模块时,如果有2个或更多模块添加了具有相同名称(和通道的父项)的作业函数或通道,它们将在相同的记录中合并,即使它们有不同的xmlids。在卸载时,当所有使用它的模块都卸载时,合并的记录将被删除。

作业函数:模型

如果函数在抽象模型中定义,则不能写入<field name="model_id" ref="xml_id_of_the_abstract_model"</field>,但必须为从抽象模型继承的每个模型定义一个函数。

作业函数:通道

作业将延迟的通道。默认通道是root

作业函数:相关动作

“相关动作”作为作业视图上的按钮出现。该按钮将执行定义的动作。

默认情况下,它将打开与作业相关的记录的视图(当只有一个记录时为表单视图,有多个记录时为列表视图)。在许多情况下,默认的相关动作就足够了,不需要定制,但可以通过在作业函数上提供一个字典来定制。

{
    "enable": False,
    "func_name": "related_action_partner",
    "kwargs": {"name": "Partner"},
}
  • enable:当False时,按钮没有效果(默认:True

  • func_namequeue.job上返回动作的方法的名称

  • kwargs:传递给相关动作方法的额外参数

相关动作代码示例

class QueueJob(models.Model):
    _inherit = 'queue.job'

    def related_action_partner(self, name):
        self.ensure_one()
        model = self.model_name
        partner = self.records
        action = {
            'name': name,
            'type': 'ir.actions.act_window',
            'res_model': model,
            'view_type': 'form',
            'view_mode': 'form',
            'res_id': partner.id,
        }
        return action

作业函数:重试模式

当作业因可重试的错误类型失败时,它会自动稍后重试。默认情况下,重试总是10分钟后。

可以在作业函数上配置重试模式。模式表示“从X次尝试开始,推迟Y秒”。它表示为一个字典,其中键是尝试次数,值是推迟秒数的整数。

{
    1: 10,
    5: 20,
    10: 30,
    15: 300,
}

基于此配置,我们可以知道

  • 前5次重试推迟10秒后

  • 第5到第10次重试推迟20秒后

  • 第10到第15次重试推迟30秒后

  • 所有后续重试推迟5分钟后

作业上下文

作业记录集的上下文,或传递给作业的任何记录集,将根据允许列表转移到作业中。

为了向后兼容,默认的允许列表为空。允许列表可以在Base._job_prepare_context_before_enqueue_keys中进行自定义。

示例

class Base(models.AbstractModel):

    _inherit = "base"

    @api.model
    def _job_prepare_context_before_enqueue_keys(self):
        """Keys to keep in context of stored jobs

        Empty by default for backward compatibility.
        """
        return ("tz", "lang", "allowed_company_ids", "force_company", "active_test")

跳过正在运行的Odoo上的作业

当你在开发(例如:连接器模块)时,你可能想绕过队列作业并立即运行你的代码。

要做到这一点,你可以在你的环境中设置QUEUE_JOB__NO_DELAY=1

跳过测试中的作业

当编写与作业相关的方法的测试时,处理延迟的记录集总是很棘手。为了使你的测试生活更轻松,你可以在上下文中设置queue_job__no_delay=True

提示:你可以在测试用例级别这样做

@classmethod
def setUpClass(cls):
    super().setUpClass()
    cls.env = cls.env(context=dict(
        cls.env.context,
        queue_job__no_delay=True,  # no jobs thanks
    ))

然后,所有测试都同步执行作业方法,不会延迟任何作业。

测试

断言已入队的作业

测试作业的推荐方式,而不是直接和同步地运行它们,是将测试分成两部分

  • 一个测试中作业被模拟(使用trap_jobs()捕获作业,并且测试只验证作业已使用预期的参数进行延迟)

  • 另一个测试仅同步调用作业的方法,以验证此方法的正确行为

这样做意味着你可以证明作业在运行时将正确地入队,并确保你的代码在测试和生产环境中具有不同的行为(因为同步运行作业可能会有不同的行为,因为它们位于同一事务中/方法中间)。此外,它还提供了对在调用作业方法时(这次是第二种测试类型中的同步调用)想要传递的参数的更多控制,并使测试更小。

在入队作业上运行此类断言的最佳方式是使用odoo.addons.queue_job.tests.common.trap_jobs()

在这个上下文管理器内部,作业不是添加到数据库的队列中,而是推入一个内存列表。上下文管理器然后提供有用的辅助工具来验证作业是否已使用预期的参数入队。它甚至可以同步运行其列表中的作业!详细信息请参阅odoo.addons.queue_job.tests.common.JobsTester

一个非常小的示例(更多详细信息请参阅tests/common.py

# code
def my_job_method(self, name, count):
    self.write({"name": " ".join([name] * count)

def method_to_test(self):
    count = self.env["other.model"].search_count([])
    self.with_delay(priority=15).my_job_method("Hi!", count=count)
    return count

# tests
from odoo.addons.queue_job.tests.common import trap_jobs

# first test only check the expected behavior of the method and the proper
# enqueuing of jobs
def test_method_to_test(self):
    with trap_jobs() as trap:
        result = self.env["model"].method_to_test()
        expected_count = 12

        trap.assert_jobs_count(1, only=self.env["model"].my_job_method)
        trap.assert_enqueued_job(
            self.env["model"].my_job_method,
            args=("Hi!",),
            kwargs=dict(count=expected_count),
            properties=dict(priority=15)
        )
        self.assertEqual(result, expected_count)


 # second test to validate the behavior of the job unitarily
 def test_my_job_method(self):
     record = self.env["model"].browse(1)
     record.my_job_method("Hi!", count=12)
     self.assertEqual(record.name, "Hi! Hi! Hi! Hi! Hi! Hi! Hi! Hi! Hi! Hi! Hi! Hi!")

如果你愿意,你仍然可以在单个测试中测试整个流程,通过在测试中调用jobs_tester.perform_enqueued_jobs()

def test_method_to_test(self):
    with trap_jobs() as trap:
        result = self.env["model"].method_to_test()
        expected_count = 12

        trap.assert_jobs_count(1, only=self.env["model"].my_job_method)
        trap.assert_enqueued_job(
            self.env["model"].my_job_method,
            args=("Hi!",),
            kwargs=dict(count=expected_count),
            properties=dict(priority=15)
        )
        self.assertEqual(result, expected_count)

        trap.perform_enqueued_jobs()

        record = self.env["model"].browse(1)
        record.my_job_method("Hi!", count=12)
        self.assertEqual(record.name, "Hi! Hi! Hi! Hi! Hi! Hi! Hi! Hi! Hi! Hi! Hi! Hi!")

在运行Odoo时同步执行作业

当你在开发(例如:连接器模块)时,你可能想绕过队列作业并立即运行你的代码。

要做到这一点,你可以在你的环境中设置QUEUE_JOB__NO_DELAY=1

在测试中同步执行作业

你应该使用trap_jobs,但如果你出于任何原因无法使用它,并且仍然需要在测试中同步执行作业方法,你可以通过在上下文中设置queue_job__no_delay=True来实现。

提示:你可以在测试用例级别这样做

@classmethod
def setUpClass(cls):
    super().setUpClass()
    cls.env = cls.env(context=dict(
        cls.env.context,
        queue_job__no_delay=True,  # no jobs thanks
    ))

然后,所有测试都同步执行作业方法,不会延迟任何作业。

在测试中,你必须像这样静音记录器

@mute_logger(‘odoo.addons.queue_job.models.base’)

技巧与窍门

  • 幂等性 (https://www.restapitutorial.com/lessons/idempotency.html):queue_job应该是幂等的,这样它们可以多次重试而不会对数据产生影响。

  • 作业应在非常开始时测试其相关性:作业将被执行的时间是未知的,这是设计上的。因此,作业的第一个任务应该是检查相关的作业在执行时是否仍然相关。

模式

随着时间的推移,出现了两种主要模式

  1. 对于向用户公开的数据,模型应存储数据,并且模型应是作业的创建者。作业对用户隐藏

  2. 对于不向用户暴露的技术数据,通常可以直接创建工作,将数据作为参数传递给工作,而无需中间模型。

已知问题/路线图

  • 在创建新的数据库或在现有数据库上安装 queue_job 之后,Odoo 必须重新启动,以便运行器能够检测到它。

  • 当 Odoo 正常关闭时,它会等待运行中的工作完成。然而,当 Odoo 服务器崩溃或被强制停止时,运行中的工作会被中断,而运行器根本无法知道它们已被中止。在这种情况下,工作可能仍然处于 startedenqueued 状态,直到 Odoo 服务器停止。由于运行器无法知道它们是否实际上正在运行,并且无法确定是否可以安全地重新启动工作,因此它不会尝试自动重新启动它们。因此,这些陈旧的工作会填满运行队列,并阻止其他工作启动。因此,您必须手动重新排队,无论是从工作视图,还是通过在启动 Odoo 之前运行以下 SQL 语句。

update queue_job set state='pending' where state in ('started', 'enqueued')

变更日志

下一步

  • [ADD] 将 jobrunner 作为工作进程运行,而不是主进程中的线程(当使用 –workers > 0 运行时)

  • [REF] @job@related_action 已弃用,任何方法都可以延迟,并使用 queue.job.function 记录进行配置

  • [MIGRATION] 从 13.0 分支到 rev. e24ff4b

问题追踪器

在 GitHub Issues 上跟踪错误。[a href="https://github.com/OCA/queue/issues" rel="nofollow">GitHub Issues。如果遇到问题,请检查是否已报告您的问题。如果您是第一个发现它的人,请通过提供详细和受欢迎的 反馈 帮助我们将其摧毁。

请不要直接联系贡献者以获取支持或技术问题的帮助。

鸣谢

作者

  • Camptocamp

  • ACSONE SA/NV

贡献者

维护者

此模块由 OCA 维护。

Odoo Community Association

OCA,即 Odoo 社区协会,是一个非营利组织,其使命是支持 Odoo 功能的协作开发并促进其广泛使用。

当前 维护者

guewen

此模块是 GitHub 上 OCA/queue 项目的组成部分。

欢迎您做出贡献。有关如何操作的更多信息,请访问 https://odoo-community.org/page/Contribute

项目详细信息


发布历史 发布通知 | RSS 源

下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源代码分发

本版本没有可用的源代码分发文件。请参阅生成分发存档的教程。

构建分发

odoo14_addon_queue_job-14.0.3.8.0-py3-none-any.whl (320.6 kB 查看散列值)

上传时间 Python 3

由...

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面