Skip to main content

Python client for VoLCA — Life Cycle Assessment engine

Project description

pyvolca

Python client for VoLCA — Life Cycle Assessment engine over Agribalyse and ecoinvent.

Full guide and tutorials: https://volca.run/docs/guides/pyvolca/ Issues / source: https://github.com/ccomb/volca

Install

pip install pyvolca

Requires Python ≥ 3.10 and a running VoLCA engine. Use Server (below) to run one as a child process, or point Client at any reachable instance.

Quick start

# no-test  — needs a real engine; the snippets below run against a mocked Client.
from volca import Client, Server

with Server(config="volca.toml") as srv:
    c = Client(base_url=srv.base_url, db="agribalyse-3.2", password=srv.password)
    plants = c.search_activities(name="wheat flour, at plant", limit=5)
    chain = c.get_supply_chain(plants[0].process_id, name="at farm")
    score = c.get_impacts(plants[0].process_id, method_id=c.list_methods()[0]["methodId"])

Server reads port and password from the TOML config. The engine self-stops after idle_timeout seconds without traffic (default 5 min).

Examples below assume c is a Client instance — construct it with the snippet above, or against an already-running server: c = Client(base_url="http://localhost:8080", db="agribalyse-3.2", password="…").

Discover what's available

Which databases are loaded? Which LCIA methods can I score against? What classification systems can I filter on?

for db in c.list_databases():
    print(f"  {db.name} [{db.status}]: {db.activity_count} activities")

for m in c.list_methods()[:5]:
    print(f"  {m['methodId']}  {m['name']} [{m['unit']}]")

Other listings: c.list_classifications() returns the classification systems and their values for the current database; c.list_presets() returns named filter presets configured in the engine. Use c.load_database(name) / c.unload_database(name) to manage memory if a database isn't auto-loaded.

Find an activity

Which activity in the database represents the product I want to assess?

plants = c.search_activities(name="wheat flour, at plant", limit=5)
for a in plants:
    print(f"{a.process_id}  {a.name} ({a.location})")

Each Activity carries process_id, name, location, product, product_amount, product_unit. Narrow the query with geo="FR", classification=/classification_value= (ISIC/CPC), or set exact=True for an exact-name match. To search by flow name (technosphere products and biosphere flows) instead of activity name, use c.search_flows(query=...).

Inspect an activity

What goes into making this product? What does it emit? What's its reference unit?

detail = c.get_activity(plants[0].process_id)
for ex in detail.technosphere_inputs:
    print(f"{ex.amount:.4g} {ex.unit} of {ex.flow_name}{ex.target_activity}")

get_activity returns a typed ActivityDetail. Use .inputs / .outputs / .technosphere_inputs to filter the exchanges; each entry is an Exchange — either a TechnosphereExchange (an input or output of an intermediate product) or a BiosphereExchange (resource extracted or pollutant emitted).

Trace the upstream supply chain

What's the full upstream chain — every ingredient, recursively, down to the farm or mine?

chain = c.get_supply_chain(plants[0].process_id, name="at farm", limit=20)
print(f"{chain.filtered_activities} of {chain.total_activities} upstream activities match 'at farm'")
for entry in chain.entries[:5]:
    print(f"  {entry.quantity:.4g} {entry.unit} of {entry.name} ({entry.location})")

For "how exactly does this root reach a specific upstream supplier?", use get_path_to(process_id, target=...) — returns a PathResult of ordered PathSteps root → target with cumulative quantities and step ratios.

Find downstream consumers

Where is this supplier used? Which products depend on it?

result = c.get_consumers(plants[0].process_id, max_depth=2, limit=10)
for cons in result.consumers:
    print(f"  depth={cons.depth}  {cons.name} ({cons.location})")

Returns a ConsumersResponse with consumers, pagination, and (when include_edges=True) the technosphere edges so callers can reconstruct supplier→consumer paths without a second round trip. Pass classification_filters=[...] to restrict to a category.

Compute the life-cycle inventory

What are the cumulative biosphere flows (CO₂, water, methane, …) per functional unit, before applying any characterization method?

inv = c.get_inventory(plants[0].process_id, limit=20)
# inv is a raw dict — see the OpenAPI spec for the full shape.
# Substitutions are accepted: c.get_inventory(pid, substitutions=[...])

The inventory is what every LCIA method runs on top of. If you only need grouped views (by name, location, classification, etc.), reach for c.aggregate(scope="biosphere", group_by=...) instead — same data, summarized.

Compute environmental impacts (LCIA)

What's the carbon footprint of this product? Which emissions dominate the score?

score = c.get_impacts(plants[0].process_id, method_id="EF3.1-climate-change", top_flows=5)
print(f"{score.score:.4g} {score.unit}")
for c_flow in score.top_contributors:
    print(f"  {c_flow.share_pct:.1f}%  {c_flow.flow_name}")

LCIAResult carries the score, unit, optional normalized_score / weighted_score (in Pt), and the top contributing biosphere flows with their share_pct.

Compute every impact category in one go — climate, water, land use, …

batch = c.get_impacts_batch(plants[0].process_id)
for r in batch.results:
    print(f"  {r.category}: {r.score:.4g} {r.unit}")
if batch.single_score is not None:
    print(f"PEF single score: {batch.single_score:.4g} {batch.single_score_unit}")

LCIABatchResult also surfaces formula-based scoring sets (PEF, ECS…) via scoring_results and scoring_indicators, so you can render a per-indicator chart alongside the aggregate single score.

Drill into what drives a single impact

I have a climate-change score. Which biosphere flows account for it? Which upstream activities?

get_impacts(...).top_contributors already returns the top biosphere flows for a single LCIA call. For a deeper or differently-bounded view — and for the activity attribution view — use the standalone drill-down endpoints:

flows = c.get_contributing_flows(
    plants[0].process_id,
    method_id="EF3.1-climate-change",
    limit=10,
)
acts = c.get_contributing_activities(
    plants[0].process_id,
    method_id="EF3.1-climate-change",
    limit=10,
)
# Both return raw dicts — the shape is documented in the OpenAPI spec.

Which characterization factors does a method apply, and to which database flows?

char = c.get_characterization(method_id="EF3.1-climate-change", limit=20)

Useful for sanity-checking method coverage or building custom indicators on top of the engine's mapping.

Aggregate flows by group

What are the top emitting substances? How do flows break down by category, location, or classification?

agg = c.aggregate(
    plants[0].process_id,
    scope="biosphere",
    group_by="name",
    aggregate="sum_quantity",
)
for g in agg.groups[:5]:
    print(f"  {g.quantity:.4g} {g.unit or ''} of {g.key}")

scope selects what to aggregate over: "direct" (just this activity's exchanges), "supply_chain" (cumulative upstream), or "biosphere" (all elementary flows). group_by accepts "name", "flow_id", "unit", "location", "target_name", or "classification.<system>".

Compare two activities

How does variant A differ from variant B? Which inputs change?

from volca import compare_activities

diff = compare_activities(c, plants[0].process_id, plants[1].process_id, scope="direct")
print(f"  matched: {len(diff.matched)}, only-left: {len(diff.left_only)}, only-right: {len(diff.right_only)}")
for row in diff.matched[:3]:
    print(f"    {row.key}: {row.left:.4g}{row.right:.4g}  (Δ={row.delta:+.4g})")

A client-side merge over two aggregate calls. Groups by flow_id (default) so matching is stable across naming variants. Pass scope="supply_chain" to compare cumulative inputs instead of direct exchanges.

Run counterfactuals (substitutions)

What if I used organic wheat instead of conventional? Recycled aluminium instead of virgin? — without reloading the database.

The engine applies a Sherman–Morrison rank-1 update, so substitutions are fast regardless of database size. Works on get_supply_chain, get_inventory, and get_impacts.

subs = [{
    "from": "old-supplier-pid",      # the activity to replace
    "to":   "new-supplier-pid",      # the replacement
    "consumer": "consumer-pid",      # the activity that directly uses the old supplier
}]
score = c.get_impacts(plants[0].process_id, method_id="EF3.1-climate-change", substitutions=subs)

Multiple substitutions chain in one call — the consumer field disambiguates where in the chain each swap applies.

Handle errors

The activity doesn't exist, the engine is down, or the request is malformed — what do I catch?

from volca import VoLCAError

try:
    score = c.get_impacts("nonexistent-pid", method_id="EF3.1-climate-change")
except VoLCAError as e:
    print(f"  failed: {e.status_code}{e.body[:80]}")

VoLCAError.status_code is the HTTP status when the engine returned one; body is the raw response body.

Switch databases

I want to run the same workflow against ecoinvent instead of Agribalyse — without rebuilding the client.

ei = c.use("ecoinvent-3.10")
ei_results = ei.search_activities(name="electricity, high voltage")

Client.use(db_name) returns a new Client targeting a different database while sharing the HTTP session and dispatch table — no spec re-fetch.

Refresh IDE autocomplete after upgrading the engine

I just upgraded the VoLCA server. How do I get my editor to see the new endpoints?

c.refresh_stubs()

Pyvolca dispatches dynamically against the engine's OpenAPI spec, so it ships without .pyi stubs. refresh_stubs() refetches the spec and writes stubs into the installed package directory; restart your language server to pick them up.

API reference

This reference is generated from the installed package. Run python scripts/gen_api_md.py to regenerate.

Classes

Client

HTTP client for the VoLCA REST API.

Usage::

c = Client(db="agribalyse-3.2", password="1234")
plants = c.search_activities(name="at plant")
chain = c.get_supply_chain(plants[0].process_id, name="at farm")

Substitutions can be passed to get_supply_chain, get_inventory, and get_impacts to compute results with a different upstream supplier — fast::

subs = [{"from": old_pid, "to": new_pid, "consumer": consumer_pid}]
result = c.get_impacts(pid, method_id=mid, substitutions=subs)

Constructor: Client(base_url: str = 'http://localhost:8080', db: str = '', password: str = '')

Methods:

  • aggregate(process_id: str, scope: str, *, is_input: bool | None = None, max_depth: int | None = None, filter_name: str | None = None, filter_name_not: list[str] | str | None = None, filter_unit: str | None = None, preset: str | None = None, filter_classification: list[ClassificationFilter] | None = None, filter_target_name: str | None = None, filter_is_reference: bool | None = None, group_by: str | None = None, aggregate: str | None = None) -> AggregateResult — SQL-group-by aggregation over direct exchanges, supply chain, or biosphere flows.
  • call(operation_id: str, **kwargs) -> Any — Escape hatch: call any OpenAPI operation by operationId.
  • get_activity(process_id: str) -> ActivityDetail — Fetch an activity's full detail.
  • get_characterization(method_id: str, *, flow: str | None = None, limit: int | None = None) -> dict — Look up characterization factors for a method matched to database flows.
  • get_consumers(process_id: str, *, name: str | None = None, location: str | None = None, product: str | None = None, preset: str | None = None, classification_filters: list[ClassificationFilter] | None = None, limit: int | None = None, max_depth: int | None = None, include_edges: bool = False) -> ConsumersResponse — Find all activities that transitively consume this supplier.
  • get_contributing_activities(process_id: str, method_id: str, *, collection: str = 'methods', limit: int | None = None) -> dict — Which upstream activities drive a given impact category.
  • get_contributing_flows(process_id: str, method_id: str, *, collection: str = 'methods', limit: int | None = None) -> dict — Which elementary flows drive a given impact category.
  • get_flow_mapping(method_id: str) -> dict — Get the characterization-factor-to-database-flow mapping coverage.
  • get_impacts(process_id: str, method_id: str, *, collection: str = 'methods', top_flows: int | None = None, substitutions: list[dict] | None = None) -> LCIAResult — Compute the LCIA score for a single impact category on an activity.
  • get_impacts_batch(process_id: str, *, collection: str = 'methods', substitutions: list[dict] | None = None) -> LCIABatchResult — Compute LCIA for every impact category in a collection, in one call.
  • get_inputs(process_id: str) -> list[Exchange] — Return the input exchanges of an activity (richer metadata than get_activity).
  • get_inventory(process_id: str, *, flow: str | None = None, limit: int | None = None, substitutions: list[dict] | None = None) -> dict — Compute the life-cycle inventory (cumulative biosphere flows) for an activity.
  • get_outputs(process_id: str) -> list[Exchange] — Return the output exchanges of an activity. See :meth:get_inputs for notes.
  • get_path_to(process_id: str, target: str) -> PathResult — Find the shortest upstream path from process to first activity whose name matches target.
  • get_supply_chain(process_id: str, *, name: str | None = None, location: str | None = None, limit: int | None = None, min_quantity: float | None = None, max_depth: int | None = None, preset: str | None = None, classification_filters: list[ClassificationFilter] | None = None, substitutions: list[dict] | None = None, include_edges: bool | None = None) -> SupplyChain — Get the flat supply chain of an activity.
  • get_tree(process_id: str) -> dict — Fetch the recursive activity tree used by the analysis SPA.
  • get_version() — Return server version info (version, gitHash, gitTag, buildTarget).
  • list_classifications() — List classification systems and their values for the current database.
  • list_databases() — List every database declared in the engine config.
  • list_methods() — List every LCIA method available in the engine.
  • list_presets() — List classification presets configured in this instance.
  • load_database(db_name: str) -> dict — Load a database into memory so it answers queries.
  • refresh_stubs() — Fetch the OpenAPI spec from the server and refresh the dispatch table.
  • search_activities(name: str | None = None, *, geo: str | None = None, product: str | None = None, preset: str | None = None, classification: str | None = None, classification_value: str | None = None, limit: int | None = None, offset: int = 0, exact: bool = False) -> list[Activity] — Search activities in the current database.
  • search_flows(query: str | None = None, *, limit: int | None = None) -> list[dict] — Search flows (technosphere products and biosphere flows) in the current database.
  • unload_database(db_name: str) -> dict — Unload a database from memory to free RAM. The disk copy is kept.
  • use(db_name: str) -> 'Client' — Return a new client targeting a different database (shares session).

Server

Manages the VoLCA server process.

Usage::

with Server(config="volca.toml") as srv:
    client = Client(base_url=srv.base_url, db="agribalyse-3.2", password=srv.password)
    activities = client.search_activities(name="at plant")

Constructor: Server(config: str = 'volca.toml', port: int = 0, binary: str = 'volca')

Properties:

  • base_url — (no docstring)

Methods:

  • is_alive() — Health check — GET /api/v1/db, return True if 200.
  • start(idle_timeout: int = 300, wait_timeout: int = 120) -> None — Spawn the engine process if it is not already serving, and wait until ready.
  • stop() — Stop the server via shutdown endpoint, then terminate process.

Exceptions

VoLCAError

Error from the VoLCA API.

Constructor: VoLCAError(message: str, status_code: int | None = None, body: str = '')

Data types

Activity

Field Type Default
process_id str
name str
location str
product str
product_amount float
product_unit str

ActivityDetail

Typed wrapper around the JSON returned by GET /activity/{pid}.

Use the .inputs / .outputs / .technosphere_inputs convenience properties instead of walking the raw exchanges list.

Field Type Default
process_id str
name str
location str
unit str
description list[str]
classifications dict[str, str]
reference_product str | None
reference_product_amount float | None
reference_product_unit str | None
all_products list[Activity]
exchanges list[Union[TechnosphereExchange, BiosphereExchange]]

ActivityDiff

Result of compare_activities.

Field Type Default
scope str
group_by str
matched list[ActivityDiffRow] list()
left_only list[ActivityDiffRow] list()
right_only list[ActivityDiffRow] list()

ActivityDiffRow

One matched or unmatched flow in an activity comparison.

Field Type Default
key str
left float | None
right float | None
unit str | None

AggregateGroup

One bucket inside an AggregateResult.

Field Type Default
key str
quantity float
count int
unit str | None None
share float | None None

AggregateResult

Result of a Client.aggregate() call.

filtered_total is the sum across all items matching the filters (the top-level number). groups is the per-bucket breakdown when group_by was set; empty otherwise.

Field Type Default
scope str
filtered_total float
filtered_unit str | None
filtered_count int
groups list[AggregateGroup] list()

BiosphereExchange

An exchange with the environment (resource extraction or emission).

Field Type Default
flow_name str
flow_category str
amount float
unit str
is_input bool
comment str | None None
is_biosphere bool True

ClassificationFilter

Filter a supply-chain/consumers query by a classification (system, value, mode).

Matches one classification system entry (e.g. ("Category", "Agricultural\Food", "exact")). Mode is "exact" (case-insensitive equality) or "contains" (substring). Multiple filters are AND-combined by the server.

Field Type Default
system str
value str
mode str 'contains'

ConsumerResult

Activity that consumes a given supplier, with BFS depth.

Field Type Default
process_id str
name str
location str
product str
product_amount float
product_unit str
depth int
classifications dict[str, str] dict()

ConsumersResponse

Reverse supply chain (/consumers) — paginated consumer list plus optional edge set. Mirrors :class:SupplyChain so callers have a uniform {entries, edges} shape in both traversal directions. edges is populated only when include_edges=True.

Field Type Default
consumers list[ConsumerResult]
total int
offset int
limit int
has_more bool
search_time_ms float
edges list[SupplyChainEdge] list()

DatabaseInfo

One entry of :meth:Client.list_databases.

depends_on names the databases this one links against for cross-DB flow resolution — mirrors the dependsOn list surfaced by the relink endpoint. Derived from the engine's declared topology, not runtime state.

Field Type Default
name str
display_name str
status str
path str
load_at_startup bool False
is_uploaded bool False
activity_count int 0
description str | None None
format str | None None
depends_on list[str] list()

FlowContribution

Top contributing elementary flow for an impact category.

Emitted inside LCIAResult.top_contributors.

Field Type Default
flow_name str
contribution float
share_pct float
flow_id str
category str
cf_value float 0.0
compartment str | None None

LCIABatchResult

Batch LCIA: every impact category in a method collection, for one activity.

Returned by :meth:Client.get_impacts_batch. Carries the per-method impact results plus any formula-based scoring sets configured in the engine TOML (PEF, ECS, or any named set).

scoring_indicators gives the per-variable normalized-weighted breakdown of each scoring set — already multiplied by the set's displayMultiplier and expressed in its display unit (see :class:ScoringIndicator). Lets callers render per-indicator charts alongside the aggregate scoring_results.

Field Type Default
results list[LCIAResult]
single_score float | None None
single_score_unit str | None None
norm_weight_set_name str | None None
available_nw_sets list[str] list()
scoring_results dict[str, dict[str, float]] dict()
scoring_units dict[str, str] dict()
scoring_indicators dict[str, dict[str, ScoringIndicator]] dict()

LCIAResult

LCIA score for one impact category on one activity.

Returned directly by :meth:Client.get_impacts, and nested inside :class:LCIABatchResult.results (one entry per impact category).

Field Type Default
method_id str
method_name str
category str
damage_category str
score float
unit str
mapped_flows int
functional_unit str
normalized_score float | None None
weighted_score float | None None
top_contributors list[FlowContribution] list()

PathResult

Shortest upstream path from a root process to a matching activity.

Field Type Default
path list[PathStep]
path_length int
total_ratio float

PathStep

One step in the supply chain path returned by get_path_to.

Note: the /path endpoint emits snake_case JSON directly (built via aeson's object [...] rather than generic ToJSON), so it bypasses the engine's stripLowerPrefix transform.

Field Type Default
process_id str
name str
location str
unit str
cumulative_quantity float
scaling_factor float
local_step_ratio float | None None

ScoringIndicator

One per-variable entry inside LCIABatchResult.scoring_indicators.

value is pre-multiplied by the scoring set's displayMultiplier (configured in the scoring TOML) and expressed in the set's display unit. category names the impact category the variable was resolved from.

Field Type Default
category str
value float

SupplyChain

Field Type Default
root Activity
total_activities int
filtered_activities int
entries list[SupplyChainEntry] list()
edges list[SupplyChainEdge] list()

SupplyChainEdge

from/to are Python keywords, so they're stored under from_id/to_id.

Field Type Default
from_id str
to_id str
amount float

SupplyChainEntry

Field Type Default
process_id str
name str
location str
quantity float
unit str
scaling_factor float
classifications dict[str, str] dict()

TechnosphereExchange

An exchange with another activity (input or output of an intermediate product).

Built from an ExchangeWithUnit envelope: outer fields like flowName/unitName live next to an inner exchange object (the discriminated Exchange sum).

Field Type Default
flow_name str
flow_category str
amount float
unit str
is_input bool
is_reference bool
target_activity str | None
target_location str | None
target_process_id str | None
comment str | None None
is_biosphere bool False

Functions

compare_activities(client: Client, pid_left: str, pid_right: str, *, scope: str = 'direct', group_by: str = 'flow_id', is_input: bool | None = True, **aggregate_kwargs) -> ActivityDiff

Diff two activities by flow_id (default) at the requested scope.

Returns three lists:

  • matched: flows present in both activities (with left, right, delta).
  • left_only: flows present only in the left activity.
  • right_only: flows present only in the right activity.

Default is_input=True restricts the comparison to inputs, which is the common case for "what does this variant consume differently?". Pass is_input=None to include outputs as well.

Type aliases

Exchange

Type alias: Union[TechnosphereExchange, BiosphereExchange].

See also

License

Apache-2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyvolca-0.3.0.tar.gz (51.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyvolca-0.3.0-py3-none-any.whl (40.8 kB view details)

Uploaded Python 3

File details

Details for the file pyvolca-0.3.0.tar.gz.

File metadata

  • Download URL: pyvolca-0.3.0.tar.gz
  • Upload date:
  • Size: 51.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.5

File hashes

Hashes for pyvolca-0.3.0.tar.gz
Algorithm Hash digest
SHA256 57149aeb3ce2c4ec821e2d6626c41825491cb541358afbdae1d92a5e332fd165
MD5 acf895774c30f0ba1f42897575bd89c2
BLAKE2b-256 6606a4e65897ef8727cf8010cf663a4d2f52081c6dd6cf4bec4e62e77e8b264b

See more details on using hashes here.

File details

Details for the file pyvolca-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: pyvolca-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 40.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.5

File hashes

Hashes for pyvolca-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f7ae4fb2ee7539d8f8d76b3b9efcbd0b002cf123af821ccdb99efc6d77c27757
MD5 327ae933cc5169252a2a66f87b70d273
BLAKE2b-256 b22f547c226356ecc4135f3f746a00d6ce50e4a3afc36fafb6248f09c2d64312

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page