library for executing batches of data processing sequentially or asynchronously to python 3
Project description
Batchflows for Python 3
This tool will help you create and process a lot of data in an organized manner. You can create batches of processing synchronously and asynchronously.
remember it's in BETA :D
Get Started
from batchflows.Batch import Batch, Step
#First extend Step class and implement method execute
class SaveValueStep(Step):
def __init__(self, value_name, value):
#Remember name is required if you
super().__init__()
self.value_name = value_name
self.value = value
# "_context" is a dict you can use to store values that will be used in other steps.
# If you intend to use a ContextManager (see below) to save progress, I recommend that you use only primitive values.
def execute(self, _context):
#do what u have to do here!
_context[self.value_name] = self.value
#creating a second step just to make the explanation richer
class SumCalculatorStep(Step):
def __init__(self, attrs):
super().__init__()
self.attrs = attrs
def execute(self, _context):
calc = 0.0
for attr in self.attrs:
calc += _context[attr]
_context[self.name] = calc
#Here we create our batch!
batch = Batch()
batch.add_step(SaveValueStep('value01', 1))
batch.add_step(SaveValueStep('value02', 4))
batch.add_step(SumCalculatorStep(['value01', 'value02', 'other_value']))
#You can add something useful to your steps before starting bath!
batch.add_to_context('other_value', 5)
#than execute your batch and be happy ;)
batch.execute()
Let's try run some parallel code
from batchflows.Batch import Batch, Step, ParallelFlows
import time
class SomeStep(Step):
def execute(self, _context):
#count to 10 slowly
c = 0
while c < 10:
c += 1
print(c)
time.sleep(1)
#Create your AsyncFlow
lazy_counter = ParallelFlows('LazySteps01')
#add steps so they run in parallel
lazy_counter.add_step(SomeStep('lazy01'))
lazy_counter.add_step(SomeStep('lazy02'))
lazy_counter2 = ParallelFlows('LazySteps02')
lazy_counter2.add_step(SomeStep('lazy03'))
lazy_counter2.add_step(SomeStep('lazy04'))
batch = Batch()
batch.add_step(lazy_counter)
batch.add_step(lazy_counter2)
#batchfllows will wait for each step to finish before executing the next one.
#In this example lazy_counter will be called first and execute steps "lazy01" and "lazy02" in parallel.
#Only when both steps finish ,the batch will star lazy_counter2
batch.execute()
ContextManager
Starting with version 1.0.1-beta, you can use FileContextManager. This feature allows you to save batch progress to a file. This allows you to resume the process from the step that had a runtime error. You can also extend the ContextManager class and create your way of saving progress.
from batchflows.Batch import Batch, Step, ParallelFlows
from batchflows.contextmanager.LocalContextManager import FileContextManager
class DownloadVideoStep(Step):
def __init__(self, name, file_url):
# Remember if you using ContextManager, name is required and need be unique.
# required (default=False) tells the batch that this step is required.
# So if the process is resumed, even if this step was successfully executed, it will be executed again.
super().__init__(name=name, required=True)
self.file_url = file_url
def execute(self, _context):
# download file
pass
class AddWaterMarkStep(Step):
#do something
pass
class UploadFile(Step):
#do something
pass
cm = FileContextManager('path\\to\\storage\\file.name')
batch = Batch(context_manager=cm)
batch.add_step(DownloadVideoStep('downloadVideo', 'https://somevideo.url/example'))
batch.add_step(DownloadVideoStep(AddWaterMarkStep()))
batch.add_step(DownloadVideoStep(UploadFile()))
batch.execute()
Next features:
- AsyncFlow thread pool size
- AsyncFlow timeout
- Remote Flow
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
batchflows-1.0.1b0.tar.gz
(8.0 kB
view hashes)
Built Distribution
Close
Hashes for batchflows-1.0.1b0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 6712dbac62d4f969326e3ca09063d8be03d3df51d9c970e9c9cf07691e47897f |
|
MD5 | f39a843b389f9ba6a0e698c9e426b88c |
|
BLAKE2b-256 | ebc2b34dc7140b3faa4fc60d15039332fe24e75b66db160cc4e3c4a6c48e8b4f |