importjsonimportloggingimportsysfromdatetimeimportdatetimeimportpandasaspdimportrequestsfromitem.commonimportconfig,pathslog=logging.getLogger(__name__)log.setLevel(logging.INFO)log.addHandler(logging.StreamHandler(sys.stdout))classAPIError(Exception):"""Error message returned by OpenKAPSARC."""passclassDataset:def__init__(self,data):self.data=data@propertydefid(self):returnself.data["dataset"]["dataset_id"]@propertydefuid(self):returnself.data["dataset"]["dataset_uid"]@propertydefrecords_count(self):returnself.data["dataset"]["metas"]["default"]["records_count"]@propertydefdata_processed(self):returndatetime.fromisoformat(self.data["dataset"]["metas"]["default"]["data_processed"])def__str__(self):returnf"<Dataset {self.uid}: '{self.id}'>"
[docs]classOpenKAPSARC:"""Wrapper for the OpenKAPSARC data API. See https://datasource.kapsarc.org/api/v2/console Parameters ---------- server : str, optional Address of the server, e.g. `http://example.com:8888`. """ALL=sys.maxsizemax={"rows":1000}server="https://datasource.kapsarc.org/api/v2"# Alternate values include 'opendatasoft', which includes all public data# sets hosted by the software provider used by KAPSARCsource="catalog"def__init__(self,server=None,api_key=None):self.server=serverorself.serverself.api_key=api_keyorconfig.get("api_key",None)def_modify_params(self,params):params.setdefault("apikey",self.api_key)
[docs]defendpoint(self,name,*args,params={},**kwargs):"""Call the API endpoint *name* with any additional *args*."""# Construct the URLself._modify_params(params)args=list(filter(None,args))url_parts=[self.server,self.source,name]+args# Make the requestr=requests.get("/".join(url_parts),params=params,**kwargs)log.debug(r.url)r.raise_for_status()if"application/json"inr.headers["content-type"]:# Response in JSONtry:returnr.json()exceptjson.JSONDecodeErrors:log.error(r.content)raiseelse:log.debug(r.headers["content-type"])returnr
defdatasets(self,dataset_id=None,*args,params={},kw=None,**kwargs):ifkw:if"where"inparams:raiseValueError("either give kw= or params={'where': …}")params["where"]=f"keyword LIKE '{kw}'"result=self.endpoint("datasets",dataset_id,*args,params=params,**kwargs)ifdataset_id:returnDataset(result)else:total_count=result["total_count"]log.info("{} results; retrieved {}".format(total_count,len(result["datasets"])))return[Dataset(ds_json)fords_jsoninresult["datasets"]]
[docs]deftable(self,dataset_id,cache=True,**kwargs):"""Return data from dataset *name*. Currently only the latest data on the master branch is returned. Returns ------- :class:`pandas.DataFrame` """# Make another request to get dataset informationds=self.datasets(dataset_id)# Cache pathcache_path=(paths["historical"]/ds.uid).with_suffix(".csv")cache_is_valid=Falselog.info(f"Cache path {cache_path}")ifcacheandcache_path.exists():cache_is_valid=True# Check cache timecache_time=datetime.fromtimestamp(cache_path.stat().st_mtime)ifcache_time<ds.data_processed.replace(tzinfo=None):cache_is_valid=Falselog.info("…is outdated → remove")ifcache_is_valid:# Check cache lengthwithopen(cache_path)asf:cache_records=sum(1for_inf)ifcache_records<ds.records_count:cache_is_valid=Falselog.info(f"...has fewer records ({cache_records}) than "f"source ({ds.records_count}) -> remove")ifnotcache_is_valid:cache_path.unlink()else:log.info("…is current; reading from file")returnpd.read_csv(cache_path,sep=";")# Stream datakwargs["stream"]=Trueargs=["datasets",dataset_id,"exports","csv"]withself.endpoint(*args,**kwargs)asresponse:# Write content to filewithopen(cache_path,"wb")ascache:forchunkinresponse.iter_content():cache.write(chunk)# Parse and returnreturnpd.read_csv(cache_path,sep=";")