Skip to main content

Building blocks for creating a pipeline for Fairspace

Project description

Fairspace Pipeline

PyPI PyPI - Status Apache 2.0 license

This repository contains the Fairspace Pipeline, a package that can be used as a base to create custom ETL pipelines for Fairspace.

Installation

Requires Python 3.12 or newer.

Installing from PyPI

python -m pip install fairspace-pipeline

Installing from sources

git clone https://github.com/thehyve/fairspace-pipeline.git
python -m build
python -m pip install .

Usage

Fairspace Pipeline cannot be used as a standalone package. It requires a custom implementation of several and specific configuration. Sections below describe the necessary steps to create a fully-functioning custom pipeline with reusable components and interfaces implementation.

Configuration

Create a .env file with the following environment variables:

# Keycloak
KEYCLOAK_CLIENT_ID="" # e.g. KEYCLOAK_CLIENT_ID="workspace-client"
KEYCLOAK_CLIENT_SECRET="" # e.g. KEYCLOAK_CLIENT_SECRET="********"
KEYCLOAK_USERNAME="" # e.g. KEYCLOAK_USERNAME="organisation-admin"
KEYCLOAK_PASSWORD="" # e.g. KEYCLOAK_PASSWORD="fairspace123"
KEYCLOAK_SERVER_URL="" # e.g. KEYCLOAK_SERVER_URL="https://my-keycloak-instance.com"
KEYCLOAK_REALM="" # e.g. KEYCLOAK_REALM="fairspace"

# Fairspace
FAIRSPACE_URL="" # e.g. FAIRSPACE_URL="https://my-fairspace-instance.com"

# Amazon S3
IS_AWS_S3=False # e.g. IS_AWS_S3=True if using AWS S3 bucket for source data
AWS_SOURCE_BUCKET_NAME="" # e.g. AWS_SOURCE_BUCKET_NAME="fairspace-metadata-source"
AWS_OUTPUT_BUCKET_NAME="" # e.g. AWS_OUTPUT_BUCKET_NAME="fairspace-metadata-output"

# Pipeline settings
TAXONOMIES_TTL_PATH="config/custom_taxonomies.ttl"
SOURCE_STUDIES_PATHS='' # e.g. '["./test_data/test_study_input1", "./test_data/test_study_input2"]'
OUTPUT_DATA_PATH="" # e.g. OUTPUT_DATA_PATH="./test_data/.output_data"

Next, implement a function that creates a FairspacePipelineConfig object using environment variables

def get_config() -> FairspacePipelineConfig:
    config = FairspacePipelineConfig()
    config.source_study_prefixes = json.loads(str(os.environ.get('SOURCE_STUDIES_PREFIXES', '[""]')))
    config.output_data_directory = os.environ.get('OUTPUT_DATA_PATH',
                                                  os.path.join(os.getcwd(), 'test_data/.output_data'))
    config.is_aws_s3 = os.getenv("IS_AWS_S3", 'False').lower() in ('true', '1', 't')
    if config.is_aws_s3:
        config.source_study_directories = json.loads(os.environ.get('SOURCE_STUDIES_PATHS'))
    else:
        config.source_study_directories = [os.path.join(os.getcwd(), val) for val in json.loads(
            os.environ.get('SOURCE_STUDIES_PATHS', '["test_data/source_data"]'))]
    config.source_bucket_name = os.environ.get('AWS_SOURCE_BUCKET_NAME')
    config.output_bucket_name = os.environ.get('AWS_OUTPUT_BUCKET_NAME')
    config.taxonomies_directory = os.environ.get(
        'TAXONOMIES_TTL_PATH',
        os.path.join(pathlib.Path(__file__).parent.absolute(), "config", "taxonomies.ttl")
    )
    return config

Taxonomies

Implement custom taxonomies graph class that extends the TaxonomiesGraph class. The class needs to:

  • include all custom taxonomies as class attributes,
  • define the TAXONOMY_PREFIX to be used in custom taxonomy terms URIs.

Sample implementation:

from fairspace_pipeline.graph.taxonomy_graph import TaxonomiesGraph

TAXONOMY_PREFIX = "https://fairspace.com/custom_ontology#"  # Prefix for custom taxonomies

class CustomTaxonomiesGraph(TaxonomiesGraph):
    def __init__(self, taxonomies_dir):
        super().__init__(taxonomies_dir, TAXONOMY_PREFIX)
        self.countries = self.query_taxonomy(taxonomy_name='Country')
        self.file_types = self.query_taxonomy(taxonomy_name='FileType')
        self.species = self.query_taxonomy(taxonomy_name='Species')
        self.gender = self.query_taxonomy(taxonomy_name='Gender')
        self.study_status = self.query_taxonomy(taxonomy_name='StudyStatus')
        self.study_phases = self.query_taxonomy(taxonomy_name='StudyPhase')
        self.sample_treatments = self.query_taxonomy(taxonomy_name='Treatment')

Mapping

Implement custom graph class that extends the FairspaceGraph class. It is a module for mapping data to RDF graphs specific to the custom data model, e.g.:

The class needs to implement create_graph() and get_file_suffix_processing_order() methods of the FairspaceGraph interface.

Sample implementation:

from fairspace_pipeline.graph.fairspace_graph import FairspaceGraph

CUSTOM_NS = Namespace("https://custom.com/ontology#")

class CustomGraph(FairspaceGraph):
    def __init__(self, taxonomies_graph: CustomTaxonomiesGraph):
        super().__init__(taxonomies_graph)

    def map_study(self, study_dict: dict) -> Study:
        # Implement custom study processing logic
        pass

    def map_sample(self, sample_dict: dict) -> Sample:
        # Implement custom sample processing logic
        pass

    def map_subject(self, subject_dict: dict) -> Subject:
        # Implement custom subject processing logic
        pass

    def create_study_graph(self, data: dict, prefix: str) -> Graph:
        graph = self.create_new_graph()
        studies = self.map_studies(data, prefix)
        for study in studies:
            graph.add((study.uri, RDF.type, CUSTOM_NS.Study))
            if study.label is not None:
                graph.add((study.uri, RDFS.label, Literal(study.label)))
            if study.identifier is not None:
                graph.add((study.uri, DCTERMS.identifier, Literal(study.identifier)))
            ...
        return graph

    def create_sample_graph(self, data: dict, prefix: str) -> Graph:
        graph = self.create_new_graph()
        samples = self.map_sample(data, prefix)
        for sample in samples:
            ...
        return graph
            
    def create_subject_graph(self, data: dict, prefix: str) -> Graph:
        graph = self.create_new_graph()
        subjects = self.map_subject(data, prefix)
        for subject in subjects:
            ...
        return graph
            
    def create_graph(self, file_path: str, data: dict, prefix: str) -> Graph:
        if str(file_path).endswith(STUDY_MANIFEST_FILE_SUFFIX):
            return self.create_study_graph(data, prefix)
        if str(file_path).endswith(SAMPLE_INFORMATION_SUFFIX):
            return self.create_sample_graph(data, prefix)
        ...
        return self.create_subjects_data_files_graph(data, prefix)

    # Define the order of processing files by file suffix in the source study directory
    # e.g. ['_study.json', '_sample_information.json']
    def get_file_suffix_processing_order(self) -> list[str]:
        return [STUDY_MANIFEST_FILE_SUFFIX, SAMPLE_INFORMATION_SUFFIX]

IO Handler

Implement a custom IO handler class that extends the IOHandler class, following examples of the LocalIOHandler class and the S3IOHandler class.

Sample implementation (extending local handler):

class CustomIOHandler(LocalIOHandler):
    def __init__(self, output_data_directory: str, fairspace_graph: CustomFairspaceGraph):
        super().__init__(output_data_directory, fairspace_graph)
        ...

    def process_directory(self, directory: str):
        ...

    @override
    def send_to_api(self, api: FairspaceApi, source_directories: str):
        for directory, subdirectories, files in os.walk(self.output_data_directory):
            full_path = os.path.join(directory, file)
            try:
                api.upload_metadata_by_path(full_path)
             except Exception as e:
                log.error(f"Error uploading file {full_path}")
                log.error(e)
        log.info(f"Done uploading all metadata to Fairspace.")

    @override
    def transform_data(self, source_directories: str, source_prefixes=None):
        for source_directory in source_directories:
            self.process_directory(source_directory)

Pipeline runner

Define main function to run the pipeline.

from fairspace_pipeline.pipeline import FairspacePipeline, FairspacePipelineConfig

def main():
    load_dotenv()

    parser = argparse.ArgumentParser()
    parser.add_argument("-i", "--init", action="store_true")
    parser.add_argument("-p", "--process", action="store_true")
    parser.add_argument("-u", "--upload", action="store_true")
    parser.add_argument("-r", "--reindex", action="store_true")
    parser.add_argument("-d", "--delete", action="store_true")
    parser.add_argument("-c", "--compact", action="store_true")
    parser.add_argument("-ms", "--maintenance_status", action="store_true")
    args = parser.parse_args()

    config = get_config()

    taxonomies_graph = CustomTaxonomiesGraph(config.taxonomies_directory)
    graph = CustomGraph(config, taxonomies_graph)

    pipeline = FairspacePipeline(config, graph)
    pipeline.run(args.init, args.process, args.upload, args.delete, args.reindex, args.compact, args.maintenance_status)


if __name__ == '__main__':
    main()

Running the complete pipeline

When all the components are implemented, the pipeline can be run with the following command:

python main.py --init --process --upload --compact

Include only the options that you need:

  • --init or -i - prepare required roles for the configured user and upload taxonomies
  • --process or -p - read the input source data, transform to Fairspace model and save to the configured output directory
  • --upload or -u - upload the transform data into Fairspace
  • --reindex or -r - reindex Fairspace views database
  • --compact or -c - compact Fairspace Jena database to reduce the size
  • --maintenance_status or -ms - get the maintenance status to see if reindexing or compacting is in progress

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fairspace_pipeline-0.1.1.tar.gz (19.0 kB view details)

Uploaded Source

Built Distribution

fairspace_pipeline-0.1.1-py3-none-any.whl (19.4 kB view details)

Uploaded Python 3

File details

Details for the file fairspace_pipeline-0.1.1.tar.gz.

File metadata

  • Download URL: fairspace_pipeline-0.1.1.tar.gz
  • Upload date:
  • Size: 19.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for fairspace_pipeline-0.1.1.tar.gz
Algorithm Hash digest
SHA256 41bc99fc2421f884af9f72e5395470027efc4023e809874e272749bfc9022779
MD5 992a9b14b5c34259fef733f609f09b2e
BLAKE2b-256 ffd3503b9f6e1c66f7783aa188cc27b1091e1817c58d4bb6442181b20f4d560b

See more details on using hashes here.

Provenance

The following attestation bundles were made for fairspace_pipeline-0.1.1.tar.gz:

Publisher: publish.yml on thehyve/fairspace-pipeline

Attestations:

File details

Details for the file fairspace_pipeline-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for fairspace_pipeline-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 0a2b911811253fb1fbc097c1b6150f1d183e24c24ee056442b014423d2cf1f23
MD5 dcc7cf0e7900827b2c5df385709c700d
BLAKE2b-256 817a1b956267e129764df617f457e36113906e8318f6318616759916a38c9c3c

See more details on using hashes here.

Provenance

The following attestation bundles were made for fairspace_pipeline-0.1.1-py3-none-any.whl:

Publisher: publish.yml on thehyve/fairspace-pipeline

Attestations:

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page