Skip to main content

A tool for having a sequential execution of plugins

Project description

Introduction

Sometime in the development or implementation we encounter things to be processed by different specialization. For example, we receive a CSV file that is intended from importing to a table. We found out that the CSV file is not formatted to what the table can consume. Thus, we need to do two thing:

  1. Transform or create a CSV file to meet the table requirement based on the CSV file at hand.

  2. Wrap the contents of the transformed and newly created CSV to create the SQL statements that can be readily run against a database.

As we can see above, these then can be created by two different individuals one that is specialized in manipulating CSV file and knowledge in generating SQL scripts. Addition, these two thing must be run in sequence and not on the other way around.

This is the idea around the conveyor that the work is done in sequence and can use some tools.

Installation

The installation is as easy as downloading the conveyor-<version>-py3-none-any.whl package and run the following command in the download folder in the command line:

Command 1. Using pip module for installation If you have the wheel package

python -m pip install mg_conveyor-<version>-py3-none-any.whl

Or simply

python -m pip install mg_conveyor

Using the conveyor

Conveyor is not intended be used as a standalone command but a least a plugin must be made available to it to process.

The ISequencialPlugin Interface

As its name suggests it is executed based on sequence. This plugin needs the following methods to be implemented:

Methods to be implemented

def sequence(self)
def process(self, identity, context, *args, **kwargs)

The sequence method must return an integer that will dictate sequence on when it must be executed.

The process method is the entrypoint for each plugin and this is where our plugin logic must reside. This method has two important parameters namely identity and context. Currently the identity is the name of the plugin (i.e. defined in the yapsy configuration file that is particularly the Name attribute) and the context contains some information about the plugin. Also this method must return one of the predefined status.

ISequencialPlugin Return Status

Status

Description

STATUS_SUCCESS

Finished successfully.

STATUS_FAILED

Error encountered also abort the execution of conveyor

STATUS_ABORT

Abort the execution of conveyor.

STATUS_RETRY

The plugin is requesting an interactive retry.

STATUS_CONFIRM_RETRY

The user responded Y to the interactive retry.

STATUS_CONFIRM_ABORT

The user responded N to the interactive retry.

STATUS_SKIPPED

Skipped plugin

STATUS_STARTED

Plugin is currently started.

Example 1. Simple Plugin Implementation

import mg_conveyor as conveyor
class PluginOne(conveyor.ISequencialPlugin):
    def sequence(self):
        return 100
    def process(self, identity, context, *args, **kwargs):
        print("Hello Worlds.")
        return conveyor.STATUS_SUCCESS

Using the context Parameter

The context parameter contains some information about the plugin like its status (i.e. CONTEXT_KEY_STATUS) and its instance (i.e. CONTEXT_KEY_PLUGIN_OBJECT). This parameter is just a simple dictionary. By default each plugin has its own context inside the context with the key as its identity (i.e. context[identity]). Thus, to get the plugin object of the current plugin we should access it like the following snippet inside the process method.

pluginObj = context[identity][CONTEXT_KEY_PLUGIN_OBJECT]

Pre-defined Context Keys per identity

Status

Description

CONTEXT_KEY_STATUS

Contains the numerical equivalent of the ISequencialPlugin Return Status.

CONTEXT_KEY_PLUGIN_OBJECT

The instance of the plugin associated with the identity.

The context has another purpose that it can hold custom plugin information. To add a new entry to the context it is better to add it under the identity key. This can be done by the snippet below (i.e. inside the process method):

plugin_context = context[identity]
plugin_context['item1']='This is item 1'
plugin_context['item2']='This is item 2'

More over this context is shared by all the plugins loaded by the conveyor. Therefore, we can use the context to pass information between plugins. For example the result of the first plugin can be stored in the context and that item will be processed by the following plugin.

Yapsy Configuration File

Once the plugin is ready we must create a configuration file. Since we are using yapsy the sample configuration file is as follows:

[Core]
Name = Hello World
Module = helloworld

[Documentation]
Author = Ronaldo Webb
Version = 0.1
Website = http://www.ronella.xyz
Description = Test only

See yapsy-plugin for more information about the content of the configuration file.

The IUtilityPlugin Interface

This plugin is the one that can be shared by all ISequencialPlugin implementations. Thus, we can think of the IUtilityPlugin implementation as a tool that the ISequencialPlugin (i.e. activing as a worker) can use to do its job properly. This tool is always available in the context and only has one method to be implemented.

Method to be implemented

def use(self, *args, **kwargs)

The use method must contain the implmentation on what the ISequencialPlugin implementaion demands it to do according to what it’s purpose as a tool. It can return any type therefore the a proper documentation of the return type is necessary.

Built-in Properties

The IUtilityPlugin has built-in properties as we can see in the following table:

Property

Description

identity

Contains the identity of the plugin (i.e. the Name attribute in yapsy configuration file.)

context

The context shared by all the plugins.

localContext

This is equals to context[identity].

Acquiring IUtilityPlugin from context

To acquire an instance of the IUtilityPlugin we must know its identity (i.e. Name attribute in yapsy configuraiton file.). Once we have the instance we can call it’s use method.

Example 2: Acquiring IUtilityPlugin

If have Utility as the identity then in the process method of the ISequencialPlugin implementation we can retrieve its instance like in the snippet below:

def process(self, identity, context, *args, **kwargs):

    utilObj = context['Utility'][conveyor.CONTEXT_KEY_PLUGIN_OBJECT].use()

Starting The Conveyor

Once the plugin is working fine and the configuration file were created, we can now create an implmenetation to start the conveyor. The first thing that we must do is to instantiate the Conveyor class then load the plugins and finally start it. See the sample snippet below:

Example 3: Starting the Conveyor

conv = conveyor.Conveyor()
conv.loadPlugins(["plugins"])
conv.start()

Appendix

Complete Hello World Implementation

plugins/helloworld_plugin.py

import mg_conveyor as conveyor
class PluginOne(conveyor.ISequencialPlugin):
    def sequence(self):
        return 100
    def process(self, identity, context, *args, **kwargs):
        print("Hello World.")
        return conveyor.STATUS_SUCCESS

plugins/helloworld.yapsy-plugin

[Core]
Name = Hello World
Module = helloworld_plugin

[Documentation]
Author = Ronaldo Webb
Version = 0.1
Website = http://www.ronella.xyz
Description = Test only

helloworld.py

import mg_conveyor as conveyor

if __name__ == '__main__':

    conv=conveyor.Conveyor()
    conv.loadPlugins(["plugins"])
    conv.start()

Sample Context Usage

plugins/plugin1.py

import mg_conveyor as conveyor
class Plugin(conveyor.ISequencialPlugin):
    def sequence(self):
        return 100

    def process(self, identity, context, *args, **kwargs):
        plugin_context = context[identity]

        plugin_context['Message'] = "From plugin 1"

        return conveyor.STATUS_SUCCESS

plugins/plugin1.yapsy-plugin

[Core]
Name = Plugin1
Module = plugin1

[Documentation]
Author = Ronaldo Webb
Version = 0.1
Website = http://www.ronella.xyz
Description = Test only

plugins/plugin2.py

import mg_conveyor as conveyor
class Plugin(conveyor.ISequencialPlugin):
    def sequence(self):
        return 200

    def process(self, identity, context, *args, **kwargs):

        #Plugin1 is the Name found in plugin1.yapsy-plugin
        plugin1_context = context['Plugin1']


        print("I'm in plugin 2...")
        print("Message from plugin 1: " + plugin1_context['Message'])
        return conveyor.STATUS_SUCCESS

plugins/plugin2.yapsy-plugin

[Core]
Name = Plugin2
Module = plugin2

[Documentation]
Author = Ronaldo Webb
Version = 0.1
Website = http://www.ronella.xyz
Description = Test only

context_usage.py

import mg_conveyor as conveyor

if __name__ == '__main__':

    conv=conveyor.Conveyor()
    conv.loadPlugins(["plugins"])
    conv.start()

Passing Parameter to the Process Method

plugins/parameter.py

import mg_conveyor as conveyor
class PluginOne(conveyor.ISequencialPlugin):

    def sequence(self):
        return 100

    def process(self, identity, context, *args, **kwargs):
        print("Argument 1: " + args[0])
        print("Arguemnt 2: " + args[1])
        print("Parameter 1: " + kwargs['param1'])
        print("Parameter 2: " + kwargs['param2'])

        return conveyor.STATUS_SUCCESS

plugins/parameter.yapsy-plugin

[Core]
Name = Parameter Passing
Module = parameter

[Documentation]
Author = Ronaldo Webb
Version = 0.1
Website = http://www.ronella.xyz
Description = Test only

process_parameter.py

import mg_conveyor as conveyor

if __name__ == '__main__':

    conv=conveyor.Conveyor()
    conv.loadPlugins(["plugins"])
    conv.start("arg0"
        , "arg1"
        , param1="This is parameter 1"
        , param2="This is parameter 2")

Basic Sample for using IUtilityPlugin

utility.py

import mg_conveyor as conveyor

class Utility(conveyor.IUtilityPlugin):

    def __init__(self):
        super().__init__()

    def use(self, *args, **kwargs):

        self.localContext['Message'] = "I'm in Utility plugin"
        return "This is a test."

utility.yapsy-plugin

[Core]
Name = Utility
Module = utility

[Documentation]
Author = Ronaldo Webb
Version = 0.1
Website = http://www.ronella.xyz
Description = Test only

sequence.py

import mg_conveyor as conveyor

class PluginOne(conveyor.ISequencialPlugin):
    def sequence(self):
        return 100

    def process(self, identity, context, *args, **kwargs):

        utilCtx = context['Utility']
        utilOutput = utilCtx[conveyor.CONTEXT_KEY_PLUGIN_OBJECT].use()

        if 'util_message' in kwargs:
            kwargs['util_message'](utilCtx['Message'])

        if 'util_output' in kwargs:
            kwargs['util_output'](utilOutput)

        return conveyor.STATUS_SUCCESS

sequence.yapsy-plugin

[Core]
Name = Sequence
Module = sequence

[Documentation]
Author = Ronaldo Webb
Version = 0.1
Website = http://www.ronella.xyz
Description = Test only

utility_usage.py

import mg_conveyor as conveyor

if __name__ == '__main__':

    conv=conveyor.Conveyor()
    conv.loadPlugins(["plugins"])
    conv.start()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

mg_conveyor-1.0.0b-py3-none-any.whl (12.2 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page