Commit ba00aec8 authored by lukas leufen's avatar lukas leufen

refactored some parts in the single station data handler to be more flexible

parent c63cff8a
Pipeline #51117 passed with stages
in 7 minutes and 25 seconds
__author__ = 'Lukas Leufen'
__date__ = '2020-11-05'
from mlair.data_handler.data_handler_single_station import DataHandlerSingleStation
from mlair.configuration import path_config
import logging
import os
import pandas as pd
import xarray as xr
class DataHandlerMixedSampling(DataHandlerSingleStation):
def setup_samples(self):
"""
Setup samples. This method prepares and creates samples X, and labels Y.
"""
self.load_data()
self.interpolate(dim=self.time_dim, method=self.interpolation_method, limit=self.interpolation_limit)
self.set_inputs_and_targets()
if self.do_transformation is True:
self.call_transform()
self.make_samples()
def load_data(self):
try:
self.read_data_from_disk()
except FileNotFoundError:
self.download_data()
self.load_data()
def read_data_from_disk(self, source_name=""):
"""
Load data and meta data either from local disk (preferred) or download new data by using a custom download method.
Data is either downloaded, if no local data is available or parameter overwrite_local_data is true. In both
cases, downloaded data is only stored locally if store_data_locally is not disabled. If this parameter is not
set, it is assumed, that data should be saved locally.
"""
source_name = source_name if len(source_name) == 0 else f" from {source_name}"
path_config.check_path_and_create(self.path)
file_name = self._set_file_name()
meta_file = self._set_meta_file_name()
if self.overwrite_local_data is True:
logging.debug(f"overwrite_local_data is true, therefore reload {file_name}{source_name}")
if os.path.exists(file_name):
os.remove(file_name)
if os.path.exists(meta_file):
os.remove(meta_file)
data, self.meta = self.download_data(file_name, meta_file)
logging.debug(f"loaded new data{source_name}")
else:
try:
logging.debug(f"try to load local data from: {file_name}")
data = xr.open_dataarray(file_name)
self.meta = pd.read_csv(meta_file, index_col=0)
self.check_station_meta()
logging.debug("loading finished")
except FileNotFoundError as e:
logging.debug(e)
logging.debug(f"load new data{source_name}")
data, self.meta = self.download_data(file_name, meta_file)
logging.debug("loading finished")
# create slices and check for negative concentration.
data = self._slice_prep(data)
self._data = self.check_for_negative_concentrations(data)
......@@ -52,7 +52,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
min_length: int = 0, start=None, end=None, variables=None, **kwargs):
super().__init__() # path, station, statistics_per_var, transformation, **kwargs)
self.station = helpers.to_list(station)
self.path = os.path.abspath(data_path)
self.path = os.path.abspath(data_path) # ToDo: data_path could be a dict or list?
self.statistics_per_var = statistics_per_var
self.do_transformation = transformation is not None
self.input_data, self.target_data = self.setup_transformation(transformation)
......@@ -141,7 +141,8 @@ class DataHandlerSingleStation(AbstractDataHandler):
"""
Setup samples. This method prepares and creates samples X, and labels Y.
"""
self.load_data()
self.load_data(self.station, self.statistics_per_var, self.sampling, self.station_type, self.network,
self.store_data_locally)
self.interpolate(dim=self.time_dim, method=self.interpolation_method, limit=self.interpolation_limit)
self.set_inputs_and_targets()
if self.do_transformation is True:
......@@ -160,7 +161,8 @@ class DataHandlerSingleStation(AbstractDataHandler):
self.make_observation(self.target_dim, self.target_var, self.time_dim)
self.remove_nan(self.time_dim)
def read_data_from_disk(self, source_name=""):
def load_data(self, station, statistics_per_var, sampling, station_type=None, network=None,
store_data_locally=False):
"""
Load data and meta data either from local disk (preferred) or download new data by using a custom download method.
......@@ -168,35 +170,41 @@ class DataHandlerSingleStation(AbstractDataHandler):
cases, downloaded data is only stored locally if store_data_locally is not disabled. If this parameter is not
set, it is assumed, that data should be saved locally.
"""
source_name = source_name if len(source_name) == 0 else f" from {source_name}"
check_path_and_create(self.path)
file_name = self._set_file_name()
meta_file = self._set_meta_file_name()
file_name = self._set_file_name(self.path, station, statistics_per_var)
meta_file = self._set_meta_file_name(self.path, station, statistics_per_var)
if self.overwrite_local_data is True:
logging.debug(f"overwrite_local_data is true, therefore reload {file_name}{source_name}")
logging.debug(f"overwrite_local_data is true, therefore reload {file_name}")
if os.path.exists(file_name):
os.remove(file_name)
if os.path.exists(meta_file):
os.remove(meta_file)
data, self.meta = self.download_data(file_name, meta_file)
logging.debug(f"loaded new data{source_name}")
data, self.meta = self.download_data(file_name, meta_file, station, statistics_per_var, sampling,
station_type=station_type, network=network,
store_data_locally=store_data_locally)
logging.debug(f"loaded new data")
else:
try:
logging.debug(f"try to load local data from: {file_name}")
data = xr.open_dataarray(file_name)
self.meta = pd.read_csv(meta_file, index_col=0)
self.check_station_meta()
self.check_station_meta(station, station_type, network)
logging.debug("loading finished")
except FileNotFoundError as e:
logging.debug(e)
logging.debug(f"load new data{source_name}")
data, self.meta = self.download_data(file_name, meta_file)
logging.debug(f"load new data")
data, self.meta = self.download_data(file_name, meta_file, station, statistics_per_var, sampling,
station_type=station_type, network=network,
store_data_locally=store_data_locally)
logging.debug("loading finished")
# create slices and check for negative concentration.
data = self._slice_prep(data)
self._data = self.check_for_negative_concentrations(data)
def download_data_from_join(self, file_name: str, meta_file: str) -> [xr.DataArray, pd.DataFrame]:
@staticmethod
def download_data_from_join(file_name: str, meta_file: str, station, statistics_per_var, sampling,
station_type=None, network=None, store_data_locally=True) -> [xr.DataArray,
pd.DataFrame]:
"""
Download data from TOAR database using the JOIN interface.
......@@ -209,36 +217,36 @@ class DataHandlerSingleStation(AbstractDataHandler):
:return: downloaded data and its meta data
"""
df_all = {}
df, meta = join.download_join(station_name=self.station, stat_var=self.statistics_per_var,
station_type=self.station_type, network_name=self.network, sampling=self.sampling)
df_all[self.station[0]] = df
df, meta = join.download_join(station_name=station, stat_var=statistics_per_var, station_type=station_type,
network_name=network, sampling=sampling)
df_all[station[0]] = df
# convert df_all to xarray
xarr = {k: xr.DataArray(v, dims=['datetime', 'variables']) for k, v in df_all.items()}
xarr = xr.Dataset(xarr).to_array(dim='Stations')
if self.store_data_locally is True:
if store_data_locally is True:
# save locally as nc/csv file
xarr.to_netcdf(path=file_name)
meta.to_csv(meta_file)
return xarr, meta
def download_data(self, file_name, meta_file):
data, meta = self.download_data_from_join(file_name, meta_file)
def download_data(self, *args, **kwargs):
data, meta = self.download_data_from_join(*args, **kwargs)
return data, meta
def check_station_meta(self):
def check_station_meta(self, station, station_type, network):
"""
Search for the entries in meta data and compare the value with the requested values.
Will raise a FileNotFoundError if the values mismatch.
"""
if self.station_type is not None:
check_dict = {"station_type": self.station_type, "network_name": self.network}
if station_type is not None:
check_dict = {"station_type": station_type, "network_name": network}
for (k, v) in check_dict.items():
if v is None:
continue
if self.meta.at[k, self.station[0]] != v:
if self.meta.at[k, station[0]] != v:
logging.debug(f"meta data does not agree with given request for {k}: {v} (requested) != "
f"{self.meta.at[k, self.station[0]]} (local). Raise FileNotFoundError to trigger new "
f"{self.meta.at[k, station[0]]} (local). Raise FileNotFoundError to trigger new "
f"grapping from web.")
raise FileNotFoundError
......@@ -303,13 +311,15 @@ class DataHandlerSingleStation(AbstractDataHandler):
res.name = index_name
return res
def _set_file_name(self):
all_vars = sorted(self.statistics_per_var.keys())
return os.path.join(self.path, f"{''.join(self.station)}_{'_'.join(all_vars)}.nc")
@staticmethod
def _set_file_name(path, station, statistics_per_var):
all_vars = sorted(statistics_per_var.keys())
return os.path.join(path, f"{''.join(station)}_{'_'.join(all_vars)}.nc")
def _set_meta_file_name(self):
all_vars = sorted(self.statistics_per_var.keys())
return os.path.join(self.path, f"{''.join(self.station)}_{'_'.join(all_vars)}_meta.csv")
@staticmethod
def _set_meta_file_name(path, station, statistics_per_var):
all_vars = sorted(statistics_per_var.keys())
return os.path.join(path, f"{''.join(station)}_{'_'.join(all_vars)}_meta.csv")
def interpolate(self, dim: str, method: str = 'linear', limit: int = None, use_coordinate: Union[bool, str] = True,
**kwargs):
......@@ -490,13 +500,6 @@ class DataHandlerSingleStation(AbstractDataHandler):
else:
raise NotImplementedError("Cannot handle this.")
def load_data(self):
try:
self.read_data_from_disk()
except FileNotFoundError:
self.download_data()
self.load_data()
def transform(self, data_class, dim: Union[str, int] = 0, transform_method: str = 'standardise',
inverse: bool = False, mean=None,
std=None, min=None, max=None) -> None:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment