Skip to main content

hurry.workflow is a simple workflow system. It can be used to implement stateful multi-version workflows for Zope Toolkit applications.

Project description

hurry.workflow

A simple but quite nifty workflow system for Zope 3.

CHANGES

3.0.2 (2018-01-16)

  • Update rendering of pypi description.

3.0.1 (2018-01-16)

  • Fix up a brown paper bag release.

3.0.0 (2018-01-16)

  • Update dependencies not to rely on ZODB 3 anymore.

  • Support python3.4, python3.5, python3.6 in addition to python2.7.

0.13.1 (2013-01-17)

  • Make the exceptions also display more informative messages.

0.13 (2013-01-17)

  • NoTransitionAvailableError gained a source and destination attribute indicating what transition wasn’t available.

  • AmbiguousTransitionError also gained a source and destination attribute indicating what transition was ambiguous.

  • InvalidTransitionError gained a source attribute indicating the source state of the attempted invalid transition.

  • Newer bootstrap.py

0.12 (2012-02-10)

  • Make the info() and state() functions on the WorkflowInfo class into classmethods as they are not of much use otherwise.

  • fireTransitionToward already accepted a check_security=False argument, but it would not allow a transition that a user didn’t have the permission for to be fired after all, because the transition wouldn’t even be found in the first place. Now it works.

0.11 (2010-04-16)

  • Do IAnnotations(self.context) only once in WorkflowState.

  • An IWorkflowVersions implementation is now optional.

  • Added multiple workflows support.

0.10 (2009-11-19)

  • Moved to svn.zope.org for development.

  • Added a buildout.cfg, bootstrap.py

  • Minimized dependencies. Note that Workflow does not inherit from Persistent and zope.container.contained.Contained anymore. If you need persistent workflow, you need to subclass this in your own code. This breaks backwards compatibility, as persistent workflows would need to be re-initialized.

0.9.2.1 (2007-08-15)

  • Oops, the patches in 0.9.2 were not actually applied. Fixed them now.

0.9.2 (2007-08-15)

  • zope.security changes broke imports in hurry.workflow.

  • localUtility directive is now deprecated, so don’t use it anymore.

0.9.1 (2006-09-22)

  • first cheesehop release.

0.9 (2006-06-15)

  • separate out from hurry package into hurry.workflow

  • eggification work

  • Zope 3.3 compatibility work

0.8 (2006-05-01)

Initial public release.

Detailed Documentation

Hurry Workflow

The hurry workflow system is a “roll my own because I’m in a hurry” framework.

Basic workflow

Let’s first make a content object that can go into a workflow:

>>> from zope.interface import implementer, Attribute

>>> from zope.annotation.interfaces import IAttributeAnnotatable
>>> class IDocument(IAttributeAnnotatable):
...    title = Attribute('Title')
>>> @implementer(IDocument)
... class Document(object):
...    def __init__(self, title):
...        self.title = title

As you can see, such a content object must provide IAnnotatable, as this is used to store the workflow state. The system uses the IWorkflowState adapter to get and set an object’s workflow state:

>>> from hurry.workflow import interfaces
>>> document = Document('Foo')
>>> state = interfaces.IWorkflowState(document)
>>> print(state.getState())
None

The state can be set directly for an object using the IWorkflowState adapter as well:

>>> state.setState('foo')
>>> state.getState()
'foo'

But let’s set it back to None again, so we can start again in a pristine state for this document:

>>> state.setState(None)

It’s not recommended use setState() do this ourselves, though: usually we’ll let the workflow system take care of state transitions and the setting of the initial state.

Now let’s define a simple workflow transition from ‘a’ to ‘b’. It needs a condition which must return True before the transition is allowed to occur:

>>> def NullCondition(wf, context):
...    return True

and an action that takes place when the transition is taken:

>>> def NullAction(wf, context):
...    pass

Now let’s construct a transition:

>>> from hurry.workflow import workflow
>>> transition = workflow.Transition(
...     transition_id='a_to_b',
...     title='A to B',
...     source='a',
...     destination='b',
...     condition=NullCondition,
...     action=NullAction,
...     trigger=interfaces.MANUAL)

The transition trigger is either MANUAL, AUTOMATIC or SYSTEM. MANUAL indicates user action is needed to fire the transition. AUTOMATIC transitions fire automatically. SYSTEM is a workflow transition directly fired by the system, and not directly by the user.

We also will introduce an initial transition, that moves an object into the workflow (for instance just after it is created):

>>> init_transition = workflow.Transition(
...     transition_id='to_a',
...     title='Create A',
...     source=None,
...     destination='a')

And a final transition, when the object moves out of the workflow again (for instance just before it is deleted):

>>> final_transition = workflow.Transition(
...     transition_id='finalize',
...     title='Delete',
...     source='b',
...     destination=None)

Now let’s put the transitions in an workflow utility:

>>> wf = workflow.Workflow([transition, init_transition, final_transition])
>>> from zope import component
>>> component.provideUtility(wf, interfaces.IWorkflow)

We can get the transition from the workflow using get_transition should we need it:

>>> wf.getTransition('a', 'a_to_b') is transition
True

If we try to get a transition that doesn’t exist, we get an error:

>>> wf.getTransition('b', 'a_to_b')
Traceback (most recent call last):
  ...
hurry.workflow.interfaces.InvalidTransitionError: source: "b"

>>> from hurry.workflow.interfaces import InvalidTransitionError
>>> try:
...   wf.getTransition('b', 'a_to_b')
... except InvalidTransitionError as e_:
...   e = e_
>>> e.source
'b'

Workflow transitions cause events to be fired; we will put in a simple handler so we can check whether things were successfully fired:

>>> events = []
>>> def transition_handler(event):
...     events.append(event)
>>> component.provideHandler(
...     transition_handler,
...     [interfaces.IWorkflowTransitionEvent])

To get what transitions to other states are possible from an object, as well as to fire transitions and set initial state, we use the IWorkflowInfo adapter:

>>> info = interfaces.IWorkflowInfo(document)

We’ll initialize the workflow by firing the ‘to_a’ transition:

>>> info.fireTransition('to_a')

This should’ve fired an event:

>>> events[-1].transition.transition_id
'to_a'
>>> events[-1].source is None
True
>>> events[-1].destination
'a'

There’s only a single transition defined to workflow state ‘b’:

>>> info.getManualTransitionIds()
['a_to_b']

We can also get this by asking which manual (or system) transition exists that brings us to the desired workflow state:

>>> info.getFireableTransitionIdsToward('b')
['a_to_b']

Since this is a manually triggered transition, we can fire this transition:

>>> info.fireTransition('a_to_b')

The workflow state should now be ‘b’:

>>> state.getState()
'b'

We check that the event indeed got fired:

>>> events[-1].transition.transition_id
'a_to_b'
>>> events[-1].source
'a'
>>> events[-1].destination
'b'

We will also try fireTransitionToward here, so we sneak back the workflow to state ‘a’ again and try that:

>>> state.setState('a')

Try going through a transition we cannot reach first:

>>> info.fireTransitionToward('c')
Traceback (most recent call last):
...
hurry.workflow.interfaces.NoTransitionAvailableError: source: "a" destination: "c"

This error has some information available of what transition was attempted:

>>> from hurry.workflow.interfaces import NoTransitionAvailableError
>>> try:
...   info.fireTransitionToward('c')
... except NoTransitionAvailableError as e_:
...   e = e_
>>> e.source
'a'
>>> e.destination
'c'

Now go to ‘b’ again:

>>> info.fireTransitionToward('b')
>>> state.getState()
'b'

Finally, before forgetting about our document, we finalize the workflow:

>>> info.fireTransition('finalize')
>>> state.getState() is None
True

And we have another event that was fired:

>>> events[-1].transition.transition_id
'finalize'
>>> events[-1].source
'b'
>>> events[-1].destination is None
True

Multiple workflows

We have previously registered a workflow as a unnamed utility. You can also register a workflow as a named utility to provide several workflows for a site.

Let’s create a, invoice document:

>>> class IInvoiceDocument(IDocument):
...    title = Attribute('Title')

>>> @implementer(IInvoiceDocument)
... class InvoiceDocument(object):
...    def __init__(self, title, amount):
...        self.title = title
...        self.amount = amount

We define our workflow:

>>> invoice_init = workflow.Transition(
...     transition_id='init_invoice',
...     title='Invoice Received',
...     source=None,
...     destination='received')
>>>
>>> invoice_paid = workflow.Transition(
...     transition_id='invoice_paid',
...     title='Invoice Paid',
...     source='received',
...     destination='paid')

>>> invoice_wf = workflow.Workflow( [ invoice_init, invoice_paid ] )

We register a new workflow utility, WorkflowState and WorkflowInfo adapters, all named “invoice”:

>>> from hurry.workflow import workflow
>>> from zope.annotation import interfaces as annotation_interfaces
>>> component.provideUtility(invoice_wf, interfaces.IWorkflow, name="invoice")
>>> class InvoiceWorkflowInfo(workflow.WorkflowInfo):
...     name="invoice"
>>> component.provideAdapter(
...     InvoiceWorkflowInfo,
...     (annotation_interfaces.IAnnotatable,),
...     interfaces.IWorkflowInfo,
...     name="invoice")
>>> class InvoiceWorkflowState(workflow.WorkflowState):
...     state_key = "invoice.state"
...     id_key  = "invoice.id"
>>> component.provideAdapter(
...     InvoiceWorkflowState,
...     (annotation_interfaces.IAnnotatable,),
...     interfaces.IWorkflowState,
...     name="invoice")

Now we can utilize the workflow:

>>> invoice = InvoiceDocument('abc', 22)

>>> info = component.getAdapter(invoice, interfaces.IWorkflowInfo, name="invoice")
>>> info.fireTransition('init_invoice')
>>> state = component.getAdapter(invoice, interfaces.IWorkflowState, name="invoice")
>>> state.getState()
'received'
>>> info.fireTransition('invoice_paid')
>>> state.getState()
'paid'

To make it easier to get the state and info adapters for a particular context object, there are two convenience functions on the workflow info object. The info object “knows” what workflow utility to look for, as they are associated by name:

>>> info_ = InvoiceWorkflowInfo.info(invoice)
>>> interfaces.IWorkflowInfo.providedBy(info_)
True

>>> state_ = InvoiceWorkflowInfo.state(invoice)
>>> interfaces.IWorkflowState.providedBy(state_)
True
>>> state.getState() is state_.getState()
True

Of course, this document always have the default unnamed workflow:

>>> info = interfaces.IWorkflowInfo(invoice)
>>> info.fireTransition('to_a')
>>> state = interfaces.IWorkflowState(invoice)
>>> state.getState()
'a'

Multi-version workflow

Now let’s go for a more complicated scenario where have multiple versions of a document. At any one time a document can have an UNPUBLISHED version and a PUBLISHED version. There can also be a CLOSED version and any number of ARCHIVED versions:

>>> UNPUBLISHED = 'unpublished'
>>> PUBLISHED = 'published'
>>> CLOSED = 'closed'
>>> ARCHIVED = 'archived'

Let’s start with a simple initial transition:

>>> init_transition = workflow.Transition(
...    transition_id='init',
...    title='Initialize',
...    source=None,
...    destination=UNPUBLISHED)

When the unpublished version is published, any previously published version is made to be the CLOSED version. To accomplish this secondary state transition, we’ll use the system’s built-in versioning ability with the ‘fireTransitionsForVersions’ method, which can be used to fire transitions of other versions of the document:

>>> def PublishAction(wf, context):
...    wf.fireTransitionForVersions(PUBLISHED, 'close')

Now let’s build the transition:

>>> publish_transition = workflow.Transition(
...    transition_id='publish',
...    title='Publish',
...    source=UNPUBLISHED,
...    destination=PUBLISHED,
...    condition=NullCondition,
...    action=PublishAction,
...    trigger=interfaces.MANUAL,
...    order=1)

Next, we’ll define a transition from PUBLISHED to CLOSED, which means we want to archive whatever was closed before:

>>> def CloseAction(wf, context):
...    wf.fireTransitionForVersions(CLOSED, 'archive')
>>> close_transition = workflow.Transition(
...    transition_id='close',
...    title='Close',
...    source=PUBLISHED,
...    destination=CLOSED,
...    condition=NullCondition,
...    action=CloseAction,
...    trigger=interfaces.MANUAL,
...    order=2)

Note that CloseAction will also be executed automatically whenever state is transitioned from PUBLISHED to CLOSED using fireTransitionsForVersions. This means that publishing a document results in the previously closed document being archived.

If there is a PUBLISHED but no UNPUBLISHED version, we can make a new copy of the PUBLISHED version and make that the UNPUBLISHED version:

>>> def CanCopyCondition(wf, context):
...     return not wf.hasVersion(UNPUBLISHED)

Since we are actually creating a new content object, the action should return the newly created object with the new state:

>>> def CopyAction(wf, context):
...     return Document('copy of %s' % context.title)

>>> copy_transition = workflow.Transition(
...     transition_id='copy',
...     title='Copy',
...     source=PUBLISHED,
...     destination=UNPUBLISHED,
...     condition=CanCopyCondition,
...     action=CopyAction,
...     trigger=interfaces.MANUAL,
...     order=3)

A very similar transition applies to the closed version. If we have no UNPUBLISHED version and no PUBLISHED version, we can make a new copy from the CLOSED version:

>>> def CanCopyCondition(wf, context):
...     return (not wf.hasVersion(UNPUBLISHED) and
...         not wf.hasVersion(PUBLISHED))

>>> copy_closed_transition = workflow.Transition(
...     transition_id='copy_closed',
...     title='Copy',
...     source=CLOSED,
...     destination=UNPUBLISHED,
...     condition=CanCopyCondition,
...     action=CopyAction,
...     trigger=interfaces.MANUAL,
...     order=4)

Finally let’s build the archiving transition:

>>> archive_transition = workflow.Transition(
...     transition_id='archive',
...     title='Archive',
...     source=CLOSED,
...     destination=ARCHIVED,
...     condition=NullCondition,
...     action=NullAction,
...     trigger=interfaces.MANUAL,
...     order=5)

Now let’s build and provide the workflow utility:

>>> wf = workflow.Workflow([init_transition,
...                         publish_transition, close_transition,
...                         copy_transition, copy_closed_transition,
...                         archive_transition])

>>> component.provideUtility(wf, interfaces.IWorkflow)

Let’s get the workflow_versions utility which we can use to track versions and come up with a new unique id:

>>> workflow_versions = component.getUtility(interfaces.IWorkflowVersions)

And let’s start with a document:

>>> document = Document('bar')
>>> info = interfaces.IWorkflowInfo(document)
>>> info.fireTransition('init')

We need the document id to compare later when we create a new version:

>>> state = interfaces.IWorkflowState(document)
>>> document_id = state.getId()

Let’s add it to the workflow versions container so we can find it. Note that we’re using a private API here; this could be implemented as adding it to a folder or any other way, as long as getVersions() works later:

>>> workflow_versions.addVersion(document) # private API

Also clear out previously recorded events:

>>> del events[:]

We can publish it:

>>> info.getManualTransitionIds()
['publish']

So let’s do that:

>>> info.fireTransition('publish')
>>> state.getState()
'published'

The last event should be the ‘publish’ transition:

>>> events[-1].transition.transition_id
'publish'

And now we can either close or create a new copy of it. Note that the names are sorted using the order of the transitions:

>>> info.getManualTransitionIds()
['close', 'copy']

Let’s close it:

>>> info.fireTransition('close')
>>> state.getState()
'closed'

We’re going to create a new copy for editing now:

>>> info.getManualTransitionIds()
['copy_closed', 'archive']
>>> document2 = info.fireTransition('copy_closed')
>>> workflow_versions.addVersion(document2) # private API to track it
>>> document2.title
'copy of bar'
>>> state = interfaces.IWorkflowState(document2)
>>> state.getState()
'unpublished'
>>> state.getId() == document_id
True

The original version is still there in its original state:

>>> interfaces.IWorkflowState(document).getState()
'closed'

Let’s also check the last event in some detail:

>>> event = events[-1]
>>> event.transition.transition_id
'copy_closed'
>>> event.old_object == document
True
>>> event.object == document2
True

Now we are going to publish the new version:

>>> info = interfaces.IWorkflowInfo(document2)
>>> info.getManualTransitionIds()
['publish']
>>> info.fireTransition('publish')
>>> interfaces.IWorkflowState(document2).getState()
'published'

The original is still closed:

>>> interfaces.IWorkflowState(document).getState()
'closed'

Now let’s publish another copy after this:

>>> document3 = info.fireTransition('copy')
>>> workflow_versions.addVersion(document3)
>>> interfaces.IWorkflowInfo(document3).fireTransition('publish')

This copy is now published:

>>> interfaces.IWorkflowState(document3).getState()
'published'

And the previously published version is now closed:

>>> interfaces.IWorkflowState(document2).getState()
'closed'

Note that due to the condition, it’s not possible to copy from the closed version, as there is a published version still remaining:

>>> interfaces.IWorkflowInfo(document2).getManualTransitionIds()
['archive']

Meanwhile, the original version, previously closed, is now archived:

>>> interfaces.IWorkflowState(document).getState()
'archived'

Automatic transitions

Now let’s try a workflow transition that is automatic and time-based. We’ll set up a very simple workflow between ‘unpublished’ and ‘published’, and have the ‘published’ transition be time-based.

To simulate time, we have moments:

>>> time_moment = 0

We will only publish if time_moment is greater than 3:

>>> def TimeCondition(wf, context):
...    return time_moment > 3

Set up the transition using this condition; note that this one is automatic, i.e. it doesn’t have to be triggered by humans:

>>> publish_transition = workflow.Transition(
...    transition_id='publish',
...    title='Publish',
...    source=UNPUBLISHED,
...    destination=PUBLISHED,
...    condition=TimeCondition,
...    action=NullAction,
...    trigger=interfaces.AUTOMATIC)

Set up the workflow using this transition, and reusing the init transition we defined before:

>>> wf = workflow.Workflow([init_transition, publish_transition])
>>> component.provideUtility(wf, interfaces.IWorkflow)

Clear out all versions; this is an private API we just use for demonstration purposes:

>>> workflow_versions.clear()

Now create a document:

>>> document = Document('bar')
>>> info = interfaces.IWorkflowInfo(document)
>>> info.fireTransition('init')

Private again; do this with the catalog or any way you prefer in your own code:

>>> workflow_versions.addVersion(document)

Since this transition is automatic, we should see it like this:

>>> interfaces.IWorkflowInfo(document).getAutomaticTransitionIds()
['publish']

Now fire let’s any automatic transitions:

>>> workflow_versions.fireAutomatic()

Nothing should have happened as we are still at time moment 0:

>>> state = interfaces.IWorkflowState(document)
>>> state.getState()
'unpublished'

We change the time moment past 3:

>>> time_moment = 4

Now fire any automatic transitions again:

>>> workflow_versions.fireAutomatic()

The transition has fired, so the state will be ‘published’:

>>> state.getState()
'published'

System transitions

Let’s try system transitions now. This transition shouldn’t show up as manual nor as automatic:

>>> publish_transition = workflow.Transition(
...    transition_id='publish',
...    title='Publish',
...    source=UNPUBLISHED,
...    destination=PUBLISHED,
...    trigger=interfaces.SYSTEM)

Set up the workflow using this transition, and reusing the init transition we defined before:

>>> wf = workflow.Workflow([init_transition, publish_transition])
>>> component.provideUtility(wf, interfaces.IWorkflow)

Clear out all versions; this is an private API we just use for demonstration purposes:

>>> workflow_versions.clear()

Now create a document:

>>> document = Document('bar')
>>> info = interfaces.IWorkflowInfo(document)
>>> info.fireTransition('init')

Private again; do this with the catalog or any way you prefer in your own code:

>>> workflow_versions.addVersion(document)

We should see it as a system transition:

>>> info.getSystemTransitionIds()
['publish']

but not as automatic nor manual:

>>> info.getAutomaticTransitionIds()
[]
>>> info.getManualTransitionIds()
[]

This transition can be fired:

>>> info.fireTransition('publish')
>>> interfaces.IWorkflowState(document).getState()
'published'

Multiple transitions

It’s possible to have multiple transitions from the source state to the target state, for instance an automatic and a manual one.

Let’s set up a workflow with two manual transitions and a single automatic transitions between two states:

>>> publish_1_transition = workflow.Transition(
...    transition_id='publish 1',
...    title='Publish 1',
...    source=UNPUBLISHED,
...    destination=PUBLISHED,
...    condition=NullCondition,
...    action=NullAction,
...    trigger=interfaces.MANUAL)

>>> publish_2_transition = workflow.Transition(
...    transition_id='publish 2',
...    title='Publish 2',
...    source=UNPUBLISHED,
...    destination=PUBLISHED,
...    condition=NullCondition,
...    action=NullAction,
...    trigger=interfaces.MANUAL)

>>> publish_auto_transition = workflow.Transition(
...    transition_id='publish auto',
...    title='Publish Auto',
...    source=UNPUBLISHED,
...    destination=PUBLISHED,
...    condition=TimeCondition,
...    action=NullAction,
...    trigger=interfaces.AUTOMATIC)

Clear out all versions; this is an private API we just use for demonstration purposes:

>>> workflow_versions.clear()

Since we’re using the time condition again, let’s make sure time is at 0 again so that the publish_auto_transition doesn’t fire:

>>> time_moment = 0

Now set up the workflow using these transitions, plus our init_transition:

>>> wf = workflow.Workflow([init_transition,
...     publish_1_transition, publish_2_transition,
...     publish_auto_transition])
>>> component.provideUtility(wf, interfaces.IWorkflow)

Now create a document:

>>> document = Document('bar')
>>> info = interfaces.IWorkflowInfo(document)
>>> info.fireTransition('init')

We should have two manual transitions:

>>> sorted(interfaces.IWorkflowInfo(document).getManualTransitionIds())
['publish 1', 'publish 2']

And a single automatic transition:

>>> interfaces.IWorkflowInfo(document).getAutomaticTransitionIds()
['publish auto']

Protecting transitions with permissions

Transitions can be (and should be) protected with a permission, so that not everybody can execute them.

Let’s set up a workflow with a permission that has a permission:

>>> publish_transition = workflow.Transition(
...    transition_id='publish',
...    title='Publish',
...    source=UNPUBLISHED,
...    destination=PUBLISHED,
...    condition=NullCondition,
...    action=NullAction,
...    trigger=interfaces.MANUAL,
...    permission="zope.ManageContent")

Quickly set up the workflow state again for a document:

>>> workflow_versions.clear()
>>> wf = workflow.Workflow([init_transition, publish_transition])
>>> component.provideUtility(wf, interfaces.IWorkflow)
>>> document = Document('bar')
>>> info = interfaces.IWorkflowInfo(document)
>>> info.fireTransition('init')

Let’s set up the security context:

>>> from zope.security.interfaces import Unauthorized
>>> from zope.security.management import newInteraction, endInteraction
>>> class Principal:
...    def __init__(self, id):
...        self.id = id
...        self.groups = []
>>> class Participation:
...   interaction = None
...   def __init__(self, principal):
...      self.principal = principal
>>> endInteraction() # XXX argh, apparently one is already active?
>>> newInteraction(Participation(Principal('bob')))

We shouldn’t see this permission appear in our list of possible transitions, as we do not have access:

>>> info.getManualTransitionIds()
[]

Now let’s try firing the transition. It should fail with Unauthorized:

>>> try:
...     info.fireTransition('publish')
... except Unauthorized:
...     print("Got unauthorized")
Got unauthorized

It’s also not allowed for fireTransitionToward:

>>> info.fireTransitionToward(PUBLISHED)
Traceback (most recent call last):
   ...
hurry.workflow.interfaces.NoTransitionAvailableError: source: "unpublished" destination: "published"

In this case, the transition even’t even available because the user doesn’t have the right permission.

The system user is however allowed to do it:

>>> from zope.security.management import system_user
>>> endInteraction()
>>> newInteraction(Participation(system_user))
>>> info.fireTransition('publish')

And this goes off without a problem.

There is also a special way to make it happen by passing check_security is False to fireTransition:

>>> endInteraction()
>>> newInteraction(Participation(Principal('bob')))
>>> interfaces.IWorkflowState(document).setState(UNPUBLISHED)
>>> info.fireTransition('publish', check_security=False)

This also works with fireTransitionToward:

>>> interfaces.IWorkflowState(document).setState(UNPUBLISHED)
>>> info.fireTransitionToward(PUBLISHED, check_security=False)

Side effects during transitions

Sometimes we would like something to get executed before the WorkflowTransitionEvent is fired, but after a (potential) new version of the object has been created. If an object is edited during the same request as a workflow transition, the editing should take place after a potential new version has been created, otherwise the old, not the new, version will be edited.

If something like a history logger hooks into IWorkflowTransitionEvent however, it would get information about the new copy before the editing took place. To allow an editing to take place between the creation of the new copy and the firing of the event, a side effect function can be passed along when a transition is fired.

The sequence of execution then is:

  • firing of transition itself, creating a new version

  • executing the side effect function on the new version

  • firing the IWorkflowTransitionEvent

Let’s set up a very simple workflow:

>>> foo_transition = workflow.Transition(
...    transition_id='foo',
...    title='Foo',
...    source=UNPUBLISHED,
...    destination=PUBLISHED,
...    condition=NullCondition,
...    action=CopyAction,
...    trigger=interfaces.MANUAL)

Quickly set up the workflow state again for a document:

>>> workflow_versions.clear()
>>> wf = workflow.Workflow([init_transition, foo_transition])
>>> component.provideUtility(wf, interfaces.IWorkflow)
>>> document = Document('bar')
>>> events = []
>>> info = interfaces.IWorkflowInfo(document)
>>> info.fireTransition('init')

Now let’s set up a side effect:

>>> def side_effect(context):
...    context.title = context.title + '!'

Now fire the transition, with a side effect:

>>> new_version = info.fireTransition('foo', side_effect=side_effect)

The title of the new version should now have a ! at the end:

>>> new_version.title[-1] == '!'
True

But the old version doesn’t:

>>> document.title[-1] == '!'
False

The events list we set up before should contain two events:

>>> len(events)
2
>>> events[1].object.title[-1] == '!'
True

Ambiguous transitions

Let’s set up a situation where there are two equivalent transitions from a to b:

>>> transition1 = workflow.Transition(
...     transition_id='a_to_b1',
...     title='A to B',
...     source='a',
...     destination='b',
...     condition=NullCondition,
...     action=NullAction,
...     trigger=interfaces.MANUAL)

>>> transition2 = workflow.Transition(
...     transition_id='a_to_b2',
...     title='A to B',
...     source='a',
...     destination='b',
...     condition=NullCondition,
...     action=NullAction,
...     trigger=interfaces.MANUAL)


>>> wf = workflow.Workflow([transition1, transition2])
>>> from zope import component
>>> component.provideUtility(wf, interfaces.IWorkflow)
>>> info = interfaces.IWorkflowInfo(document)
>>> state = interfaces.IWorkflowState(document)
>>> state.setState('a')

fireTransitionToward is ambiguous as two transitions are possible:

>>> info.fireTransitionToward('b')
Traceback (most recent call last):
   ...
hurry.workflow.interfaces.AmbiguousTransitionError: source: "a" destination: "b"

>>> from hurry.workflow.interfaces import AmbiguousTransitionError
>>> try:
...   info.fireTransitionToward('b')
... except AmbiguousTransitionError as e_:
...   e = e_
>>> e.source
'a'
>>> e.destination
'b'

Download

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

hurry.workflow-3.0.2.tar.gz (34.0 kB view details)

Uploaded Source

File details

Details for the file hurry.workflow-3.0.2.tar.gz.

File metadata

File hashes

Hashes for hurry.workflow-3.0.2.tar.gz
Algorithm Hash digest
SHA256 3fcaee702a133403a4337c39ed6938e37a6130a45c5a8c52b2bcec58b4bc9e6f
MD5 4556874091eecdbd846fa16e549f19d3
BLAKE2b-256 4c1ab1ac73ae32200d7dcd964a95b1722d51f60d7473f858c0bd4d83b4451d87

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page