Skip to main content

whenact decision pipeline

Project description

WhenAct

WhenAct is a module that defines a decision pipeline.

The executing flow looks like:

policy0: [when0] > [action0]
policy1: [when1] > [action10 > action11]
policy2: [when20 > when21] > [action2]

When all when in one policy set is satisfied, than it runs to it's following action. No matter the when in previous policy is satisfied or not, the when in next policy will be checked and executed. When all policies have been checked and executed, this pipeline then finishes.

For example:

policy0: [when0=True] > [action0]
policy1: [when1=False] > X
policy2: [when20=True > when21=False] > X
policy0: [when0=True] > X 
policy1: [when1=True] > [action10 > action11]
policy2: [when20=False > X] > X

Install

pip install whenact

Create WhenAct pipeline

Before create a pipeline, you need to define when and act function first, then put them in the pipeline.

import whenact


@whenact.when
def w1(ctx):
    return True

@whenact.act
def a1(ctx):
    return "done"

pipeline = whenact.create_pipeline(config=[
    [w1, a1]
])

print(pipeline)
# p0: [w1] > [a1]

result = pipeline.run()
assert result == "done"

More complex pipeline can be like this:

import whenact

@whenact.when
def w1(ctx):
    return False

@whenact.when
def w2(ctx):
    return True

@whenact.act
def a1(ctx):
    ctx["action"] = "a1 action"

@whenact.act
def a2(ctx):
    ctx["action"] = "a2 action"

@whenact.act
def a3(ctx):
    ctx["action"] += " with a3"


pipeline = whenact.create_pipeline(config=[
    [w1, a1],
    [w2, a2, a3]
])

print(pipeline)
# p0: [w1] > [a1]
# p1: [w2] > [a2 > a3]

class TestContext(whenact.BaseContext):
    pass

ctx = TestContext()

result = pipeline.run(ctx)
assert result is None
assert ctx["action"] == "a2 action with a3"

There is another way to set a pipeline, additionally, to set policy name.

import whenact

@whenact.when
def w1(ctx):
    return True

@whenact.act
def a1(ctx):
    ctx["r1"] = 1
    return "done"

pipeline = whenact.Pipeline(
    [whenact.Policy(when=[w1], action=[a1], name="my_policy1")]
)
print(pipeline)
# p0: [w1] > [a1]

More examples

More examples can be found in tests

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

whenact-0.0.3.tar.gz (4.8 kB view hashes)

Uploaded Source

Built Distribution

whenact-0.0.3-py3-none-any.whl (5.8 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page