Source code for item.model

import errno
import os
import pickle
from importlib import import_module
from os import makedirs
from os.path import join
from typing import Dict

import pandas as pd
import pycountry
import xarray as xr
import yaml

from item.common import log, paths
from item.model.common import as_xarray, concat_versions, select, tidy, to_wide
from item.model.dimensions import INDEX, load_template

__all__ = [
    "concat_versions",
    "coverage",
    "get_model_info",
    "load_model_data",
    "load_model_scenarios",
    "make_regions_csv",
    "make_regions_yaml",
    "select",
    "squash_scenarios",
    "to_wide",
]


# Versions of the database
VERSIONS = [1, 2]

# Information about the models
MODELS: Dict[str, dict] = {}


[docs]def coverage(models): """Display some basic data coverage information.""" log("Checking data coverage.\n") # Accumulate a list of xr.DataArrays to later concatenate@ result = [] # Load the list of requested quantities qty = load_template(paths["model data"]) # Find True/not-null values and sum to get the number of requested # quantities for each variable req = qty.notnull().sum(["Mode", "Technology", "Fuel"]).to_array(name="Requested") log("Quantities requested in reporting template: %d\n", req.sum()) result.append((req, "Requested")) # Iterate through models for name in sorted(models.keys()): if name == "itf" or name == "exxonmobil" or name == "roadmap": # Skip due to a data issue continue log("Loading data for %s" % name) # Load model data df = pd.read_csv(os.path.join(paths["model data"], "model", name, "data.csv")) log(df.head()) # Convert to an xr.Dataset, then count non-null values. We consider a # series populated if it has a data value for *any* scenario, region # and year. counts = ( as_xarray(df) .notnull() .any(["Scenario", "Region", "Year"]) .sum(["Mode", "Technology", "Fuel"]) .to_array() ) result.append((counts, name)) # Make two separate lists of the DataArrays and labels data, labels = zip(*result) # Combine to a single Dataset df = ( xr.concat(data, pd.Index(labels, name="model")) .fillna(0) .to_dataframe() .unstack("model") ) # Compute some totals df.columns = df.columns.droplevel(0) df["# of models"] = (df.loc[:, "bp":] > 0).sum(axis="columns") df.loc["Total", :] = df.sum(axis="rows") df = df.astype(int) log(df) df.to_csv(os.path.join(paths["model data"], "output", "coverage.csv"))
def get_model_info(name, version): load_models_info() try: model_info = MODELS[name] if version in model_info["versions"]: return model_info else: raise ValueError( "model '{}' not present in database version {}".format(name, version) ) except KeyError: raise ValueError(f"Model {repr(name)} not among {MODELS.keys()}") def get_model_names(version=VERSIONS[-1]): """Return the names of all models in *version*.""" load_models_info() result = [] for name, info in MODELS.items(): if version in info["versions"]: result.append(name) return result def process_raw(version, models): """Process raw data submissions. Data for MODELS are imported from the raw data directory. """ # Process arguments models = models if len(models) else get_model_names(version) log("Processing raw data for: {}".format(" ".join(models))) class _csv_model: def import_data(self, data_path, metadata_path): return pd.read_csv(data_path), None for name in models: try: info = get_model_info(name, version) except KeyError: log(" unknown model '%s', skipping" % name) continue if info["format"] == "csv": model = _csv_model() elif info["format"] is None: log(" model '{}' needs no import".format(name)) continue else: model = import_module("item.model.%s" % name) _process_raw(name, model, version, info) def _process_raw(name, model, version, info): log("Processing raw data for {}".format(name)) # Path to raw data: this hold the contents of the Dropbox folder # 'ITEM2/Scenario_data_for_comparison/Data_submission_1/Raw_data' raw_data = join( paths["model raw"], str(version), "{}.{}".format(name, info["format"]) ) metadata = join(paths["data"], "model", name) log(" raw data: {}\n metadata: {}".format(raw_data, metadata)) # Load the data data, notes = model.import_data(raw_data, metadata) # Put columns in a canonical order data = tidy(data) # Log some diagnostic information iy = list(set(data.columns) - set(INDEX)) log(" %d non-zero values beginning %s", data.loc[:, iy].notnull().sum().sum(), iy) # Create a subdirectory under item2-data/model, if it does not already # exist model_dir = join(paths["model processed"], str(version), name) makedirs(model_dir, exist_ok=True) # TODO log the last-changed date of the file used for import, or a # checksum # Write data data.to_csv( join(paths["model processed"], str(version), "%s.csv" % name), index=False ) # Write the region list for this model pd.Series(data["region"].unique(), name="region").to_csv( join(model_dir, "region.csv"), index=False ) # Write the model comments try: notes.to_csv(join(model_dir, "note.csv"), index=False) except AttributeError: # notes == None; no comments provided for this data set pass
[docs]def load_model_data( version, skip_cache=False, cache=True, fmt=pd.DataFrame, options=[] ): """Load model database""" # Check arguments version = int(version) try: path = paths["models-%d" % version] except KeyError: raise ValueError("invalid model database version: %s" % version) if fmt not in [pd.DataFrame, xr.DataArray, xr.Dataset]: raise ValueError("unknown return format: %s" % fmt) # Path for cached data cache_path = os.path.join(paths["cache"], "model-%d.pkl" % version) data = None # Read data from cache if not skip_cache: try: with open(cache_path, "rb") as f: data = pickle.load(f) except OSError as e: if e.errno == errno.ENOENT: # No such file or directory pass # Read data from file if data is None: data = tidy(pd.read_csv(path)) # Convert to long format, drop empty rows data = pd.melt(data, id_vars=INDEX, var_name="year").dropna(subset=["value"]) # Cache the result if cache: with open(cache_path, "wb") as f: pickle.dump(data, f) # Optional additional processing if "squash scenarios" in options: data = squash_scenarios(data, version) options.remove("squash scenarios") if len(options): raise ValueError if fmt in [xr.Dataset, xr.DataArray]: # Convert to an xarray format return as_xarray(data, version, fmt) else: # return as-is return data
def load_models_info(): """Load the models metadata into the MODELS global.""" global MODELS if len(MODELS) > 0: # Already loaded return with open(join(paths["data"], "model", "models.yaml")) as f: MODELS = yaml.safe_load(f) def load_model_regions(name, version): """Load regions.yaml for model *name* in database *version*. Returns a dictionary where: - Keys are codes or names of model regions. - Values are dictionaries with the keys: - description (optional): a longer name or description of the region - countries: a list of ISO 3166 alpha-3 codes for countries in the region. """ # IDEA load from either regions-1.yaml or regions-2.yaml try: get_model_info(name, version) except Exception: if name.lower() == "item": # Use an empty path in the join() call below; this causes the # overall regions.yaml to be loaded name = "" else: raise with open(join(paths["data"], "model", name, "regions.yaml")) as f: return yaml.safe_load(f)
[docs]def load_model_scenarios(name, version): """Load scenarios.yaml for model *name* in database *version*. Returns a dictionay where: - Keys are codes or names of scenarios. - Values are dictionaries with the key: - ``category``: either 'reference' or 'policy'. """ # Don't do anything with the return value; just check arguments get_model_info(name, version) with open(join(paths["data"], "model", name, "scenarios.yaml")) as f: return yaml.safe_load(f)[version]
[docs]def make_regions_csv(out_file, models=None, compare=None): """Produce a CSV *out_file* with a country→region map for *models*. The table is created by parsing the regions.yaml files in the iTEM model database metadata. It is indexed by ISO 3166 (alpha-3) codes, and has one column for each model in *models* (if no models are specified, all models are included). If *compare* is given, the table has entries only where the generated value and """ version = VERSIONS[-1] # Version 2 only models = models or get_model_names(version) def _load(name): def _invert(data): result = {} for k, v in data.items(): result.update({c: k for c in v["countries"]}) return result return pd.Series( _invert(load_model_regions(name, version)), name=name if len(name) else "item", ) result = pd.concat([_load(model) for model in ["item"] + models], axis=1) def _get_name(row): error = None try: name = pycountry.countries.get(alpha_3=row.name).name except AttributeError: try: name = pycountry.historic_countries.get(alpha_3=row.name).name error = "historical" except AttributeError: name = "" error = "nonexistent" finally: print( "{} ISO 3166 code '{}' in models: {}".format( error, row.name, ", ".join(row.dropna().index) ) ) return name result["name"] = result.apply(_get_name, axis=1) if compare is not None: other = pd.read_csv(compare) other.columns = map(str.lower, other.columns) other.set_index("iso", inplace=True) other.index = map(str.upper, other.index) result = result.where(result.ne(other)) with open(out_file, "w") as f: result.to_csv(f)
[docs]def make_regions_yaml(in_file, country, region, out_file): """Convert a country→region map from CSV *in_file* to YAML *out_file*. *country* and *region* are columns in *in_file* with country codes and region names, respectively. """ data = pd.read_csv(in_file)[[region, country]].sort_values([region, country]) data[country] = data[country].apply(str.upper) result = {} for region, group in data.groupby(region): result[region] = dict(description="", countries=list(group[country])) with open(out_file, "w") as f: yaml.dump(result, f, default_flow_style=False)
[docs]def squash_scenarios(data, version): """Replace the per-model scenario names with scenario categories. *data* is a pd.DataFrame. *version* is the version of the iTEM model database. """ # Construct the map from model metadata scenarios_map = {} for model in get_model_names(version): for s, info in load_model_scenarios(model, version).items(): scenarios_map[s] = info["category"] return data.replace({"scenario": scenarios_map})