Skip to main content

Building blocks for creating a pipeline for Fairspace

Project description

Fairspace Pipeline

PyPI PyPI - Status Apache 2.0 license

This repository contains the Fairspace Pipeline, a package that can be used as a base to create custom ETL pipelines for Fairspace.

Installation

Requires Python 3.12 or newer.

Installing from PyPI

python -m pip install fairspace-pipeline

Installing from sources

git clone https://github.com/thehyve/fairspace-pipeline.git
python -m build
python -m pip install .

Usage

Fairspace Pipeline cannot be used as a standalone package. It requires a custom implementation of several and specific configuration. Sections below describe the necessary steps to create a fully-functioning custom pipeline with reusable components and interfaces implementation.

Configuration

Create a .env file with the following environment variables:

# Keycloak
KEYCLOAK_CLIENT_ID="" # e.g. KEYCLOAK_CLIENT_ID="workspace-client"
KEYCLOAK_CLIENT_SECRET="" # e.g. KEYCLOAK_CLIENT_SECRET="********"
KEYCLOAK_USERNAME="" # e.g. KEYCLOAK_USERNAME="organisation-admin"
KEYCLOAK_PASSWORD="" # e.g. KEYCLOAK_PASSWORD="fairspace123"
KEYCLOAK_SERVER_URL="" # e.g. KEYCLOAK_SERVER_URL="https://my-keycloak-instance.com"
KEYCLOAK_REALM="" # e.g. KEYCLOAK_REALM="fairspace"

# Fairspace
FAIRSPACE_URL="" # e.g. FAIRSPACE_URL="https://my-fairspace-instance.com"

# Amazon S3
IS_AWS_S3=False # e.g. IS_AWS_S3=True if using AWS S3 bucket for source data
AWS_SOURCE_BUCKET_NAME="" # e.g. AWS_SOURCE_BUCKET_NAME="fairspace-metadata-source"
AWS_OUTPUT_BUCKET_NAME="" # e.g. AWS_OUTPUT_BUCKET_NAME="fairspace-metadata-output"

# Pipeline settings
TAXONOMIES_TTL_PATH="config/custom_taxonomies.ttl"
SOURCE_STUDIES_PATHS='' # e.g. '["./test_data/test_study_input1", "./test_data/test_study_input2"]'
OUTPUT_DATA_PATH="" # e.g. OUTPUT_DATA_PATH="./test_data/.output_data"

Next, implement a function that creates a FairspacePipelineConfig object using environment variables

def get_config() -> FairspacePipelineConfig:
    config = FairspacePipelineConfig()
    config.source_study_prefixes = json.loads(str(os.environ.get('SOURCE_STUDIES_PREFIXES', '[""]')))
    config.output_data_directory = os.environ.get('OUTPUT_DATA_PATH',
                                                  os.path.join(os.getcwd(), 'test_data/.output_data'))
    config.is_aws_s3 = os.getenv("IS_AWS_S3", 'False').lower() in ('true', '1', 't')
    if config.is_aws_s3:
        config.source_study_directories = json.loads(os.environ.get('SOURCE_STUDIES_PATHS'))
    else:
        config.source_study_directories = [os.path.join(os.getcwd(), val) for val in json.loads(
            os.environ.get('SOURCE_STUDIES_PATHS', '["test_data/source_data"]'))]
    config.source_bucket_name = os.environ.get('AWS_SOURCE_BUCKET_NAME')
    config.output_bucket_name = os.environ.get('AWS_OUTPUT_BUCKET_NAME')
    config.taxonomies_directory = os.environ.get(
        'TAXONOMIES_TTL_PATH',
        os.path.join(pathlib.Path(__file__).parent.absolute(), "config", "taxonomies.ttl")
    )
    return config

Taxonomies

Implement custom taxonomies graph class that extends the TaxonomiesGraph class. The class needs to:

  • include all custom taxonomies as class attributes,
  • define the TAXONOMY_PREFIX to be used in custom taxonomy terms URIs.

Sample implementation:

from fairspace_pipeline.graph.taxonomy_graph import TaxonomiesGraph

TAXONOMY_PREFIX = "https://fairspace.com/custom_ontology#"  # Prefix for custom taxonomies

class CustomTaxonomiesGraph(TaxonomiesGraph):
    def __init__(self, taxonomies_dir):
        super().__init__(taxonomies_dir, TAXONOMY_PREFIX)
        self.countries = self.query_taxonomy(taxonomy_name='Country')
        self.file_types = self.query_taxonomy(taxonomy_name='FileType')
        self.species = self.query_taxonomy(taxonomy_name='Species')
        self.gender = self.query_taxonomy(taxonomy_name='Gender')
        self.study_status = self.query_taxonomy(taxonomy_name='StudyStatus')
        self.study_phases = self.query_taxonomy(taxonomy_name='StudyPhase')
        self.sample_treatments = self.query_taxonomy(taxonomy_name='Treatment')

Mapping

Implement custom graph class that extends the FairspaceGraph class. It is a module for mapping data to RDF graphs specific to the custom data model, e.g.:

The class needs to implement create_graph() and get_file_suffix_processing_order() methods of the FairspaceGraph interface.

Sample implementation:

from fairspace_pipeline.graph.fairspace_graph import FairspaceGraph

CUSTOM_NS = Namespace("https://custom.com/ontology#")

class CustomGraph(FairspaceGraph):
    def __init__(self, taxonomies_graph: CustomTaxonomiesGraph):
        super().__init__(taxonomies_graph)

    def map_study(self, study_dict: dict) -> Study:
        # Implement custom study processing logic
        pass

    def map_sample(self, sample_dict: dict) -> Sample:
        # Implement custom sample processing logic
        pass

    def map_subject(self, subject_dict: dict) -> Subject:
        # Implement custom subject processing logic
        pass

    def create_study_graph(self, data: dict, prefix: str) -> Graph:
        graph = self.create_new_graph()
        studies = self.map_studies(data, prefix)
        for study in studies:
            graph.add((study.uri, RDF.type, CUSTOM_NS.Study))
            if study.label is not None:
                graph.add((study.uri, RDFS.label, Literal(study.label)))
            if study.identifier is not None:
                graph.add((study.uri, DCTERMS.identifier, Literal(study.identifier)))
            ...
        return graph

    def create_sample_graph(self, data: dict, prefix: str) -> Graph:
        graph = self.create_new_graph()
        samples = self.map_sample(data, prefix)
        for sample in samples:
            ...
        return graph
            
    def create_subject_graph(self, data: dict, prefix: str) -> Graph:
        graph = self.create_new_graph()
        subjects = self.map_subject(data, prefix)
        for subject in subjects:
            ...
        return graph
            
    def create_graph(self, file_path: str, data: dict, prefix: str) -> Graph:
        if str(file_path).endswith(STUDY_MANIFEST_FILE_SUFFIX):
            return self.create_study_graph(data, prefix)
        if str(file_path).endswith(SAMPLE_INFORMATION_SUFFIX):
            return self.create_sample_graph(data, prefix)
        ...
        return self.create_subjects_data_files_graph(data, prefix)

    # Define the order of processing files by file suffix in the source study directory
    # e.g. ['_study.json', '_sample_information.json']
    def get_file_suffix_processing_order(self) -> list[str]:
        return [STUDY_MANIFEST_FILE_SUFFIX, SAMPLE_INFORMATION_SUFFIX]

Pipeline runner

Define main function to run the pipeline.

from fairspace_pipeline.pipeline import FairspacePipeline, FairspacePipelineConfig

def main():
    load_dotenv()

    parser = argparse.ArgumentParser()
    parser.add_argument("-i", "--init", action="store_true")
    parser.add_argument("-p", "--process", action="store_true")
    parser.add_argument("-u", "--upload", action="store_true")
    parser.add_argument("-r", "--reindex", action="store_true")
    parser.add_argument("-d", "--delete", action="store_true")
    parser.add_argument("-c", "--compact", action="store_true")
    parser.add_argument("-ms", "--maintenance_status", action="store_true")
    args = parser.parse_args()

    config = get_config()

    taxonomies_graph = CustomTaxonomiesGraph(config.taxonomies_directory)
    graph = CustomGraph(config, taxonomies_graph)

    pipeline = FairspacePipeline(config, graph)
    pipeline.run(args.init, args.process, args.upload, args.delete, args.reindex, args.compact, args.maintenance_status)


if __name__ == '__main__':
    main()

Optional implementation

If a custom processing needs to be applied, implement a custom IO handler class that extends the IOHandler class, following examples of the LocalIOHandler class and the S3IOHandler class.

Running the complete pipeline

When all the components are implemented, the pipeline can be run with the following command:

python main.py --init --process --upload --compact

Include only the options that you need:

  • --init or -i - prepare required roles for the configured user and upload taxonomies
  • --process or -p - read the input source data, transform to Fairspace model and save to the configured output directory
  • --upload or -u - upload the transform data into Fairspace
  • --reindex or -r - reindex Fairspace views database
  • --compact or -c - compact Fairspace Jena database to reduce the size
  • --maintenance_status or -ms - get the maintenance status to see if reindexing or compacting is in progress

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fairspace_pipeline-0.1.0.tar.gz (18.6 kB view details)

Uploaded Source

Built Distribution

fairspace_pipeline-0.1.0-py3-none-any.whl (19.1 kB view details)

Uploaded Python 3

File details

Details for the file fairspace_pipeline-0.1.0.tar.gz.

File metadata

  • Download URL: fairspace_pipeline-0.1.0.tar.gz
  • Upload date:
  • Size: 18.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for fairspace_pipeline-0.1.0.tar.gz
Algorithm Hash digest
SHA256 dd67d3201e1c9e0f47324fbe00665db6c11b42628d51fa9a128f116059b5508f
MD5 a825704b5cbf988a5e9f73f07a0f0118
BLAKE2b-256 da785b2fa1622ee89837d7d4d12d68bae08dbfa1f49d19ed25f71daad76d9921

See more details on using hashes here.

Provenance

The following attestation bundles were made for fairspace_pipeline-0.1.0.tar.gz:

Publisher: publish.yml on thehyve/fairspace-pipeline

Attestations:

File details

Details for the file fairspace_pipeline-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for fairspace_pipeline-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 5008db12e9bf04746206b103fcd1422ffc6198ba55234ba4499175a66bbcc0e7
MD5 379a0dda2da11f7990e0eadb8bda96f5
BLAKE2b-256 ca09e27b064fd2df4fed8c71a927337676dd56bbc0453a6a9391d23ae2ee37c8

See more details on using hashes here.

Provenance

The following attestation bundles were made for fairspace_pipeline-0.1.0-py3-none-any.whl:

Publisher: publish.yml on thehyve/fairspace-pipeline

Attestations:

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page