Skip to main content

library for executing batches of data processing sequentially or asynchronously to python 3

Project description

Batchflows for Python 3

This tool will help you create and process a lot of data in an organized manner. You can create batches of processing synchronously and asynchronously.

remember it's in BETA :D

Get Started

import logging

from batchflows.Batch import Batch
from batchflows.Step import Step

#First extend Step class and implement method execute
class SaveValueStep(Step):
    def __init__(self, value_name, value):
        #Remember name is required if you want use remote steps
        super().__init__()
        self.value_name = value_name
        self.value = value

    # "_context" is a dict you can use to store values that will be used in other steps.
    def execute(self, _context):
        #do what u have to do here!
        _context[self.value_name] = self.value

#creating a second step just to make the explanation richer
class SumCalculatorStep(Step):
    def __init__(self, attrs):
        super().__init__()
        self.attrs = attrs

    def execute(self, _context):
        calc = 0.0
        for attr in self.attrs:
            calc += _context[attr]

        _context['sum'] = calc

#Here we create our batch!
batch = Batch()
batch.add_step(SaveValueStep('value01', 1))
batch.add_step(SaveValueStep('value02', 4))
batch.add_step(SumCalculatorStep(['value01', 'value02', 'other_value']))

#You can add something useful to your steps before starting bath!
batch.context['other_value'] = 5

#than execute your batch and be happy ;)
batch.execute()

logging.info(batch.context)

Let's try run some parallel code

import logging
import time

from batchflows.Batch import Batch
from batchflows.Step import Step, ParallelFlows


class SomeStep(Step):
    def execute(self, _context):
        #count to 10 slowly
        c = 0
        while c < 10:
            c += 1
            print(c)
            time.sleep(1)

#Create your AsyncFlow
lazy_counter = ParallelFlows('LazySteps01')
#add steps so they run in parallel
lazy_counter.add_step(SomeStep('lazy01'))
lazy_counter.add_step(SomeStep('lazy02'))

lazy_counter2 = ParallelFlows('LazySteps02')
lazy_counter2.add_step(SomeStep('lazy03'))
lazy_counter2.add_step(SomeStep('lazy04'))

batch = Batch()
batch.add_step(lazy_counter)
batch.add_step(lazy_counter2)

#batchfllows will wait for each step to finish before executing the next one.
#In this example lazy_counter will be called first and execute steps "lazy01" and "lazy02" in parallel.
#Only when both steps finish ,the batch will star lazy_counter2
batch.execute()

FileContextmanager and RemoteStep

You can extend RemoteStep class and make your code to run a remote batch. Unfortunately the basic context does not allow remote steps to be performed without customization. To solve this problem we have FileContextManager

let's start by creating our main batch

context_manager = FileContextManager(
    filepath='/tmp', #where u set where you want batch create status file
                     #you can put a disc that all your machines share
    is_remote_step=False, #default false. You saying here this is the main batch
    process_id='123ABC', #default random uuid. You can specify an id for the process. 
                         #This field is important for remote batches to be able to write the status file correctly.
    process_name='batch name' #default random uuid. For the main batch this field has no importance, but for your remote batch
                              #you need put the same name of remote-step
    )

batch = Batch(context_manager=context_manager)
batch.add_step(RemoteBatchStep(
                name='do-a-barrel-roll',
                timeout=10 #in seconds
            ))

batch.execute()

now let's create our remote batch

context_manager = FileContextManager(
    filepath='/tmp',
    is_remote_step=True,
    process_id='123ABC', #exactly the same a main batch
    process_name='do-a-barrel-roll'#exactly the same name as step
    )

batch = Batch(context_manager=context_manager)
batch.add_step(...) # lot of cool things
batch.add_step(...) # lot of cool things
batch.add_step(...) # lot of cool things

batch.execute()

customize your ContextManager

You can also extend the ContextManager class and create your way to run remote code.

class MyContextManager(ABCContextManager):
    def __init__(self):
        self.context = dict()
        self.steps = []

    #override this method to teach your customization how to identify if remote execution is ready
    def is_remote_step_done(self, name: str):
        raise NotImplementedError()

    #override this method if you want to execute some code after batch conclude
    def upon_completion(self, success: bool, error: str = None):
        pass

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

batchflows-2.0.0b0.tar.gz (8.0 kB view hashes)

Uploaded Source

Built Distribution

batchflows-2.0.0b0-py3-none-any.whl (13.8 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page