import logging
from collections import ChainMap
from datetime import datetime
from functools import lru_cache
from typing import TYPE_CHECKING, List, cast
import numpy as np
import sdmx.message as msg
import sdmx.urn
from sdmx import Client
from sdmx.model import common as m
from sdmx.model import v21
from sdmx.model.v21 import (
DataKey,
DataKeySet,
DataSet,
MeasureDimension,
MemberSelection,
MemberValue,
Observation,
PrimaryMeasure,
)
from item.structure import base
if TYPE_CHECKING:
import sdmx.model.common
import sdmx.model.v21
log = logging.getLogger(__name__)
def _get_anno(obj, id):
"""Wrapper around :meth:`AnnotableArtefact.get_annotation`.
Like :func:`_pop_anno`, but doesn't remove the annotation.
"""
try:
return eval(obj.get_annotation(id=id).text.localized_default())
except KeyError:
return None
def _pop_anno(obj, id):
"""Wrapper around :meth:`AnnotableArtefact.pop_annotation`.
Inverse of :func:`_annotate`.
"""
try:
return eval(obj.pop_annotation(id=id).text.localized_default())
except KeyError:
return None
[docs]
@lru_cache()
def get_cdc():
"""Retrieve the ``CROSS_DOMAIN_CONCEPTS`` from the SDMX Global Registry."""
id = "CROSS_DOMAIN_CONCEPTS"
msg = Client("SGR").conceptscheme(id)
return msg.concept_scheme[id]
[docs]
@lru_cache()
def generate() -> msg.StructureMessage:
"""Return the SDMX data structures for iTEM data."""
item_agency = base.AS_ITEM["iTEM"]
sm = msg.StructureMessage(
header=msg.Header(sender=item_agency, prepared=datetime.now())
)
# Add the AgencyScheme containing iTEM
sm.organisation_scheme[base.AS_ITEM.id] = base.AS_ITEM
# Add concept schemes
for cs in base.CONCEPT_SCHEMES:
sm.concept_scheme[cs.id] = cs
# Ensure concepts are associated to their parent scheme
for item in cs:
item.parent = item.parent or cs
# Process and add code lists
for id, codes in base.CODELISTS.items():
# Create a code list object
cl: "sdmx.model.common.Codelist" = m.Codelist(id=f"CL_{id}")
# Add each code and any children
# TODO move this upstream to sdmx1
for c in codes:
cl.append(c)
cl.extend(c.child)
# Add to the message
sm.codelist[cl.id] = cl
# Process and add data structure definitions
for dsd in base.DATA_STRUCTURES:
prepare_dsd(dsd, sm)
# Add to the message
sm.structure[dsd.id] = dsd
# Process and add content constraints
for cc in base.CONSTRAINTS:
# Add the constraint to the message
sm.constraint[cc.id] = cc
# Look up the object that is constrained
try:
dsd = sm.structure[cc.id]
except KeyError:
log.info(f"No constraint(s) for {repr(dsd)}")
continue
# Update the constraint with a reference to the DSD
cc.content.add(dsd)
# Convert annotations into DataKeySet and CubeRegion objects, associated with
# the DSDs
dks_from_anno(cc, dsd)
cr_from_anno(cc, dsd)
# Update the constraint using applicable CubeRegions from GENERAL0, GENERAL1,
# etc.
merge_general_constraints(cc, dsd, sm)
# Add MaintainableArtefact properties to all objects
for kind in (
"codelist",
"concept_scheme",
"constraint",
"organisation_scheme",
"structure",
):
for obj in getattr(sm, kind).values():
obj.maintainer = item_agency
obj.version = base.VERSION
obj.is_external_reference = False
return sm
[docs]
def make_iamc_variable_cl(
sm: msg.StructureMessage,
source: str,
*,
format_id=None,
locale: str = "en",
use_constraint: bool = True,
) -> "sdmx.model.v21.Codelist":
"""Create a code list for the IAMC 'VARIABLE' concept/dimension from a `source` DFD.
Adds to `sm` and returns a new Codelist. The codelist contains 1 code for every
valid key in the the dataflow referred to by `source`.
Parameters
----------
format_id :
Callback to format the code ID.
locale :
Locale (~language), such as "en", for formatting names.
use_constraint :
if :any:`True`, the default, retrieve and use a ContentConstraint from `sm`
that applies to the source DFD or DSD.
"""
# Default `format_id` callback
if format_id is None:
format_id = str
# Retrieve the DSD
dsd: "sdmx.model.v21.DataStructureDefinition" = sm.structure[source]
# Ensure a URN
# FIXME Move upstream to where the dsd is created
dsd.urn = sdmx.urn.make(dsd)
# Retrieve a constraint that affects this DSD
# FIXME this duplicates code in merge_dsd(); deduplicate
cc = None
if use_constraint:
ccs = [cc for cc in sm.constraint.values() if dsd in cc.content]
assert len(ccs) <= 1
cc = ccs[0] if len(ccs) and len(ccs[0].data_content_region) else None
# Dimensions other than REF_AREA (→ IAMC 'REGION'), TIME_PERIOD (→ IAMC 'YEAR')
dims = list(
filter(lambda d: d.id not in ("REF_AREA", "TIME_PERIOD"), dsd.dimensions)
)
# Identify the measure quantity based on the structure ID
# FIXME Set this association within the DSD itself, and retrieve from there
cs_measure = sm.concept_scheme["CS_TRANSPORT_MEASURE"]
c_measure = cs_measure[source]
# Create the new codelist
cl_new: "m.Codelist" = m.Codelist(
id=f"CL_IAMC_VARIABLE_{source}",
annotations=[v21.Annotation(id="original-ds-urn", text=dsd.urn)],
)
# Use the localized name of the measure concept as the first part of the variable
var = f"{c_measure.name.localizations[locale]}"
# Iterate over keys
for key in dsd.iter_keys(constraint=cc):
# Parts for Code.id
id_parts = [var] + [key[d].value.id for d in dims]
# Parts to save in an annotation
key_parts = dict((d, kv.value.id) for d, kv in key.values.items() if d in dims)
# Create the Code.id
new_id = format_id("|".join(id_parts))
# Create a Code with the new ID and an annotation
cl_new.setdefault(
id=new_id,
annotations=[
v21.Annotation(id="original-key-values", text=repr(key_parts))
],
)
# Add to the existing StructureMessage
sm.add(cl_new)
return cl_new
[docs]
def merge_dsd(
sm: msg.StructureMessage,
target: str,
others: List[str],
fill_value: str = "_Z",
) -> "sdmx.model.v21.DataSet":
"""‘Merge’ 2 or more data structure definitions."""
dsd_target = sm.structure[target]
# Create a temporary DataSet
ds: "sdmx.model.v21.DataSet" = DataSet(structured_by=dsd_target)
# Count of keys
count = 0
for dsd_id in others:
# Retrieve the DSD
dsd = sm.structure[dsd_id]
# Retrieve a constraint that affects this DSD
ccs = [cc for cc in sm.constraint.values() if dsd in cc.content]
assert len(ccs) <= 1
cc = ccs[0] if len(ccs) and len(ccs[0].data_content_region) else None
# Key for the VARIABLE dimension
base_key = m.Key(VARIABLE=dsd_id, described_by=dsd_target.dimensions)
# Add KeyValues for other dimensions included in the target but not in this DSD
for dim in dsd_target.dimensions:
if dim.id in base_key.values or dim.id in dsd.dimensions:
continue
base_key[dim.id] = dim.local_representation.enumerated[fill_value]
# Iterate over the possible keys in `dsd`; add to `k`
ds.add_obs(
Observation(dimension=(base_key + key).order(), value=np.nan)
for key in dsd.iter_keys(constraint=cc)
)
log.info(f"{repr(dsd)}: {len(ds.obs) - count} keys")
count = len(ds.obs)
log.info(
f"Total keys: {len(ds.obs)}\n"
+ "\n".join(map(lambda o: repr(o.dimension), ds.obs[:5]))
)
return ds
[docs]
def prepare_dsd(
dsd: "sdmx.model.v21.DataStructureDefinition", sm: msg.StructureMessage
):
"""Populate data structures within `dsd`."""
# Concepts for each dimension of each DSD
dsd_concepts = ChainMap(
sm.concept_scheme["TRANSPORT"].items,
sm.concept_scheme["MODELING"].items,
# Retrieve the CROSS_DOMAIN_CONCEPTS scheme from the SDMX Global Registry
get_cdc(),
)
try:
# Pop an annotation and use it to produce a list of dimension IDs
dims = _pop_anno(dsd, "_dimensions").split()
except AttributeError:
# No dimensions
dims = []
# Add common dimensions
dims = dims + ["REF_AREA", "TIME_PERIOD"]
# Add dimensions to the data structure
for order, concept_id in enumerate(dims):
# Locate the corresponding concept in one of three concept schemes
concept = dsd_concepts.get(concept_id)
if concept_id == "VARIABLE":
d: m.DimensionComponent = MeasureDimension(
id="VARIABLE",
# NB these are not attributes of Component; store as a Concept
# name="Variable",
# description="Reference to a concept from CS_TRANSPORT_MEASURES.",
local_representation=m.Representation(
enumerated=sm.concept_scheme["CS_TRANSPORT_MEASURE"]
),
)
elif concept is None:
raise KeyError(concept_id)
else:
# Create the dimension, referring to the concept
d = m.Dimension(id=concept_id, concept_identity=concept, order=order)
try:
# The dimension is represented by the corresponding code list, if any
d.local_representation = m.Representation(
enumerated=sm.codelist[f"CL_{concept_id}"]
)
except KeyError:
pass # No iTEM codelist for this concept
# Append this dimension
dsd.dimensions.append(d)
# Add a primary measure: either one with ID matching the DSD, or OBS_VALUE as backup
concept = dsd_concepts.get(dsd.id) or dsd_concepts.get("OBS_VALUE")
assert concept is not None
dsd.measures.append(PrimaryMeasure(id=concept.id, concept_identity=concept))
# Assign order to the dimensions
dsd.dimensions.assign_order()
[docs]
def cr_from(
info: dict, dsd: "sdmx.model.common.BaseDataStructureDefinition"
) -> m.CubeRegion:
"""Create a :class:`.CubeRegion` from a simple :class:`dict` of `info`."""
cr = m.CubeRegion(included=info.pop("included", True))
for dim_id, values in info.items():
dim = cast(m.Dimension, dsd.dimensions.get(dim_id))
values = values.split()
if values[0] == "!":
included = False
values.pop(0)
else:
included = True
cr.member[dim] = MemberSelection(
included=included,
values_for=dim,
values=[MemberValue(value=value) for value in values],
)
return cr
[docs]
def cr_from_anno(
obj: "sdmx.model.v21.ContentConstraint",
dsd: "sdmx.model.common.BaseDataStructureDefinition",
) -> None:
"""Convert an annotation on `obj` into a :class:`.CubeRegion` constraint."""
all_info = _pop_anno(obj, "_data_content_region")
if all_info is None:
return
for info in all_info:
obj.data_content_region.append(cr_from(info, dsd))
[docs]
def dks_from_anno(
obj: "sdmx.model.v21.ContentConstraint",
dsd: "sdmx.model.common.BaseDataStructureDefinition",
) -> None:
"""Convert an annotation on `obj` into a :class:`.DataKeySet` constraint."""
info = _pop_anno(obj, "_data_content_keys")
if info is None:
return
dks = DataKeySet(included=True, keys=[])
for dim_id, values in info.items():
dim = dsd.dimensions.get(dim_id)
for value in values:
dks.keys.append(
DataKey(
key_value={dim: m.ComponentValue(value_for=dim, value=value)},
included=True,
)
)
obj.data_content_keys = dks
[docs]
def merge_general_constraints(
cc: "sdmx.model.v21.ContentConstraint",
dsd: "sdmx.model.common.BaseDataStructureDefinition",
sm: msg.StructureMessage,
) -> None:
"""Merge general constraints from `sm` into `cc` if relevant to `dsd`."""
for other_cc in filter(
lambda obj: obj.id.startswith("GENERAL"), sm.constraint.values()
):
for i, info in enumerate(_get_anno(other_cc, "_data_content_region")):
if not (set(info.keys()) - {"included"}) < set(
dim.id for dim in dsd.dimensions
):
continue
# log.debug(
# f"Extend {repr(cc)} using {repr(other_cc)}.data_content_region[{i}]"
# )
cc.data_content_region.append(cr_from(info, dsd))