import json
import logging
import sys
from datetime import datetime
import pandas as pd
import requests
import requests_cache
from item.common import config, paths
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
log.addHandler(logging.StreamHandler(sys.stdout))
requests_cache.install_cache("item")
class APIError(Exception):
"""Error message returned by OpenKAPSARC."""
pass
class Dataset:
def __init__(self, data):
self.data = data
@property
def id(self):
return self.data["dataset"]["dataset_id"]
@property
def uid(self):
return self.data["dataset"]["dataset_uid"]
@property
def records_count(self):
return self.data["dataset"]["metas"]["default"]["records_count"]
@property
def data_processed(self):
return datetime.fromisoformat(
self.data["dataset"]["metas"]["default"]["data_processed"]
)
def __str__(self):
return f"<Dataset {self.uid}: '{self.id}'>"
[docs]class OpenKAPSARC:
"""Wrapper for the OpenKAPSARC data API.
See https://datasource.kapsarc.org/api/v2/console
Parameters
----------
server : str, optional
Address of the server, e.g. `http://example.com:8888`.
"""
ALL = sys.maxsize
max = {"rows": 1000}
server = "https://datasource.kapsarc.org/api/v2"
# Alternate values include 'opendatasoft', which includes all public data
# sets hosted by the software provider used by KAPSARC
source = "catalog"
def __init__(self, server=None, api_key=None):
self.server = server or self.server
self.api_key = api_key or config.get("api_key", None)
def _modify_params(self, params):
params.setdefault("apikey", self.api_key)
[docs] def endpoint(self, name, *args, params={}, **kwargs):
"""Call the API endpoint *name* with any additional *args*."""
# Construct the URL
self._modify_params(params)
args = list(filter(None, args))
url_parts = [self.server, self.source, name] + args
# Make the request
r = requests.get("/".join(url_parts), params=params, **kwargs)
log.debug(r.url)
r.raise_for_status()
if "application/json" in r.headers["content-type"]:
# Response in JSON
try:
return r.json()
except json.JSONDecodeErrors:
log.error(r.content)
raise
else:
log.debug(r.headers["content-type"])
return r
def datasets(self, dataset_id=None, *args, params={}, kw=None, **kwargs):
if kw:
if "where" in params:
raise ValueError("either give kw= or params={'where': …}")
params["where"] = f"keyword LIKE '{kw}'"
result = self.endpoint("datasets", dataset_id, *args, params=params, **kwargs)
if dataset_id:
return Dataset(result)
else:
total_count = result["total_count"]
log.info(
"{} results; retrieved {}".format(total_count, len(result["datasets"]))
)
return [Dataset(ds_json) for ds_json in result["datasets"]]
[docs] def table(self, dataset_id, cache=True, **kwargs):
"""Return data from dataset *name*.
Currently only the latest data on the master branch is returned.
Returns
-------
:class:`pandas.DataFrame`
"""
# Make another request to get dataset information
ds = self.datasets(dataset_id)
# Cache path
cache_path = (paths["historical"] / ds.uid).with_suffix(".csv")
cache_is_valid = False
log.info(f"Cache path {cache_path}")
if cache and cache_path.exists():
cache_is_valid = True
# Check cache time
cache_time = datetime.fromtimestamp(cache_path.stat().st_mtime)
if cache_time < ds.data_processed.replace(tzinfo=None):
cache_is_valid = False
log.info("…is outdated → remove")
if cache_is_valid:
# Check cache length
with open(cache_path) as f:
cache_records = sum(1 for _ in f)
if cache_records < ds.records_count:
cache_is_valid = False
log.info(
f"...has fewer records ({cache_records}) than "
f"source ({ds.records_count}) -> remove"
)
if not cache_is_valid:
cache_path.unlink()
else:
log.info("…is current; reading from file")
return pd.read_csv(cache_path, sep=";")
# Stream data
kwargs["stream"] = True
args = ["datasets", dataset_id, "exports", "csv"]
with self.endpoint(*args, **kwargs) as response:
# Write content to file
with open(cache_path, "wb") as cache:
for chunk in response.iter_content():
cache.write(chunk)
# Parse and return
return pd.read_csv(cache_path, sep=";")