Source code for item.model.common

"""Common code for data input."""

from dataclasses import dataclass, field
from logging import DEBUG
from os.path import join
from typing import Optional

import numpy as np
import pandas as pd
import xarray as xr
import yaml

from item.common import log
from item.model.dimensions import INDEX
from item.util import metadata_repo_file

# Information about the models
MODELS: dict[str, dict] = {}
SCENARIOS = None


[docs] @dataclass class ModelInfo: """Information about a model. This dataclass replaces entries previously in :file:`data/model/models.yaml`. """ id: str format: str versions: tuple[int, ...] = field(default_factory=tuple) name: Optional[str] = None citation: Optional[str] = None #: URL for model documentation. model_url: Optional[str] = None #: Organization name. org: Optional[str] = None #: Organization contact URL. org_url: Optional[str] = None
def as_xarray(data, version, fmt): # Columns to preserve as a multi-index data.set_index(INDEX + ["year"], inplace=True) # variable name → xr.DataArray das = {} # Iterate over variables. Some variables (intensities) appear twice with # different units for freight, passenger for key, d in data.groupby(level=["variable", "unit"]): variable, unit = key log("Variable: {0[0]} [{0[1]}]\n {1} values".format(key, len(d)), level=DEBUG) # Version-specific fixes # TODO move if version == 1: if variable == "intensity_new": log(" skipping", level=DEBUG) continue elif variable in ["ef_co2 (service)", "intensity_service"]: variable = variable.replace("service", unit[-3:]) # *d* (the data for this variable) has all the MultiIndex levels of # *data*; drop the unused ones (requires pandas 0.20) d.index = d.index.remove_unused_levels() # Convert to xr.DataArray try: d = xr.DataArray.from_series(d["value"].astype(float)) except Exception as e: if "non-unique multi-index" in str(e): log(d.index[d.index.duplicated()].to_series(), level=DEBUG) raise # Convert unused dimensions for this variable to attributes squeeze_dims = [] for c in d.coords: if d.sizes[c] == 1: # Dimension 'c' has only one value → convert d.attrs[c] = d[c].values[0] squeeze_dims.append(c) d = d.squeeze(squeeze_dims, drop=True) d.name = variable d.attrs["unit"] = unit fill = float(100 * d.notnull().sum() / np.prod(list(d.sizes.values()))) log( " {:2.0f}% full\n coords: {}\n attrs: {}".format( fill, ", ".join(d.coords.keys()), d.attrs ), level=DEBUG, ) das[variable] = d result = das # The resulting dataset is very sparse if fmt == xr.Dataset: log("Merging\n sparseness:", level=DEBUG) result = xr.merge(das.values()) for v in result: fill = float( 100 * result[v].notnull().sum() / np.prod(list(result[v].sizes.values())) ) log(" {:3.0f}% full — {}".format(fill, v), level=DEBUG) return result
[docs] def concat_versions(dataframes={}): """Convert a dict of *dataframes* to a single pd.DataFrame. The keys of *dataframes* are saved in a new column 'version'. """ dfs = [] for version, df in dataframes.items(): df["version"] = version dfs.append(df) return pd.concat(dfs)
[docs] def data_columns(df): """Return a sorted list of non-index columns in pandas.Dataframe *df*.""" try: return sorted(set(df.columns) - set(INDEX)) except TypeError: print(set(df.columns), set(INDEX), set(df.columns) - set(INDEX)) raise
[docs] def drop_empty(df, columns=None): """Drop rows in *df* where the data columns are all empty. The number of dropped rows is logged. If *columns*, an iterable of column names, is given, drop on these columns instead. """ rows = df.shape[0] if columns is None: columns = data_columns(df) df = df.dropna(how="all", subset=columns) log(" dropped %d empty rows" % (rows - df.shape[0])) return df
[docs] def load(): """Load model & scenario data.""" global SCENARIOS with open(metadata_repo_file("model", "model.yaml")) as f: MODELS = yaml.load(f, Loader=yaml.SafeLoader) # Load scenario information scenarios = [] for model in MODELS: fn = join(metadata_repo_file("model", model, "scenarios.yaml")) try: with open(fn) as f: m_s = yaml.load(f, Loader=yaml.SafeLoader) except FileNotFoundError: continue for version, m_s_v in m_s.items(): for name, m_s_v_n in m_s_v.items(): scenarios.append( { "model": model, "version": version, "scenario": name, "category": m_s_v_n.get("category"), } ) SCENARIOS = pd.DataFrame(scenarios)
def tidy(df): # Rename data columns: # - remove 'X' preceding years # - convert years to integers # - lowercase def _rename(colname): try: if isinstance(colname, str): colname = colname.lstrip("X") return int(colname) except ValueError: if colname == "Tech": colname = "Technology" return colname.lower() df.rename(columns=_rename, inplace=True) return drop_empty(df.reindex_axis(INDEX + data_columns(df), axis=1))
[docs] def select(data, *args, **kwargs): """Select from *data*.""" # Process arguments if len(args) > 1: raise ValueError( ("can't determine dimensions for >1 positional arguments: {}").format( " ".join(args) ) ) dims = {k: None for k in INDEX} for d, v in kwargs.items(): d = "technology" if d == "tech" else d if isinstance(v, (int, str)): dims[d] = set([v]) elif v is None: dims[d] = v else: dims[d] = set(v) if len(args) and dims["variable"] is None: dims["variable"] = set(args) # Code to this point is generic (doesn't depend on the format of *data*) if isinstance(data, xr.Dataset) or isinstance(data, dict): # TODO write methods for these other data types raise NotImplementedError # pandas.DataFrame # Construct a boolean mask keep = None for d, v in dims.items(): if v is None: continue elif keep is None: keep = data[d].isin(v) else: keep &= data[d].isin(v) # Subset the data and return return data[keep].copy()
[docs] def to_wide(data, dimension="year"): """Convert *data* to wide format, one column per year.""" return data.set_index(INDEX + ["year"])["value"].unstack(dimension)