"""Common code for data input."""
from logging import DEBUG
from os.path import join
from typing import Dict
import numpy as np
import pandas as pd
import xarray as xr
import yaml
from item.common import log, paths
from item.model.dimensions import INDEX
# Information about the models
MODELS: Dict[str, dict] = {}
SCENARIOS = None
def as_xarray(data, version, fmt):
# Columns to preserve as a multi-index
data.set_index(INDEX + ["year"], inplace=True)
# variable name → xr.DataArray
das = {}
# Iterate over variables. Some variables (intensities) appear twice with
# different units for freight, passenger
for key, d in data.groupby(level=["variable", "unit"]):
variable, unit = key
log("Variable: {0[0]} [{0[1]}]\n {1} values".format(key, len(d)), level=DEBUG)
# Version-specific fixes
# TODO move
if version == 1:
if variable == "intensity_new":
log(" skipping", level=DEBUG)
continue
elif variable in ["ef_co2 (service)", "intensity_service"]:
variable = variable.replace("service", unit[-3:])
# *d* (the data for this variable) has all the MultiIndex levels of
# *data*; drop the unused ones (requires pandas 0.20)
d.index = d.index.remove_unused_levels()
# Convert to xr.DataArray
try:
d = xr.DataArray.from_series(d["value"].astype(float))
except Exception as e:
if "non-unique multi-index" in str(e):
log(d.index[d.index.duplicated()].to_series(), level=DEBUG)
raise
# Convert unused dimensions for this variable to attributes
squeeze_dims = []
for c in d.coords:
if d.sizes[c] == 1:
# Dimension 'c' has only one value → convert
d.attrs[c] = d[c].values[0]
squeeze_dims.append(c)
d = d.squeeze(squeeze_dims, drop=True)
d.name = variable
d.attrs["unit"] = unit
fill = float(100 * d.notnull().sum() / np.prod(list(d.sizes.values())))
log(
" {:2.0f}% full\n coords: {}\n attrs: {}".format(
fill, ", ".join(d.coords.keys()), d.attrs
),
level=DEBUG,
)
das[variable] = d
result = das
# The resulting dataset is very sparse
if fmt == xr.Dataset:
log("Merging\n sparseness:", level=DEBUG)
result = xr.merge(das.values())
for v in result:
fill = float(
100
* result[v].notnull().sum()
/ np.prod(list(result[v].sizes.values()))
)
log(" {:3.0f}% full — {}".format(fill, v), level=DEBUG)
return result
[docs]def concat_versions(dataframes={}):
"""Convert a dict of *dataframes* to a single pd.DataFrame.
The keys of *dataframes* are saved in a new column 'version'.
"""
dfs = []
for version, df in dataframes.items():
df["version"] = version
dfs.append(df)
return pd.concat(dfs)
[docs]def data_columns(df):
"""Return a sorted list of non-index columns in pandas.Dataframe *df*."""
try:
return sorted(set(df.columns) - set(INDEX))
except TypeError:
print(set(df.columns), set(INDEX), set(df.columns) - set(INDEX))
raise
[docs]def drop_empty(df, columns=None):
"""Drop rows in *df* where the data columns are all empty.
The number of dropped rows is logged. If *columns*, an iterable of column
names, is given, drop on these columns instead.
"""
rows = df.shape[0]
if columns is None:
columns = data_columns(df)
df = df.dropna(how="all", subset=columns)
log(" dropped %d empty rows" % (rows - df.shape[0]))
return df
[docs]def load():
"""Load model & scenario data."""
global SCENARIOS
with open(join(paths["data"], "model", "models.yaml")) as f:
MODELS = yaml.load(f, Loader=yaml.SafeLoader)
# Load scenario information
scenarios = []
for model in MODELS:
fn = join(paths["data"], "model", model, "scenarios.yaml")
try:
with open(fn) as f:
m_s = yaml.load(f, Loader=yaml.SafeLoader)
except FileNotFoundError:
continue
for version, m_s_v in m_s.items():
for name, m_s_v_n in m_s_v.items():
scenarios.append(
{
"model": model,
"version": version,
"scenario": name,
"category": m_s_v_n.get("category"),
}
)
SCENARIOS = pd.DataFrame(scenarios)
def tidy(df):
# Rename data columns:
# - remove 'X' preceding years
# - convert years to integers
# - lowercase
def _rename(colname):
try:
if isinstance(colname, str):
colname = colname.lstrip("X")
return int(colname)
except ValueError:
if colname == "Tech":
colname = "Technology"
return colname.lower()
df.rename(columns=_rename, inplace=True)
return drop_empty(df.reindex_axis(INDEX + data_columns(df), axis=1))
[docs]def select(data, *args, **kwargs):
"""Select from *data*."""
# Process arguments
if len(args) > 1:
raise ValueError(
("can't determine dimensions for >1 positional " "arguments: {}").format(
" ".join(args)
)
)
dims = {k: None for k in INDEX}
for d, v in kwargs.items():
d = "technology" if d == "tech" else d
if isinstance(v, (int, str)):
dims[d] = set([v])
elif v is None:
dims[d] = v
else:
dims[d] = set(v)
if len(args) and dims["variable"] is None:
dims["variable"] = set(args)
# Code to this point is generic (doesn't depend on the format of *data*)
if isinstance(data, xr.Dataset) or isinstance(data, dict):
# TODO write methods for these other data types
raise NotImplementedError
# pandas.DataFrame
# Construct a boolean mask
keep = None
for d, v in dims.items():
if v is None:
continue
elif keep is None:
keep = data[d].isin(v)
else:
keep &= data[d].isin(v)
# Subset the data and return
return data[keep].copy()
[docs]def to_wide(data, dimension="year"):
"""Convert *data* to wide format, one column per year."""
return data.set_index(INDEX + ["year"])["value"].unstack(dimension)
load()