Skip to main content

hurry.workflow is a simple workflow system. It can be used toimplement stateful multi-version workflows for Zope Toolkit applications.

Project description

**************
hurry.workflow
**************

A simple but quite nifty workflow system for Zope 3.

======================
hurry.workflow changes
======================

0.13 (2013-01-17)
=================

* ``NoTransitionAvailableError`` gained a ``source`` and ``destination``
attribute indicating what transition wasn't available.

* ``AmbiguousTransitionError`` also gained a ``source`` and ``destination``
attribute indicating what transition was ambiguous.

* ``InvalidTransitionError`` gained a ``source`` attribute indicating
the source state of the attempted invalid transition.

* Newer ``bootstrap.py``

0.12 (2012-02-10)
=================

* Make the info() and state() functions on the WorkflowInfo class into
classmethods as they are not of much use otherwise.

* fireTransitionToward already accepted a check_security=False
argument, but it would not allow a transition that a user didn't
have the permission for to be fired after all, because the
transition wouldn't even be found in the first place. Now it works.

0.11 (2010-04-16)
=================

* Do IAnnotations(self.context) only once in WorkflowState.

* An IWorkflowVersions implementation is now optional.

* Added multiple workflows support.

0.10 (2009-11-19)
=================

* Moved to svn.zope.org for development.

* Added a buildout.cfg, bootstrap.py

* Minimized dependencies. Note that ``Workflow`` does not inherit from
``Persistent`` and ``zope.container.contained.Contained``
anymore. If you need persistent workflow, you need to subclass this
in your own code. This breaks backwards compatibility, as persistent
workflows would need to be re-initialized.

0.9.2.1 (2007-08-15)
====================

Bug fixes
---------

* Oops, the patches in 0.9.2 were not actually applied. Fixed them
now.

0.9.2 (2007-08-15)
==================

Bug fixes
---------

* zope.security changes broke imports in hurry.workflow.

* localUtility directive is now deprecated, so don't use it anymore.

0.9.1 (2006-09-22)
==================

Feature changes
---------------

* first cheesehop release.

0.9 (2006-06-15)
================

Feature changes
---------------

* separate out from hurry package into hurry.workflow

* eggification work

* Zope 3.3 compatibility work

0.8 (2006-05-01)
================

Feature changes
---------------

Initial public release.

Detailed Documentation
**********************

Hurry Workflow
==============

The hurry workflow system is a "roll my own because I'm in a hurry"
framework.

Basic workflow
--------------

Let's first make a content object that can go into a workflow::

>>> from zope.interface import implements, Attribute

>>> from zope.annotation.interfaces import IAttributeAnnotatable
>>> class IDocument(IAttributeAnnotatable):
... title = Attribute('Title')
>>> class Document(object):
... implements(IDocument)
... def __init__(self, title):
... self.title = title

As you can see, such a content object must provide IAnnotatable, as
this is used to store the workflow state. The system uses the
IWorkflowState adapter to get and set an object's workflow state::

>>> from hurry.workflow import interfaces
>>> document = Document('Foo')
>>> state = interfaces.IWorkflowState(document)
>>> print state.getState()
None

The state can be set directly for an object using the IWorkflowState
adapter as well::

>>> state.setState('foo')
>>> state.getState()
'foo'

But let's set it back to None again, so we can start again in a
pristine state for this document::

>>> state.setState(None)

It's not recommended use setState() do this ourselves, though: usually
we'll let the workflow system take care of state transitions and the
setting of the initial state.

Now let's define a simple workflow transition from 'a' to 'b'. It
needs a condition which must return True before the transition is
allowed to occur::

>>> def NullCondition(wf, context):
... return True

and an action that takes place when the transition is taken::

>>> def NullAction(wf, context):
... pass

Now let's construct a transition::

>>> from hurry.workflow import workflow
>>> transition = workflow.Transition(
... transition_id='a_to_b',
... title='A to B',
... source='a',
... destination='b',
... condition=NullCondition,
... action=NullAction,
... trigger=interfaces.MANUAL)

The transition trigger is either MANUAL, AUTOMATIC or SYSTEM. MANUAL
indicates user action is needed to fire the transition. AUTOMATIC
transitions fire automatically. SYSTEM is a workflow transition
directly fired by the system, and not directly by the user.

We also will introduce an initial transition, that moves an object
into the workflow (for instance just after it is created)::

>>> init_transition = workflow.Transition(
... transition_id='to_a',
... title='Create A',
... source=None,
... destination='a')

And a final transition, when the object moves out of the workflow again
(for instance just before it is deleted)::

>>> final_transition = workflow.Transition(
... transition_id='finalize',
... title='Delete',
... source='b',
... destination=None)

Now let's put the transitions in an workflow utility::

>>> wf = workflow.Workflow([transition, init_transition, final_transition])
>>> from zope import component
>>> component.provideUtility(wf, interfaces.IWorkflow)

We can get the transition from the workflow using ``get_transition``
should we need it::

>>> wf.getTransition('a', 'a_to_b') is transition
True

If we try to get a transition that doesn't exist, we get an error::

>>> from hurry.workflow.interfaces import InvalidTransitionError
>>> try:
... wf.getTransition('b', 'a_to_b')
... except InvalidTransitionError, e:
... pass
>>> e.source
'b'

Workflow transitions cause events to be fired; we will put in a simple
handler so we can check whether things were successfully fired::

>>> events = []
>>> def transition_handler(event):
... events.append(event)
>>> component.provideHandler(
... transition_handler,
... [interfaces.IWorkflowTransitionEvent])

To get what transitions to other states are possible from an object,
as well as to fire transitions and set initial state, we use the
IWorkflowInfo adapter::

>>> info = interfaces.IWorkflowInfo(document)

We'll initialize the workflow by firing the 'to_a' transition::

>>> info.fireTransition('to_a')

This should've fired an event::

>>> events[-1].transition.transition_id
'to_a'
>>> events[-1].source is None
True
>>> events[-1].destination
'a'

There's only a single transition defined to workflow state 'b'::

>>> info.getManualTransitionIds()
['a_to_b']

We can also get this by asking which manual (or system) transition
exists that brings us to the desired workflow state::

>>> info.getFireableTransitionIdsToward('b')
['a_to_b']

Since this is a manually triggered transition, we can fire this
transition::

>>> info.fireTransition('a_to_b')

The workflow state should now be 'b'::

>>> state.getState()
'b'

We check that the event indeed got fired::

>>> events[-1].transition.transition_id
'a_to_b'
>>> events[-1].source
'a'
>>> events[-1].destination
'b'

We will also try fireTransitionToward here, so we sneak back the
workflow to state 'a' again and try that::

>>> state.setState('a')

Try going through a transition we cannot reach first::

>>> info.fireTransitionToward('c')
Traceback (most recent call last):
...
NoTransitionAvailableError

This error has some information available of what transition was attempted::

>>> from hurry.workflow.interfaces import NoTransitionAvailableError
>>> try:
... info.fireTransitionToward('c')
... except NoTransitionAvailableError, e:
... pass
>>> e.source
'a'
>>> e.destination
'c'

Now go to 'b' again::

>>> info.fireTransitionToward('b')
>>> state.getState()
'b'

Finally, before forgetting about our document, we finalize the workflow::

>>> info.fireTransition('finalize')
>>> state.getState() is None
True

And we have another event that was fired::

>>> events[-1].transition.transition_id
'finalize'
>>> events[-1].source
'b'
>>> events[-1].destination is None
True


Multiple workflows
------------------

We have previously registered a workflow as a unnamed utility.
You can also register a workflow as a named utility to provide
several workflows for a site.

Let's create a, invoice document::

>>> class IInvoiceDocument(IDocument):
... title = Attribute('Title')

>>> class InvoiceDocument(object):
... implements(IInvoiceDocument)
... def __init__(self, title, amount):
... self.title = title
... self.amount = amount

We define our workflow::

>>> invoice_init = workflow.Transition(
... transition_id='init_invoice',
... title='Invoice Received',
... source=None,
... destination='received')
>>>
>>> invoice_paid = workflow.Transition(
... transition_id='invoice_paid',
... title='Invoice Paid',
... source='received',
... destination='paid')

>>> invoice_wf = workflow.Workflow( [ invoice_init, invoice_paid ] )

We register a new workflow utility, WorkflowState and WorkflowInfo adapters, all
named "invoice"::

>>> from hurry.workflow import workflow
>>> from zope.annotation import interfaces as annotation_interfaces
>>> component.provideUtility(invoice_wf, interfaces.IWorkflow, name="invoice")
>>> class InvoiceWorkflowInfo(workflow.WorkflowInfo):
... name="invoice"
>>> component.provideAdapter(
... InvoiceWorkflowInfo,
... (annotation_interfaces.IAnnotatable,),
... interfaces.IWorkflowInfo,
... name="invoice")
>>> class InvoiceWorkflowState(workflow.WorkflowState):
... state_key = "invoice.state"
... id_key = "invoice.id"
>>> component.provideAdapter(
... InvoiceWorkflowState,
... (annotation_interfaces.IAnnotatable,),
... interfaces.IWorkflowState,
... name="invoice")

Now we can utilize the workflow::

>>> invoice = InvoiceDocument('abc', 22)

>>> info = component.getAdapter(invoice, interfaces.IWorkflowInfo, name="invoice")
>>> info.fireTransition('init_invoice')
>>> state = component.getAdapter(invoice, interfaces.IWorkflowState, name="invoice")
>>> state.getState()
'received'
>>> info.fireTransition('invoice_paid')
>>> state.getState()
'paid'

To make it easier to get the state and info adapters for a particular context
object, there are two convenience functions on the workflow info object. The
info object "knows" what workflow utility to look for, as they are associated
by name::

>>> info_ = InvoiceWorkflowInfo.info(invoice)
>>> interfaces.IWorkflowInfo.providedBy(info_)
True

>>> state_ = InvoiceWorkflowInfo.state(invoice)
>>> interfaces.IWorkflowState.providedBy(state_)
True
>>> state.getState() is state_.getState()
True

Of course, this document always have the default unnamed workflow::

>>> info = interfaces.IWorkflowInfo(invoice)
>>> info.fireTransition('to_a')
>>> state = interfaces.IWorkflowState(invoice)
>>> state.getState()
'a'

Multi-version workflow
----------------------

Now let's go for a more complicated scenario where have multiple
versions of a document. At any one time a document can have an
UNPUBLISHED version and a PUBLISHED version. There can also be a
CLOSED version and any number of ARCHIVED versions::

>>> UNPUBLISHED = 'unpublished'
>>> PUBLISHED = 'published'
>>> CLOSED = 'closed'
>>> ARCHIVED = 'archived'

Let's start with a simple initial transition::

>>> init_transition = workflow.Transition(
... transition_id='init',
... title='Initialize',
... source=None,
... destination=UNPUBLISHED)

When the unpublished version is published, any previously published
version is made to be the CLOSED version. To accomplish this secondary
state transition, we'll use the system's built-in versioning ability
with the 'fireTransitionsForVersions' method, which can be used to
fire transitions of other versions of the document::

>>> def PublishAction(wf, context):
... wf.fireTransitionForVersions(PUBLISHED, 'close')

Now let's build the transition::

>>> publish_transition = workflow.Transition(
... transition_id='publish',
... title='Publish',
... source=UNPUBLISHED,
... destination=PUBLISHED,
... condition=NullCondition,
... action=PublishAction,
... trigger=interfaces.MANUAL,
... order=1)

Next, we'll define a transition from PUBLISHED to CLOSED, which means
we want to archive whatever was closed before::

>>> def CloseAction(wf, context):
... wf.fireTransitionForVersions(CLOSED, 'archive')
>>> close_transition = workflow.Transition(
... transition_id='close',
... title='Close',
... source=PUBLISHED,
... destination=CLOSED,
... condition=NullCondition,
... action=CloseAction,
... trigger=interfaces.MANUAL,
... order=2)

Note that CloseAction will also be executed automatically whenever
state is transitioned from PUBLISHED to CLOSED using
fireTransitionsForVersions. This means that publishing a document
results in the previously closed document being archived.

If there is a PUBLISHED but no UNPUBLISHED version, we can make a new
copy of the PUBLISHED version and make that the UNPUBLISHED version::

>>> def CanCopyCondition(wf, context):
... return not wf.hasVersion(UNPUBLISHED)

Since we are actually creating a new content object, the action should
return the newly created object with the new state::

>>> def CopyAction(wf, context):
... return Document('copy of %s' % context.title)

>>> copy_transition = workflow.Transition(
... transition_id='copy',
... title='Copy',
... source=PUBLISHED,
... destination=UNPUBLISHED,
... condition=CanCopyCondition,
... action=CopyAction,
... trigger=interfaces.MANUAL,
... order=3)

A very similar transition applies to the closed version. If we have
no UNPUBLISHED version and no PUBLISHED version, we can make a new copy
from the CLOSED version::

>>> def CanCopyCondition(wf, context):
... return (not wf.hasVersion(UNPUBLISHED) and
... not wf.hasVersion(PUBLISHED))

>>> copy_closed_transition = workflow.Transition(
... transition_id='copy_closed',
... title='Copy',
... source=CLOSED,
... destination=UNPUBLISHED,
... condition=CanCopyCondition,
... action=CopyAction,
... trigger=interfaces.MANUAL,
... order=4)

Finally let's build the archiving transition::

>>> archive_transition = workflow.Transition(
... transition_id='archive',
... title='Archive',
... source=CLOSED,
... destination=ARCHIVED,
... condition=NullCondition,
... action=NullAction,
... trigger=interfaces.MANUAL,
... order=5)

Now let's build and provide the workflow utility::

>>> wf = workflow.Workflow([init_transition,
... publish_transition, close_transition,
... copy_transition, copy_closed_transition,
... archive_transition])

>>> component.provideUtility(wf, interfaces.IWorkflow)

Let's get the workflow_versions utility which we can use to track
versions and come up with a new unique id::

>>> workflow_versions = component.getUtility(interfaces.IWorkflowVersions)

And let's start with a document::

>>> document = Document('bar')
>>> info = interfaces.IWorkflowInfo(document)
>>> info.fireTransition('init')

We need the document id to compare later when we create a new version::

>>> state = interfaces.IWorkflowState(document)
>>> document_id = state.getId()

Let's add it to the workflow versions container so we can find it. Note
that we're using a private API here; this could be implemented as adding
it to a folder or any other way, as long as getVersions() works later::

>>> workflow_versions.addVersion(document) # private API

Also clear out previously recorded events::

>>> del events[:]

We can publish it::

>>> info.getManualTransitionIds()
['publish']

So let's do that::

>>> info.fireTransition('publish')
>>> state.getState()
'published'

The last event should be the 'publish' transition::

>>> events[-1].transition.transition_id
'publish'

And now we can either close or create a new copy of it. Note that the
names are sorted using the order of the transitions::

>>> info.getManualTransitionIds()
['close', 'copy']

Let's close it::

>>> info.fireTransition('close')
>>> state.getState()
'closed'

We're going to create a new copy for editing now::

>>> info.getManualTransitionIds()
['copy_closed', 'archive']
>>> document2 = info.fireTransition('copy_closed')
>>> workflow_versions.addVersion(document2) # private API to track it
>>> document2.title
'copy of bar'
>>> state = interfaces.IWorkflowState(document2)
>>> state.getState()
'unpublished'
>>> state.getId() == document_id
True

The original version is still there in its original state::

>>> interfaces.IWorkflowState(document).getState()
'closed'

Let's also check the last event in some detail::

>>> event = events[-1]
>>> event.transition.transition_id
'copy_closed'
>>> event.old_object == document
True
>>> event.object == document2
True

Now we are going to publish the new version::

>>> info = interfaces.IWorkflowInfo(document2)
>>> info.getManualTransitionIds()
['publish']
>>> info.fireTransition('publish')
>>> interfaces.IWorkflowState(document2).getState()
'published'

The original is still closed::

>>> interfaces.IWorkflowState(document).getState()
'closed'

Now let's publish another copy after this::

>>> document3 = info.fireTransition('copy')
>>> workflow_versions.addVersion(document3)
>>> interfaces.IWorkflowInfo(document3).fireTransition('publish')

This copy is now published::

>>> interfaces.IWorkflowState(document3).getState()
'published'

And the previously published version is now closed::

>>> interfaces.IWorkflowState(document2).getState()
'closed'

Note that due to the condition, it's not possible to copy from the
closed version, as there is a published version still remaining::

>>> interfaces.IWorkflowInfo(document2).getManualTransitionIds()
['archive']

Meanwhile, the original version, previously closed, is now archived::

>>> interfaces.IWorkflowState(document).getState()
'archived'

Automatic transitions
---------------------

Now let's try a workflow transition that is automatic and time-based.
We'll set up a very simple workflow between 'unpublished' and
'published', and have the 'published' transition be time-based.

To simulate time, we have moments::

>>> time_moment = 0

We will only publish if time_moment is greater than 3::

>>> def TimeCondition(wf, context):
... return time_moment > 3

Set up the transition using this condition; note that this one is
automatic, i.e. it doesn't have to be triggered by humans::

>>> publish_transition = workflow.Transition(
... transition_id='publish',
... title='Publish',
... source=UNPUBLISHED,
... destination=PUBLISHED,
... condition=TimeCondition,
... action=NullAction,
... trigger=interfaces.AUTOMATIC)

Set up the workflow using this transition, and reusing the
init transition we defined before::

>>> wf = workflow.Workflow([init_transition, publish_transition])
>>> component.provideUtility(wf, interfaces.IWorkflow)

Clear out all versions; this is an private API we just use for
demonstration purposes::

>>> workflow_versions.clear()

Now create a document::

>>> document = Document('bar')
>>> info = interfaces.IWorkflowInfo(document)
>>> info.fireTransition('init')

Private again; do this with the catalog or any way you prefer in your
own code::

>>> workflow_versions.addVersion(document)

Since this transition is automatic, we should see it like this::

>>> interfaces.IWorkflowInfo(document).getAutomaticTransitionIds()
['publish']

Now fire let's any automatic transitions::

>>> workflow_versions.fireAutomatic()

Nothing should have happened as we are still at time moment 0::

>>> state = interfaces.IWorkflowState(document)
>>> state.getState()
'unpublished'

We change the time moment past 3::

>>> time_moment = 4

Now fire any automatic transitions again::

>>> workflow_versions.fireAutomatic()

The transition has fired, so the state will be 'published'::

>>> state.getState()
'published'

System transitions
------------------

Let's try system transitions now. This transition shouldn't show up
as manual nor as automatic::

>>> publish_transition = workflow.Transition(
... transition_id='publish',
... title='Publish',
... source=UNPUBLISHED,
... destination=PUBLISHED,
... trigger=interfaces.SYSTEM)

Set up the workflow using this transition, and reusing the
init transition we defined before::

>>> wf = workflow.Workflow([init_transition, publish_transition])
>>> component.provideUtility(wf, interfaces.IWorkflow)

Clear out all versions; this is an private API we just use for
demonstration purposes::

>>> workflow_versions.clear()

Now create a document::

>>> document = Document('bar')
>>> info = interfaces.IWorkflowInfo(document)
>>> info.fireTransition('init')

Private again; do this with the catalog or any way you prefer in your
own code::

>>> workflow_versions.addVersion(document)

We should see it as a system transition::

>>> info.getSystemTransitionIds()
['publish']

but not as automatic nor manual::

>>> info.getAutomaticTransitionIds()
[]
>>> info.getManualTransitionIds()
[]

This transition can be fired::

>>> info.fireTransition('publish')
>>> interfaces.IWorkflowState(document).getState()
'published'

Multiple transitions
--------------------

It's possible to have multiple transitions from the source state to
the target state, for instance an automatic and a manual one.

Let's set up a workflow with two manual transitions and a single
automatic transitions between two states::

>>> publish_1_transition = workflow.Transition(
... transition_id='publish 1',
... title='Publish 1',
... source=UNPUBLISHED,
... destination=PUBLISHED,
... condition=NullCondition,
... action=NullAction,
... trigger=interfaces.MANUAL)

>>> publish_2_transition = workflow.Transition(
... transition_id='publish 2',
... title='Publish 2',
... source=UNPUBLISHED,
... destination=PUBLISHED,
... condition=NullCondition,
... action=NullAction,
... trigger=interfaces.MANUAL)

>>> publish_auto_transition = workflow.Transition(
... transition_id='publish auto',
... title='Publish Auto',
... source=UNPUBLISHED,
... destination=PUBLISHED,
... condition=TimeCondition,
... action=NullAction,
... trigger=interfaces.AUTOMATIC)

Clear out all versions; this is an private API we just use for
demonstration purposes::

>>> workflow_versions.clear()

Since we're using the time condition again, let's make sure
time is at 0 again so that the publish_auto_transition doesn't fire::

>>> time_moment = 0

Now set up the workflow using these transitions, plus our
init_transition::

>>> wf = workflow.Workflow([init_transition,
... publish_1_transition, publish_2_transition,
... publish_auto_transition])
>>> component.provideUtility(wf, interfaces.IWorkflow)

Now create a document::

>>> document = Document('bar')
>>> info = interfaces.IWorkflowInfo(document)
>>> info.fireTransition('init')

We should have two manual transitions::

>>> sorted(interfaces.IWorkflowInfo(document).getManualTransitionIds())
['publish 1', 'publish 2']

And a single automatic transition::

>>> interfaces.IWorkflowInfo(document).getAutomaticTransitionIds()
['publish auto']

Protecting transitions with permissions
---------------------------------------

Transitions can be (and should be) protected with a permission, so
that not everybody can execute them.

Let's set up a workflow with a permission that has a permission::

>>> publish_transition = workflow.Transition(
... transition_id='publish',
... title='Publish',
... source=UNPUBLISHED,
... destination=PUBLISHED,
... condition=NullCondition,
... action=NullAction,
... trigger=interfaces.MANUAL,
... permission="zope.ManageContent")

Quickly set up the workflow state again for a document::

>>> workflow_versions.clear()
>>> wf = workflow.Workflow([init_transition, publish_transition])
>>> component.provideUtility(wf, interfaces.IWorkflow)
>>> document = Document('bar')
>>> info = interfaces.IWorkflowInfo(document)
>>> info.fireTransition('init')

Let's set up the security context::

>>> from zope.security.interfaces import Unauthorized
>>> from zope.security.management import newInteraction, endInteraction
>>> class Principal:
... def __init__(self, id):
... self.id = id
... self.groups = []
>>> class Participation:
... interaction = None
... def __init__(self, principal):
... self.principal = principal
>>> endInteraction() # XXX argh, apparently one is already active?
>>> newInteraction(Participation(Principal('bob')))

We shouldn't see this permission appear in our list of possible transitions,
as we do not have access::

>>> info.getManualTransitionIds()
[]

Now let's try firing the transition. It should fail with Unauthorized::

>>> try:
... info.fireTransition('publish')
... except Unauthorized:
... print "Got unauthorized"
Got unauthorized

It's also not allowed for ``fireTransitionToward``::

>>> info.fireTransitionToward(PUBLISHED)
Traceback (most recent call last):
...
NoTransitionAvailableError

In this case, the transition even't even available because the user
doesn't have the right permission.

The system user is however allowed to do it::

>>> from zope.security.management import system_user
>>> endInteraction()
>>> newInteraction(Participation(system_user))
>>> info.fireTransition('publish')

And this goes off without a problem.

There is also a special way to make it happen by passing check_security is
False to fireTransition::

>>> endInteraction()
>>> newInteraction(Participation(Principal('bob')))
>>> interfaces.IWorkflowState(document).setState(UNPUBLISHED)
>>> info.fireTransition('publish', check_security=False)

This also works with fireTransitionToward::

>>> interfaces.IWorkflowState(document).setState(UNPUBLISHED)
>>> info.fireTransitionToward(PUBLISHED, check_security=False)


Side effects during transitions
-------------------------------

Sometimes we would like something to get executed *before* the
WorkflowTransitionEvent is fired, but after a (potential) new version
of the object has been created. If an object is edited during the
same request as a workflow transition, the editing should take place
after a potential new version has been created, otherwise the old, not
the new, version will be edited.

If something like a history logger hooks into IWorkflowTransitionEvent
however, it would get information about the new copy *before* the
editing took place. To allow an editing to take place between the
creation of the new copy and the firing of the event, a side effect
function can be passed along when a transition is fired.

The sequence of execution then is:

* firing of transition itself, creating a new version

* executing the side effect function on the new version

* firing the IWorkflowTransitionEvent

Let's set up a very simple workflow:

>>> foo_transition = workflow.Transition(
... transition_id='foo',
... title='Foo',
... source=UNPUBLISHED,
... destination=PUBLISHED,
... condition=NullCondition,
... action=CopyAction,
... trigger=interfaces.MANUAL)

Quickly set up the workflow state again for a document::

>>> workflow_versions.clear()
>>> wf = workflow.Workflow([init_transition, foo_transition])
>>> component.provideUtility(wf, interfaces.IWorkflow)
>>> document = Document('bar')
>>> events = []
>>> info = interfaces.IWorkflowInfo(document)
>>> info.fireTransition('init')

Now let's set up a side effect::

>>> def side_effect(context):
... context.title = context.title + '!'

Now fire the transition, with a side effect::

>>> new_version = info.fireTransition('foo', side_effect=side_effect)

The title of the new version should now have a ! at the end::

>>> new_version.title[-1] == '!'
True

But the old version doesn't::

>>> document.title[-1] == '!'
False

The events list we set up before should contain two events::

>>> len(events)
2
>>> events[1].object.title[-1] == '!'
True

Ambiguous transitions
=====================

Let's set up a situation where there are two equivalent transitions from
``a`` to ``b``::

>>> transition1 = workflow.Transition(
... transition_id='a_to_b1',
... title='A to B',
... source='a',
... destination='b',
... condition=NullCondition,
... action=NullAction,
... trigger=interfaces.MANUAL)

>>> transition2 = workflow.Transition(
... transition_id='a_to_b2',
... title='A to B',
... source='a',
... destination='b',
... condition=NullCondition,
... action=NullAction,
... trigger=interfaces.MANUAL)


>>> wf = workflow.Workflow([transition1, transition2])
>>> from zope import component
>>> component.provideUtility(wf, interfaces.IWorkflow)
>>> info = interfaces.IWorkflowInfo(document)
>>> state = interfaces.IWorkflowState(document)
>>> state.setState('a')

``fireTransitionToward`` is ambiguous as two transitions are possible::

>>> from hurry.workflow.interfaces import AmbiguousTransitionError
>>> try:
... info.fireTransitionToward('b')
... except AmbiguousTransitionError, e:
... pass
>>> e.source
'a'
>>> e.destination
'b'


Download
********

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

hurry.workflow-0.13.tar.gz (32.6 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page