跳转到主要内容

hurry.workflow是一个简单的流程系统。它可以用于为Zope Toolkit应用程序实现具有状态的多版本工作流。

项目描述

hurry.workflow

一个简单但相当巧妙的Zope 3工作流系统。

变更日志

3.0.2 (2018-01-16)

  • 更新pypi描述的渲染。

3.0.1 (2018-01-16)

  • 修复一个棕色纸袋版本。

3.0.0 (2018-01-16)

  • 更新依赖,不再依赖ZODB 3。

  • 支持python3.4、python3.5、python3.6,以及python2.7。

0.13.1 (2013-01-17)

  • 使异常也显示更详细的消息。

0.13 (2013-01-17)

  • NoTransitionAvailableError 获得了 sourcedestination 属性,指示哪个转换不可用。

  • AmbiguousTransitionError 也获得了 sourcedestination 属性,指示哪个转换是模糊的。

  • InvalidTransitionError 获得了 source 属性,指示尝试无效转换的源状态。

  • 更新后的 bootstrap.py

0.12 (2012-02-10)

  • 将 WorkflowInfo 类上的 info() 和 state() 函数转换为类方法,因为它们在其他情况下不太有用。

  • fireTransitionToward 已经接受了一个 check_security=False 参数,但它不会允许用户没有权限的转换被触发,因为转换甚至根本找不到。现在它工作了。

0.11 (2010-04-16)

  • 在 WorkflowState 中只调用一次 DoIAnnotations(self.context)。

  • IWorkflowVersions 实现现在是可选的。

  • 添加了对多个工作流的支持。

0.10 (2009-11-19)

  • 已迁移到 svn.zope.org 进行开发。

  • 添加了 buildout.cfg 和 bootstrap.py

  • 最小化了依赖项。请注意,Workflow 不再继承自 Persistentzope.container.contained.Contained。如果您需要持久化工作流,您需要在您的代码中继承此功能。这破坏了向后兼容性,因为持久化工作流需要重新初始化。

0.9.2.1 (2007-08-15)

  • 哎呀,0.9.2 中的补丁实际上并没有应用。现在已修复。

0.9.2 (2007-08-15)

  • zope.security 的更改破坏了 hurry.workflow 中的导入。

  • localUtility 指令现在是过时的,所以不要再使用它了。

0.9.1 (2006-09-22)

  • 第一个 cheesehop 发布。

0.9 (2006-06-15)

  • 从 hurry 包中分离出来,形成 hurry.workflow

  • eggification 工作

  • Zope 3.3 兼容性工作

0.8 (2006-05-01)

首次公开发布。

详细文档

Hurry 工作流

Hurry 工作流系统是一个“因为着急所以要自己动手”的框架。

基本工作流

让我们首先创建一个可以进入工作流的内容对象

>>> from zope.interface import implementer, Attribute

>>> from zope.annotation.interfaces import IAttributeAnnotatable
>>> class IDocument(IAttributeAnnotatable):
...    title = Attribute('Title')
>>> @implementer(IDocument)
... class Document(object):
...    def __init__(self, title):
...        self.title = title

如您所见,这样的内容对象必须提供 IAnnotatable,因为这将用于存储工作流状态。系统使用 IWorkflowState 适配器来获取和设置对象的工怍流状态

>>> from hurry.workflow import interfaces
>>> document = Document('Foo')
>>> state = interfaces.IWorkflowState(document)
>>> print(state.getState())
None

可以使用 IWorkflowState 适配器直接为对象设置状态

>>> state.setState('foo')
>>> state.getState()
'foo'

但是,让我们再次将其设置为 None,这样我们就可以从干净的状态开始这个文档

>>> state.setState(None)

不建议使用 setState() 自己做这件事:通常我们会让工作流系统负责状态转换和初始状态的设置。

现在让我们定义一个简单的从“a”到“b”的工作流转换。它需要一个条件,该条件在转换被允许发生之前必须返回 True

>>> def NullCondition(wf, context):
...    return True

以及一个在转换发生时执行的动作

>>> def NullAction(wf, context):
...    pass

现在让我们构建一个转换

>>> from hurry.workflow import workflow
>>> transition = workflow.Transition(
...     transition_id='a_to_b',
...     title='A to B',
...     source='a',
...     destination='b',
...     condition=NullCondition,
...     action=NullAction,
...     trigger=interfaces.MANUAL)

转换触发器可以是 MANUAL、AUTOMATIC 或 SYSTEM。MANUAL 表示需要用户操作来触发转换。AUTOMATIC 转换会自动触发。SYSTEM 是由系统直接触发的工作流转换,而不是由用户直接触发。

我们还将引入一个初始转换,将对象移动到工作流中(例如,在它创建后立即进行)

>>> init_transition = workflow.Transition(
...     transition_id='to_a',
...     title='Create A',
...     source=None,
...     destination='a')

以及一个最终转换,当对象再次从工作流中移出时(例如,在它删除前进行)

>>> final_transition = workflow.Transition(
...     transition_id='finalize',
...     title='Delete',
...     source='b',
...     destination=None)

现在让我们将这些转换放入一个工作流实用工具中

>>> wf = workflow.Workflow([transition, init_transition, final_transition])
>>> from zope import component
>>> component.provideUtility(wf, interfaces.IWorkflow)

如果我们需要,我们可以使用 get_transition 从工作流中获取转换

>>> wf.getTransition('a', 'a_to_b') is transition
True

如果我们尝试获取一个不存在的转换,我们会得到一个错误

>>> wf.getTransition('b', 'a_to_b')
Traceback (most recent call last):
  ...
hurry.workflow.interfaces.InvalidTransitionError: source: "b"

>>> from hurry.workflow.interfaces import InvalidTransitionError
>>> try:
...   wf.getTransition('b', 'a_to_b')
... except InvalidTransitionError as e_:
...   e = e_
>>> e.source
'b'

工作流转换会导致事件被触发;我们将添加一个简单的处理程序,以便我们可以检查是否成功触发了事件

>>> events = []
>>> def transition_handler(event):
...     events.append(event)
>>> component.provideHandler(
...     transition_handler,
...     [interfaces.IWorkflowTransitionEvent])

为了获取从对象到其他可能的状态的转换,以及触发转换和设置初始状态,我们使用IWorkflowInfo适配器

>>> info = interfaces.IWorkflowInfo(document)

我们将通过触发'to_a'转换来初始化工作流

>>> info.fireTransition('to_a')

这应该已经触发了一个事件

>>> events[-1].transition.transition_id
'to_a'
>>> events[-1].source is None
True
>>> events[-1].destination
'a'

只定义了一个转换到工作流状态'b'的转换

>>> info.getManualTransitionIds()
['a_to_b']

我们也可以通过询问哪些手动(或系统)转换可以带我们到达所需的工作流状态来获取这个信息

>>> info.getFireableTransitionIdsToward('b')
['a_to_b']

由于这是一个手动触发的转换,我们可以触发这个转换

>>> info.fireTransition('a_to_b')

现在工作流状态应该是'b'了

>>> state.getState()
'b'

我们检查事件是否确实已经触发

>>> events[-1].transition.transition_id
'a_to_b'
>>> events[-1].source
'a'
>>> events[-1].destination
'b'

我们还将在这里尝试fireTransitionToward,因此我们将工作流偷偷地再次恢复到状态'a'并尝试它

>>> state.setState('a')

尝试通过一个我们无法到达的转换

>>> info.fireTransitionToward('c')
Traceback (most recent call last):
...
hurry.workflow.interfaces.NoTransitionAvailableError: source: "a" destination: "c"

这个错误提供了一些信息,说明尝试了哪个转换

>>> from hurry.workflow.interfaces import NoTransitionAvailableError
>>> try:
...   info.fireTransitionToward('c')
... except NoTransitionAvailableError as e_:
...   e = e_
>>> e.source
'a'
>>> e.destination
'c'

现在再次转到'b'状态

>>> info.fireTransitionToward('b')
>>> state.getState()
'b'

最后,在忘记我们的文档之前,我们最终确定工作流

>>> info.fireTransition('finalize')
>>> state.getState() is None
True

我们还有另一个触发的事件

>>> events[-1].transition.transition_id
'finalize'
>>> events[-1].source
'b'
>>> events[-1].destination is None
True

多个工作流

我们之前已将一个工作流注册为未命名的实用工具。您也可以将工作流注册为命名实用工具,为网站提供多个工作流。

让我们创建一个发票文档

>>> class IInvoiceDocument(IDocument):
...    title = Attribute('Title')

>>> @implementer(IInvoiceDocument)
... class InvoiceDocument(object):
...    def __init__(self, title, amount):
...        self.title = title
...        self.amount = amount

我们定义我们的工作流

>>> invoice_init = workflow.Transition(
...     transition_id='init_invoice',
...     title='Invoice Received',
...     source=None,
...     destination='received')
>>>
>>> invoice_paid = workflow.Transition(
...     transition_id='invoice_paid',
...     title='Invoice Paid',
...     source='received',
...     destination='paid')

>>> invoice_wf = workflow.Workflow( [ invoice_init, invoice_paid ] )

我们注册一个新的工作流实用工具,名为“invoice”的WorkflowState和WorkflowInfo适配器

>>> from hurry.workflow import workflow
>>> from zope.annotation import interfaces as annotation_interfaces
>>> component.provideUtility(invoice_wf, interfaces.IWorkflow, name="invoice")
>>> class InvoiceWorkflowInfo(workflow.WorkflowInfo):
...     name="invoice"
>>> component.provideAdapter(
...     InvoiceWorkflowInfo,
...     (annotation_interfaces.IAnnotatable,),
...     interfaces.IWorkflowInfo,
...     name="invoice")
>>> class InvoiceWorkflowState(workflow.WorkflowState):
...     state_key = "invoice.state"
...     id_key  = "invoice.id"
>>> component.provideAdapter(
...     InvoiceWorkflowState,
...     (annotation_interfaces.IAnnotatable,),
...     interfaces.IWorkflowState,
...     name="invoice")

现在我们可以利用工作流了

>>> invoice = InvoiceDocument('abc', 22)

>>> info = component.getAdapter(invoice, interfaces.IWorkflowInfo, name="invoice")
>>> info.fireTransition('init_invoice')
>>> state = component.getAdapter(invoice, interfaces.IWorkflowState, name="invoice")
>>> state.getState()
'received'
>>> info.fireTransition('invoice_paid')
>>> state.getState()
'paid'

为了更容易地获取特定上下文对象的state和info适配器,工作流信息对象上有两个便利函数。信息对象“知道”要查找哪个工作流实用工具,因为它们是按名称关联的

>>> info_ = InvoiceWorkflowInfo.info(invoice)
>>> interfaces.IWorkflowInfo.providedBy(info_)
True

>>> state_ = InvoiceWorkflowInfo.state(invoice)
>>> interfaces.IWorkflowState.providedBy(state_)
True
>>> state.getState() is state_.getState()
True

当然,这个文档始终有默认的未命名工作流

>>> info = interfaces.IWorkflowInfo(invoice)
>>> info.fireTransition('to_a')
>>> state = interfaces.IWorkflowState(invoice)
>>> state.getState()
'a'

多版本工作流

现在让我们考虑一个更复杂的场景,其中有一个文档的多个版本。在任何时候,一个文档都可以有一个未发布的版本和一个已发布的版本。还可以有一个已关闭的版本和任意数量的存档版本

>>> UNPUBLISHED = 'unpublished'
>>> PUBLISHED = 'published'
>>> CLOSED = 'closed'
>>> ARCHIVED = 'archived'

让我们从一个简单的初始转换开始

>>> init_transition = workflow.Transition(
...    transition_id='init',
...    title='Initialize',
...    source=None,
...    destination=UNPUBLISHED)

当未发布的版本发布时,任何之前发布的版本都会变成已关闭的版本。为了完成这个二级状态转换,我们将使用系统的内置版本控制能力,即使用'fireTransitionsForVersions'方法,它可以用来触发文档其他版本的转换

>>> def PublishAction(wf, context):
...    wf.fireTransitionForVersions(PUBLISHED, 'close')

现在让我们构建转换

>>> publish_transition = workflow.Transition(
...    transition_id='publish',
...    title='Publish',
...    source=UNPUBLISHED,
...    destination=PUBLISHED,
...    condition=NullCondition,
...    action=PublishAction,
...    trigger=interfaces.MANUAL,
...    order=1)

接下来,我们将定义从已发布到已关闭的转换,这意味着我们想要存档之前关闭的内容

>>> def CloseAction(wf, context):
...    wf.fireTransitionForVersions(CLOSED, 'archive')
>>> close_transition = workflow.Transition(
...    transition_id='close',
...    title='Close',
...    source=PUBLISHED,
...    destination=CLOSED,
...    condition=NullCondition,
...    action=CloseAction,
...    trigger=interfaces.MANUAL,
...    order=2)

注意,当使用fireTransitionsForVersions将状态从已发布转换为已关闭时,CloseAction也将自动执行。这意味着发布文档会导致之前关闭的文档存档。

如果有一个已发布的版本但没有未发布的版本,我们可以创建已发布版本的副本,并将其作为未发布的版本

>>> def CanCopyCondition(wf, context):
...     return not wf.hasVersion(UNPUBLISHED)

由于我们实际上正在创建一个新的内容对象,该操作应返回具有新状态的新创建的对象

>>> def CopyAction(wf, context):
...     return Document('copy of %s' % context.title)

>>> copy_transition = workflow.Transition(
...     transition_id='copy',
...     title='Copy',
...     source=PUBLISHED,
...     destination=UNPUBLISHED,
...     condition=CanCopyCondition,
...     action=CopyAction,
...     trigger=interfaces.MANUAL,
...     order=3)

一个非常类似的转换也适用于已关闭的版本。如果没有未发布的版本和已发布的版本,我们可以从已关闭版本创建一个新副本

>>> def CanCopyCondition(wf, context):
...     return (not wf.hasVersion(UNPUBLISHED) and
...         not wf.hasVersion(PUBLISHED))

>>> copy_closed_transition = workflow.Transition(
...     transition_id='copy_closed',
...     title='Copy',
...     source=CLOSED,
...     destination=UNPUBLISHED,
...     condition=CanCopyCondition,
...     action=CopyAction,
...     trigger=interfaces.MANUAL,
...     order=4)

最后,让我们构建存档转换

>>> archive_transition = workflow.Transition(
...     transition_id='archive',
...     title='Archive',
...     source=CLOSED,
...     destination=ARCHIVED,
...     condition=NullCondition,
...     action=NullAction,
...     trigger=interfaces.MANUAL,
...     order=5)

现在让我们构建并提供一个工作流实用工具

>>> wf = workflow.Workflow([init_transition,
...                         publish_transition, close_transition,
...                         copy_transition, copy_closed_transition,
...                         archive_transition])

>>> component.provideUtility(wf, interfaces.IWorkflow)

让我们获取工作流_versions实用工具,我们可以用它来跟踪版本并生成一个新的唯一ID

>>> workflow_versions = component.getUtility(interfaces.IWorkflowVersions)

让我们从一个文档开始

>>> document = Document('bar')
>>> info = interfaces.IWorkflowInfo(document)
>>> info.fireTransition('init')

我们需要文档ID,以便稍后创建新版本时进行比较

>>> state = interfaces.IWorkflowState(document)
>>> document_id = state.getId()

将其添加到工作流版本容器中,以便我们可以找到它。注意,我们在这里使用的是私有API;这可以实施为将其添加到一个文件夹或任何其他方式,只要getVersions()稍后可以工作

>>> workflow_versions.addVersion(document) # private API

同时清除先前记录的事件

>>> del events[:]

我们可以发布它

>>> info.getManualTransitionIds()
['publish']

所以让我们这么做

>>> info.fireTransition('publish')
>>> state.getState()
'published'

最后的事件应该是'publish'转换

>>> events[-1].transition.transition_id
'publish'

现在我们可以关闭或创建一个新的副本。请注意,名称是按照转换的顺序排序的

>>> info.getManualTransitionIds()
['close', 'copy']

让我们关闭它

>>> info.fireTransition('close')
>>> state.getState()
'closed'

我们现在将创建一个新的副本进行编辑

>>> info.getManualTransitionIds()
['copy_closed', 'archive']
>>> document2 = info.fireTransition('copy_closed')
>>> workflow_versions.addVersion(document2) # private API to track it
>>> document2.title
'copy of bar'
>>> state = interfaces.IWorkflowState(document2)
>>> state.getState()
'unpublished'
>>> state.getId() == document_id
True

原始版本仍然以原始状态存在

>>> interfaces.IWorkflowState(document).getState()
'closed'

让我们也详细检查最后的事件

>>> event = events[-1]
>>> event.transition.transition_id
'copy_closed'
>>> event.old_object == document
True
>>> event.object == document2
True

现在我们将发布新版本

>>> info = interfaces.IWorkflowInfo(document2)
>>> info.getManualTransitionIds()
['publish']
>>> info.fireTransition('publish')
>>> interfaces.IWorkflowState(document2).getState()
'published'

原始版本仍然处于关闭状态

>>> interfaces.IWorkflowState(document).getState()
'closed'

现在让我们在这个之后发布另一个副本

>>> document3 = info.fireTransition('copy')
>>> workflow_versions.addVersion(document3)
>>> interfaces.IWorkflowInfo(document3).fireTransition('publish')

这个副本现在已经发布

>>> interfaces.IWorkflowState(document3).getState()
'published'

之前发布的版本现在已经关闭

>>> interfaces.IWorkflowState(document2).getState()
'closed'

请注意,由于条件限制,无法从已关闭的版本复制,因为仍然存在一个已发布的版本

>>> interfaces.IWorkflowInfo(document2).getManualTransitionIds()
['archive']

与此同时,之前关闭的原始版本现在已存档

>>> interfaces.IWorkflowState(document).getState()
'archived'

自动转换

现在让我们尝试一个基于自动和基于时间的流程转换。我们将设置一个非常简单的流程,在“未发布”和“发布”之间,并将“发布”转换设置为基于时间的。

为了模拟时间,我们有时刻

>>> time_moment = 0

只有当时间_时刻大于3时,我们才会发布

>>> def TimeCondition(wf, context):
...    return time_moment > 3

使用这个条件设置转换;请注意,这个转换是自动的,也就是说,不需要由人类触发

>>> publish_transition = workflow.Transition(
...    transition_id='publish',
...    title='Publish',
...    source=UNPUBLISHED,
...    destination=PUBLISHED,
...    condition=TimeCondition,
...    action=NullAction,
...    trigger=interfaces.AUTOMATIC)

使用这个转换设置流程,并重用我们之前定义的初始转换

>>> wf = workflow.Workflow([init_transition, publish_transition])
>>> component.provideUtility(wf, interfaces.IWorkflow)

清除所有版本;这是一个仅用于演示目的的私有API

>>> workflow_versions.clear()

现在创建一个文档

>>> document = Document('bar')
>>> info = interfaces.IWorkflowInfo(document)
>>> info.fireTransition('init')

再次私有;在你的代码中,你可以使用目录或任何你喜欢的其他方式

>>> workflow_versions.addVersion(document)

由于这个转换是自动的,我们应该看到它这样

>>> interfaces.IWorkflowInfo(document).getAutomaticTransitionIds()
['publish']

现在触发任何自动转换

>>> workflow_versions.fireAutomatic()

由于我们仍然在时间时刻0,所以没有发生任何事情

>>> state = interfaces.IWorkflowState(document)
>>> state.getState()
'unpublished'

我们改变时间时刻超过3

>>> time_moment = 4

现在再次触发任何自动转换

>>> workflow_versions.fireAutomatic()

转换已触发,因此状态将是“发布”

>>> state.getState()
'published'

系统转换

现在让我们尝试系统转换。这个转换不应该显示为手动或自动

>>> publish_transition = workflow.Transition(
...    transition_id='publish',
...    title='Publish',
...    source=UNPUBLISHED,
...    destination=PUBLISHED,
...    trigger=interfaces.SYSTEM)

使用这个转换设置流程,并重用我们之前定义的初始转换

>>> wf = workflow.Workflow([init_transition, publish_transition])
>>> component.provideUtility(wf, interfaces.IWorkflow)

清除所有版本;这是一个仅用于演示目的的私有API

>>> workflow_versions.clear()

现在创建一个文档

>>> document = Document('bar')
>>> info = interfaces.IWorkflowInfo(document)
>>> info.fireTransition('init')

再次私有;在你的代码中,你可以使用目录或任何你喜欢的其他方式

>>> workflow_versions.addVersion(document)

我们应该将其视为系统转换

>>> info.getSystemTransitionIds()
['publish']

但不是自动的也不是手动的

>>> info.getAutomaticTransitionIds()
[]
>>> info.getManualTransitionIds()
[]

这个转换可以被触发

>>> info.fireTransition('publish')
>>> interfaces.IWorkflowState(document).getState()
'published'

多个转换

从源状态到目标状态可以有多个转换,例如一个自动和一个手动转换。

让我们设置一个具有两个手动转换和一个单一生成转换的流程

>>> publish_1_transition = workflow.Transition(
...    transition_id='publish 1',
...    title='Publish 1',
...    source=UNPUBLISHED,
...    destination=PUBLISHED,
...    condition=NullCondition,
...    action=NullAction,
...    trigger=interfaces.MANUAL)

>>> publish_2_transition = workflow.Transition(
...    transition_id='publish 2',
...    title='Publish 2',
...    source=UNPUBLISHED,
...    destination=PUBLISHED,
...    condition=NullCondition,
...    action=NullAction,
...    trigger=interfaces.MANUAL)

>>> publish_auto_transition = workflow.Transition(
...    transition_id='publish auto',
...    title='Publish Auto',
...    source=UNPUBLISHED,
...    destination=PUBLISHED,
...    condition=TimeCondition,
...    action=NullAction,
...    trigger=interfaces.AUTOMATIC)

清除所有版本;这是一个仅用于演示目的的私有API

>>> workflow_versions.clear()

由于我们再次使用时间条件,让我们确保时间再次为0,以便发布_auto_transition不会触发

>>> time_moment = 0

现在使用这些转换设置流程,加上我们的init_transition

>>> wf = workflow.Workflow([init_transition,
...     publish_1_transition, publish_2_transition,
...     publish_auto_transition])
>>> component.provideUtility(wf, interfaces.IWorkflow)

现在创建一个文档

>>> document = Document('bar')
>>> info = interfaces.IWorkflowInfo(document)
>>> info.fireTransition('init')

我们应该有两个手动转换

>>> sorted(interfaces.IWorkflowInfo(document).getManualTransitionIds())
['publish 1', 'publish 2']

并且只有一个自动转换

>>> interfaces.IWorkflowInfo(document).getAutomaticTransitionIds()
['publish auto']

使用权限保护转换

转换可以用权限(应该用权限)来保护,这样不是每个人都可以执行它们。

让我们设置一个具有权限的权限的流程

>>> publish_transition = workflow.Transition(
...    transition_id='publish',
...    title='Publish',
...    source=UNPUBLISHED,
...    destination=PUBLISHED,
...    condition=NullCondition,
...    action=NullAction,
...    trigger=interfaces.MANUAL,
...    permission="zope.ManageContent")

快速设置文档的工作流程状态

>>> workflow_versions.clear()
>>> wf = workflow.Workflow([init_transition, publish_transition])
>>> component.provideUtility(wf, interfaces.IWorkflow)
>>> document = Document('bar')
>>> info = interfaces.IWorkflowInfo(document)
>>> info.fireTransition('init')

让我们设置安全上下文

>>> from zope.security.interfaces import Unauthorized
>>> from zope.security.management import newInteraction, endInteraction
>>> class Principal:
...    def __init__(self, id):
...        self.id = id
...        self.groups = []
>>> class Participation:
...   interaction = None
...   def __init__(self, principal):
...      self.principal = principal
>>> endInteraction() # XXX argh, apparently one is already active?
>>> newInteraction(Participation(Principal('bob')))

由于我们没有访问权限,我们不应该在可能的转换列表中看到这个权限

>>> info.getManualTransitionIds()
[]

现在让我们尝试触发转换。它应该因为未授权而失败

>>> try:
...     info.fireTransition('publish')
... except Unauthorized:
...     print("Got unauthorized")
Got unauthorized

也不允许fireTransitionToward

>>> info.fireTransitionToward(PUBLISHED)
Traceback (most recent call last):
   ...
hurry.workflow.interfaces.NoTransitionAvailableError: source: "unpublished" destination: "published"

在这种情况下,由于用户没有正确的权限,转换甚至不可用。

然而,系统用户允许这样做

>>> from zope.security.management import system_user
>>> endInteraction()
>>> newInteraction(Participation(system_user))
>>> info.fireTransition('publish')

并且没有问题地完成了。

还有一种特殊的方法可以通过将check_security设置为False传递给fireTransition来实现它

>>> endInteraction()
>>> newInteraction(Participation(Principal('bob')))
>>> interfaces.IWorkflowState(document).setState(UNPUBLISHED)
>>> info.fireTransition('publish', check_security=False)

这也适用于fireTransitionToward

>>> interfaces.IWorkflowState(document).setState(UNPUBLISHED)
>>> info.fireTransitionToward(PUBLISHED, check_security=False)

转换期间的副作用

有时我们希望某些操作在触发 WorkflowTransitionEvent 事件之前执行,但在创建(潜在的)新版本对象之后。如果一个对象在同一个请求中作为工作流转换被编辑,编辑应该在创建潜在的新版本之后进行,否则将编辑旧的而不是新的版本。

然而,如果类似历史记录日志器这样的东西钩入 IWorkflowTransitionEvent,它将在编辑之前获得关于新副本的信息。为了允许在创建新副本和触发事件之间进行编辑,可以在触发转换时传递一个副作用函数。

执行顺序如下

  • 触发转换本身,创建新版本

  • 在新的版本上执行副作用函数

  • 触发 IWorkflowTransitionEvent

让我们设置一个非常简单的工作流

>>> foo_transition = workflow.Transition(
...    transition_id='foo',
...    title='Foo',
...    source=UNPUBLISHED,
...    destination=PUBLISHED,
...    condition=NullCondition,
...    action=CopyAction,
...    trigger=interfaces.MANUAL)

快速设置文档的工作流程状态

>>> workflow_versions.clear()
>>> wf = workflow.Workflow([init_transition, foo_transition])
>>> component.provideUtility(wf, interfaces.IWorkflow)
>>> document = Document('bar')
>>> events = []
>>> info = interfaces.IWorkflowInfo(document)
>>> info.fireTransition('init')

现在让我们设置一个副作用

>>> def side_effect(context):
...    context.title = context.title + '!'

现在触发转换,带有副作用

>>> new_version = info.fireTransition('foo', side_effect=side_effect)

新版本的标题现在应该在末尾有一个感叹号!

>>> new_version.title[-1] == '!'
True

但旧版本没有

>>> document.title[-1] == '!'
False

我们之前设置的事件列表应包含两个事件

>>> len(events)
2
>>> events[1].object.title[-1] == '!'
True

模糊的转换

让我们设置一个有两个从 ab 的等效转换的情况

>>> transition1 = workflow.Transition(
...     transition_id='a_to_b1',
...     title='A to B',
...     source='a',
...     destination='b',
...     condition=NullCondition,
...     action=NullAction,
...     trigger=interfaces.MANUAL)

>>> transition2 = workflow.Transition(
...     transition_id='a_to_b2',
...     title='A to B',
...     source='a',
...     destination='b',
...     condition=NullCondition,
...     action=NullAction,
...     trigger=interfaces.MANUAL)


>>> wf = workflow.Workflow([transition1, transition2])
>>> from zope import component
>>> component.provideUtility(wf, interfaces.IWorkflow)
>>> info = interfaces.IWorkflowInfo(document)
>>> state = interfaces.IWorkflowState(document)
>>> state.setState('a')

fireTransitionToward 是模糊的,因为有两个可能的转换

>>> info.fireTransitionToward('b')
Traceback (most recent call last):
   ...
hurry.workflow.interfaces.AmbiguousTransitionError: source: "a" destination: "b"

>>> from hurry.workflow.interfaces import AmbiguousTransitionError
>>> try:
...   info.fireTransitionToward('b')
... except AmbiguousTransitionError as e_:
...   e = e_
>>> e.source
'a'
>>> e.destination
'b'

下载

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分发

hurry.workflow-3.0.2.tar.gz (34.0 kB 查看哈希值)

上传时间

支持者