Skip to main content

library for executing batches of data processing sequentially or asynchronously to python 3

Project description

Batchflows for Python 3

This tool will help you create and process a lot of data in an organized manner. You can create batches of processing synchronously and asynchronously.

remember it's in BETA :D

Get Started

import logging

from batchflows.Batch import Batch
from batchflows.Step import Step

#First extend Step class and implement method execute
class SaveValueStep(Step):
    def __init__(self, value_name, value):
        #Remember name is required if you want use remote steps
        super().__init__()
        self.value_name = value_name
        self.value = value

    # "_context" is a dict you can use to store values that will be used in other steps.
    def execute(self, _context):
        #do what u have to do here!
        _context[self.value_name] = self.value

#creating a second step just to make the explanation richer
class SumCalculatorStep(Step):
    def __init__(self, attrs):
        super().__init__()
        self.attrs = attrs

    def execute(self, _context):
        calc = 0.0
        for attr in self.attrs:
            calc += _context[attr]

        _context['sum'] = calc

#Here we create our batch!
batch = Batch()
batch.add_step(SaveValueStep('value01', 1))
batch.add_step(SaveValueStep('value02', 4))
batch.add_step(SumCalculatorStep(['value01', 'value02', 'other_value']))

#You can add something useful to your steps before starting bath!
batch.context['other_value'] = 5

#than execute your batch and be happy ;)
batch.execute()

logging.info(batch.context)

Let's try run some parallel code

import logging
import time

from batchflows.Batch import Batch
from batchflows.Step import Step, ParallelFlows


class SomeStep(Step):
    def execute(self, _context):
        #count to 10 slowly
        c = 0
        while c < 10:
            c += 1
            print(c)
            time.sleep(1)

#Create your AsyncFlow
lazy_counter = ParallelFlows('LazySteps01')
#add steps so they run in parallel
lazy_counter.add_step(SomeStep('lazy01'))
lazy_counter.add_step(SomeStep('lazy02'))

lazy_counter2 = ParallelFlows('LazySteps02')
lazy_counter2.add_step(SomeStep('lazy03'))
lazy_counter2.add_step(SomeStep('lazy04'))

batch = Batch()
batch.add_step(lazy_counter)
batch.add_step(lazy_counter2)

#batchfllows will wait for each step to finish before executing the next one.
#In this example lazy_counter will be called first and execute steps "lazy01" and "lazy02" in parallel.
#Only when both steps finish ,the batch will star lazy_counter2
batch.execute()

FileContextmanager and RemoteStep

You can extend RemoteStep class and make your code to run a remote batch. Unfortunately the basic context does not allow remote steps to be performed without customization. To solve this problem we have FileContextManager

let's start by creating our main batch

context_manager = FileContextManager(
    filepath='/tmp', #where u set where you want batch create status file
                     #you can put a disc that all your machines share
    is_remote_step=False, #default false. You saying here this is the main batch
    process_id='123ABC', #default random uuid. You can specify an id for the process. 
                         #This field is important for remote batches to be able to write the status file correctly.
    process_name='batch name' #default random uuid. For the main batch this field has no importance, but for your remote batch
                              #you need put the same name of remote-step
    )

batch = Batch(context_manager=context_manager)
batch.add_step(RemoteBatchStep(
                name='do-a-barrel-roll',
                timeout=10 #in seconds
            ))

batch.execute()

now let's create our remote batch

context_manager = FileContextManager(
    filepath='/tmp',
    is_remote_step=True,
    process_id='123ABC', #exactly the same a main batch
    process_name='do-a-barrel-roll'#exactly the same name as step
    )

batch = Batch(context_manager=context_manager)
batch.add_step(...) # lot of cool things
batch.add_step(...) # lot of cool things
batch.add_step(...) # lot of cool things

batch.execute()

customize your ContextManager

You can also extend the ContextManager class and create your way to run remote code.

class MyContextManager(ABCContextManager):
    def __init__(self):
        self.context = dict()
        self.steps = []

    #override this method to teach your customization how to identify if remote execution is ready
    def is_remote_step_done(self, name: str):
        raise NotImplementedError()

    #override this method if you want to execute some code after batch conclude
    def upon_completion(self, success: bool, error: str = None):
        pass

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

batchflows-2.0.0b0.tar.gz (8.0 kB view details)

Uploaded Source

Built Distribution

batchflows-2.0.0b0-py3-none-any.whl (13.8 kB view details)

Uploaded Python 3

File details

Details for the file batchflows-2.0.0b0.tar.gz.

File metadata

  • Download URL: batchflows-2.0.0b0.tar.gz
  • Upload date:
  • Size: 8.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/42.0.2 requests-toolbelt/0.9.1 tqdm/4.35.0 CPython/3.7.4

File hashes

Hashes for batchflows-2.0.0b0.tar.gz
Algorithm Hash digest
SHA256 178f6c1275e87bbb9d3aa57bf735d64999fc4a4478bd642e15c0c07531f5ee53
MD5 1e6e0729c754b4fc5874f4c34917e86f
BLAKE2b-256 d09ba4c478abc5a0535a4a11c192b327b5abce76125b3d32262ca0cb76b4d58b

See more details on using hashes here.

File details

Details for the file batchflows-2.0.0b0-py3-none-any.whl.

File metadata

  • Download URL: batchflows-2.0.0b0-py3-none-any.whl
  • Upload date:
  • Size: 13.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/42.0.2 requests-toolbelt/0.9.1 tqdm/4.35.0 CPython/3.7.4

File hashes

Hashes for batchflows-2.0.0b0-py3-none-any.whl
Algorithm Hash digest
SHA256 c86068f3f2d946d6ae244bd3ebf864199c10e1f00af90f6602e658389457adc9
MD5 e93dc60d341235abcc8cfa0d6cf45df8
BLAKE2b-256 db2f1857b4538e06e24fdbe8c76cdd500db633478006200fc0f86b7596c851f0

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page