A wrapper of luigi. This make it easy to define tasks.
Project description
gokart
A wrapper of the data pipeline library "luigi".
Getting Started
Run pip install gokart
to install the latest version from PyPI. Documentation for the latest release is hosted on readthedocs.
How to Use
Please use gokart.TaskOnKart instead of luigi.Task to define your tasks.
Basic Task with gokart.TaskOnKart
import gokart
class BasicTask(gokart.TaskOnKart):
def requires(self):
return TaskA()
def output(self):
# please use TaskOnKart.make_target to make Target.
return self.make_target('basic_task.csv')
def run(self):
# load data which TaskA output
texts = self.load()
# do something with texts, and make results.
# save results with the file path {self.workspace_directory}/basic_task_{unique_id}.csv
self.dump(results)
Details of base functions
Make Target with TaskOnKart
TaskOnKart.make_target
judge Target
type by the passed path extension. The following extensions are supported.
- pkl
- txt
- csv
- tsv
- gz
- json
- xml
Make Target for models which generate multiple files in saving.
TaskOnKart.make_model_target
and TaskOnKart.dump
are designed to save and load models like gensim.model.Word2vec.
class TrainWord2Vec(TaskOnKart):
def output(self):
# please use 'zip'.
return self.make_model_target(
'model.zip',
save_function=gensim.model.Word2Vec.save,
load_function=gensim.model.Word2Vec.load)
def run(self):
# make word2vec
self.dump(word2vec)
Load input data
Pattern 1: Load input data individually.
def requires(self):
return dict(data=LoadItemData(), model=LoadModel())
def run(self):
# pass a key in the dictionary `self.requires()`
data = self.load('data')
model = self.load('model')
Pattern 2: Load input data at once
def run(self):
input_data = self.load()
"""
The above line is equivalent to the following:
input_data = dict(data=self.load('data'), model=self.load('model'))
"""
Load input data as pd.DataFrame
def requires(self):
return LoadDataFrame()
def run(self):
data = self.load_data_frame(required_columns={'id', 'name'})
Advanced
Using task lock
Require
You need to install (redis)[https://redis.io/topics/quickstart] for this advanced function.
Description
Task lock is implemented to prevent task cahche collision. (Originally, task cache collision may occur when same task with same parameters run at different applications parallelly.)
-
Set up a redis server at somewhere accessible from gokart/luigi jobs.
Following will run redis at your localhost.
$ redis-server
-
Set redis server hostname and port number as parameters to gokart.TaskOnKart().
You can set it by adding
--redis-host=[your-redis-localhost] --redis-port=[redis-port-number]
options to gokart python script.e.g.
python main.py sample.SomeTask --local-scheduler --redis-host=localhost --redis-port=6379
Alternatively, you may set parameters at config file.
[TaskOnKart] redis_host=localhost redis_port=6379
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.