import logging
from collections import ChainMap
from datetime import datetime
from functools import lru_cache
from typing import List, cast
import numpy as np
import sdmx.message as msg
import sdmx.model as m
from sdmx import Client
from item.structure import base
log = logging.getLogger(__name__)
def _get_anno(obj, id):
"""Wrapper around :meth:`AnnotableArtefact.get_annotation`.
Like :func:`_pop_anno`, but doesn't remove the annotation.
"""
try:
return eval(obj.get_annotation(id=id).text.localized_default())
except KeyError:
return None
def _pop_anno(obj, id):
"""Wrapper around :meth:`AnnotableArtefact.pop_annotation`.
Inverse of :func:`_annotate`.
"""
try:
return eval(obj.pop_annotation(id=id).text.localized_default())
except KeyError:
return None
[docs]@lru_cache()
def get_cdc():
"""Retrieve the ``CROSS_DOMAIN_CONCEPTS`` from the SDMX Global Registry."""
id = "CROSS_DOMAIN_CONCEPTS"
msg = Client("SGR").conceptscheme(id)
return msg.concept_scheme[id]
[docs]@lru_cache()
def generate() -> msg.StructureMessage:
"""Return the SDMX data structures for iTEM data."""
item_agency = base.AS_ITEM.items["iTEM"]
sm = msg.StructureMessage(
prepared=datetime.now(), header=msg.Header(sender=item_agency)
)
# Add the AgencyScheme containing iTEM
sm.organisation_scheme[base.AS_ITEM.id] = base.AS_ITEM
# Add concept schemes
for cs in base.CONCEPT_SCHEMES:
sm.concept_scheme[cs.id] = cs
# Ensure concepts are associated to their parent scheme
for item in cs:
item.parent = item.parent or cs
# Process and add code lists
for id, codes in base.CODELISTS.items():
# Create a code list object
cl = m.Codelist(id=f"CL_{id}")
# Add each code and any children
# TODO move this upstream to sdmx1
for c in codes:
cl.append(c)
cl.extend(c.child)
# Add to the message
sm.codelist[cl.id] = cl
# Process and add data structure definitions
for dsd in base.DATA_STRUCTURES:
prepare_dsd(dsd, sm)
# Add to the message
sm.structure[dsd.id] = dsd
# Process and add content constraints
for cc in base.CONSTRAINTS:
# Add the constraint to the message
sm.constraint[cc.id] = cc
# Look up the object that is constrained
try:
dsd = sm.structure[cc.id]
except KeyError:
log.info(f"No constraint(s) for {repr(dsd)}")
continue
# Update the constraint with a reference to the DSD
cc.content.add(dsd)
# Convert annotations into DataKeySet and CubeRegion objects, associated with
# the DSDs
dks_from_anno(cc, dsd)
cr_from_anno(cc, dsd)
# Update the constraint using applicable CubeRegions from GENERAL0, GENERAL1,
# etc.
merge_general_constraints(cc, dsd, sm)
# Add MaintainableArtefact properties to all objects
for kind in (
"codelist",
"concept_scheme",
"constraint",
"organisation_scheme",
"structure",
):
for obj in getattr(sm, kind).values():
obj.maintainer = item_agency
obj.version = base.VERSION
obj.is_external_reference = False
return sm
[docs]def merge_dsd(
sm: msg.StructureMessage,
target: str,
others: List[str],
fill_value: str = "_Z",
) -> m.DataSet:
"""‘Merge’ 2 or more data structure definitions."""
dsd_target = sm.structure[target]
# Create a temporary DataSet
ds = m.DataSet(structured_by=dsd_target)
# Count of keys
count = 0
for dsd_id in others:
# Retrieve the DSD
dsd = sm.structure[dsd_id]
# Retrieve a constraint that affects this DSD
ccs = [cc for cc in sm.constraint.values() if dsd in cc.content]
assert len(ccs) <= 1
cc = ccs[0] if len(ccs) and len(ccs[0].data_content_region) else None
# Key for the VARIABLE dimension
base_key = m.Key(VARIABLE=dsd_id, described_by=dsd_target.dimensions)
# Add KeyValues for other dimensions included in the target but not in this DSD
for dim in dsd_target.dimensions:
if dim.id in base_key.values or dim.id in dsd.dimensions:
continue
base_key[dim.id] = dim.local_representation.enumerated[fill_value]
# Iterate over the possible keys in `dsd`; add to `k`
ds.add_obs(
m.Observation(dimension=(base_key + key).order(), value=np.NaN)
for key in dsd.iter_keys(constraint=cc)
)
log.info(f"{repr(dsd)}: {len(ds.obs) - count} keys")
count = len(ds.obs)
log.info(
f"Total keys: {len(ds.obs)}\n"
+ "\n".join(map(lambda o: repr(o.dimension), ds.obs[:5]))
)
return ds
[docs]def prepare_dsd(dsd: m.DataStructureDefinition, sm: msg.StructureMessage):
"""Populate data structures within `dsd`."""
# Concepts for each dimension of each DSD
dsd_concepts = ChainMap(
sm.concept_scheme["TRANSPORT"].items,
sm.concept_scheme["MODELING"].items,
# Retrieve the CROSS_DOMAIN_CONCEPTS scheme from the SDMX Global Registry
get_cdc(),
)
try:
# Pop an annotation and use it to produce a list of dimension IDs
dims = _pop_anno(dsd, "_dimensions").split()
except AttributeError:
# No dimensions
dims = []
# Add common dimensions
dims = dims + ["REF_AREA", "TIME_PERIOD"]
# Add dimensions to the data structure
for order, concept_id in enumerate(dims):
# Locate the corresponding concept in one of three concept schemes
concept = dsd_concepts.get(concept_id)
if concept_id == "VARIABLE":
d: m.DimensionComponent = m.MeasureDimension(
id="VARIABLE",
name="Variable",
description="Reference to a concept from CL_TRANSPORT_MEASURES.",
local_representation=m.Representation(
enumerated=sm.concept_scheme["TRANSPORT_MEASURE"]
),
)
elif concept is None:
raise KeyError(concept_id)
else:
# Create the dimension, referring to the concept
d = m.Dimension(
id=concept_id, name=concept.name, concept_identity=concept, order=order
)
try:
# The dimension is represented by the corresponding code list, if any
d.local_representation = m.Representation(
enumerated=sm.codelist[f"CL_{concept_id}"]
)
except KeyError:
pass # No iTEM codelist for this concept
# Append this dimension
dsd.dimensions.append(d)
# Add a primary measure: either one with ID matching the DSD, or OBS_VALUE as backup
concept = dsd_concepts.get(dsd.id) or dsd_concepts.get("OBS_VALUE")
assert concept is not None
dsd.measures.append(
m.PrimaryMeasure(id=concept.id, name=concept.name, concept_identity=concept)
)
# Assign order to the dimensions
dsd.dimensions.assign_order()
[docs]def cr_from(info: dict, dsd: m.DataStructureDefinition) -> m.CubeRegion:
"""Create a :class:`.CubeRegion` from a simple :class:`dict` of `info`."""
cr = m.CubeRegion(included=info.pop("included", True))
for dim_id, values in info.items():
dim = cast(m.Dimension, dsd.dimensions.get(dim_id))
values = values.split()
if values[0] == "!":
included = False
values.pop(0)
else:
included = True
cr.member[dim] = m.MemberSelection(
included=included,
values_for=dim,
values=[m.MemberValue(value=value) for value in values],
)
return cr
[docs]def cr_from_anno(obj: m.ContentConstraint, dsd: m.DataStructureDefinition) -> None:
"""Convert an annotation on `obj` into a :class:`.CubeRegion` constraint."""
all_info = _pop_anno(obj, "_data_content_region")
if all_info is None:
return
for info in all_info:
obj.data_content_region.append(cr_from(info, dsd))
[docs]def dks_from_anno(obj: m.ContentConstraint, dsd: m.DataStructureDefinition) -> None:
"""Convert an annotation on `obj` into a :class:`.DataKeySet` constraint."""
info = _pop_anno(obj, "_data_content_keys")
if info is None:
return
dks = m.DataKeySet(included=True, keys=[])
for dim_id, values in info.items():
dim = dsd.dimensions.get(dim_id)
for value in values:
dks.keys.append(
m.DataKey(
key_value={dim: m.ComponentValue(value_for=dim, value=value)},
included=True,
)
)
obj.data_content_keys = dks
[docs]def merge_general_constraints(
cc: m.ContentConstraint,
dsd: m.DataStructureDefinition,
sm: msg.StructureMessage,
) -> None:
"""Merge general constraints from `sm` into `cc` if relevant to `dsd`."""
for other_cc in filter(
lambda obj: obj.id.startswith("GENERAL"), sm.constraint.values()
):
for i, info in enumerate(_get_anno(other_cc, "_data_content_region")):
if not (set(info.keys()) - {"included"}) < set(
dim.id for dim in dsd.dimensions
):
continue
# log.debug(
# f"Extend {repr(cc)} using {repr(other_cc)}.data_content_region[{i}]"
# )
cc.data_content_region.append(cr_from(info, dsd))