A python library for creating memoized data and code for neuroimaging pipelines
Project description
memori
A python library for creating memoized data and code for neuroimaging pipelines
Table of Contents
Installation
To install, use pip
:
pip install memori
Usage
memori
uses a directed acyclic graph (DAG) approach to constructing pipelines.
Nodes of the the graph represent a "logical unit of processing" (up to the user
to define) that can be encomposed in a function. The edges of the
graph transfers data between these nodes to create a pipeline.
To represent this memori
employs the use of the Stage
and Pipeline
objects.
The Stage
object
A Stage
is a wrapper around a python function and is the conceptual equivalent
of a node of our graph. A Stage
object can take input/output from/to other Stage
objects, but can also be run in isolation. Here is an example of a Stage
wrapped
around a python function:
# our example function
def test_function(a, b, c):
# Do some stuff
d = a + b
e = b + c
# and return stuff
return d, e
We can wrap this function in a Stage
object and run it:
from memori import Stage
# any values a function returns need to be labeled with the `stage_outputs` parameter
my_test_stage = Stage(test_function, stage_outputs=["d", "e"])
# we can run this stage with the run method and store the results
result = my_test_stage.run(1, 2, 3)
# result will return a dictionary containing: {"d": 3, "e": 5}
# running it again with different parameters
result = my_test_stage.run(2, 3, 4)
# result will return a dictionary containing: {"d": 5, "e": 7}
Now lets write a 2nd function that can take input from our test_function
. Note that
the input arguments for this function should match the key names of the stage outputs
for the test_function
.
# new test function with input arguments matching previous stage
# function stage_output names
def test_function2(d, e):
return d + e
# and wrap this in a Stage
my_test_stage2 = Stage(test_function2, stage_outputs=["f"])
# to run this we just merely need to **results (kwarg unpacking) to pass information
# from my_test_stage to my_test_stage2
result2 = my_test_stage2.run(**results)
# result2 will return a dictionary containing: {"f": 12}
# or running the entire pipeline from the beginning
result2 = my_test_stage2.run(**my_test_stage.run(1, 2, 3))
# result2 will return a dictionary containing: {"f": 8}
# The previous two lines is the equivalent to running
test_function2(**test_function(1, 2, 3))
We can create static values in our Stage
object that ignores inputs from other stages
that are passed into the run
method.
# Stage will take the same params as test_function
# and use them as static values
my_test_stage = Stage(
test_function,
stage_outputs=["d", "e"],
a=1,
b=2,
c=3
)
# when we run the stage, we will see that it does not change with the input (2, 3, 4)
result = my_test_stage.run(2, 3, 4)
# result will return a dictionary containing: {"d": 3, "e": 5}
# if static values weren't used this should return {"d": 5, "e": 7}
Now we know how to wrap the functions we write into a Stage
object, but what benefit
does this provide? The main feature of memori
is to memoize
the inputs to each
stage and recall the outputs if they are the same. This can enable long running
functions to be skipped if the results are going to be the same!
# To enable memoization feature, we need to add the hash_output
# parameter when constructing a Stage object. hash_output is
# just some directory to where the memoization files can be
# written to.
my_test_stage = Stage(test_function, stage_output=["d", "e"], hash_output="/test/directory")
# run the stage
my_test_stage.run(1, 2, 3)
This will write 3 files: test_function.inputs
, test_function.stage
, and
test_function.outputs
at the location: /test/directory
These 3 files record the important states of the Stage for memoization, after it has
been run.
The .stage
file contains information about the function that was run.
It contains some rudimentary static analysis to check whether and code
wrapped by a Stage has changed in a way that will affect the result. If it has
detected this, it will rerun the stage. Note that this file contains binary data
is mostly non-human readable (unlike the .inputs
and .outputs
files).
The .inputs
and .outputs
files contain information about the inputs and outputs of the stage. These files are simply JSON files and upon opening them in a text editor you should see the following:
test_function.inputs
{
"a": 1,
"b": 2,
"c": 3
}
test_function.outputs
{
"d": 3,
"e": 5
}
memori
checks the .inputs
file on each run to determine if the stage needs to be run (assuming it has also passed the .stage
file check). If the stage is skipped, the .outputs
file is used to load the results into the stage.
By default, memori
uses the name of the function as the name for the hash files. If you
would like to use a different name for these files, you can set the name of the Stage object with
the stage_name
parameter in the constructor:
# Stage with a custom stage name
Stage(...
stage_name="my_stage_name"
...)
When passing path/file strings between Stage
objects, memori
has a special behavior: if it
determines the string to be a valid file on the disk, it will hash it with the SHA256
algorithm. For files, this gives memoization results that can reflect changes in data integrity:
# now we specify the input and output to be files on the disk
file0 = "/Some/file/path"
file1 = "/Some/second/file/path"
# define our simple test_function that outputs a file path
def test_function3(f0):
# always return file1
return file1
# Now we wrap it in a stage
my_test_stage3 = Stage(test_function3, stage_outputs=["file1"], hash_output="/test/directory")
# and run the stage with file0 as the input
results3 = my_test_stage3.run(file0)
Now if you examine the test_function3.inputs
and test_function3.outputs
you will see the following:
test_function3.inputs
{
"file0": {
"file": "/Some/file/path",
"hash": "f0e4c2f76c58916ec258f246851bea091d14d4247a2fc3e18694461b1816e13b"
}
}
test_function3.outputs
{
"file1": {
"file": "/Some/second/file/path",
"hash": "f91c3b6b3ec826aca3dfaf46d47a32cc627d2ba92e2d63d945fbd98b87b2b002"
}
}
As shown above memori
replaces a valid file path with a dictionary entry containing the "file"
and "hash"
keys. Valid files are compared by hash values rather than path/filename ensuring data integrity.
NOTE: Since
"file"
and"hash"
are keywords used to hash valid files. These are reserved keywords that should NOT be used when returning an output from a stage using a dictionary. Doing so could lead to catastrophic results!
CAUTION:
memori
uses JSON to memoize and pass information betweenStage
objects. This means that the inputs/outputs of your function MUST be JSON serializable or you will get a serialization error. You can also get data conversion effects if you don't use the proper data types. For example, python always converts a Tuple to a List when serializing a dictionary to JSON. This will lead to hash check fail each time you run the Stage! Since whenever memori loads the stage output data from the.outputs
file, the Tuple in the code will never match against list it was converted to in the JSON. So take care to use only JSON compatible data types (This means None, integers, floats, strings, bools, lists, and dictionaries are the only valid input/output data types inmemori
).For data that is not JSON serializable, the typical workaround is to save it to a file and pass the file location between the
Stage
objects. This also allows you to take advantage of the SHA256 file hashing features ofmemori
.
The Pipeline
object
What happens when you have more complex pipelines? Maybe you have a Stage
that needs to provide input to two different Stage
objects.
This is where the Pipeline
object comes in. A Pipeline
is a collection of Stage
objects with their input/output connections defined. A Pipeline
object represents
the conceptual DAG that was mentioned above.
from memori import Stage, Pipeline
# create some stages (see the last section on Stages for details)
stage0 = Stage(some params go here...)
stage1 = Stage(some params go here...)
stage2 = Stage(some params go here...)
stage3 = Stage(some params go here...)
# Now we create a Pipeline object, a pipeline takes a definition list during construction
# the definition list is a list of tuples specifying the connection between stages
#
# The "start" keyword is a special instruction that the Pipeline object can read
# it specifies that a particular stage has not precedent Stage and should be a Stage
# that is run first in the Pipeline.
p = Pipeline([
("start", stage0), # stage0 takes no input from other stages, so it should run first
(stage0, stage1), # stage0 passes it's output to stage1
(stage0, stage2), # and also to stage2
((stage1, stage2), stage3) # stage3 needs inputs from stage1 and stage2, so we use a
# special tuple-in-tuple so that it can get outputs from both
# NOTE: if stage1 and stage2 have stage_outputs with the same
# name, the last stage (right-most) stage will have precedence
# for it's output
])
# we can run the Pipeline with the run method, and get it's result
result = p.run(some input parameters here...)
Running the pipeline has the effect of invoking the run method
of each Stage
object individually, and passing the result of the stage onto the
next stage as defined by the Pipeline
definition passed in during Pipeline
initialization.
Stage Aliases and Complex Pipelines
When building a complicated pipleine, sometimes the functions that you write
will have input argument names that are different from the stage_output
names
that you have defined in a Stage
. Consider the following example:
def test_function(a, b):
return a + b
def test_function2(c):
# this might represent some complicated processing
c += 1
return c
def test_function3(d):
# this might be another function with some more complocated processing
d += 2
return d
Now let's say I want to pass the result of test_function
to both test_function2
and
test_function3
. This presents an issue because test_function2
and test_function3
have
different input argument names. So if I define the stage_output
of the wrapped test_function
to be stage_outputs=["c"]
this won't work for test_function3
and if I define it to be
stage_outputs=["d"]
it won't work for test_function2
.
One way of solving this issue would be to rewrite the test_function2
and test_function3
functions to have the same argument name, this may not always be possible (particularly when
wrapping a function call from a third-party library). Another option would be to wrap the
call of either test_function2
or test_function3
to take in the same input. For example:
# this is necessary hashing external function calls
# more about the hashable wrapper in the next section
from memori import hashable
# we wrap the call of test_function3
def test_function3_mod(c):
return hashable(test_function3)(c)
Now when we create the Stage
for each function, test_function2
and test_function3_mod
now have the same input argument names and can take in input from test_function
.
While this solution works (and indeed this was how it used to be done), memori
provides a more
convienent solution through Stage aliases. Aliases can map the name of one of the stage outputs to
another name. When creating a Stage
object, you can define this through the aliases
parameter.
# We wrap test_function in a Stage, and specify an alias from d -> c
test_stage = Stage(test_function, stage_outputs=["c"], aliases=["d": "c"])
# Now I can construct stages around test_function2 and test_function3 without
# writing extra code
test_stage2 = Stage(test_function2, stage_outputs=["e"])
test_stage3 = Stage(test_function3, stage_outputs=["f"])
# now definte the pipeline
my_pipeline = Pipeline(
[
("start", test_stage),
(test_stage, test_stage2),
(test_stage, test_stage3), # because we mapped d -> c, memori know where to pass the result to
]
)
Stage aliases reduces the need for extra boilerplate code, and adding on an extra
stage the feeds from test_stage
is as simple as adding another alias.
Hashing external functions
In the last section, we saw the use of the hashable wrapper when trying to wrap a function call in another function. But what does it actually do? Consider the following example:
def test_function(a, b)
c = a + b
d = test_function2(c)
return d
def test_function2(c)
return c + 1
stage0 = Stage(test_function, stage_outputs=["d"], hash_output="test")
result = stage0.run(1, 2)
# this will return the result {"d": 4}
Now, what if we change the code of test_function to:
# change up test_function!
def test_function(a, b)
c = a + b + 1
d = test_function2(c)
return d
Rebuilding the stage on this function and invoking the run
method it will cause the
.stage
hash to mismatch (since the function signature is different with the added
+ 1
in the code), and the function will rerun instead of loading from cache
(this should return the result {"d": 5}
).
So the function hashing feature of memori works! but what happens when we modify
test_function2
and rerun our stage.
# will memori see this change?
def test_function2(c):
return c + 2
Rerunning the stage with the updated test_function2
, you will see that after invoking
run
, the Stage
object simply loads the result from the .output
file and ignores
the difference in the updated test_function2
(this will still return {"d": 5}
rather
than {"d": 6}
.
This occurs because memori
function hashing only occurs one call deep. Meaning that
only the instructions of the wrapped callable are the only thing that is hashed. Function calls inside a function are simply recorded as constants, meaning that only
the name test_function2
is memoized, not the actual instructions!
To correct this issue, memori
provides the hashable
wrapper. This wrapper marks
a function so that memori knows to try and hash it.
# wrap test_funtion2 in hashable
def test_function(a, b)
c = a + b + 1
d = hashable(test_function2)(c)
return d
Alternatively, you can add the hashable wrapper a decorator.
# this is the same as calling hashable(test_function2)
# but makes everything transparent
@hashable
def test_function2(c)
return c + 1
This allows you to simply call test_function2
without worrying about calling
the hashable wrapper each time.
Path Management
memori
also provides a path management utility called PathManager
. It
is useful for manipulating file paths as well as suffixes and extensions.
If is derived from a Path
object from the pathlib library, and so can use any of the
parent methods as well.
Here are a few useful examples:
from memori import PathManager as PathMan
# a string to a path I want PathManager to manage
my_file_path_pm = PathMan("/my/path/to/a/file.ext.ext2")
# get only the file prefix
prefix = my_file_path_pm.get_prefix()
# prefix contains "file"
# get the path and file prefix
path_and_prefix = my_file_path_pm.get_path_and_prefix()
# path_and_prefix contains "/my/path/to/a/file"
# change path of the file, keeping the filename the same
repathed = my_file_path_pm.repath("/new/path")
# repathed contains "/new/path/file.ext.ext2"
# append a suffix (following the BIDS standard, suffixes should always have _)
suffixed = my_file_path_pm.append_suffix("_newsuffix")
# suffixed contains "/my/path/to/a/file_newsuffix.ext.ext2"
# replace last suffix
replaced = suffixed.replace_suffix("_newsuffix2")
# replaced contains "/my/path/to/a/file_newsuffix2.ext.ext2"
# delete last suffix
deleted = replaced.delete_suffix()
# deleted contains "/my/path/to/a/file.ext.ext2"
# methods can be chained together
chained = my_file_path_pm.repath("/new").append_suffix("_test").get_path_and_prefix()
# chained contains /new/file_test
# return as a string
my_file_path = my_file_path_pm.path
# /new/file_test
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file memori-0.3.1.tar.gz
.
File metadata
- Download URL: memori-0.3.1.tar.gz
- Upload date:
- Size: 31.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.9.15
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8a8931a2162dc8998d4d157dd13f5798f80085046b426925ecb5f5d251b7bb12 |
|
MD5 | 590781cc0c8f65ea4824edebcc49cc01 |
|
BLAKE2b-256 | 799f0b51be5e7976cf5e94c41cb75142c3666b178d81f8c595e5cce2d24cc68c |
File details
Details for the file memori-0.3.1-py3-none-any.whl
.
File metadata
- Download URL: memori-0.3.1-py3-none-any.whl
- Upload date:
- Size: 27.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.9.15
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 488f508b9ec75ddc2d7cf06016a53266439cb994589ffb8ae136b7932f624552 |
|
MD5 | 44c26a4cad66e88f43ecc39245c5aaf1 |
|
BLAKE2b-256 | e0609f97e2954cf878cfa8b3e53340cf3c7c174c408a629a036abfff4d41343f |