工作队列
项目描述
此插件为Odoo添加了一个集成的工作队列。
它允许异步执行的方法调用延迟执行。
工作由后台的Jobrunner执行,在它们自己的事务中。
示例
from odoo import models, fields, api
class MyModel(models.Model):
_name = 'my.model'
def my_method(self, a, k=None):
_logger.info('executed with a: %s and k: %s', a, k)
class MyOtherModel(models.Model):
_name = 'my.other.model'
def button_do_stuff(self):
self.env['my.model'].with_delay().my_method('a', k=2)
在上面的代码片段中,当我们调用button_do_stuff时,一个将方法及其参数捕获的工作将被延迟。它将在Jobrunner有空闲槽位时执行,如果没有其他工作运行,这可能是一瞬间的事。
功能
工作视图,工作存储在PostgreSQL中
Jobrunner:执行工作,由于PostgreSQL的NOTIFY而非常高效
通道:为根通道及其子通道提供容量,并在其中隔离工作。例如,允许将重工作限制为一次执行,而轻工作则四次执行。
重试:通过引发异常类型来重试工作的能力
重试模式:前三次尝试,10秒后重试,接下来五次尝试,1分钟后重试,……
工作属性:优先级,预计到达时间(ETA),自定义描述,重试次数
相关操作:在工作视图中链接操作,例如打开工作相关的记录
目录
用例/上下文
Odoo 对任务进行同步处理,例如导入产品列表时,它将把每一行视为一个大的任务。“队列作业”功能允许您将大任务分解成许多较小的任务。
假设您需要更改数千个订单的大量数据,您可以一次性完成,这可能会对服务器造成重负载,并可能影响 Odoo 的性能。使用 queue_job,您可以分解工作为作业,并运行数千个作业(每个订单一个作业)。另一个好处是,如果某一行失败,它不会阻止其他作业的处理,因为作业是独立的。此外,您可以安排作业并设置重试次数。
以下是一些社区使用示例
批量发送发票:[account_invoice_mass_sending](https://github.com/OCA/account-invoicing/tree/17.0/account_invoice_mass_sending)
后台导入数据:[base_import_async](https://github.com/OCA/queue/tree/17.0/base_import_async)
后台导出数据:[base_export_async](https://github.com/OCA/queue/tree/17.0/base_export_async)
使用作业生成合同发票:[contract_queue_job](https://github.com/OCA/contract/tree/17.0/contract_queue_job)
使用作业生成合作伙伴发票:[partner_invoicing_mode](https://github.com/OCA/account-invoicing/tree/17.0/partner_invoicing_mode)
使用作业处理销售自动工作流操作:[sale_automatic_workflow_job](https://github.com/OCA/sale-workflow/tree/17.0/sale_automatic_workflow_job)
安装
请确保安装 requests 库。
配置
使用环境变量和命令行
调整环境变量(可选)
ODOO_QUEUE_JOB_CHANNELS=root:4 或任何其他通道配置。默认为 root:1
如果未设置 xmlrpc_port:ODOO_QUEUE_JOB_PORT=8069
以 --load=web,queue_job 和 --workers 大于 1 的方式启动 Odoo。[1]
使用 Odoo 配置文件
[options]
(...)
workers = 6
server_wide_modules = web,queue_job
(...)
[queue_job]
channels = root:2
通过检查 odoo 日志文件来确认运行器是否正确启动
...INFO...queue_job.jobrunner.runner: starting ...INFO...queue_job.jobrunner.runner: initializing database connections ...INFO...queue_job.jobrunner.runner: queue job runner ready for db <dbname> ...INFO...queue_job.jobrunner.runner: database connections ready
创建作业(例如使用 base_import_async)并观察它们立即并行启动
提示:要启用队列作业的调试日志,请使用 --log-handler=odoo.addons.queue_job:DEBUG
使用
要使用此模块,您需要
转到 作业队列 菜单
开发者
延迟工作
为方法入队作业的最快方法是使用记录或模型上的 with_delay()
def button_done(self):
self.with_delay().print_confirmation_document(self.state)
self.write({"state": "done"})
return True
在这里,方法 print_confirmation_document() 将作为作业异步执行。 with_delay() 可以接受多个参数来更精确地定义作业的执行方式(优先级等)。
传递给正在延迟的方法的所有参数都存储在作业中,并在异步执行时传递给方法,包括 self,因此作业执行期间保持当前记录(警告:上下文不会保留)。
可以在作业之间表达依赖关系。要启动作业图,请在记录或模型上使用 delayable()。以下与 with_delay() 相当,但使用长格式
def button_done(self):
delayable = self.delayable()
delayable.print_confirmation_document(self.state)
delayable.delay()
self.write({"state": "done"})
return True
可延迟对象的方法定回自身,因此可以作为构建器模式使用,在某些情况下允许动态构建作业
def button_generate_simple_with_delayable(self):
self.ensure_one()
# Introduction of a delayable object, using a builder pattern
# allowing to chain jobs or set properties. The delay() method
# on the delayable object actually stores the delayable objects
# in the queue_job table
(
self.delayable()
.generate_thumbnail((50, 50))
.set(priority=30)
.set(description=_("generate xxx"))
.delay()
)
定义依赖关系的最简单方法是使用 Delayable 上的 .on_done(job)
def button_chain_done(self):
self.ensure_one()
job1 = self.browse(1).delayable().generate_thumbnail((50, 50))
job2 = self.browse(1).delayable().generate_thumbnail((50, 50))
job3 = self.browse(1).delayable().generate_thumbnail((50, 50))
# job 3 is executed when job 2 is done which is executed when job 1 is done
job1.on_done(job2.on_done(job3)).delay()
可以通过使用 chain() 和 group() 基本操作将可延迟项连接起来,形成更复杂的图。链表示按顺序执行的作业序列,组表示可以并行执行的作业。使用 chain() 的效果与使用多个嵌套的 on_done() 相同,但更易读。两者可以组合形成图,例如,我们可以将作业 [A] 分组,这将阻塞另一个分组 [B] 的作业。只有当组 [A] 中的所有作业都执行完毕时,组 [B] 的作业才会执行。代码如下所示
from odoo.addons.queue_job.delay import group, chain
def button_done(self):
group_a = group(self.delayable().method_foo(), self.delayable().method_bar())
group_b = group(self.delayable().method_baz(1), self.delayable().method_baz(2))
chain(group_a, group_b).delay()
self.write({"state": "done"})
return True
当作业图中发生故障时,依赖失败作业的作业执行将停止。它们将保持在 wait_dependencies 状态,直到其“父”作业成功。这可以通过两种方式发生:父作业重试并成功第二次尝试,或者父作业被用户手动“设置为完成”。在这两种情况下,依赖关系得到解决,图将继续处理。或者,用户可以取消失败的作业及其所有依赖作业。不依赖失败作业的图中的其他作业将继续执行。
注意:必须在图顶部的可延迟项、链或组上调用 delay()。在上面的示例中,如果它在 group_a 上调用,则 group_b 将永远不会延迟(但会显示警告)。
工作入队选项
优先级:默认为10,越接近0,执行越快
eta:作业的到达时间估计。在指定日期/时间之前不会执行
max_retries:默认为5,在放弃并设置作业状态为“失败”之前的最大重试次数。0的值为无限重试。
description:对作业的人类描述。如果没有设置,描述将从函数文档或方法名称计算得出
channel:用于处理函数的完整通道名称。如果指定,它将覆盖函数上定义的通道
identity_key:唯一标识作业的键,如果指定且尚未运行具有相同键的作业,则不会创建新作业
配置工作默认选项
在早期版本中,可以使用 @job 装饰器配置作业。这现在是过时的,它们可以使用可选的 queue.job.function 和 queue.job.channel XML记录进行配置。
通道示例
<record id="channel_sale" model="queue.job.channel">
<field name="name">sale</field>
<field name="parent_id" ref="queue_job.channel_root" />
</record>
作业函数示例
<record id="job_function_sale_order_action_done" model="queue.job.function">
<field name="model_id" ref="sale.model_sale_order" />
<field name="method">action_done</field>
<field name="channel_id" ref="channel_sale" />
<field name="related_action" eval='{"func_name": "custom_related_action"}' />
<field name="retry_pattern" eval="{1: 60, 2: 180, 3: 10, 5: 300}" />
</record>
name 的一般形式为: <model.name>.method。
通道、相关操作和重试模式选项是可选的,它们在下面有文档说明。
在编写模块时,如果有2个或更多模块添加具有相同名称(和父通道)的作业函数或通道,它们将合并到同一个记录中,即使它们的xmlids不同。在卸载时,当所有使用它的模块都卸载时,合并的记录将被删除。
作业函数:模型
如果函数在抽象模型中定义,您不能编写 <field name="model_id" ref="xml_id_of_the_abstract_model"</field>,但必须为从抽象模型继承的每个模型定义一个函数。
作业函数:通道
作业将延迟的通道。默认通道是 root。
作业函数:相关操作
相关操作 在作业视图中显示为按钮。按钮将执行定义的操作。
默认情况下是打开与工作相关的记录视图(只有一个记录时为表单视图,多个记录为列表视图)。在许多情况下,默认的相关操作就足够了,不需要自定义,但可以通过在作业函数上提供字典来自定义。
{
"enable": False,
"func_name": "related_action_partner",
"kwargs": {"name": "Partner"},
}
enable:当设置为False时,按钮没有效果(默认:True)
func_name:在queue.job上返回操作的方法名
kwargs:传递给相关操作方法的其他参数
相关操作代码示例
class QueueJob(models.Model):
_inherit = 'queue.job'
def related_action_partner(self, name):
self.ensure_one()
model = self.model_name
partner = self.records
action = {
'name': name,
'type': 'ir.actions.act_window',
'res_model': model,
'view_type': 'form',
'view_mode': 'form',
'res_id': partner.id,
}
return action
作业函数:重试模式
当作业因可重试的错误类型失败时,它会自动稍后重试。默认情况下,重试总是在10分钟后进行。
可以在作业函数上配置重试模式。模式表示的是“从X次尝试开始,推迟Y秒”。它表示为一个字典,其中键是尝试次数,值是推迟秒数(整数)
{
1: 10,
5: 20,
10: 30,
15: 300,
}
根据此配置,我们可以得出以下结论:
前5次重试推迟10秒后进行
第5至第10次重试推迟20秒后进行
第10至第15次重试推迟30秒后进行
所有后续重试推迟5分钟后进行
作业上下文
作业记录集的上下文,或作业参数中传递的任何记录集,根据允许列表传输到作业。
默认的允许列表是("tz","lang","allowed_company_ids","force_company","active_test")。可以在Base._job_prepare_context_before_enqueue_keys中进行自定义。绕过正在运行的Odoo上的作业
当您在开发时(例如:连接器模块),可能希望绕过队列作业并立即运行您的代码。
为此,您可以在您的环境中设置QUEUE_JOB__NO_DELAY=1。
在测试中绕过作业
在编写与作业相关的方法的测试时,处理延迟记录集总是很棘手。为了使您的测试生活更轻松,您可以在上下文中设置queue_job__no_delay=True。
提示:您可以在测试用例级别这样做
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.env = cls.env(context=dict(
cls.env.context,
queue_job__no_delay=True, # no jobs thanks
))
然后,您的所有测试都会同步执行作业方法,而不延迟任何作业。
测试
断言已排队的作业
测试作业的推荐方法是,而不是直接同步运行它们,将测试分为两部分
一个测试,其中作业被模拟(使用trap_jobs()捕获作业,并且测试只验证作业已根据预期的参数延迟
一个只同步调用作业方法的测试,以验证该方法的正确行为
这样做的意义在于,您可以证明作业将在运行时正确地入队,并确保您的代码在测试和生产环境中具有不同的行为(因为同步运行作业可能具有不同的行为,因为它们在同一个事务中/方法中间)。此外,它提供了更多控制权,可以在调用作业方法时传递您想要的参数(这次是在第二种类型的测试中同步调用),并使测试更小。
在排队的作业上进行此类断言的最佳方法是使用odoo.addons.queue_job.tests.common.trap_jobs()。
一个非常小的示例(更多细节在tests/common.py中)
# code
def my_job_method(self, name, count):
self.write({"name": " ".join([name] * count)
def method_to_test(self):
count = self.env["other.model"].search_count([])
self.with_delay(priority=15).my_job_method("Hi!", count=count)
return count
# tests
from odoo.addons.queue_job.tests.common import trap_jobs
# first test only check the expected behavior of the method and the proper
# enqueuing of jobs
def test_method_to_test(self):
with trap_jobs() as trap:
result = self.env["model"].method_to_test()
expected_count = 12
trap.assert_jobs_count(1, only=self.env["model"].my_job_method)
trap.assert_enqueued_job(
self.env["model"].my_job_method,
args=("Hi!",),
kwargs=dict(count=expected_count),
properties=dict(priority=15)
)
self.assertEqual(result, expected_count)
# second test to validate the behavior of the job unitarily
def test_my_job_method(self):
record = self.env["model"].browse(1)
record.my_job_method("Hi!", count=12)
self.assertEqual(record.name, "Hi! Hi! Hi! Hi! Hi! Hi! Hi! Hi! Hi! Hi! Hi! Hi!")
如果您愿意,您仍然可以在单个测试中测试整个流程,通过在您的测试中调用jobs_tester.perform_enqueued_jobs()。
def test_method_to_test(self):
with trap_jobs() as trap:
result = self.env["model"].method_to_test()
expected_count = 12
trap.assert_jobs_count(1, only=self.env["model"].my_job_method)
trap.assert_enqueued_job(
self.env["model"].my_job_method,
args=("Hi!",),
kwargs=dict(count=expected_count),
properties=dict(priority=15)
)
self.assertEqual(result, expected_count)
trap.perform_enqueued_jobs()
record = self.env["model"].browse(1)
record.my_job_method("Hi!", count=12)
self.assertEqual(record.name, "Hi! Hi! Hi! Hi! Hi! Hi! Hi! Hi! Hi! Hi! Hi! Hi!")
在运行Odoo时同步执行作业
当您在开发时(例如:连接器模块),可能希望绕过队列作业并立即运行您的代码。
为此,您可以在您的环境中设置QUEUE_JOB__NO_DELAY=1。
警告
不要在生产环境中这样做
在测试中同步执行作业
您应该使用 trap_jobs,真的,但如果由于任何原因您不能使用它,并且仍然需要在测试中同步执行作业方法,您可以通过在上下文中设置 queue_job__no_delay=True 来做到这一点。
提示:您可以在测试用例级别这样做
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.env = cls.env(context=dict(
cls.env.context,
queue_job__no_delay=True, # no jobs thanks
))
然后,您的所有测试都会同步执行作业方法,而不延迟任何作业。
在测试中,您需要像这样静音日志
@mute_logger(‘odoo.addons.queue_job.models.base’)
注意
在作业的图表中,queue_job__no_delay 上下文键必须在图表中至少一个作业的 env 中,整个图表才能同步执行
技巧和技巧
幂等性 (https://www.restapitutorial.com/lessons/idempotency.html):queue_job 应该是幂等的,这样它们可以在不影响数据的情况下重试多次。
作业应在开始时测试其相关性:作业将被执行的时刻是未知的。因此,作业的第一个任务应该是检查相关的作业在执行时是否仍然相关。
模式
随着时间的推移,出现了两种主要模式
对于暴露给用户的数据库,模型应该存储数据,并且模型应该是作业的创建者。作业对用户保持隐藏
对于不向用户暴露的技术数据,通常可以直接创建带有作为作业参数传递的数据的作业,而不需要中间模型。
已知问题/路线图
在创建新的数据库或将 queue_job 安装在现有数据库上后,Odoo 必须重新启动,以便运行器检测到它。
当 Odoo 正常关闭时,它会等待正在运行的任务完成。然而,当 Odoo 服务器崩溃或被强制停止时,正在运行的任务会中断,而运行器没有机会知道它们已经被中止。在这种情况下,作业可能会在 Odoo 服务器停止后保持 started 或 enqueued 状态。由于运行器无法知道它们实际上是否在运行,并且无法确定是否可以安全地重新启动作业,因此它不会自动尝试重新启动它们。因此,这些过时的作业会填充运行队列,并阻止其他作业启动。因此,您必须手动重新排队,无论是从作业视图还是通过运行以下 SQL 语句 在启动 Odoo 之前
update queue_job set state='pending' where state in ('started', 'enqueued')
变更日志
接下来
[ADD] 将 jobrunner 作为工作进程而不是主进程中的线程运行(当使用 –workers > 0 运行时)
[REF] @job 和 @related_action 已弃用,任何方法都可以延迟,并使用 queue.job.function 记录进行配置
[迁移] 从 rev. e24ff4b 分支的 13.0
错误跟踪器
错误在 GitHub Issues 上跟踪。如果遇到问题,请检查是否已报告了您的问题。如果您是第一个发现它的人,请通过提供详细且受欢迎的 反馈 来帮助我们将其摧毁。
请不要直接联系贡献者以寻求支持或帮助解决技术问题。
致谢
贡献者
Guewen Baconnier <guewen.baconnier@camptocamp.com>
Stéphane Bidoul <stephane.bidoul@acsone.eu>
Matthieu Dietrich <matthieu.dietrich@camptocamp.com>
Jos De Graeve <Jos.DeGraeve@apertoso.be>
David Lefever <dl@taktik.be>
Laurent Mignon <laurent.mignon@acsone.eu>
Laetitia Gangloff <laetitia.gangloff@acsone.eu>
Cédric Pigeon <cedric.pigeon@acsone.eu>
塔蒂亚娜·德里比娜 <tatiana.deribina@avoin.systems>
苏海尔·贝贾乌伊 <souheil.bejaoui@acsone.eu>
埃里克·安托内斯 <eantones@nuobit.com>
西蒙内·奥西 <simone.orsi@camptocamp.com>
阮明谦 <chien@trobz.com>
维护者
此模块由OCA维护。
OCA,即Odoo社区协会,是一个非营利组织,其使命是支持Odoo功能的协作开发并促进其广泛应用。
当前维护者
此模块是GitHub上的OCA/queue项目的一部分。
欢迎您贡献力量。有关如何贡献的信息,请访问https://odoo-community.org/page/Contribute。
项目详情
哈希值 for odoo_addon_queue_job-17.0.1.0.5.1-py3-none-any.whl
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 0bbcd2d6916076902aedb85d4cc51c9d7d0f35d112eb69034ffc8b7ce25a1d08 |
|
MD5 | 84d63b134f0d2416747a29fffaff466a |
|
BLAKE2b-256 | db7fc1f4edee828baf6159da98ff2dd9afbafbd3b0eb83c4c7d776e687b2557d |