import os
from .RDEModel import *
import requests
import pickle
import pandas as pd
RDE_TYPE_TO_STATIC_CLASS_DEF = {
RDEType.HR.value: HistoricalRecord,
'hr': HistoricalRecord,
RDEType.OBS.value: Observation,
RDEType.POI.value: PointOfInterest,
RDEType.GEOM.value: Geometry,
RDEType.DATASET.value: Dataset,
RDEType.MAP.value: Map,
RDEType.LAYER.value: Layer,
RDEType.AREA.value: Area
}
[docs]
class TimeAtlas:
entity_cache = {}
default_save_cache_filepath = 'rde_entity_cache.pkl'
[docs]
def __init__(self, api_url: str):
self.api_url = api_url
# requests.get(f'{self.api_url}/status').raise_for_status()
if api_url.endswith('/v1/'):
# removing trailing slash as it makes the endpoint construction clearer.
self.api_url = api_url[:-1]
if not self.api_url.endswith('/v1'):
raise ValueError('API URL must end with /v1')
# test health of the API by querying the health endpoint
resp = requests.get(f'{self.api_url}/health')
if resp.status_code != 200:
raise ConnectionError(f'Could not connect to TimeAtlas API at {self.api_url}. Status code: {resp.status_code}')
if os.path.exists(self.default_save_cache_filepath):
with open(self.default_save_cache_filepath, 'rb') as f:
self.entity_cache = pickle.load(f)
else:
self.entity_cache = {}
[docs]
def save_entity_cache_to_file(self, filepath: str = None):
if filepath is None:
filepath = self.default_save_cache_filepath
with open(filepath, 'wb') as f:
pickle.dump(self.entity_cache, f)
[docs]
def get_single_rde_object(self, endpoint: str, uuid: str) -> RDE:
if uuid in self.entity_cache:
return self.entity_cache[uuid]
resp = requests.get(f'{self.api_url}/{endpoint}/{uuid}')
resp.raise_for_status()
data = resp.json()
rde_type = data.get('rde_type')
if rde_type is None and 'properties' in data and 'rde_type' in data['properties']:
# special cases for geometries, as they are GeoJSON Feature objects
rde_type = data['properties']['rde_type']
if rde_type not in RDE_TYPE_TO_STATIC_CLASS_DEF:
raise ValueError(f'Unknown RDE type: {rde_type}')
res = RDE_TYPE_TO_STATIC_CLASS_DEF[rde_type].constructor_from_json_obj(data)
self.entity_cache[uuid] = res
return res
[docs]
def get_all_results_from_endpoint(self, endpoint: str, per_page: int = 1000) -> list[dict]:
# the TimeAtlas API paginates results, so we need to loop until we get all results. 1000 is the maximal amount per page.
results = []
page = 1
while True:
resp = requests.get(f'{self.api_url}/{endpoint}', params={'page': page, 'per_page': per_page}, headers={'Accept': 'application/json'})
resp.raise_for_status()
data = resp.json()
results.extend(data['items'])
if 'next' not in data or data['next'] is None:
break
page += 1
return results
[docs]
def get_dataset(self, dataset_uuid: str) -> Dataset:
# resp = requests.get(f'{self.api_url}/datasets/{dataset_uuid}')
return self.get_single_rde_object('datasets', dataset_uuid)
[docs]
def get_dataset_by_slug(self, slug: str) -> Dataset:
resp = requests.get(f'{self.api_url}/datasets', headers={'Accept': 'application/json'})
data = resp.json()
for dataset in data['items']:
if dataset['slug'] == slug:
ds = Dataset.constructor_from_json_obj(dataset)
self.entity_cache[dataset['uuid']] = ds
return ds
raise ValueError(f'Dataset with slug {slug} not found')
[docs]
def generate_all_hr_from_dataset(self, dataset: Dataset) -> list[HistoricalRecord]:
hr_jsons = self.get_all_results_from_endpoint('hr/search?query=&dataset_slug=' + dataset.slug, per_page=1000)
hrs = [HR.constructor_from_json_obj(hr_json) for hr_json in hr_jsons]
self.entity_cache.update({hr.uuid: hr for hr in hrs})
return hrs
# Waring: very slow. Waiting on a better API endpoint to retrieve all obs for a dataset
[docs]
def generate_obs_from_list_of_hr(self, hr_list: list[HistoricalRecord]) -> list[Observation]:
obs_uuids = set()
for hr in hr_list:
for obs_ref in hr.documents:
match obs_ref:
case str():
obs_uuids.add(obs_ref)
case Obs():
obs_uuids.add(obs_ref.uuid)
obs_list = []
# for obs_uuid in tqdm(obs_uuids, desc='Fetching observations'):
for obs_uuid in obs_uuids:
obs_list.append(self.get_single_rde_object('obs', obs_uuid))
return obs_list
[docs]
def generate_geoms_from_list_of_obs(self, obs_list: list[Observation ]) -> list[Geometry]:
geom_uuids = set()
for obs in obs_list:
for geom_ref in obs.has_geometry:
match geom_ref:
case str():
geom_uuids.add(geom_ref)
case Geometry():
geom_uuids.add(geom_ref.uuid)
geom_list = []
# for geom_uuid in tqdm(geom_uuids, desc='Fetching geometries'):
for geom_uuid in geom_uuids:
geom_list.append(self.get_single_rde_object('geometries', geom_uuid))
return geom_list
[docs]
def generate_pois_from_list_of_obs(self, obs_list: list[Observation]) -> list[PointOfInterest]:
poi_uuids = set()
for obs in obs_list:
poi_ref = obs.has_handle
match poi_ref:
case str():
poi_uuids.add(poi_ref)
case PointOfInterest():
poi_uuids.add(poi_ref.uuid)
poi_list = []
# for poi_uuid in tqdm(poi_uuids, desc='Fetching POIs'):
for poi_uuid in poi_uuids:
poi_list.append(self.get_single_rde_object('poi', poi_uuid))
return poi_list
[docs]
def materialize_all_rde_from_dataset_obj(self, dataset: Dataset) -> list[RDE]:
hrs = self.generate_all_hr_from_dataset(dataset)
obs = self.generate_obs_from_list_of_hr(hrs)
geoms = self.generate_geoms_from_list_of_obs(obs)
pois = self.generate_pois_from_list_of_obs(obs)
for o in obs: o.actualize_references(self.entity_cache)
for h in hrs: h.actualize_observations_references(self.entity_cache)
return hrs + obs + geoms + pois
[docs]
def materialize_all_rde_from_dataset_slug(self, dataset_slug: str) -> list[RDE]:
ds = self.get_dataset_by_slug(dataset_slug)
return self.materialize_all_rde_from_dataset_obj(ds)
[docs]
@staticmethod
def hr_list_to_dataframe(hr_list: list[HistoricalRecord]) -> pd.DataFrame:
hr_dicts = []
for hr in hr_list:
hr_dict = hr.to_dict()
hr_dict['obj'] = hr
hr_dicts.append(hr_dict)
return pd.DataFrame(hr_dicts)