"""Diagnostics for historical data sets."""
from pathlib import Path
import pandas as pd
from item.historical import fetch_source, source_str
from . import A003
# Quality checks
QUALITY = [A003.compute]
# Jinja2 template for diagnostics index page
INDEX_TEMPLATE = """<html><body>
{% for group_name, paths in groups.items() %}
<h1>{{ group_name|title }}</h1>
<ul>
{% for path in paths %}
<li><a href="./{{ path }}">{{ path }}</a></li>
{% endfor %}
</ul>
{% endfor %}
</body></html>
"""
# Template for coverage()
COV_TEXT = """{N_area} areas: {areas}
{N_measures} measures: {measures}
{N_periods} periods: {periods[0]}–{last_period}
Measure-by-area coverage:
"""
[docs]def coverage(df, area="COUNTRY", measure="VARIABLE", period="TIME_PERIOD"):
"""Return information about the coverage of a data set."""
# String report
areas = sorted(df[area].unique())
measures = sorted(df[measure].unique())
periods = sorted(df[period].unique())
result = COV_TEXT.format(
N_area=len(areas),
areas=" ".join(areas),
N_measures=len(measures),
measures=measures,
N_periods=len(periods),
periods=periods,
last_period=periods[-1],
)
counts = df.groupby([measure, area]).count()
for m, df0 in df.groupby(measure):
result += f"\n{m}\n"
for a, df1 in df0.groupby(area):
# Number of observations. Some observations have a status, but no
# value
obs = max(counts.xs((m, a))[["value", "OBS_STATUS"]])
# Observation value
values = counts.xs((m, a))["value"]
# Periods appearing in this series
gp = sorted(df1[period].unique())
missing = (periods.index(gp[-1]) + 1 - periods.index(gp[0])) - obs
# Assemble line
result += (
f" {a}: {obs} obs {gp[0]}–{gp[-1]}"
+ (f" ({missing} gaps)" if missing else "")
+ f"; {values} values\n"
)
return result
[docs]def run_all(output_path):
"""Run all diagnostics."""
from zipfile import ZIP_DEFLATED, ZipFile
from jinja2 import Template
output_path = Path(output_path)
output_path.mkdir(parents=True, exist_ok=True)
data_files = []
# Coverage
groups = {"Coverage": [], "Quality": []}
for source_id in [0, 1, 2, 3]:
# Output filename
filename = f"{source_str(source_id)}.txt"
groups["Coverage"].append(filename)
# Read source data
data_files.append(fetch_source(source_id, use_cache=True))
data = pd.read_csv(data_files[-1])
# Generate coverage and write to file
# TODO this doesn't allow for column names other than the defaults to
# coverage(), above; generalize
(output_path / filename).write_text(coverage(data))
# Quality
from item.historical import process
for check in QUALITY:
# Output filename
filename = f"{check.__name__.split('.')[-1]}.csv"
groups["Quality"].append(filename)
data_files.append(output_path / filename)
# TODO this is specific to A003; generalize
check(process(3), process(9)).to_csv(data_files[-1])
# Archive data files
zf = ZipFile(
output_path / "data.zip", mode="w", compression=ZIP_DEFLATED, compresslevel=9
)
for path in data_files:
zf.write(filename=path, arcname=path.name)
groups["Cached raw source data"] = ["data.zip"]
# Generate index file
t = Template(INDEX_TEMPLATE)
(output_path / "index.html").write_text(t.render(groups=groups))