Skip to main content

HESTIA's set of file converters

Project description

Hestia schema converter common base

This library lets you create a converter that can translate one Pydantic object from one schema to another.

It is used as a common base to create schema converters for Life cycle analysis (LCI) software.

This repo includes code:

  • to do LCI flow mappings: exchanging a term from one LCI nomenclature to an equivalent term in another nomenclature
  • to describe how the fields of a schema or nested schemas of 2 LCI schemas relate to eachother
  • to convert one pydantic schema to another
  • a demo converter that can convert between the HESTIA Schema to the OpenLCA schema.

Setup

  1. Install the library and the converters you want to use:
pip install hestia-earth-converters
pip install "hestia-earth-converters[simapro]"
  1. Convert an HESTIA ImpactAssessment to SimaPro format:
hestia-convert --output-folder samples --input-format HESTIA --output-format SimaPro --hestia-impact-id africanAubergineFruit-cote-divoire-2010-2025-20250427

The converted file will be stored under samples directory.

Flow mapping.

Given a input HESTIA "term" / "flow" such as:

{
  "id": "GADM-COL",
  "name": "Colombia",
  "termType": "region",
  "type": "Term"
}

we can use the code:

from RosettaFlow import FlowMap

term_map_obj = FlowMap(PATH_TO_MAPPING_FILES)
candidates = term_map_obj.map_flow({"id": "GADM-COL",
                                    "name": "Colombia",
                                    "termType": "region",
                                    "type": "Term"}, target_nomenclature="openLCA")

to get a list of known equivalent "openLCA" location flow:

print(candidates)
[CandidateFlow(MatchCondition='=', ConversionFactor=1.0, FlowName='Colombia', FlowUUID='ab6c0400-6660-3ef2-919d-512b21dce9ab', FlowContext='Locations', Unit='LOCATION', Mapper='hestia', Verifier='hestia', LastUpdated='2025-05-29')]

or

for c in candidate_mapped_flows:
    pprint.pprint(dict(c))
{'ConversionFactor': 1.0,
 'FlowContext': 'Locations',
 'FlowName': 'Colombia',
 'FlowUUID': 'ab6c0400-6660-3ef2-919d-512b21dce9ab',
 'LastUpdated': '2025-05-29',
 'Mapper': 'hestia',
 'MatchCondition': '=',
 'Unit': 'LOCATION',
 'Verifier': 'hestia'}

The mappings are stored in a standardised csv file format as defined by the GLAD project. GLAD repository UNEP-Economy-Division described here FlowMapping.md This format is compatible with the USEPA format described here USEPA FlowMapping.md

Please use the template file FlowMapping.csv when creating new mappings.

The FlowMap class contains functions to search flow mappings, validate entries in csv files, and helper functions to create new mappings, select the "best" candidate for every situation.

Symmetry of flow maps.

By default term_map_obj.map_flow() searches for reverse "=" and "~" mappings (right to left in the csv file) if it cannot find a mapping in "SourceUUID" (left to right in the csv file). This can be disabled using the check_reverse=False parameter. Any found ConversionFactor ratios returned in this case will be inverted: 1/original_conversion_fator

TODO:

  • Add support for correctly handling "superset of', 'a subset of', 'a proxy for', aka >, <, and ~
  • Add support daisy channing mappings by recursively checking mappings to other nomenclatures when no direct mapping exists.
  • Update pick_best_match(): add sort by LastUpdated date, trusted/preferred Mapper, trusted Verifier, closet relevant TargetFlowContext, prioritise = over ~

Pydantic object converter.

Originally based on pymapme https://github.com/funnydman/pymapme by author funnydman and heavily modified by the Hestia team.

Basics

Given 2 pydantic models, the Converter class can convert common fields from one to the other with no configuration:

from pydantic import BaseModel


class ModelA(BaseModel):
    some_field_one: str = None
    name: str = None


class ModelB(BaseModel):
    some_field_two: str = None
    name: str = None


from Converter import Converter

converter_obj = Converter()

instance_of_model_a = ModelA(**{"some_field_one": "something", "name": "bob"})
instance_of_model_b = converter_obj.transmute(source_model_obj=instance_of_model_a,
                                              destination_model=ModelB)

print(f"Type of 'instance_of_model_b' is : {type(instance_of_model_b)}")
print(f"Data in 'instance_of_model_b': {instance_of_model_b.model_dump()}")
Type of 'instance_of_model_b' is : <class 'ModelB'>
Data in 'instance_of_model_b': {'some_field_two': None, 'name': 'bob'}

Mapping fields between 2 schemas

If 2 schemas have fields with the same information, but different field names, you can map them using:

converter_obj.register_model_map(source_model_type=ModelA,
                                 destination_model_type=ModelB,
                                 map_field_dict={
                                     "some_field_two": "some_field_one"
                                 })

instance_of_model_a = ModelA(**{"some_field_one": "something", "name": "bob"})
instance_of_model_b = converter_obj.transmute(source_model_obj=instance_of_model_a,
                                              destination_model=ModelB)

print(f"Data in 'instance_of_model_b': {instance_of_model_b.model_dump()}")
Data in 'instance_of_model_b': {'some_field_two': 'something', 'name': 'bob'}

Registered field maps between models are symmetrical, so converting back from ModelB to ModelA will use the reverse mapping.

Mapping deeper nested fields

If a model has a field that contains a nested schema, you can use a . to map to a nested field:

class ModelC(BaseModel):
    field_in_c_one: int = None
    field_in_c_two: str = None


class ModelA(BaseModel):
    some_field_one: str = None
    name: str = None
    some_data: ModelC = None


converter_obj.register_model_map(source_model_type=ModelA,
                                 destination_model_type=ModelB,
                                 map_field_dict={
                                     "some_field_two": "some_data.field_in_c_two"
                                 })

instance_of_model_a = ModelA(**{"some_field_one": "something",
                                "name": "bob",
                                "some_data": {
                                    "field_in_c_one": 4,
                                    "field_in_c_two": "Some nested string"}
                                })
instance_of_model_b = converter_obj.transmute(source_model_obj=instance_of_model_a,
                                              destination_model=ModelB)
print(f"Data in 'instance_of_model_b': {instance_of_model_b.model_dump()}")
Data in 'instance_of_model_b': {'some_field_two': 'Some nested string', 'name': 'bob'}

Currently, mappings to nested fields are not symmetrical.

TODO

Using custom functions for each field

Sometimes the contents of one field must be transformed when moving to a new schema. To do this you can specify a custom function:

from pydantic import BaseModel

class ModelA(BaseModel):
    length_in_km: int = None


class ModelB(BaseModel):
    length_in_m: float = None

def _convert_km_to_m(source_model:ModelA, **kwargs)-> float:
    return source_model.length_in_km * 1000

converter_obj.register_model_map(source_model_type=ModelA,
                                 destination_model_type=ModelB,
                                 map_field_dict={
                                     "length_in_m": _convert_km_to_m,
                                 })

instance_of_model_a = ModelA(**{"length_in_km": 2})
instance_of_model_b = converter_obj.transmute(source_model_obj=instance_of_model_a,
                                              destination_model=ModelB)

print(f"Data in 'instance_of_model_b': {instance_of_model_b.model_dump()}")
Data in 'instance_of_model_b': {'length_in_m': 2000.0}

Note the use of **kwargs in _convert_km_to_m(). Multiple arguments are made available to custom functions such as:

  • 'field_name' the name of the model field,
  • 'default' the default object the converter saves when no data is found,
  • 'model_data': a dict containing model data extracted so far

To keep the mapping symmetrical, you need to also map the opposite equivalent function in reverse:

converter_obj.register_model_map(source_model_type=ModelB,
                                 destination_model_type=ModelA,
                                 map_field_dict={
                                     "length_in_km": _convert_m_to_km,
                                 })

or

converter_obj.register_model_map(source_model_type=ModelB,
                                 destination_model_type=ModelA,
                                 map_field_dict={
                                     "length_in_m": lambda source_model,field_name,default,model_data: source_model.length_in_km/1000,
                                 })

Automatic re-use of defined mappings.

Once a mapping between 2 pydantic models has been defined, it will automatically be used if encountered when converting a different pydantic model that uses that model in a subfield:

from pydantic import BaseModel, Field


class HestiaTerm(BaseModel):
    type: str = Field(default="Term")
    id: str = None


class HestiaIndicator(BaseModel):
    type: str = Field(default="Indicator")
    term: HestiaTerm
    value: float


class OpenLcaFlow(BaseModel):
    id: str = None


class OpenLcaExchange(BaseModel):
    flow: OpenLcaFlow = Field(default=None)
    amount: float = Field(default=None)


def _convert_Hestia_Term_to_openLCA_flow_ref(source_model: HestiaTerm, **kwargs) -> OpenLcaFlow:
    candidate_mapped_flows = term_map_obj.map_flow(source_model.model_dump())
    best_candidate = candidate_mapped_flows[0]
    return OpenLcaFlow(id=best_candidate.FlowUUID)


converter_obj.register_model_map(source_model_type=HestiaTerm,
                                 destination_model_type=OpenLcaFlow,
                                 map_function=_convert_Hestia_Term_to_openLCA_flow_ref)

converter_obj.register_model_map(source_model_type=HestiaIndicator,
                                 destination_model_type=OpenLcaExchange,
                                 map_field_dict={
                                     "flow": "term",
                                     "amount": "value"
                                 })

instance_of_hestia_indicator = HestiaIndicator(**{
    "type": "Indicator",
    "term": {
        "type": "Term",
        "id": "nh3ToAirInputsProduction"
    },
    "value": 3.4
})

instance_of_openLca_exchange = converter_obj.transmute(source_model_obj=instance_of_hestia_indicator,
                                                       destination_model=OpenLcaExchange)
print(f"Data in 'instance_of_openLca_exchange': {instance_of_openLca_exchange.model_dump()}")
Data in 'instance_of_openLca_exchange': {'flow': {'id': '87883a4e-1e3e-4c9d-90c0-f1bea36f8014'}, 'amount': 3.4}

Re-use of mappings in a list

Once a mapping is defined, it will also be used if a field is a list containing the destination subschema:

    class HestiaImpactAssessment(BaseModel):
        emissionsResourceUse: List[HestiaIndicator] = Field(None)

    class OpenLcaProcess(BaseModel):
        exchanges: List[OpenLcaExchange] = Field(None)

    converter_obj.register_model_map(source_model_type=HestiaImpactAssessment,
                                     destination_model_type=OpenLcaProcess,
                                     map_field_dict={
                                         "exchanges": "emissionsResourceUse",
                                     })

    instance_of_hestia_impact_assessment = HestiaImpactAssessment(**{
        "emissionsResourceUse": [
            {
                "type": "Indicator",
                "term": {
                    "type": "Term",
                    "id": "nh3ToAirInputsProduction"
                },
                "value": 3.4
            }
        ]
    })

    instance_of_openLca_process = converter_obj.transmute(source_model_obj=instance_of_hestia_impact_assessment,
                                                          destination_model=OpenLcaProcess)
    print(f"Data in 'instance_of_openLca_process': {instance_of_openLca_process.model_dump()}")
Data in 'instance_of_openLca_process': {'exchanges': [{'flow': {'id': '87883a4e-1e3e-4c9d-90c0-f1bea36f8014'}, 'amount': 3.4}]}

Too generic schemas.

Some schemas allow storing information in ways that are so general they require different parsing / conversion policies depending on the situation. To avoid having to build custom functions made up of long if/elif/else statements, you can add new pydantic models to the original pydantic implementation that helps you map each situation to a separate function:

class HestiaIndicator(BaseModel):
    type: str = Field(default="Indicator")
    term: HestiaTerm
    value: float
    some_field_that_affects_how_this_schema_should_be_converted: bool = False


class SpecialCaseHestiaIndicator(HestiaIndicator):
    pass

    class Config:
        revalidate_instances = "subclass-instances"


normal_instance_of_openLca_exchange = converter_obj.transmute(source_model_obj=instance_of_hestia_indicator,
                                                              destination_model=OpenLcaExchange)
print(f"Data in 'normal_instance_of_openLca_exchange': {normal_instance_of_openLca_exchange.model_dump()}")

other_instance_of_hestia_indicator = HestiaIndicator(**{
    "type": "Indicator",
    "term": {
        "type": "Term",
        "id": "nh3ToAirInputsProduction"
    },
    "value": 3.4,
    "some_field_that_affects_how_this_schema_should_be_converted": True
})

if other_instance_of_hestia_indicator.some_field_that_affects_how_this_schema_should_be_converted == True:
    # This turns the HestiaIndicator instance into a SpecialCaseHestiaIndicator
    special_instance = SpecialCaseHestiaIndicator.model_validate(other_instance_of_hestia_indicator)

converter_obj.register_model_map(source_model_type=SpecialCaseHestiaIndicator,
                                 destination_model_type=OpenLcaExchange,
                                 map_field_dict={
                                     "flow": "term",
                                     "amount": _custom_function_values_in_scientific_notation
                                 })

# or using a custom function
converter_obj.register_model_map(source_model_type=SpecialCaseHestiaIndicator,
                                 destination_model_type=OpenLcaExchange,
                                 map_function=_custom_function_to_handle_special_case_hestia_indicators)

special_case_of_openLca_exchange = converter_obj.transmute(source_model_obj=special_instance,
                                                           destination_model=OpenLcaExchange)

print(f"Data in 'special_case_of_openLca_exchange': {special_case_of_openLca_exchange.model_dump()}")

The setting revalidate_instances = "subclass-instances" means that the line:

special_instance = SpecialCaseHestiaIndicator.model_validate(other_instance_of_hestia_indicator)

will return a copy of other_instance_of_hestia_indicator but of Type SpecialCaseHestiaIndicator that inherits all the same fields as HestiaIndicator

Many to one

TODO

One to many

TODO

Edge cases

If you need to edit multiple fields at once, or need to add data that is dependent on processes fields you can set a function to run at the end of a schema conversion using the _always_run_ field:

converter_obj.register_model_map(source_model_type=HestiaImpactAssessment,
                                 destination_model_type=OpenLcaProcess,
                                 map_field_dict={
                                     "exchanges": "emissionsResourceUse",
                                     "_always_run_": _convert_product_and_move_to_exchanges
                                 })

As an example, the _convert_product_and_move_to_exchanges runs after the conversion, and adds a new entry to the "exchanges" or the resulting OpenLcaProcess

class HestiaImpactAssessment(BaseModel):
    emissionsResourceUse: List[HestiaIndicator] = Field(None)
    product: HestiaIndicator = Field(None)


def _convert_product_and_move_to_exchanges(model_data: dict,
                                           source_model: HestiaImpactAssessment = None,
                                           destination_model_type: OpenLcaExchange = None,
                                           context:dict = None) -> dict:
    """
    This function takes a Hestia "product" from an impact assessment, converts it to a open LCA exchange and places it in the list of exchanges in a openLCA Process.
    """
    product_exchange = converter_obj.transmute(source_model_obj=source_model.product,
                                               destination_model=OpenLcaExchange)

    model_data['exchanges'].append(product_exchange)

    return model_data

The _convert_product_and_move_to_exchanges is given a dict model_data containing the destination_model created so far, the source object source_model, the type of the destination model destination_model_type as well a context dict. You can pass in values to the context dict using:

instance_of_openLca_process = converter_obj.transmute(source_model_obj=HestiaImpactAssessment,
                                                      destination_model=OpenLcaProcess,
                                                      context={"Foo": "Bar"})

Custom mapping implementations

Instead of defining a map_field_dict, it may be easier in some cases to implement your own function to handle the entire conversion between 2 schemas. This lets you build small custom code to deal with pairs of sub-schemas, while using the other converter features to handle the mode general tasks.

converter_obj.register_model_map(source_model_type=HestiaUnit,
                                 destination_model_type=OpenLcaUnit,
                                 map_function=_convert_hestia_unit_to_openLCA)
def _convert_hestia_unit_to_openLCA(source_model: HestiaUnit,
                                    destination_model_type=OpenLcaUnit,
                                    context:dict = None
                                    ) -> OpenLcaUnit:
    # ... your code here
    return OpenLcaUnit(name="kg", id="20aadc24-a391-41cf-b340-3e4529f44bde")

Todo:

  • Add support for "alias" fields
  • Test all possible field annotations can be read Optional[List[Union[Unicorn,Magic, bool]]]
  • subclass openlca_schema package
  • ci/cd automate generate hestia pydantic schema from official repo
  • hestia to openlca converter
  • Add sub-git

Sample Pydantic schemas in this repository:

HESTIA pydantic schema

This repo contains a pydantic implementation of the hestia schema. It is autogenerated using datamodel-codegen by reading the official schema definition files in https://gitlab.com/hestia-earth/hestia-schema/ . (Both Yaml and json-schema definitions). https://gitlab.com/hestia-earth/hestia-schema/ remains the only canonical source for the HESTIA schema. Minor changes added to build the POC hestia to openLCA converter.

Todo:

  • import validations from hestia_earth.validation
  • add autogeneration script

OpenLCA pydantic schema

This repo contains a pydantic implementation of the openLca schema. In the background it uses the official canonical openLca schema package olca-schema and reuses and subclasses the original classes when possible. Minor changes added such as making some fields more specific. For example:

location fields in olca classes now use the Location sub-schema

    location: Optional[Location] = Field(None)

instead of the more generic Ref schema

    location: Optional[Ref] = Field(None)

that was a parent of the Location class and too general.

Sample converters.

Hestia to OpenLCA sample converter

src/Hestia_OpenLCA_Converter contains a proof of concept schema converter that partially converts from the HESTIA schema to the openLCA schema

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

hestia-earth-converters-0.0.4.tar.gz (103.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

hestia_earth_converters-0.0.4-py3-none-any.whl (109.8 kB view details)

Uploaded Python 3

File details

Details for the file hestia-earth-converters-0.0.4.tar.gz.

File metadata

  • Download URL: hestia-earth-converters-0.0.4.tar.gz
  • Upload date:
  • Size: 103.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.23

File hashes

Hashes for hestia-earth-converters-0.0.4.tar.gz
Algorithm Hash digest
SHA256 249ab7a3262aa4f51b92a952565569e8452a75885a61db2be9ce3dee14d41ae0
MD5 cc7b269e80ca55c1893313bdc7864020
BLAKE2b-256 357b3d3f72b2071d4eee3f7b531df21ab75eca3adfc4fc8949b491003d366b34

See more details on using hashes here.

File details

Details for the file hestia_earth_converters-0.0.4-py3-none-any.whl.

File metadata

File hashes

Hashes for hestia_earth_converters-0.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 60bceca80212234cc943d7c1e9c8b6e3083bcb43da06ab0b7bc4c866765ad217
MD5 7242d726512c8ca6e13853fda8543d22
BLAKE2b-256 765454293ba366509e42298e4d9b7e1af3ec21c34ed8cfbd8bdc949a0b1c0359

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page