library for executing batches of data processing sequentially or asynchronously to python 3
Batchflows for Python 3
This tool will help you create and process a lot of data in an organized manner. You can create batches of processing synchronously and asynchronously.
remember it's in BETA :D
import logging from batchflows.Batch import Batch from batchflows.Step import Step #First extend Step class and implement method execute class SaveValueStep(Step): def __init__(self, value_name, value): #Remember name is required if you want use remote steps super().__init__() self.value_name = value_name self.value = value # "_context" is a dict you can use to store values that will be used in other steps. def execute(self, _context): #do what u have to do here! _context[self.value_name] = self.value #creating a second step just to make the explanation richer class SumCalculatorStep(Step): def __init__(self, attrs): super().__init__() self.attrs = attrs def execute(self, _context): calc = 0.0 for attr in self.attrs: calc += _context[attr] _context['sum'] = calc #Here we create our batch! batch = Batch() batch.add_step(SaveValueStep('value01', 1)) batch.add_step(SaveValueStep('value02', 4)) batch.add_step(SumCalculatorStep(['value01', 'value02', 'other_value'])) #You can add something useful to your steps before starting bath! batch.context['other_value'] = 5 #than execute your batch and be happy ;) batch.execute() logging.info(batch.context)
Let's try run some parallel code
import logging import time from batchflows.Batch import Batch from batchflows.Step import Step, ParallelFlows class SomeStep(Step): def execute(self, _context): #count to 10 slowly c = 0 while c < 10: c += 1 print(c) time.sleep(1) #Create your AsyncFlow lazy_counter = ParallelFlows('LazySteps01') #add steps so they run in parallel lazy_counter.add_step(SomeStep('lazy01')) lazy_counter.add_step(SomeStep('lazy02')) lazy_counter2 = ParallelFlows('LazySteps02') lazy_counter2.add_step(SomeStep('lazy03')) lazy_counter2.add_step(SomeStep('lazy04')) batch = Batch() batch.add_step(lazy_counter) batch.add_step(lazy_counter2) #batchfllows will wait for each step to finish before executing the next one. #In this example lazy_counter will be called first and execute steps "lazy01" and "lazy02" in parallel. #Only when both steps finish ,the batch will star lazy_counter2 batch.execute()
FileContextmanager and RemoteStep
You can extend RemoteStep class and make your code to run a remote batch. Unfortunately the basic context does not allow remote steps to be performed without customization. To solve this problem we have FileContextManager
let's start by creating our main batch
context_manager = FileContextManager( filepath='/tmp', #where u set where you want batch create status file #you can put a disc that all your machines share is_remote_step=False, #default false. You saying here this is the main batch process_id='123ABC', #default random uuid. You can specify an id for the process. #This field is important for remote batches to be able to write the status file correctly. process_name='batch name' #default random uuid. For the main batch this field has no importance, but for your remote batch #you need put the same name of remote-step ) batch = Batch(context_manager=context_manager) batch.add_step(RemoteBatchStep( name='do-a-barrel-roll', timeout=10 #in seconds )) batch.execute()
now let's create our remote batch
context_manager = FileContextManager( filepath='/tmp', is_remote_step=True, process_id='123ABC', #exactly the same a main batch process_name='do-a-barrel-roll'#exactly the same name as step ) batch = Batch(context_manager=context_manager) batch.add_step(...) # lot of cool things batch.add_step(...) # lot of cool things batch.add_step(...) # lot of cool things batch.execute()
customize your ContextManager
You can also extend the ContextManager class and create your way to run remote code.
class MyContextManager(ABCContextManager): def __init__(self): self.context = dict() self.steps =  #override this method to teach your customization how to identify if remote execution is ready def is_remote_step_done(self, name: str): raise NotImplementedError() #override this method if you want to execute some code after batch conclude def upon_completion(self, success: bool, error: str = None): pass
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Hashes for batchflows-2.0.0b0-py3-none-any.whl