Commit b06bcacc authored by lukas leufen's avatar lukas leufen

introduced two new concepts DataClass and TransformationClass to set...

introduced two new concepts DataClass and TransformationClass to set transformation for inputs and targets independently
parent 7c403660
Pipeline #50816 passed with stages
in 7 minutes and 22 seconds
__author__ = "Lukas Leufen"
__date__ = '2020-06-25'
from mlair.helpers.statistics import TransformationClass
DEFAULT_STATIONS = ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087']
DEFAULT_VAR_ALL_DICT = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values',
......@@ -13,8 +14,7 @@ DEFAULT_START = "1997-01-01"
DEFAULT_END = "2017-12-31"
# DEFAULT_TRANSFORMATION = {"scope": "data", "method": "standardise", "mean": "estimate"}
DEFAULT_TRANSFORMATION = {"scope": "data", "method": "standardise"}
DEFAULT_TRANSFORMATION = TransformationClass(inputs_method="standardise", targets_method="standardise")
DEFAULT_HPC_LOGIN_LIST = ["ju", "hdfmll"] # ju[wels} #hdfmll(ogin)
DEFAULT_HPC_HOST_LIST = ["jw", "hdfmlc"] # first part of node names for Juwels (jw[comp], hdfmlc(ompute).
......@@ -3,6 +3,7 @@
__author__ = 'Lukas Leufen, Felix Kleinert'
__date__ = '2020-07-20'
import copy
import datetime as dt
import logging
import os
......@@ -53,7 +54,8 @@ class DataHandlerSingleStation(AbstractDataHandler):
self.station = helpers.to_list(station)
self.path = os.path.abspath(data_path)
self.statistics_per_var = statistics_per_var
self.transformation = self.setup_transformation(transformation)
self.do_transformation = transformation is not None
self.input_data, self.target_data = self.setup_transformation(transformation)
self.station_type = station_type = network
......@@ -74,20 +76,13 @@ class DataHandlerSingleStation(AbstractDataHandler):
self.end = end
# internal xr.DataArray = None
self._data: xr.DataArray = None # loaded raw data
self.meta = None
self.variables = list(statistics_per_var.keys()) if variables is None else variables
self.history = None
self.label = None
self.observation = None
# internal for transformation
self.mean = None
self.std = None
self.max = None
self.min = None
self._transform_method = None
# create samples
......@@ -100,7 +95,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
def shape(self):
return, self.get_X().shape, self.get_Y().shape
return self._data.shape, self.get_X().shape, self.get_Y().shape
def __repr__(self):
return f"StationPrep(station={self.station}, data_path='{self.path}', " \
......@@ -109,24 +104,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
f"sampling='{self.sampling}', target_dim='{self.target_dim}', target_var='{self.target_var}', " \
f"time_dim='{self.time_dim}', window_history_size={self.window_history_size}, " \
f"window_lead_time={self.window_lead_time}, interpolation_limit={self.interpolation_limit}, " \
f"interpolation_method='{self.interpolation_method}', overwrite_local_data={self.overwrite_local_data}, " \
def _print_transformation_as_string(self):
str_name = ''
if self.transformation is None:
str_name = f'{None}'
for k, v in self.transformation.items():
if v is not None:
v_pr = f"xr.DataArray.from_dict({v.to_dict()})"
except AttributeError:
v_pr = f"'{v}'"
str_name += f"'{k}':{v_pr}, "
str_name = f"{{{str_name}}}"
return str_name
f"interpolation_method='{self.interpolation_method}', overwrite_local_data={self.overwrite_local_data})"
def get_transposed_history(self) -> xr.DataArray:
"""Return history.
......@@ -153,18 +131,10 @@ class DataHandlerSingleStation(AbstractDataHandler):
return coords.rename(index={"station_lon": "lon", "station_lat": "lat"}).to_dict()[str(self)]
def call_transform(self, inverse=False):
self.transform(dim=self.time_dim, method=self.transformation["method"],
mean=self.transformation['mean'], std=self.transformation["std"],
min_val=self.transformation["min"], max_val=self.transformation["max"],
def set_transformation(self, transformation: dict):
if self._transform_method is not None:
self.transformation = self.setup_transformation(transformation)
kwargs = helpers.remove_items(self.input_data.as_dict(), ["data"])
self.transform(self.input_data, dim=self.time_dim, inverse=inverse, **kwargs)
kwargs = helpers.remove_items(self.target_data.as_dict(), ["data"])
self.transform(self.target_data, dim=self.time_dim, inverse=inverse, **kwargs)
def setup_samples(self):
......@@ -173,10 +143,17 @@ class DataHandlerSingleStation(AbstractDataHandler):
self.interpolate(dim=self.time_dim, method=self.interpolation_method, limit=self.interpolation_limit)
if self.transformation is not None:
if self.do_transformation:
def set_inputs_and_targets(self):
inputs = self._data.sel({self.target_dim: helpers.to_list(self.variables)})
targets = self._data.sel({self.target_dim: self.target_var}) = inputs = targets
def make_samples(self):
self.make_history_window(self.target_dim, self.window_history_size, self.time_dim)
self.make_labels(self.target_dim, self.target_var, self.time_dim, self.window_lead_time)
......@@ -217,7 +194,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
logging.debug("loading finished")
# create slices and check for negative concentration.
data = self._slice_prep(data) = self.check_for_negative_concentrations(data)
self._data = self.check_for_negative_concentrations(data)
def download_data_from_join(self, file_name: str, meta_file: str) -> [xr.DataArray, pd.DataFrame]:
......@@ -372,8 +349,8 @@ class DataHandlerSingleStation(AbstractDataHandler):
:return: xarray.DataArray
""" =, method=method, limit=limit, use_coordinate=use_coordinate,
self._data = self._data.interpolate_na(dim=dim, method=method, limit=limit, use_coordinate=use_coordinate,
def make_history_window(self, dim_name_of_inputs: str, window: int, dim_name_of_shift: str) -> None:
......@@ -390,7 +367,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
:param dim_name_of_shift: Dimension along shift will be applied
window = -abs(window)
data ={dim_name_of_inputs: self.variables})
data =
self.history = self.shift(data, dim_name_of_shift, window)
def make_labels(self, dim_name_of_target: str, target_var: str_or_list, dim_name_of_shift: str,
......@@ -407,7 +384,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
:param window: lead time of label
window = abs(window)
data ={dim_name_of_target: target_var})
data =
self.label = self.shift(data, dim_name_of_shift, window)
def make_observation(self, dim_name_of_target: str, target_var: str_or_list, dim_name_of_shift: str) -> None:
......@@ -420,7 +397,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
:param target_var: Name of observation variable(s) in 'dimension'
:param dim_name_of_shift: Name of dimension on which xarray.DataArray.shift will be applied
data ={dim_name_of_target: target_var})
data =
self.observation = self.shift(data, dim_name_of_shift, 0)
def remove_nan(self, dim: str) -> None:
......@@ -495,7 +472,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
return data
def setup_transformation(transformation: Dict):
def setup_transformation(transformation: statistics.TransformationClass):
Set up transformation by extracting all relevant information.
......@@ -561,23 +538,13 @@ class DataHandlerSingleStation(AbstractDataHandler):
if transformation is None:
elif not isinstance(transformation, dict):
raise TypeError(f"`transformation' must be either `None' or dict like e.g. `{{'method': 'standardise'}},"
f" but transformation is of type {type(transformation)}.")
transformation = transformation.copy()
method = transformation.get("method", None)
mean = transformation.get("mean", None)
std = transformation.get("std", None)
max_val = transformation.get("max", None)
min_val = transformation.get("min", None)
transformation["method"] = method
transformation["mean"] = mean
transformation["std"] = std
transformation["max"] = max_val
transformation["min"] = min_val
return transformation
return statistics.DataClass(), statistics.DataClass()
elif isinstance(transformation, statistics.DataClass):
return transformation, transformation
elif isinstance(transformation, statistics.TransformationClass):
return copy.deepcopy(transformation.inputs), copy.deepcopy(transformation.targets)
raise NotImplementedError("Cannot handle this.")
def load_data(self):
......@@ -586,8 +553,9 @@ class DataHandlerSingleStation(AbstractDataHandler):
def transform(self, dim: Union[str, int] = 0, method: str = 'standardise', inverse: bool = False, mean=None,
std=None, min_val=None, max_val=None) -> None:
def transform(self, data_class, dim: Union[str, int] = 0, transform_method: str = 'standardise',
inverse: bool = False, mean=None,
std=None, min=None, max=None) -> None:
Transform data according to given transformation settings.
......@@ -607,9 +575,9 @@ class DataHandlerSingleStation(AbstractDataHandler):
calculated over the data in this class instance.
:param std: Used for transformation (if required by 'method') based on external data. If 'None' the std is
calculated over the data in this class instance.
:param min_val: Used for transformation (if required by 'method') based on external data. If 'None' min_val is
:param min: Used for transformation (if required by 'method') based on external data. If 'None' min_val is
extracted from the data in this class instance.
:param max_val: Used for transformation (if required by 'method') based on external data. If 'None' max_val is
:param max: Used for transformation (if required by 'method') based on external data. If 'None' max_val is
extracted from the data in this class instance.
:return: xarray.DataArrays or pandas.DataFrames:
......@@ -619,36 +587,37 @@ class DataHandlerSingleStation(AbstractDataHandler):
def f(data):
if method == 'standardise':
if transform_method == 'standardise':
return statistics.standardise(data, dim)
elif method == 'centre':
elif transform_method == 'centre':
return statistics.centre(data, dim)
elif method == 'normalise':
elif transform_method == 'normalise':
# use min/max of data or given min/max
raise NotImplementedError
raise NotImplementedError
def f_apply(data):
if method == "standardise":
if transform_method == "standardise":
return mean, std, statistics.standardise_apply(data, mean, std)
elif method == "centre":
elif transform_method == "centre":
return mean, None, statistics.centre_apply(data, mean)
raise NotImplementedError
if not inverse:
if self._transform_method is not None:
raise AssertionError(f"Transform method is already set. Therefore, data was already transformed with "
f"{self._transform_method}. Please perform inverse transformation of data first.")
if data_class._method is not None:
raise AssertionError(f"Internal _method is already set. Therefore, data was already transformed with "
f"{data_class._method}. Please perform inverse transformation of data first.")
# apply transformation on local data instance (f) if mean is None, else apply by using mean (and std) from
# external data.
self.mean, self.std, = locals()["f" if mean is None else "f_apply"](
data_class.mean, data_class.std, = locals()["f" if mean is None else "f_apply"](
# set transform method to find correct method for inverse transformation.
self._transform_method = method
data_class._method = transform_method
def check_inverse_transform_params(mean: data_or_none, std: data_or_none, method: str) -> None:
......@@ -670,7 +639,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
if len(msg) > 0:
raise AttributeError(f"Inverse transform {method} can not be executed because following is None: {msg}")
def inverse_transform(self) -> None:
def inverse_transform(self, data_class) -> None:
Perform inverse transformation.
......@@ -690,36 +659,26 @@ class DataHandlerSingleStation(AbstractDataHandler):
raise NotImplementedError
if self._transform_method is None:
if data_class.transform_method is None:
raise AssertionError("Inverse transformation method is not set. Data cannot be inverse transformed.")
self.check_inverse_transform_params(self.mean, self.std, self._transform_method), self.mean, self.std = f_inverse(, self.mean, self.std, self._transform_method)
self._transform_method = None
self.check_inverse_transform_params(data_class.mean, data_class.std, data_class._method), data_class.mean, data_class.std = f_inverse(, data_class.mean, data_class.std,
data_class.transform_method = None
# update X and Y
def get_transformation_information(self, variable: str = None) -> Tuple[data_or_none, data_or_none, str]:
def get_transformation_targets(self) -> Tuple[data_or_none, data_or_none, str]:
Extract transformation statistics and method.
Get mean and standard deviation for given variable and the transformation method if set. If a transformation
Get mean and standard deviation for target values and the transformation method if set. If a transformation
depends only on particular statistics (e.g. only mean is required for centering), the remaining statistics are
returned with None as fill value.
:param variable: Variable for which the information on transformation is requested.
:return: mean, standard deviation and transformation method
variable = self.target_var if variable is None else variable
mean = self.mean.sel({'variables': variable}).values
except AttributeError:
mean = None
std = self.std.sel({'variables': variable}).values
except AttributeError:
std = None
return mean, std, self._transform_method
return self.target_data.mean, self.target_data.std, self.target_data.transform_method
if __name__ == "__main__":
......@@ -732,7 +691,6 @@ if __name__ == "__main__":
time_dim='datetime', window_history_size=7, window_lead_time=3,
) # transformation={'method': 'standardise'})
# sp.set_transformation({'method': 'standardise', 'mean': sp.mean+2, 'std': sp.std+1})
sp2 = DataHandlerSingleStation(data_path='/home/felix/PycharmProjects/mlt_new/data/', station='DEBY122',
statistics_per_var=statistics_per_var, station_type='background',
network='UBA', sampling='daily', target_dim='variables', target_var='o3',
......@@ -145,7 +145,7 @@ class DefaultDataHandler(AbstractDataHandler):
return self.id_class.observation.copy().squeeze()
def get_transformation_Y(self):
return self.id_class.get_transformation_information()
return self.id_class.get_transformation_targets()
def multiply_extremes(self, extreme_values: num_or_list = 1., extremes_on_right_tail_only: bool = False,
timedelta: Tuple[int, str] = (1, 'm'), dim="datetime"):
......@@ -218,26 +218,30 @@ class DefaultDataHandler(AbstractDataHandler):
def transformation(cls, set_stations, **kwargs):
sp_keys = {k: copy.deepcopy(kwargs[k]) for k in cls._requirements if k in kwargs}
transformation_dict = sp_keys.pop("transformation")
if transformation_dict is None:
transformation_class = sp_keys.get("transformation", None)
if transformation_class is None:
scope = transformation_dict.pop("scope")
method = transformation_dict.pop("method")
if transformation_dict.pop("mean", None) is not None:
transformation_inputs = transformation_class.inputs
if transformation_inputs.mean is not None:
mean, std = None, None
means = [None, None]
stds = [None, None]
for station in set_stations:
sp = cls.data_handler_transformation(station, transformation={"method": method}, **sp_keys)
mean = sp.mean.copy(deep=True) if mean is None else mean.combine_first(sp.mean)
std = sp.std.copy(deep=True) if std is None else std.combine_first(sp.std)
sp = cls.data_handler_transformation(station, **sp_keys)
for i, data in enumerate([sp.input_data, sp.target_data]):
means[i] = data.mean.copy(deep=True) if means[i] is None else means[i].combine_first(data.mean)
stds[i] = data.std.copy(deep=True) if stds[i] is None else stds[i].combine_first(data.std)
except (AttributeError, EmptyQueryResult):
if mean is None:
if means[0] is None:
return None
mean_estimated = mean.mean("Stations")
std_estimated = std.mean("Stations")
return {"scope": scope, "method": method, "mean": mean_estimated, "std": std_estimated}
transformation_class.inputs.mean = means[0].mean("Stations")
transformation_class.inputs.std = stds[0].mean("Stations")
transformation_class.targets.mean = means[1].mean("Stations")
transformation_class.targets.std = stds[1].mean("Stations")
return transformation_class
def get_coordinates(self):
return self.id_class.get_coordinates()
\ No newline at end of file
......@@ -11,11 +11,34 @@ import pandas as pd
from typing import Union, Tuple, Dict
from matplotlib import pyplot as plt
from mlair.helpers import to_list
from mlair.helpers import to_list, remove_items
Data = Union[xr.DataArray, pd.DataFrame]
class DataClass:
def __init__(self, data=None, mean=None, std=None, max=None, min=None, transform_method=None): = data
self.mean = mean
self.std = std
self.max = max
self.min = min
self.transform_method = transform_method
self._method = None
def as_dict(self):
return remove_items(self.__dict__, "_method")
class TransformationClass:
def __init__(self, inputs_mean=None, inputs_std=None, inputs_method=None, targets_mean=None, targets_std=None,
self.inputs = DataClass(mean=inputs_mean, std=inputs_std, transform_method=inputs_method)
self.targets = DataClass(mean=targets_mean, std=targets_std, transform_method=targets_method)
def apply_inverse_transformation(data: Data, mean: Data, std: Data = None, method: str = "standardise") -> Data:
Apply inverse transformation for given statistics.
......@@ -137,15 +137,16 @@ class PlotMonthlySummary(AbstractPlotClass):
data_cnn = data.sel(type="CNN").squeeze()
if len(data_cnn.shape) > 1:
data_cnn.coords["ahead"].values = [f"{days}d" for days in data_cnn.coords["ahead"].values]
data_cnn = data_cnn.assign_coords(ahead=[f"{days}d" for days in data_cnn.coords["ahead"].values])
data_obs = data.sel(type="obs", ahead=1).squeeze()
data_obs.coords["ahead"] = "obs"
data_concat = xr.concat([data_obs, data_cnn], dim="ahead")
data_concat = data_concat.drop("type")
data_concat = data_concat.drop_vars("type")
data_concat.index.values = data_concat.index.values.astype("datetime64[M]").astype(int) % 12 + 1
new_index = data_concat.index.values.astype("datetime64[M]").astype(int) % 12 + 1
data_concat = data_concat.assign_coords(index=new_index)
data_concat = data_concat.clip(min=0)
forecasts = xr.concat([forecasts, data_concat], 'index') if forecasts is not None else data_concat
......@@ -399,10 +399,10 @@ class PostProcessing(RunEnvironment):
:return: filled data array with ols predictions
tmp_ols = self.ols_model.predict(input_data)
if not normalised:
tmp_ols = statistics.apply_inverse_transformation(tmp_ols, mean, std, transformation_method)
target_shape = ols_prediction.values.shape
ols_prediction.values = np.swapaxes(tmp_ols, 2, 0) if target_shape != tmp_ols.shape else tmp_ols
if not normalised:
ols_prediction = statistics.apply_inverse_transformation(ols_prediction, mean, std, transformation_method)
return ols_prediction
def _create_persistence_forecast(self, data, persistence_prediction: xr.DataArray, mean: xr.DataArray,
......@@ -423,9 +423,10 @@ class PostProcessing(RunEnvironment):
:return: filled data array with persistence predictions
tmp_persi = data.copy()
if not normalised:
tmp_persi = statistics.apply_inverse_transformation(tmp_persi, mean, std, transformation_method)
persistence_prediction.values = np.tile(tmp_persi, (self.window_lead_time, 1)).T
if not normalised:
persistence_prediction = statistics.apply_inverse_transformation(persistence_prediction, mean, std,
return persistence_prediction
def _create_nn_forecast(self, input_data: xr.DataArray, nn_prediction: xr.DataArray, mean: xr.DataArray,
......@@ -447,8 +448,6 @@ class PostProcessing(RunEnvironment):
:return: filled data array with nn predictions
tmp_nn = self.model.predict(input_data)
if not normalised:
tmp_nn = statistics.apply_inverse_transformation(tmp_nn, mean, std, transformation_method)
if isinstance(tmp_nn, list):
nn_prediction.values = tmp_nn[-1]
elif tmp_nn.ndim == 3:
......@@ -457,6 +456,8 @@ class PostProcessing(RunEnvironment):
nn_prediction.values = tmp_nn
raise NotImplementedError(f"Number of dimension of model output must be 2 or 3, but not {tmp_nn.dims}.")
if not normalised:
nn_prediction = statistics.apply_inverse_transformation(nn_prediction, mean, std, transformation_method)
return nn_prediction
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment