Skip to main content

HESTIA's set of file converters

This project has been archived.

The maintainers of this project have marked this project as archived. No new releases are expected.

Project description

Hestia schema converter common base

This library lets you create a converter that can translate one Pydantic object from one schema to another.

It is used as a common base to create schema converters for Life cycle analysis (LCI) software.

This repo includes code:

  • to do LCI flow mappings: exchanging a term from one LCI nomenclature to an equivalent term in another nomenclature
  • to describe how the fields of a schema or nested schemas of 2 LCI schemas relate to eachother
  • to convert one pydantic schema to another
  • a demo converter that can convert between the HESTIA Schema to the OpenLCA schema.

Setup

This repo uses other repositories as submodules: For these repositories to be included you must use the command:

git clone --recurse-submodules https://gitlab.com/hestia-earth/hestia-convert-base

This will pull in the repositories:

Tools

scripts/openlca/open_lca_process_to_hestia_converter.py

This tool lets you convert a openLCA result (exported as a JSON-LD zip file) to a HESTIA Impact Assessment.

scripts/openlca/open_lca_process_to_hestia_converter.py --help
Usage: open_lca_process_to_hestia_converter.py [OPTIONS] INPUT_ZIP_FILE

  Program that converts a openLCA results zip file to a Hestia Impact
  assessment.

  Export your results from openLCA in "openLCA > JSON-LD" format.

Options:
  --output TEXT
  --mapping_files_directory TEXT  optional location of flowmap files
  -v, --verbose                   Enables verbose mode.
  -d, --debug_file                Outputs conversion logs to debug file.
  --filter_by_name TEXT           Optional list of names to filter results on.
                                  Must be in quotes. Can be used multiple
                                  times.
  --help                          Show this message and exit.

To work, all the elementary flows listed in the Result file must have a equivalent HESTIA glossary term listed in a flowmap file. The tool loads flowmaps from the FlowMaps directory by default, or you can specify using your own with --mapping_files_directory. Typically your first task will be to add a flowmap entry for the main reference product listed in each process of your Result file. See ecoinvent_3_11_products.csv for a typical example.

Example:

scripts/openlca/open_lca_process_to_hestia_converter.py "openLCA_exported_result.zip" --output /output_files -vvv --filter_by_name "market for Apples - RoW" --filter_by_name "market for Oranges - FR"

Flow mapping.

Given a input HESTIA "term" / "flow" such as:

{
  "id": "GADM-COL",
  "name": "Colombia",
  "termType": "region",
  "type": "Term"
}

we can use the code:

from RosettaFlow import FlowMap

term_map_obj = FlowMap(PATH_TO_MAPPING_FILES)
candidates = term_map_obj.map_flow({"id": "GADM-COL",
                                    "name": "Colombia",
                                    "termType": "region",
                                    "type": "Term"}, target_nomenclature="openLCA")

to get a list of known equivalent "openLCA" location flow:

print(candidates)
[CandidateFlow(MatchCondition='=', ConversionFactor=1.0, FlowName='Colombia', FlowUUID='ab6c0400-6660-3ef2-919d-512b21dce9ab', FlowContext='Locations', Unit='LOCATION', Mapper='hestia', Verifier='hestia', LastUpdated='2025-05-29')]

or

for c in candidate_mapped_flows:
    pprint.pprint(dict(c))
{'ConversionFactor': 1.0,
 'FlowContext': 'Locations',
 'FlowName': 'Colombia',
 'FlowUUID': 'ab6c0400-6660-3ef2-919d-512b21dce9ab',
 'LastUpdated': '2025-05-29',
 'Mapper': 'hestia',
 'MatchCondition': '=',
 'Unit': 'LOCATION',
 'Verifier': 'hestia'}

The mappings are stored in a standardised csv file format as defined by the GLAD project. GLAD repository UNEP-Economy-Division described here FlowMapping.md This format is compatible with the USEPA format described here USEPA FlowMapping.md

Please use the template file FlowMapping.csv when creating new mappings.

The FlowMap class contains functions to search flow mappings, validate entries in csv files, and helper functions to create new mappings, select the "best" candidate for every situation.

Symmetry of flow maps.

By default term_map_obj.map_flow() searches for reverse "=" and "~" mappings (right to left in the csv file) if it cannot find a mapping in "SourceUUID" (left to right in the csv file). This can be disabled using the check_reverse=False parameter. Any found ConversionFactor ratios returned in this case will be inverted: 1/original_conversion_fator

TODO:

  • Add support for correctly handling "superset of', 'a subset of', 'a proxy for', aka >, <, and ~
  • Add support daisy channing mappings by recursively checking mappings to other nomenclatures when no direct mapping exists.
  • Update pick_best_match(): add sort by LastUpdated date, trusted/preferred Mapper, trusted Verifier, closet relevant TargetFlowContext, prioritise = over ~

Pydantic object converter.

Originally based on pymapme https://github.com/funnydman/pymapme by author funnydman and heavily modified by the Hestia team.

Basics

Given 2 pydantic models, the Converter class can convert common fields from one to the other with no configuration:

from pydantic import BaseModel


class ModelA(BaseModel):
    some_field_one: str = None
    name: str = None


class ModelB(BaseModel):
    some_field_two: str = None
    name: str = None


from Converter import Converter

converter_obj = Converter()

instance_of_model_a = ModelA(**{"some_field_one": "something", "name": "bob"})
instance_of_model_b = converter_obj.transmute(source_model_obj=instance_of_model_a,
                                              destination_model=ModelB)

print(f"Type of 'instance_of_model_b' is : {type(instance_of_model_b)}")
print(f"Data in 'instance_of_model_b': {instance_of_model_b.model_dump()}")
Type of 'instance_of_model_b' is : <class 'ModelB'>
Data in 'instance_of_model_b': {'some_field_two': None, 'name': 'bob'}

Mapping fields between 2 schemas

If 2 schemas have fields with the same information, but different field names, you can map them using:

converter_obj.register_model_map(source_model_type=ModelA,
                                 destination_model_type=ModelB,
                                 map_field_dict={
                                     "some_field_two": "some_field_one"
                                 })

instance_of_model_a = ModelA(**{"some_field_one": "something", "name": "bob"})
instance_of_model_b = converter_obj.transmute(source_model_obj=instance_of_model_a,
                                              destination_model=ModelB)

print(f"Data in 'instance_of_model_b': {instance_of_model_b.model_dump()}")
Data in 'instance_of_model_b': {'some_field_two': 'something', 'name': 'bob'}

Registered field maps between models are symmetrical, so converting back from ModelB to ModelA will use the reverse mapping.

Mapping deeper nested fields

If a model has a field that contains a nested schema, you can use a . to map to a nested field:

class ModelC(BaseModel):
    field_in_c_one: int = None
    field_in_c_two: str = None


class ModelA(BaseModel):
    some_field_one: str = None
    name: str = None
    some_data: ModelC = None


converter_obj.register_model_map(source_model_type=ModelA,
                                 destination_model_type=ModelB,
                                 map_field_dict={
                                     "some_field_two": "some_data.field_in_c_two"
                                 })

instance_of_model_a = ModelA(**{"some_field_one": "something",
                                "name": "bob",
                                "some_data": {
                                    "field_in_c_one": 4,
                                    "field_in_c_two": "Some nested string"}
                                })
instance_of_model_b = converter_obj.transmute(source_model_obj=instance_of_model_a,
                                              destination_model=ModelB)
print(f"Data in 'instance_of_model_b': {instance_of_model_b.model_dump()}")
Data in 'instance_of_model_b': {'some_field_two': 'Some nested string', 'name': 'bob'}

Currently, mappings to nested fields are not symmetrical.

TODO

Using custom functions for each field

Sometimes the contents of one field must be transformed when moving to a new schema. To do this you can specify a custom function:

from pydantic import BaseModel

class ModelA(BaseModel):
    length_in_km: int = None


class ModelB(BaseModel):
    length_in_m: float = None

def _convert_km_to_m(source_model:ModelA, **kwargs)-> float:
    return source_model.length_in_km * 1000

converter_obj.register_model_map(source_model_type=ModelA,
                                 destination_model_type=ModelB,
                                 map_field_dict={
                                     "length_in_m": _convert_km_to_m,
                                 })

instance_of_model_a = ModelA(**{"length_in_km": 2})
instance_of_model_b = converter_obj.transmute(source_model_obj=instance_of_model_a,
                                              destination_model=ModelB)

print(f"Data in 'instance_of_model_b': {instance_of_model_b.model_dump()}")
Data in 'instance_of_model_b': {'length_in_m': 2000.0}

Note the use of **kwargs in _convert_km_to_m(). Multiple arguments are made available to custom functions such as:

  • 'field_name' the name of the model field,
  • 'default' the default object the converter saves when no data is found,
  • 'model_data': a dict containing model data extracted so far

To keep the mapping symmetrical, you need to also map the opposite equivalent function in reverse:

converter_obj.register_model_map(source_model_type=ModelB,
                                 destination_model_type=ModelA,
                                 map_field_dict={
                                     "length_in_km": _convert_m_to_km,
                                 })

or

converter_obj.register_model_map(source_model_type=ModelB,
                                 destination_model_type=ModelA,
                                 map_field_dict={
                                     "length_in_m": lambda source_model,field_name,default,model_data: source_model.length_in_km/1000,
                                 })

Automatic re-use of defined mappings.

Once a mapping between 2 pydantic models has been defined, it will automatically be used if encountered when converting a different pydantic model that uses that model in a subfield:

from pydantic import BaseModel, Field


class HestiaTerm(BaseModel):
    type: str = Field(default="Term")
    id: str = None


class HestiaIndicator(BaseModel):
    type: str = Field(default="Indicator")
    term: HestiaTerm
    value: float


class OpenLcaFlow(BaseModel):
    id: str = None


class OpenLcaExchange(BaseModel):
    flow: OpenLcaFlow = Field(default=None)
    amount: float = Field(default=None)


def _convert_Hestia_Term_to_openLCA_flow_ref(source_model: HestiaTerm, **kwargs) -> OpenLcaFlow:
    candidate_mapped_flows = term_map_obj.map_flow(source_model.model_dump())
    best_candidate = candidate_mapped_flows[0]
    return OpenLcaFlow(id=best_candidate.FlowUUID)


converter_obj.register_model_map(source_model_type=HestiaTerm,
                                 destination_model_type=OpenLcaFlow,
                                 map_function=_convert_Hestia_Term_to_openLCA_flow_ref)

converter_obj.register_model_map(source_model_type=HestiaIndicator,
                                 destination_model_type=OpenLcaExchange,
                                 map_field_dict={
                                     "flow": "term",
                                     "amount": "value"
                                 })

instance_of_hestia_indicator = HestiaIndicator(**{
    "type": "Indicator",
    "term": {
        "type": "Term",
        "id": "nh3ToAirInputsProduction"
    },
    "value": 3.4
})

instance_of_openLca_exchange = converter_obj.transmute(source_model_obj=instance_of_hestia_indicator,
                                                       destination_model=OpenLcaExchange)
print(f"Data in 'instance_of_openLca_exchange': {instance_of_openLca_exchange.model_dump()}")
Data in 'instance_of_openLca_exchange': {'flow': {'id': '87883a4e-1e3e-4c9d-90c0-f1bea36f8014'}, 'amount': 3.4}

Re-use of mappings in a list

Once a mapping is defined, it will also be used if a field is a list containing the destination subschema:

    class HestiaImpactAssessment(BaseModel):
        emissionsResourceUse: List[HestiaIndicator] = Field(None)

    class OpenLcaProcess(BaseModel):
        exchanges: List[OpenLcaExchange] = Field(None)

    converter_obj.register_model_map(source_model_type=HestiaImpactAssessment,
                                     destination_model_type=OpenLcaProcess,
                                     map_field_dict={
                                         "exchanges": "emissionsResourceUse",
                                     })

    instance_of_hestia_impact_assessment = HestiaImpactAssessment(**{
        "emissionsResourceUse": [
            {
                "type": "Indicator",
                "term": {
                    "type": "Term",
                    "id": "nh3ToAirInputsProduction"
                },
                "value": 3.4
            }
        ]
    })

    instance_of_openLca_process = converter_obj.transmute(source_model_obj=instance_of_hestia_impact_assessment,
                                                          destination_model=OpenLcaProcess)
    print(f"Data in 'instance_of_openLca_process': {instance_of_openLca_process.model_dump()}")
Data in 'instance_of_openLca_process': {'exchanges': [{'flow': {'id': '87883a4e-1e3e-4c9d-90c0-f1bea36f8014'}, 'amount': 3.4}]}

Too generic schemas.

Some schemas allow storing information in ways that are so general they require different parsing / conversion policies depending on the situation. To avoid having to build custom functions made up of long if/elif/else statements, you can add new pydantic models to the original pydantic implementation that helps you map each situation to a separate function:

class HestiaIndicator(BaseModel):
    type: str = Field(default="Indicator")
    term: HestiaTerm
    value: float
    some_field_that_affects_how_this_schema_should_be_converted: bool = False


class SpecialCaseHestiaIndicator(HestiaIndicator):
    pass

    class Config:
        revalidate_instances = "subclass-instances"


normal_instance_of_openLca_exchange = converter_obj.transmute(source_model_obj=instance_of_hestia_indicator,
                                                              destination_model=OpenLcaExchange)
print(f"Data in 'normal_instance_of_openLca_exchange': {normal_instance_of_openLca_exchange.model_dump()}")

other_instance_of_hestia_indicator = HestiaIndicator(**{
    "type": "Indicator",
    "term": {
        "type": "Term",
        "id": "nh3ToAirInputsProduction"
    },
    "value": 3.4,
    "some_field_that_affects_how_this_schema_should_be_converted": True
})

if other_instance_of_hestia_indicator.some_field_that_affects_how_this_schema_should_be_converted == True:
    # This turns the HestiaIndicator instance into a SpecialCaseHestiaIndicator
    special_instance = SpecialCaseHestiaIndicator.model_validate(other_instance_of_hestia_indicator)

converter_obj.register_model_map(source_model_type=SpecialCaseHestiaIndicator,
                                 destination_model_type=OpenLcaExchange,
                                 map_field_dict={
                                     "flow": "term",
                                     "amount": _custom_function_values_in_scientific_notation
                                 })

# or using a custom function
converter_obj.register_model_map(source_model_type=SpecialCaseHestiaIndicator,
                                 destination_model_type=OpenLcaExchange,
                                 map_function=_custom_function_to_handle_special_case_hestia_indicators)

special_case_of_openLca_exchange = converter_obj.transmute(source_model_obj=special_instance,
                                                           destination_model=OpenLcaExchange)

print(f"Data in 'special_case_of_openLca_exchange': {special_case_of_openLca_exchange.model_dump()}")

The setting revalidate_instances = "subclass-instances" means that the line:

special_instance = SpecialCaseHestiaIndicator.model_validate(other_instance_of_hestia_indicator)

will return a copy of other_instance_of_hestia_indicator but of Type SpecialCaseHestiaIndicator that inherits all the same fields as HestiaIndicator

Many to one

TODO

One to many

TODO

Edge cases

If you need to edit multiple fields at once, or need to add data that is dependent on processes fields you can set a function to run at the end of a schema conversion using the _always_run_ field:

converter_obj.register_model_map(source_model_type=HestiaImpactAssessment,
                                 destination_model_type=OpenLcaProcess,
                                 map_field_dict={
                                     "exchanges": "emissionsResourceUse",
                                     "_always_run_": _convert_product_and_move_to_exchanges
                                 })

As an example, the _convert_product_and_move_to_exchanges runs after the conversion, and adds a new entry to the "exchanges" or the resulting OpenLcaProcess

class HestiaImpactAssessment(BaseModel):
    emissionsResourceUse: List[HestiaIndicator] = Field(None)
    product: HestiaIndicator = Field(None)


def _convert_product_and_move_to_exchanges(model_data: dict,
                                           source_model: HestiaImpactAssessment = None,
                                           destination_model_type: OpenLcaExchange = None,
                                           context:dict = None) -> dict:
    """
    This function takes a Hestia "product" from an impact assessment, converts it to a open LCA exchange and places it in the list of exchanges in a openLCA Process.
    """
    product_exchange = converter_obj.transmute(source_model_obj=source_model.product,
                                               destination_model=OpenLcaExchange)

    model_data['exchanges'].append(product_exchange)

    return model_data

The _convert_product_and_move_to_exchanges is given a dict model_data containing the destination_model created so far, the source object source_model, the type of the destination model destination_model_type as well a context dict. You can pass in values to the context dict using:

instance_of_openLca_process = converter_obj.transmute(source_model_obj=HestiaImpactAssessment,
                                                      destination_model=OpenLcaProcess,
                                                      context={"Foo": "Bar"})

Custom mapping implementations

Instead of defining a map_field_dict, it may be easier in some cases to implement your own function to handle the entire conversion between 2 schemas. This lets you build small custom code to deal with pairs of sub-schemas, while using the other converter features to handle the mode general tasks.

converter_obj.register_model_map(source_model_type=HestiaUnit,
                                 destination_model_type=OpenLcaUnit,
                                 map_function=_convert_hestia_unit_to_openLCA)
def _convert_hestia_unit_to_openLCA(source_model: HestiaUnit,
                                    destination_model_type=OpenLcaUnit,
                                    context:dict = None
                                    ) -> OpenLcaUnit:
    # ... your code here
    return OpenLcaUnit(name="kg", id="20aadc24-a391-41cf-b340-3e4529f44bde")

Todo:

  • Add support for "alias" fields
  • Test all possible field annotations can be read Optional[List[Union[Unicorn,Magic, bool]]]
  • subclass openlca_schema package
  • ci/cd automate generate hestia pydantic schema from official repo
  • hestia to openlca converter
  • Add sub-git

Sample Pydantic schemas in this repository:

HESTIA pydantic schema

This repo contains a pydantic implementation of the hestia schema. It is autogenerated using datamodel-codegen by reading the official schema definition files in https://gitlab.com/hestia-earth/hestia-schema/ . (Both Yaml and json-schema definitions). https://gitlab.com/hestia-earth/hestia-schema/ remains the only canonical source for the HESTIA schema. Minor changes added to build the POC hestia to openLCA converter.

Todo:

  • import validations from hestia_earth.validation
  • add autogeneration script

OpenLCA pydantic schema

This repo contains a pydantic implementation of the openLca schema. In the background it uses the official canonical openLca schema package olca-schema and reuses and subclasses the original classes when possible. Minor changes added such as making some fields more specific. For example:

location fields in olca classes now use the Location sub-schema

    location: Optional[Location] = Field(None)

instead of the more generic Ref schema

    location: Optional[Ref] = Field(None)

that was a parent of the Location class and too general.

Sample converters.

Hestia to OpenLCA sample converter

src/Hestia_OpenLCA_Converter contains a proof of concept schema converter that partially converts from the HESTIA schema to the openLCA schema

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

hestia-converters-0.0.1.tar.gz (103.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

hestia_converters-0.0.1-py3-none-any.whl (109.4 kB view details)

Uploaded Python 3

File details

Details for the file hestia-converters-0.0.1.tar.gz.

File metadata

  • Download URL: hestia-converters-0.0.1.tar.gz
  • Upload date:
  • Size: 103.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.9.5

File hashes

Hashes for hestia-converters-0.0.1.tar.gz
Algorithm Hash digest
SHA256 b044c855b64f80b6ba67ac2868704f1502b632485ac6324067beda52a39b4fa3
MD5 c792fb8fd4b82cc3ef8da0941b190c18
BLAKE2b-256 ebacd4b9e4739f84d0ecdefa756f6cf3a6c8f48be86185ab29b5f5e4052790c7

See more details on using hashes here.

File details

Details for the file hestia_converters-0.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for hestia_converters-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 b08df4c97e186c1137036617bb45c27b564a3455417f1e8f0ae2f595ac1fb3b7
MD5 012c4bbf1609f1144998e6b1cfa2619c
BLAKE2b-256 2517713cc5f86b596e43944f351804741153a1776fed2bd566cd753e2f11e921

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page