Skip to main content

library for executing batches of data processing sequentially or asynchronously to python 3

Project description

Batchflows for Python 3

This tool will help you create and process a lot of data in an organized manner. You can create batches of processing synchronously and asynchronously.

remember it's in BETA :D

Get Started

from batchflows.Batch import Batch, Step

#First extend Step class and implement method execute
class SaveValueStep(Step):
    def __init__(self, value_name, value):
        #Remember name is required if you
        super().__init__()
        self.value_name = value_name
        self.value = value

    # "_context" is a dict you can use to store values that will be used in other steps.
    # If you intend to use a ContextManager (see below) to save progress, I recommend that you use only primitive values.
    def execute(self, _context):
        #do what u have to do here!
        _context[self.value_name] = self.value

#creating a second step just to make the explanation richer
class SumCalculatorStep(Step):
    def __init__(self, attrs):
        super().__init__()
        self.attrs = attrs

    def execute(self, _context):
        calc = 0.0
        for attr in self.attrs:
            calc += _context[attr]

        _context[self.name] = calc

#Here we create our batch!
batch = Batch()
batch.add_step(SaveValueStep('value01', 1))
batch.add_step(SaveValueStep('value02', 4))
batch.add_step(SumCalculatorStep(['value01', 'value02', 'other_value']))

#You can add something useful to your steps before starting bath!
batch.add_to_context('other_value', 5)

#than execute your batch and be happy ;)
batch.execute()

Let's try run some parallel code

from batchflows.Batch import Batch, Step, ParallelFlows
import time


class SomeStep(Step):
    def execute(self, _context):
        #count to 10 slowly
        c = 0
        while c < 10:
            c += 1
            print(c)
            time.sleep(1)

#Create your AsyncFlow
lazy_counter = ParallelFlows('LazySteps01')
#add steps so they run in parallel
lazy_counter.add_step(SomeStep('lazy01'))
lazy_counter.add_step(SomeStep('lazy02'))

lazy_counter2 = ParallelFlows('LazySteps02')
lazy_counter2.add_step(SomeStep('lazy03'))
lazy_counter2.add_step(SomeStep('lazy04'))

batch = Batch()
batch.add_step(lazy_counter)
batch.add_step(lazy_counter2)

#batchfllows will wait for each step to finish before executing the next one.
#In this example lazy_counter will be called first and execute steps "lazy01" and "lazy02" in parallel.
#Only when both steps finish ,the batch will star lazy_counter2
batch.execute()

ContextManager

Starting with version 1.0.1-beta, you can use FileContextManager. This feature allows you to save batch progress to a file. This allows you to resume the process from the step that had a runtime error. You can also extend the ContextManager class and create your way of saving progress.

from batchflows.Batch import Batch, Step, ParallelFlows
from batchflows.contextmanager.LocalContextManager import FileContextManager


class DownloadVideoStep(Step):
    def __init__(self, name, file_url):
        # Remember if you using ContextManager, name is required and need be unique.
        # required (default=False) tells the batch that this step is required.
        # So if the process is resumed, even if this step was successfully executed, it will be executed again.
        super().__init__(name=name, required=True)
        self.file_url = file_url

    def execute(self, _context):
        # download file
        pass

class AddWaterMarkStep(Step):
  #do something
  pass


class UploadFile(Step):
  #do something
  pass


cm = FileContextManager('path\\to\\storage\\file.name')
batch = Batch(context_manager=cm)

batch.add_step(DownloadVideoStep('downloadVideo', 'https://somevideo.url/example'))
batch.add_step(DownloadVideoStep(AddWaterMarkStep()))
batch.add_step(DownloadVideoStep(UploadFile()))


batch.execute()

Next features:

  • AsyncFlow thread pool size
  • AsyncFlow timeout
  • Remote Flow

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

batchflows-1.0.1b0.tar.gz (8.0 kB view hashes)

Uploaded Source

Built Distribution

batchflows-1.0.1b0-py3-none-any.whl (12.8 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page