import errno
import os
import pickle
from collections.abc import Mapping
from importlib import import_module
from os import makedirs
from os.path import join
from typing import TYPE_CHECKING
import pandas as pd
import pycountry
import sdmx.model.common
import xarray as xr
import yaml
from item.common import log, paths
from item.model.common import as_xarray, concat_versions, select, tidy, to_wide
from item.model.dimensions import INDEX, load_template
from item.util import metadata_repo_file
from . import structure
if TYPE_CHECKING:
from sdmx.model.common import Codelist
from .common import ModelInfo
__all__ = [
"concat_versions",
"coverage",
"get_model_info",
"load_model_data",
"load_model_scenarios",
"make_regions_csv",
"make_regions_yaml",
"select",
"squash_scenarios",
"to_wide",
]
# Versions of the database
VERSIONS = [1, 2]
#: Information about the models.
MODELS: dict[str, "ModelInfo"] = {}
#: List of submodule names containing :class:`.ModelInfo` instances.
SUBMODULES = [
"bp",
"eia",
"eppa5",
"gcam",
"get",
"itf",
"message",
"momo",
"roadmap",
"shell",
"statoil",
]
[docs]
def coverage(models):
"""Display some basic data coverage information."""
log("Checking data coverage.\n")
# Accumulate a list of xr.DataArrays to later concatenate@
result = []
# Load the list of requested quantities
qty = load_template(paths["model data"])
# Find True/not-null values and sum to get the number of requested
# quantities for each variable
req = qty.notnull().sum(["Mode", "Technology", "Fuel"]).to_array(name="Requested")
log("Quantities requested in reporting template: %d\n", req.sum())
result.append((req, "Requested"))
# Iterate through models
for name in sorted(models.keys()):
if name == "itf" or name == "exxonmobil" or name == "roadmap":
# Skip due to a data issue
continue
log("Loading data for %s" % name)
# Load model data
df = pd.read_csv(os.path.join(paths["model data"], "model", name, "data.csv"))
log(df.head())
# Convert to an xr.Dataset, then count non-null values. We consider a
# series populated if it has a data value for *any* scenario, region
# and year.
counts = (
as_xarray(df)
.notnull()
.any(["Scenario", "Region", "Year"])
.sum(["Mode", "Technology", "Fuel"])
.to_array()
)
result.append((counts, name))
# Make two separate lists of the DataArrays and labels
data, labels = zip(*result)
# Combine to a single Dataset
df = (
xr.concat(data, pd.Index(labels, name="model"))
.fillna(0)
.to_dataframe()
.unstack("model")
)
# Compute some totals
df.columns = df.columns.droplevel(0)
df["# of models"] = (df.loc[:, "bp":] > 0).sum(axis="columns")
df.loc["Total", :] = df.sum(axis="rows")
df = df.astype(int)
log(df)
df.to_csv(os.path.join(paths["model data"], "output", "coverage.csv"))
def get_model_info(name: str, version: int) -> "ModelInfo":
load_models_info()
try:
model_info = MODELS[name]
if version in model_info.versions:
return model_info
else:
raise ValueError(
f"model {name!r} not present in database version {version}"
)
except KeyError:
raise ValueError(f"Model {name!r} not among {MODELS.keys()}")
def get_model_names(version: int = VERSIONS[-1]) -> list[str]:
"""Return the names of all models in *version*."""
load_models_info()
return [m.id for m in MODELS.values() if version in m.versions]
def process_raw(version, models):
"""Process raw data submissions.
Data for MODELS are imported from the raw data directory.
"""
# Process arguments
models = models if len(models) else get_model_names(version)
log("Processing raw data for: {}".format(" ".join(models)))
class _csv_model:
def import_data(self, data_path, metadata_path):
return pd.read_csv(data_path), None
for name in models:
try:
info = get_model_info(name, version)
except KeyError:
log(" unknown model '%s', skipping" % name)
continue
if info["format"] == "csv":
model = _csv_model()
elif info["format"] is None:
log(" model '{}' needs no import".format(name))
continue
else:
model = import_module("item.model.%s" % name)
_process_raw(name, model, version, info)
def _process_raw(name, model, version, info):
log("Processing raw data for {}".format(name))
# Path to raw data: this hold the contents of the Dropbox folder
# 'ITEM2/Scenario_data_for_comparison/Data_submission_1/Raw_data'
raw_data = join(
paths["model raw"], str(version), "{}.{}".format(name, info["format"])
)
metadata = join(paths["data"], "model", name)
log(" raw data: {}\n metadata: {}".format(raw_data, metadata))
# Load the data
data, notes = model.import_data(raw_data, metadata)
# Put columns in a canonical order
data = tidy(data)
# Log some diagnostic information
iy = list(set(data.columns) - set(INDEX))
log(" %d non-zero values beginning %s", data.loc[:, iy].notnull().sum().sum(), iy)
# Create a subdirectory under item2-data/model, if it does not already
# exist
model_dir = join(paths["model processed"], str(version), name)
makedirs(model_dir, exist_ok=True)
# TODO log the last-changed date of the file used for import, or a
# checksum
# Write data
data.to_csv(
join(paths["model processed"], str(version), "%s.csv" % name), index=False
)
# Write the region list for this model
pd.Series(data["region"].unique(), name="region").to_csv(
join(model_dir, "region.csv"), index=False
)
# Write the model comments
try:
notes.to_csv(join(model_dir, "note.csv"), index=False)
except AttributeError:
# notes == None; no comments provided for this data set
pass
[docs]
def load_model_data(
version, skip_cache=False, cache=True, fmt=pd.DataFrame, options=[]
):
"""Load model database"""
# Check arguments
version = int(version)
try:
path = paths["models-%d" % version]
except KeyError:
raise ValueError("invalid model database version: %s" % version)
if fmt not in [pd.DataFrame, xr.DataArray, xr.Dataset]:
raise ValueError("unknown return format: %s" % fmt)
# Path for cached data
cache_path = os.path.join(paths["cache"], "model-%d.pkl" % version)
data = None
# Read data from cache
if not skip_cache:
try:
with open(cache_path, "rb") as f:
data = pickle.load(f)
except OSError as e:
if e.errno == errno.ENOENT: # No such file or directory
pass
# Read data from file
if data is None:
data = tidy(pd.read_csv(path))
# Convert to long format, drop empty rows
data = pd.melt(data, id_vars=INDEX, var_name="year").dropna(subset=["value"])
# Cache the result
if cache:
with open(cache_path, "wb") as f:
pickle.dump(data, f)
# Optional additional processing
if "squash scenarios" in options:
data = squash_scenarios(data, version)
options.remove("squash scenarios")
if len(options):
raise ValueError
if fmt in [xr.Dataset, xr.DataArray]:
# Convert to an xarray format
return as_xarray(data, version, fmt)
else:
# return as-is
return data
def load_models_info() -> None:
"""Load the models metadata into the MODELS global."""
global MODELS
if len(MODELS) > 0:
# Already loaded
return
for id_ in SUBMODULES:
module = import_module(f"{__name__}.{id_}")
MODELS[id_] = getattr(module, "INFO")
def load_model_regions(name: str, version: int) -> "Codelist":
"""Load regions.yaml for model *name* in database *version*.
Returns a dictionary where:
- Keys are codes or names of model regions.
- Values are dictionaries with the keys:
- description (optional): a longer name or description of the region
- countries: a list of ISO 3166 alpha-3 codes for countries in the region.
"""
# IDEA load from either regions-1.yaml or regions-2.yaml
try:
get_model_info(name, version)
except Exception:
if name.lower() == "item":
# Use an empty path in the join() call below; this causes the
# overall regions.yaml to be loaded
return structure.get_cl_region()
else:
raise
else:
with open(metadata_repo_file("model", name, "regions.yaml")) as f:
return regions_yaml_to_codelist(yaml.safe_load(f))
def regions_yaml_to_codelist(data: Mapping) -> "Codelist":
"""Convert contents of a :file:`regions.yaml` to an SDMX Codelist."""
cl: "Codelist" = sdmx.model.common.Codelist(id="CL_REGION")
for id_, region_data in data.items():
code = cl.setdefault(id=id_)
# Add children, 1 for each member of the "countries:" key
for child_id in region_data["countries"]:
code.append_child(cl.setdefault(id=child_id))
return cl
[docs]
def load_model_scenarios(name, version):
"""Load scenarios.yaml for model *name* in database *version*.
Returns a dictionay where:
- Keys are codes or names of scenarios.
- Values are dictionaries with the key:
- ``category``: either 'reference' or 'policy'.
"""
# Don't do anything with the return value; just check arguments
get_model_info(name, version)
with open(metadata_repo_file("model", name, "scenarios.yaml")) as f:
return yaml.safe_load(f)[version]
[docs]
def make_regions_csv(out_file, models=None, compare=None):
"""Produce a CSV *out_file* with a country→region map for *models*.
The table is created by parsing the regions.yaml files in the iTEM model database
metadata. It is indexed by ISO 3166 (alpha-3) codes, and has one column for each
model in *models* (if no models are specified, all models are included).
If *compare* is given, the table has entries only where the generated value and
"""
version = VERSIONS[-1] # Version 2 only
models = models or get_model_names(version)
def _load(name):
def _invert(codelist) -> dict[str, str]:
result = {}
for region in codelist:
result.update({c.id: region.id for c in region.child})
return result
return pd.Series(
_invert(load_model_regions(name, version)),
name=name if len(name) else "item",
)
result = pd.concat([_load(model) for model in ["item"] + models], axis=1)
def _get_name(row):
error = None
try:
name = pycountry.countries.get(alpha_3=row.name).name
except AttributeError:
try:
name = pycountry.historic_countries.get(alpha_3=row.name).name
error = "historical"
except AttributeError:
name = ""
error = "nonexistent"
finally:
print(
"{} ISO 3166 code '{}' in models: {}".format(
error, row.name, ", ".join(row.dropna().index)
)
)
return name
result["name"] = result.apply(_get_name, axis=1)
if compare is not None:
other = pd.read_csv(compare)
other.columns = map(str.lower, other.columns)
other.set_index("iso", inplace=True)
other.index = map(str.upper, other.index)
result = result.where(result.ne(other))
with open(out_file, "w") as f:
result.to_csv(f)
[docs]
def make_regions_yaml(in_file, country, region, out_file):
"""Convert a country→region map from CSV *in_file* to YAML *out_file*.
*country* and *region* are columns in *in_file* with country codes and
region names, respectively.
"""
data = pd.read_csv(in_file)[[region, country]].sort_values([region, country])
data[country] = data[country].apply(str.upper)
result = {}
for region, group in data.groupby(region):
result[region] = dict(description="", countries=list(group[country]))
with open(out_file, "w") as f:
yaml.dump(result, f, default_flow_style=False)
[docs]
def squash_scenarios(data, version):
"""Replace the per-model scenario names with scenario categories.
*data* is a pd.DataFrame. *version* is the version of the iTEM model
database.
"""
# Construct the map from model metadata
scenarios_map = {}
for model in get_model_names(version):
for s, info in load_model_scenarios(model, version).items():
scenarios_map[s] = info["category"]
return data.replace({"scenario": scenarios_map})