hurry.workflow是一个简单的流程系统。它可以用于为Zope Toolkit应用程序实现具有状态的多版本工作流。
项目描述
hurry.workflow
一个简单但相当巧妙的Zope 3工作流系统。
变更日志
3.0.2 (2018-01-16)
更新pypi描述的渲染。
3.0.1 (2018-01-16)
修复一个棕色纸袋版本。
3.0.0 (2018-01-16)
更新依赖,不再依赖ZODB 3。
支持python3.4、python3.5、python3.6,以及python2.7。
0.13.1 (2013-01-17)
使异常也显示更详细的消息。
0.13 (2013-01-17)
NoTransitionAvailableError 获得了 source 和 destination 属性,指示哪个转换不可用。
AmbiguousTransitionError 也获得了 source 和 destination 属性,指示哪个转换是模糊的。
InvalidTransitionError 获得了 source 属性,指示尝试无效转换的源状态。
更新后的 bootstrap.py
0.12 (2012-02-10)
将 WorkflowInfo 类上的 info() 和 state() 函数转换为类方法,因为它们在其他情况下不太有用。
fireTransitionToward 已经接受了一个 check_security=False 参数,但它不会允许用户没有权限的转换被触发,因为转换甚至根本找不到。现在它工作了。
0.11 (2010-04-16)
在 WorkflowState 中只调用一次 DoIAnnotations(self.context)。
IWorkflowVersions 实现现在是可选的。
添加了对多个工作流的支持。
0.10 (2009-11-19)
已迁移到 svn.zope.org 进行开发。
添加了 buildout.cfg 和 bootstrap.py
最小化了依赖项。请注意,Workflow 不再继承自 Persistent 和 zope.container.contained.Contained。如果您需要持久化工作流,您需要在您的代码中继承此功能。这破坏了向后兼容性,因为持久化工作流需要重新初始化。
0.9.2.1 (2007-08-15)
哎呀,0.9.2 中的补丁实际上并没有应用。现在已修复。
0.9.2 (2007-08-15)
zope.security 的更改破坏了 hurry.workflow 中的导入。
localUtility 指令现在是过时的,所以不要再使用它了。
0.9.1 (2006-09-22)
第一个 cheesehop 发布。
0.9 (2006-06-15)
从 hurry 包中分离出来,形成 hurry.workflow
eggification 工作
Zope 3.3 兼容性工作
0.8 (2006-05-01)
首次公开发布。
详细文档
Hurry 工作流
Hurry 工作流系统是一个“因为着急所以要自己动手”的框架。
基本工作流
让我们首先创建一个可以进入工作流的内容对象
>>> from zope.interface import implementer, Attribute >>> from zope.annotation.interfaces import IAttributeAnnotatable >>> class IDocument(IAttributeAnnotatable): ... title = Attribute('Title') >>> @implementer(IDocument) ... class Document(object): ... def __init__(self, title): ... self.title = title
如您所见,这样的内容对象必须提供 IAnnotatable,因为这将用于存储工作流状态。系统使用 IWorkflowState 适配器来获取和设置对象的工怍流状态
>>> from hurry.workflow import interfaces >>> document = Document('Foo') >>> state = interfaces.IWorkflowState(document) >>> print(state.getState()) None
可以使用 IWorkflowState 适配器直接为对象设置状态
>>> state.setState('foo') >>> state.getState() 'foo'
但是,让我们再次将其设置为 None,这样我们就可以从干净的状态开始这个文档
>>> state.setState(None)
不建议使用 setState() 自己做这件事:通常我们会让工作流系统负责状态转换和初始状态的设置。
现在让我们定义一个简单的从“a”到“b”的工作流转换。它需要一个条件,该条件在转换被允许发生之前必须返回 True
>>> def NullCondition(wf, context): ... return True
以及一个在转换发生时执行的动作
>>> def NullAction(wf, context): ... pass
现在让我们构建一个转换
>>> from hurry.workflow import workflow >>> transition = workflow.Transition( ... transition_id='a_to_b', ... title='A to B', ... source='a', ... destination='b', ... condition=NullCondition, ... action=NullAction, ... trigger=interfaces.MANUAL)
转换触发器可以是 MANUAL、AUTOMATIC 或 SYSTEM。MANUAL 表示需要用户操作来触发转换。AUTOMATIC 转换会自动触发。SYSTEM 是由系统直接触发的工作流转换,而不是由用户直接触发。
我们还将引入一个初始转换,将对象移动到工作流中(例如,在它创建后立即进行)
>>> init_transition = workflow.Transition( ... transition_id='to_a', ... title='Create A', ... source=None, ... destination='a')
以及一个最终转换,当对象再次从工作流中移出时(例如,在它删除前进行)
>>> final_transition = workflow.Transition( ... transition_id='finalize', ... title='Delete', ... source='b', ... destination=None)
现在让我们将这些转换放入一个工作流实用工具中
>>> wf = workflow.Workflow([transition, init_transition, final_transition]) >>> from zope import component >>> component.provideUtility(wf, interfaces.IWorkflow)
如果我们需要,我们可以使用 get_transition 从工作流中获取转换
>>> wf.getTransition('a', 'a_to_b') is transition True
如果我们尝试获取一个不存在的转换,我们会得到一个错误
>>> wf.getTransition('b', 'a_to_b') Traceback (most recent call last): ... hurry.workflow.interfaces.InvalidTransitionError: source: "b" >>> from hurry.workflow.interfaces import InvalidTransitionError >>> try: ... wf.getTransition('b', 'a_to_b') ... except InvalidTransitionError as e_: ... e = e_ >>> e.source 'b'
工作流转换会导致事件被触发;我们将添加一个简单的处理程序,以便我们可以检查是否成功触发了事件
>>> events = [] >>> def transition_handler(event): ... events.append(event) >>> component.provideHandler( ... transition_handler, ... [interfaces.IWorkflowTransitionEvent])
为了获取从对象到其他可能的状态的转换,以及触发转换和设置初始状态,我们使用IWorkflowInfo适配器
>>> info = interfaces.IWorkflowInfo(document)
我们将通过触发'to_a'转换来初始化工作流
>>> info.fireTransition('to_a')
这应该已经触发了一个事件
>>> events[-1].transition.transition_id 'to_a' >>> events[-1].source is None True >>> events[-1].destination 'a'
只定义了一个转换到工作流状态'b'的转换
>>> info.getManualTransitionIds() ['a_to_b']
我们也可以通过询问哪些手动(或系统)转换可以带我们到达所需的工作流状态来获取这个信息
>>> info.getFireableTransitionIdsToward('b') ['a_to_b']
由于这是一个手动触发的转换,我们可以触发这个转换
>>> info.fireTransition('a_to_b')
现在工作流状态应该是'b'了
>>> state.getState() 'b'
我们检查事件是否确实已经触发
>>> events[-1].transition.transition_id 'a_to_b' >>> events[-1].source 'a' >>> events[-1].destination 'b'
我们还将在这里尝试fireTransitionToward,因此我们将工作流偷偷地再次恢复到状态'a'并尝试它
>>> state.setState('a')
尝试通过一个我们无法到达的转换
>>> info.fireTransitionToward('c') Traceback (most recent call last): ... hurry.workflow.interfaces.NoTransitionAvailableError: source: "a" destination: "c"
这个错误提供了一些信息,说明尝试了哪个转换
>>> from hurry.workflow.interfaces import NoTransitionAvailableError >>> try: ... info.fireTransitionToward('c') ... except NoTransitionAvailableError as e_: ... e = e_ >>> e.source 'a' >>> e.destination 'c'
现在再次转到'b'状态
>>> info.fireTransitionToward('b') >>> state.getState() 'b'
最后,在忘记我们的文档之前,我们最终确定工作流
>>> info.fireTransition('finalize') >>> state.getState() is None True
我们还有另一个触发的事件
>>> events[-1].transition.transition_id 'finalize' >>> events[-1].source 'b' >>> events[-1].destination is None True
多个工作流
我们之前已将一个工作流注册为未命名的实用工具。您也可以将工作流注册为命名实用工具,为网站提供多个工作流。
让我们创建一个发票文档
>>> class IInvoiceDocument(IDocument): ... title = Attribute('Title') >>> @implementer(IInvoiceDocument) ... class InvoiceDocument(object): ... def __init__(self, title, amount): ... self.title = title ... self.amount = amount
我们定义我们的工作流
>>> invoice_init = workflow.Transition( ... transition_id='init_invoice', ... title='Invoice Received', ... source=None, ... destination='received') >>> >>> invoice_paid = workflow.Transition( ... transition_id='invoice_paid', ... title='Invoice Paid', ... source='received', ... destination='paid') >>> invoice_wf = workflow.Workflow( [ invoice_init, invoice_paid ] )
我们注册一个新的工作流实用工具,名为“invoice”的WorkflowState和WorkflowInfo适配器
>>> from hurry.workflow import workflow >>> from zope.annotation import interfaces as annotation_interfaces >>> component.provideUtility(invoice_wf, interfaces.IWorkflow, name="invoice") >>> class InvoiceWorkflowInfo(workflow.WorkflowInfo): ... name="invoice" >>> component.provideAdapter( ... InvoiceWorkflowInfo, ... (annotation_interfaces.IAnnotatable,), ... interfaces.IWorkflowInfo, ... name="invoice") >>> class InvoiceWorkflowState(workflow.WorkflowState): ... state_key = "invoice.state" ... id_key = "invoice.id" >>> component.provideAdapter( ... InvoiceWorkflowState, ... (annotation_interfaces.IAnnotatable,), ... interfaces.IWorkflowState, ... name="invoice")
现在我们可以利用工作流了
>>> invoice = InvoiceDocument('abc', 22) >>> info = component.getAdapter(invoice, interfaces.IWorkflowInfo, name="invoice") >>> info.fireTransition('init_invoice') >>> state = component.getAdapter(invoice, interfaces.IWorkflowState, name="invoice") >>> state.getState() 'received' >>> info.fireTransition('invoice_paid') >>> state.getState() 'paid'
为了更容易地获取特定上下文对象的state和info适配器,工作流信息对象上有两个便利函数。信息对象“知道”要查找哪个工作流实用工具,因为它们是按名称关联的
>>> info_ = InvoiceWorkflowInfo.info(invoice) >>> interfaces.IWorkflowInfo.providedBy(info_) True >>> state_ = InvoiceWorkflowInfo.state(invoice) >>> interfaces.IWorkflowState.providedBy(state_) True >>> state.getState() is state_.getState() True
当然,这个文档始终有默认的未命名工作流
>>> info = interfaces.IWorkflowInfo(invoice) >>> info.fireTransition('to_a') >>> state = interfaces.IWorkflowState(invoice) >>> state.getState() 'a'
多版本工作流
现在让我们考虑一个更复杂的场景,其中有一个文档的多个版本。在任何时候,一个文档都可以有一个未发布的版本和一个已发布的版本。还可以有一个已关闭的版本和任意数量的存档版本
>>> UNPUBLISHED = 'unpublished' >>> PUBLISHED = 'published' >>> CLOSED = 'closed' >>> ARCHIVED = 'archived'
让我们从一个简单的初始转换开始
>>> init_transition = workflow.Transition( ... transition_id='init', ... title='Initialize', ... source=None, ... destination=UNPUBLISHED)
当未发布的版本发布时,任何之前发布的版本都会变成已关闭的版本。为了完成这个二级状态转换,我们将使用系统的内置版本控制能力,即使用'fireTransitionsForVersions'方法,它可以用来触发文档其他版本的转换
>>> def PublishAction(wf, context): ... wf.fireTransitionForVersions(PUBLISHED, 'close')
现在让我们构建转换
>>> publish_transition = workflow.Transition( ... transition_id='publish', ... title='Publish', ... source=UNPUBLISHED, ... destination=PUBLISHED, ... condition=NullCondition, ... action=PublishAction, ... trigger=interfaces.MANUAL, ... order=1)
接下来,我们将定义从已发布到已关闭的转换,这意味着我们想要存档之前关闭的内容
>>> def CloseAction(wf, context): ... wf.fireTransitionForVersions(CLOSED, 'archive') >>> close_transition = workflow.Transition( ... transition_id='close', ... title='Close', ... source=PUBLISHED, ... destination=CLOSED, ... condition=NullCondition, ... action=CloseAction, ... trigger=interfaces.MANUAL, ... order=2)
注意,当使用fireTransitionsForVersions将状态从已发布转换为已关闭时,CloseAction也将自动执行。这意味着发布文档会导致之前关闭的文档存档。
如果有一个已发布的版本但没有未发布的版本,我们可以创建已发布版本的副本,并将其作为未发布的版本
>>> def CanCopyCondition(wf, context): ... return not wf.hasVersion(UNPUBLISHED)
由于我们实际上正在创建一个新的内容对象,该操作应返回具有新状态的新创建的对象
>>> def CopyAction(wf, context): ... return Document('copy of %s' % context.title) >>> copy_transition = workflow.Transition( ... transition_id='copy', ... title='Copy', ... source=PUBLISHED, ... destination=UNPUBLISHED, ... condition=CanCopyCondition, ... action=CopyAction, ... trigger=interfaces.MANUAL, ... order=3)
一个非常类似的转换也适用于已关闭的版本。如果没有未发布的版本和已发布的版本,我们可以从已关闭版本创建一个新副本
>>> def CanCopyCondition(wf, context): ... return (not wf.hasVersion(UNPUBLISHED) and ... not wf.hasVersion(PUBLISHED)) >>> copy_closed_transition = workflow.Transition( ... transition_id='copy_closed', ... title='Copy', ... source=CLOSED, ... destination=UNPUBLISHED, ... condition=CanCopyCondition, ... action=CopyAction, ... trigger=interfaces.MANUAL, ... order=4)
最后,让我们构建存档转换
>>> archive_transition = workflow.Transition( ... transition_id='archive', ... title='Archive', ... source=CLOSED, ... destination=ARCHIVED, ... condition=NullCondition, ... action=NullAction, ... trigger=interfaces.MANUAL, ... order=5)
现在让我们构建并提供一个工作流实用工具
>>> wf = workflow.Workflow([init_transition, ... publish_transition, close_transition, ... copy_transition, copy_closed_transition, ... archive_transition]) >>> component.provideUtility(wf, interfaces.IWorkflow)
让我们获取工作流_versions实用工具,我们可以用它来跟踪版本并生成一个新的唯一ID
>>> workflow_versions = component.getUtility(interfaces.IWorkflowVersions)
让我们从一个文档开始
>>> document = Document('bar') >>> info = interfaces.IWorkflowInfo(document) >>> info.fireTransition('init')
我们需要文档ID,以便稍后创建新版本时进行比较
>>> state = interfaces.IWorkflowState(document) >>> document_id = state.getId()
将其添加到工作流版本容器中,以便我们可以找到它。注意,我们在这里使用的是私有API;这可以实施为将其添加到一个文件夹或任何其他方式,只要getVersions()稍后可以工作
>>> workflow_versions.addVersion(document) # private API
同时清除先前记录的事件
>>> del events[:]
我们可以发布它
>>> info.getManualTransitionIds() ['publish']
所以让我们这么做
>>> info.fireTransition('publish') >>> state.getState() 'published'
最后的事件应该是'publish'转换
>>> events[-1].transition.transition_id 'publish'
现在我们可以关闭或创建一个新的副本。请注意,名称是按照转换的顺序排序的
>>> info.getManualTransitionIds() ['close', 'copy']
让我们关闭它
>>> info.fireTransition('close') >>> state.getState() 'closed'
我们现在将创建一个新的副本进行编辑
>>> info.getManualTransitionIds() ['copy_closed', 'archive'] >>> document2 = info.fireTransition('copy_closed') >>> workflow_versions.addVersion(document2) # private API to track it >>> document2.title 'copy of bar' >>> state = interfaces.IWorkflowState(document2) >>> state.getState() 'unpublished' >>> state.getId() == document_id True
原始版本仍然以原始状态存在
>>> interfaces.IWorkflowState(document).getState() 'closed'
让我们也详细检查最后的事件
>>> event = events[-1] >>> event.transition.transition_id 'copy_closed' >>> event.old_object == document True >>> event.object == document2 True
现在我们将发布新版本
>>> info = interfaces.IWorkflowInfo(document2) >>> info.getManualTransitionIds() ['publish'] >>> info.fireTransition('publish') >>> interfaces.IWorkflowState(document2).getState() 'published'
原始版本仍然处于关闭状态
>>> interfaces.IWorkflowState(document).getState() 'closed'
现在让我们在这个之后发布另一个副本
>>> document3 = info.fireTransition('copy') >>> workflow_versions.addVersion(document3) >>> interfaces.IWorkflowInfo(document3).fireTransition('publish')
这个副本现在已经发布
>>> interfaces.IWorkflowState(document3).getState() 'published'
之前发布的版本现在已经关闭
>>> interfaces.IWorkflowState(document2).getState() 'closed'
请注意,由于条件限制,无法从已关闭的版本复制,因为仍然存在一个已发布的版本
>>> interfaces.IWorkflowInfo(document2).getManualTransitionIds() ['archive']
与此同时,之前关闭的原始版本现在已存档
>>> interfaces.IWorkflowState(document).getState() 'archived'
自动转换
现在让我们尝试一个基于自动和基于时间的流程转换。我们将设置一个非常简单的流程,在“未发布”和“发布”之间,并将“发布”转换设置为基于时间的。
为了模拟时间,我们有时刻
>>> time_moment = 0
只有当时间_时刻大于3时,我们才会发布
>>> def TimeCondition(wf, context): ... return time_moment > 3
使用这个条件设置转换;请注意,这个转换是自动的,也就是说,不需要由人类触发
>>> publish_transition = workflow.Transition( ... transition_id='publish', ... title='Publish', ... source=UNPUBLISHED, ... destination=PUBLISHED, ... condition=TimeCondition, ... action=NullAction, ... trigger=interfaces.AUTOMATIC)
使用这个转换设置流程,并重用我们之前定义的初始转换
>>> wf = workflow.Workflow([init_transition, publish_transition]) >>> component.provideUtility(wf, interfaces.IWorkflow)
清除所有版本;这是一个仅用于演示目的的私有API
>>> workflow_versions.clear()
现在创建一个文档
>>> document = Document('bar') >>> info = interfaces.IWorkflowInfo(document) >>> info.fireTransition('init')
再次私有;在你的代码中,你可以使用目录或任何你喜欢的其他方式
>>> workflow_versions.addVersion(document)
由于这个转换是自动的,我们应该看到它这样
>>> interfaces.IWorkflowInfo(document).getAutomaticTransitionIds() ['publish']
现在触发任何自动转换
>>> workflow_versions.fireAutomatic()
由于我们仍然在时间时刻0,所以没有发生任何事情
>>> state = interfaces.IWorkflowState(document) >>> state.getState() 'unpublished'
我们改变时间时刻超过3
>>> time_moment = 4
现在再次触发任何自动转换
>>> workflow_versions.fireAutomatic()
转换已触发,因此状态将是“发布”
>>> state.getState() 'published'
系统转换
现在让我们尝试系统转换。这个转换不应该显示为手动或自动
>>> publish_transition = workflow.Transition( ... transition_id='publish', ... title='Publish', ... source=UNPUBLISHED, ... destination=PUBLISHED, ... trigger=interfaces.SYSTEM)
使用这个转换设置流程,并重用我们之前定义的初始转换
>>> wf = workflow.Workflow([init_transition, publish_transition]) >>> component.provideUtility(wf, interfaces.IWorkflow)
清除所有版本;这是一个仅用于演示目的的私有API
>>> workflow_versions.clear()
现在创建一个文档
>>> document = Document('bar') >>> info = interfaces.IWorkflowInfo(document) >>> info.fireTransition('init')
再次私有;在你的代码中,你可以使用目录或任何你喜欢的其他方式
>>> workflow_versions.addVersion(document)
我们应该将其视为系统转换
>>> info.getSystemTransitionIds() ['publish']
但不是自动的也不是手动的
>>> info.getAutomaticTransitionIds() [] >>> info.getManualTransitionIds() []
这个转换可以被触发
>>> info.fireTransition('publish') >>> interfaces.IWorkflowState(document).getState() 'published'
多个转换
从源状态到目标状态可以有多个转换,例如一个自动和一个手动转换。
让我们设置一个具有两个手动转换和一个单一生成转换的流程
>>> publish_1_transition = workflow.Transition( ... transition_id='publish 1', ... title='Publish 1', ... source=UNPUBLISHED, ... destination=PUBLISHED, ... condition=NullCondition, ... action=NullAction, ... trigger=interfaces.MANUAL) >>> publish_2_transition = workflow.Transition( ... transition_id='publish 2', ... title='Publish 2', ... source=UNPUBLISHED, ... destination=PUBLISHED, ... condition=NullCondition, ... action=NullAction, ... trigger=interfaces.MANUAL) >>> publish_auto_transition = workflow.Transition( ... transition_id='publish auto', ... title='Publish Auto', ... source=UNPUBLISHED, ... destination=PUBLISHED, ... condition=TimeCondition, ... action=NullAction, ... trigger=interfaces.AUTOMATIC)
清除所有版本;这是一个仅用于演示目的的私有API
>>> workflow_versions.clear()
由于我们再次使用时间条件,让我们确保时间再次为0,以便发布_auto_transition不会触发
>>> time_moment = 0
现在使用这些转换设置流程,加上我们的init_transition
>>> wf = workflow.Workflow([init_transition, ... publish_1_transition, publish_2_transition, ... publish_auto_transition]) >>> component.provideUtility(wf, interfaces.IWorkflow)
现在创建一个文档
>>> document = Document('bar') >>> info = interfaces.IWorkflowInfo(document) >>> info.fireTransition('init')
我们应该有两个手动转换
>>> sorted(interfaces.IWorkflowInfo(document).getManualTransitionIds()) ['publish 1', 'publish 2']
并且只有一个自动转换
>>> interfaces.IWorkflowInfo(document).getAutomaticTransitionIds() ['publish auto']
使用权限保护转换
转换可以用权限(应该用权限)来保护,这样不是每个人都可以执行它们。
让我们设置一个具有权限的权限的流程
>>> publish_transition = workflow.Transition( ... transition_id='publish', ... title='Publish', ... source=UNPUBLISHED, ... destination=PUBLISHED, ... condition=NullCondition, ... action=NullAction, ... trigger=interfaces.MANUAL, ... permission="zope.ManageContent")
快速设置文档的工作流程状态
>>> workflow_versions.clear() >>> wf = workflow.Workflow([init_transition, publish_transition]) >>> component.provideUtility(wf, interfaces.IWorkflow) >>> document = Document('bar') >>> info = interfaces.IWorkflowInfo(document) >>> info.fireTransition('init')
让我们设置安全上下文
>>> from zope.security.interfaces import Unauthorized >>> from zope.security.management import newInteraction, endInteraction >>> class Principal: ... def __init__(self, id): ... self.id = id ... self.groups = [] >>> class Participation: ... interaction = None ... def __init__(self, principal): ... self.principal = principal >>> endInteraction() # XXX argh, apparently one is already active? >>> newInteraction(Participation(Principal('bob')))
由于我们没有访问权限,我们不应该在可能的转换列表中看到这个权限
>>> info.getManualTransitionIds() []
现在让我们尝试触发转换。它应该因为未授权而失败
>>> try: ... info.fireTransition('publish') ... except Unauthorized: ... print("Got unauthorized") Got unauthorized
也不允许fireTransitionToward
>>> info.fireTransitionToward(PUBLISHED) Traceback (most recent call last): ... hurry.workflow.interfaces.NoTransitionAvailableError: source: "unpublished" destination: "published"
在这种情况下,由于用户没有正确的权限,转换甚至不可用。
然而,系统用户允许这样做
>>> from zope.security.management import system_user >>> endInteraction() >>> newInteraction(Participation(system_user)) >>> info.fireTransition('publish')
并且没有问题地完成了。
还有一种特殊的方法可以通过将check_security设置为False传递给fireTransition来实现它
>>> endInteraction() >>> newInteraction(Participation(Principal('bob'))) >>> interfaces.IWorkflowState(document).setState(UNPUBLISHED) >>> info.fireTransition('publish', check_security=False)
这也适用于fireTransitionToward
>>> interfaces.IWorkflowState(document).setState(UNPUBLISHED) >>> info.fireTransitionToward(PUBLISHED, check_security=False)
转换期间的副作用
有时我们希望某些操作在触发 WorkflowTransitionEvent 事件之前执行,但在创建(潜在的)新版本对象之后。如果一个对象在同一个请求中作为工作流转换被编辑,编辑应该在创建潜在的新版本之后进行,否则将编辑旧的而不是新的版本。
然而,如果类似历史记录日志器这样的东西钩入 IWorkflowTransitionEvent,它将在编辑之前获得关于新副本的信息。为了允许在创建新副本和触发事件之间进行编辑,可以在触发转换时传递一个副作用函数。
执行顺序如下
触发转换本身,创建新版本
在新的版本上执行副作用函数
触发 IWorkflowTransitionEvent
让我们设置一个非常简单的工作流
>>> foo_transition = workflow.Transition( ... transition_id='foo', ... title='Foo', ... source=UNPUBLISHED, ... destination=PUBLISHED, ... condition=NullCondition, ... action=CopyAction, ... trigger=interfaces.MANUAL)
快速设置文档的工作流程状态
>>> workflow_versions.clear() >>> wf = workflow.Workflow([init_transition, foo_transition]) >>> component.provideUtility(wf, interfaces.IWorkflow) >>> document = Document('bar') >>> events = [] >>> info = interfaces.IWorkflowInfo(document) >>> info.fireTransition('init')
现在让我们设置一个副作用
>>> def side_effect(context): ... context.title = context.title + '!'
现在触发转换,带有副作用
>>> new_version = info.fireTransition('foo', side_effect=side_effect)
新版本的标题现在应该在末尾有一个感叹号!
>>> new_version.title[-1] == '!' True
但旧版本没有
>>> document.title[-1] == '!' False
我们之前设置的事件列表应包含两个事件
>>> len(events) 2 >>> events[1].object.title[-1] == '!' True
模糊的转换
让我们设置一个有两个从 a 到 b 的等效转换的情况
>>> transition1 = workflow.Transition( ... transition_id='a_to_b1', ... title='A to B', ... source='a', ... destination='b', ... condition=NullCondition, ... action=NullAction, ... trigger=interfaces.MANUAL) >>> transition2 = workflow.Transition( ... transition_id='a_to_b2', ... title='A to B', ... source='a', ... destination='b', ... condition=NullCondition, ... action=NullAction, ... trigger=interfaces.MANUAL) >>> wf = workflow.Workflow([transition1, transition2]) >>> from zope import component >>> component.provideUtility(wf, interfaces.IWorkflow) >>> info = interfaces.IWorkflowInfo(document) >>> state = interfaces.IWorkflowState(document) >>> state.setState('a')
fireTransitionToward 是模糊的,因为有两个可能的转换
>>> info.fireTransitionToward('b') Traceback (most recent call last): ... hurry.workflow.interfaces.AmbiguousTransitionError: source: "a" destination: "b" >>> from hurry.workflow.interfaces import AmbiguousTransitionError >>> try: ... info.fireTransitionToward('b') ... except AmbiguousTransitionError as e_: ... e = e_ >>> e.source 'a' >>> e.destination 'b'
下载
项目详情
hurry.workflow-3.0.2.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 3fcaee702a133403a4337c39ed6938e37a6130a45c5a8c52b2bcec58b4bc9e6f |
|
MD5 | 4556874091eecdbd846fa16e549f19d3 |
|
BLAKE2b-256 | 4c1ab1ac73ae32200d7dcd964a95b1722d51f60d7473f858c0bd4d83b4451d87 |