Commit 0c4f523c authored by lukas leufen's avatar lukas leufen

Merge branch 'lukas_issue202_refac_transformation-setup' into...

Merge branch 'lukas_issue202_refac_transformation-setup' into 'lukas_issue195_feat_kz-filter-dimension'

Resolve "REFAC: transformation setup"

See merge request !181
parents 7c403660 0d92f77e
Pipeline #50946 passed with stages
in 7 minutes and 54 seconds
__author__ = "Lukas Leufen"
__date__ = '2020-06-25'
from mlair.helpers.statistics import TransformationClass
DEFAULT_STATIONS = ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087']
DEFAULT_VAR_ALL_DICT = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values',
......@@ -13,8 +14,7 @@ DEFAULT_START = "1997-01-01"
DEFAULT_END = "2017-12-31"
DEFAULT_WINDOW_HISTORY_SIZE = 13
DEFAULT_OVERWRITE_LOCAL_DATA = False
# DEFAULT_TRANSFORMATION = {"scope": "data", "method": "standardise", "mean": "estimate"}
DEFAULT_TRANSFORMATION = {"scope": "data", "method": "standardise"}
DEFAULT_TRANSFORMATION = TransformationClass(inputs_method="standardise", targets_method="standardise")
DEFAULT_HPC_LOGIN_LIST = ["ju", "hdfmll"] # ju[wels} #hdfmll(ogin)
DEFAULT_HPC_HOST_LIST = ["jw", "hdfmlc"] # first part of node names for Juwels (jw[comp], hdfmlc(ompute).
DEFAULT_CREATE_NEW_MODEL = True
......
......@@ -32,7 +32,6 @@ class DataHandlerKzFilterSingleStation(DataHandlerSingleStation):
self.kz_filter_iter = kz_filter_iter
self.cutoff_period = None
self.cutoff_period_days = None
self.data_target: xr.DataArray = None
super().__init__(*args, **kwargs)
def setup_samples(self):
......@@ -41,26 +40,25 @@ class DataHandlerKzFilterSingleStation(DataHandlerSingleStation):
"""
self.load_data()
self.interpolate(dim=self.time_dim, method=self.interpolation_method, limit=self.interpolation_limit)
self.set_inputs_and_targets()
import matplotlib
matplotlib.use("TkAgg")
import matplotlib.pyplot as plt
# self.original_data = self.data # ToDo: implement here something to store unfiltered data
self.apply_kz_filter()
# self.data.sel(filter="74d", variables="temp", Stations="DEBW107").plot()
# self.data.sel(variables="temp", Stations="DEBW107").plot.line(hue="filter")
if self.transformation is not None:
if self.do_transformation is True:
self.call_transform()
self.make_samples() # ToDo: target samples are still coming from filtered data
self.make_samples()
@TimeTrackingWrapper
def apply_kz_filter(self):
"""Apply kolmogorov zurbenko filter only on inputs."""
self.data_target = self.data.sel({self.target_dim: [self.target_var]})
kz = KZFilter(self.data, wl=self.kz_filter_length, itr=self.kz_filter_iter, filter_dim="datetime")
kz = KZFilter(self.input_data.data, wl=self.kz_filter_length, itr=self.kz_filter_iter, filter_dim="datetime")
filtered_data: List[xr.DataArray] = kz.run()
self.cutoff_period = kz.period_null()
self.cutoff_period_days = kz.period_null_days()
self.data = xr.concat(filtered_data, pd.Index(self.create_filter_index(), name="filter"))
self.input_data.data = xr.concat(filtered_data, pd.Index(self.create_filter_index(), name="filter"))
def create_filter_index(self) -> pd.Index:
"""
......@@ -75,36 +73,6 @@ class DataHandlerKzFilterSingleStation(DataHandlerSingleStation):
index = list(map(lambda x: str(x) + "d", index)) + ["res"]
return pd.Index(index, name="filter")
def make_labels(self, dim_name_of_target: str, target_var: str_or_list, dim_name_of_shift: str,
window: int) -> None:
"""
Create a xr.DataArray containing labels.
Labels are defined as the consecutive target values (t+1, ...t+n) following the current time step t. Set label
attribute.
:param dim_name_of_target: Name of dimension which contains the target variable
:param target_var: Name of target variable in 'dimension'
:param dim_name_of_shift: Name of dimension on which xarray.DataArray.shift will be applied
:param window: lead time of label
"""
window = abs(window)
data = self.data_target.sel({dim_name_of_target: target_var})
self.label = self.shift(data, dim_name_of_shift, window)
def make_observation(self, dim_name_of_target: str, target_var: str_or_list, dim_name_of_shift: str) -> None:
"""
Create a xr.DataArray containing observations.
Observations are defined as value of the current time step t. Set observation attribute.
:param dim_name_of_target: Name of dimension which contains the observation variable
:param target_var: Name of observation variable(s) in 'dimension'
:param dim_name_of_shift: Name of dimension on which xarray.DataArray.shift will be applied
"""
data = self.data_target.sel({dim_name_of_target: target_var})
self.observation = self.shift(data, dim_name_of_shift, 0)
def get_transposed_history(self) -> xr.DataArray:
"""Return history.
......
......@@ -145,7 +145,7 @@ class DefaultDataHandler(AbstractDataHandler):
return self.id_class.observation.copy().squeeze()
def get_transformation_Y(self):
return self.id_class.get_transformation_information()
return self.id_class.get_transformation_targets()
def multiply_extremes(self, extreme_values: num_or_list = 1., extremes_on_right_tail_only: bool = False,
timedelta: Tuple[int, str] = (1, 'm'), dim="datetime"):
......@@ -217,27 +217,55 @@ class DefaultDataHandler(AbstractDataHandler):
@classmethod
def transformation(cls, set_stations, **kwargs):
"""
### supported transformation methods
Currently supported methods are:
* standardise (default, if method is not given)
* centre
### mean and std estimation
Mean and std (depending on method) are estimated. For each station, mean and std are calculated and afterwards
aggregated using the mean value over all station-wise metrics. This method is not exactly accurate, especially
regarding the std calculation but therefore much faster. Furthermore, it is a weighted mean weighted by the
time series length / number of data itself - a longer time series has more influence on the transformation
settings than a short time series. The estimation of the std in less accurate, because the unweighted mean of
all stds in not equal to the true std, but still the mean of all station-wise std is a decent estimate. Finally,
the real accuracy of mean and std is less important, because it is "just" a transformation / scaling.
### mean and std given
If mean and std are not None, the default data handler expects this parameters to match the data and applies
this values to the data. Make sure that all dimensions and/or coordinates are in agreement.
"""
sp_keys = {k: copy.deepcopy(kwargs[k]) for k in cls._requirements if k in kwargs}
transformation_dict = sp_keys.pop("transformation")
if transformation_dict is None:
transformation_class = sp_keys.get("transformation", None)
if transformation_class is None:
return
scope = transformation_dict.pop("scope")
method = transformation_dict.pop("method")
if transformation_dict.pop("mean", None) is not None:
transformation_inputs = transformation_class.inputs
if transformation_inputs.mean is not None:
return
mean, std = None, None
means = [None, None]
stds = [None, None]
for station in set_stations:
try:
sp = cls.data_handler_transformation(station, transformation={"method": method}, **sp_keys)
mean = sp.mean.copy(deep=True) if mean is None else mean.combine_first(sp.mean)
std = sp.std.copy(deep=True) if std is None else std.combine_first(sp.std)
sp = cls.data_handler_transformation(station, **sp_keys)
for i, data in enumerate([sp.input_data, sp.target_data]):
means[i] = data.mean.copy(deep=True) if means[i] is None else means[i].combine_first(data.mean)
stds[i] = data.std.copy(deep=True) if stds[i] is None else stds[i].combine_first(data.std)
except (AttributeError, EmptyQueryResult):
continue
if mean is None:
if means[0] is None:
return None
mean_estimated = mean.mean("Stations")
std_estimated = std.mean("Stations")
return {"scope": scope, "method": method, "mean": mean_estimated, "std": std_estimated}
transformation_class.inputs.mean = means[0].mean("Stations")
transformation_class.inputs.std = stds[0].mean("Stations")
transformation_class.targets.mean = means[1].mean("Stations")
transformation_class.targets.std = stds[1].mean("Stations")
return transformation_class
def get_coordinates(self):
return self.id_class.get_coordinates()
\ No newline at end of file
......@@ -11,11 +11,34 @@ import pandas as pd
from typing import Union, Tuple, Dict
from matplotlib import pyplot as plt
from mlair.helpers import to_list
from mlair.helpers import to_list, remove_items
Data = Union[xr.DataArray, pd.DataFrame]
class DataClass:
def __init__(self, data=None, mean=None, std=None, max=None, min=None, transform_method=None):
self.data = data
self.mean = mean
self.std = std
self.max = max
self.min = min
self.transform_method = transform_method
self._method = None
def as_dict(self):
return remove_items(self.__dict__, "_method")
class TransformationClass:
def __init__(self, inputs_mean=None, inputs_std=None, inputs_method=None, targets_mean=None, targets_std=None,
targets_method=None):
self.inputs = DataClass(mean=inputs_mean, std=inputs_std, transform_method=inputs_method)
self.targets = DataClass(mean=targets_mean, std=targets_std, transform_method=targets_method)
def apply_inverse_transformation(data: Data, mean: Data, std: Data = None, method: str = "standardise") -> Data:
"""
Apply inverse transformation for given statistics.
......
......@@ -137,15 +137,16 @@ class PlotMonthlySummary(AbstractPlotClass):
data_cnn = data.sel(type="CNN").squeeze()
if len(data_cnn.shape) > 1:
data_cnn.coords["ahead"].values = [f"{days}d" for days in data_cnn.coords["ahead"].values]
data_cnn = data_cnn.assign_coords(ahead=[f"{days}d" for days in data_cnn.coords["ahead"].values])
data_obs = data.sel(type="obs", ahead=1).squeeze()
data_obs.coords["ahead"] = "obs"
data_concat = xr.concat([data_obs, data_cnn], dim="ahead")
data_concat = data_concat.drop("type")
data_concat = data_concat.drop_vars("type")
data_concat.index.values = data_concat.index.values.astype("datetime64[M]").astype(int) % 12 + 1
new_index = data_concat.index.values.astype("datetime64[M]").astype(int) % 12 + 1
data_concat = data_concat.assign_coords(index=new_index)
data_concat = data_concat.clip(min=0)
forecasts = xr.concat([forecasts, data_concat], 'index') if forecasts is not None else data_concat
......
......@@ -399,10 +399,10 @@ class PostProcessing(RunEnvironment):
:return: filled data array with ols predictions
"""
tmp_ols = self.ols_model.predict(input_data)
if not normalised:
tmp_ols = statistics.apply_inverse_transformation(tmp_ols, mean, std, transformation_method)
target_shape = ols_prediction.values.shape
ols_prediction.values = np.swapaxes(tmp_ols, 2, 0) if target_shape != tmp_ols.shape else tmp_ols
if not normalised:
ols_prediction = statistics.apply_inverse_transformation(ols_prediction, mean, std, transformation_method)
return ols_prediction
def _create_persistence_forecast(self, data, persistence_prediction: xr.DataArray, mean: xr.DataArray,
......@@ -423,9 +423,10 @@ class PostProcessing(RunEnvironment):
:return: filled data array with persistence predictions
"""
tmp_persi = data.copy()
if not normalised:
tmp_persi = statistics.apply_inverse_transformation(tmp_persi, mean, std, transformation_method)
persistence_prediction.values = np.tile(tmp_persi, (self.window_lead_time, 1)).T
if not normalised:
persistence_prediction = statistics.apply_inverse_transformation(persistence_prediction, mean, std,
transformation_method)
return persistence_prediction
def _create_nn_forecast(self, input_data: xr.DataArray, nn_prediction: xr.DataArray, mean: xr.DataArray,
......@@ -447,8 +448,6 @@ class PostProcessing(RunEnvironment):
:return: filled data array with nn predictions
"""
tmp_nn = self.model.predict(input_data)
if not normalised:
tmp_nn = statistics.apply_inverse_transformation(tmp_nn, mean, std, transformation_method)
if isinstance(tmp_nn, list):
nn_prediction.values = tmp_nn[-1]
elif tmp_nn.ndim == 3:
......@@ -457,6 +456,8 @@ class PostProcessing(RunEnvironment):
nn_prediction.values = tmp_nn
else:
raise NotImplementedError(f"Number of dimension of model output must be 2 or 3, but not {tmp_nn.dims}.")
if not normalised:
nn_prediction = statistics.apply_inverse_transformation(nn_prediction, mean, std, transformation_method)
return nn_prediction
@staticmethod
......
from mlair.configuration.defaults import *
class TestGetDefaults:
def test_get_defaults(self):
defaults = get_defaults()
assert isinstance(defaults, dict)
assert all(map(lambda k: k in defaults.keys(), ["DEFAULT_STATIONS", "DEFAULT_BATCH_SIZE", "DEFAULT_PLOT_LIST"]))
assert all(map(lambda x: x.startswith("DEFAULT"), defaults.keys()))
class TestAllDefaults:
def test_training_parameters(self):
assert DEFAULT_CREATE_NEW_MODEL is True
assert DEFAULT_TRAIN_MODEL is True
assert DEFAULT_FRACTION_OF_TRAINING == 0.8
assert DEFAULT_EXTREME_VALUES is None
assert DEFAULT_EXTREMES_ON_RIGHT_TAIL_ONLY is False
assert DEFAULT_PERMUTE_DATA is False
assert DEFAULT_BATCH_SIZE == int(256 * 2)
assert DEFAULT_EPOCHS == 20
def test_data_handler_parameters(self):
assert DEFAULT_STATIONS == ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087']
assert DEFAULT_VAR_ALL_DICT == {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum',
'u': 'average_values',
'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu',
'cloudcover': 'average_values',
'pblheight': 'maximum'}
assert DEFAULT_NETWORK == "AIRBASE"
assert DEFAULT_STATION_TYPE == "background"
assert DEFAULT_VARIABLES == DEFAULT_VAR_ALL_DICT.keys()
assert DEFAULT_START == "1997-01-01"
assert DEFAULT_END == "2017-12-31"
assert DEFAULT_WINDOW_HISTORY_SIZE == 13
assert DEFAULT_OVERWRITE_LOCAL_DATA is False
assert isinstance(DEFAULT_TRANSFORMATION, TransformationClass)
assert DEFAULT_TRANSFORMATION.inputs.transform_method == "standardise"
assert DEFAULT_TRANSFORMATION.targets.transform_method == "standardise"
assert DEFAULT_TARGET_VAR == "o3"
assert DEFAULT_TARGET_DIM == "variables"
assert DEFAULT_WINDOW_LEAD_TIME == 3
assert DEFAULT_DIMENSIONS == {"new_index": ["datetime", "Stations"]}
assert DEFAULT_TIME_DIM == "datetime"
assert DEFAULT_INTERPOLATION_METHOD == "linear"
assert DEFAULT_INTERPOLATION_LIMIT == 1
def test_subset_parameters(self):
assert DEFAULT_TRAIN_START == "1997-01-01"
assert DEFAULT_TRAIN_END == "2007-12-31"
assert DEFAULT_TRAIN_MIN_LENGTH == 90
assert DEFAULT_VAL_START == "2008-01-01"
assert DEFAULT_VAL_END == "2009-12-31"
assert DEFAULT_VAL_MIN_LENGTH == 90
assert DEFAULT_TEST_START == "2010-01-01"
assert DEFAULT_TEST_END == "2017-12-31"
assert DEFAULT_TEST_MIN_LENGTH == 90
assert DEFAULT_TRAIN_VAL_MIN_LENGTH == 180
assert DEFAULT_USE_ALL_STATIONS_ON_ALL_DATA_SETS is True
def test_hpc_parameters(self):
assert DEFAULT_HPC_HOST_LIST == ["jw", "hdfmlc"]
assert DEFAULT_HPC_LOGIN_LIST == ["ju", "hdfmll"]
def test_postprocessing_parameters(self):
assert DEFAULT_EVALUATE_BOOTSTRAPS is True
assert DEFAULT_CREATE_NEW_BOOTSTRAPS is False
assert DEFAULT_NUMBER_OF_BOOTSTRAPS == 20
assert DEFAULT_PLOT_LIST == ["PlotMonthlySummary", "PlotStationMap", "PlotClimatologicalSkillScore",
"PlotTimeSeries", "PlotCompetitiveSkillScore", "PlotBootstrapSkillScore",
"PlotConditionalQuantiles", "PlotAvailability"]
......@@ -3,7 +3,9 @@ import pandas as pd
import pytest
import xarray as xr
from mlair.helpers.statistics import standardise, standardise_inverse, standardise_apply, centre, centre_inverse, centre_apply, \
from mlair.helpers.statistics import DataClass, TransformationClass
from mlair.helpers.statistics import standardise, standardise_inverse, standardise_apply, centre, centre_inverse, \
centre_apply, \
apply_inverse_transformation
lazy = pytest.lazy_fixture
......@@ -113,3 +115,50 @@ class TestCentre:
data = centre_apply(data_orig, mean)
mean_expected = np.array([2, -5, 10]) - np.array([2, 10, 3])
assert np.testing.assert_almost_equal(data.mean(dim), mean_expected, decimal=1) is None
class TestDataClass:
def test_init(self):
dc = DataClass()
assert all([obj is None for obj in [dc.data, dc.mean, dc.std, dc.max, dc.min, dc.transform_method, dc._method]])
def test_init_values(self):
dc = DataClass(data=12, mean=2, std="test", max=23.4, min=np.array([3]), transform_method="f")
assert dc.data == 12
assert dc.mean == 2
assert dc.std == "test"
assert dc.max == 23.4
assert np.testing.assert_array_equal(dc.min, np.array([3])) is None
assert dc.transform_method == "f"
assert dc._method is None
def test_as_dict(self):
dc = DataClass(std=23)
dc._method = "f(x)"
assert dc.as_dict() == {"data": None, "mean": None, "std": 23, "max": None, "min": None,
"transform_method": None}
class TestTransformationClass:
def test_init(self):
tc = TransformationClass()
assert hasattr(tc, "inputs")
assert isinstance(tc.inputs, DataClass)
assert hasattr(tc, "targets")
assert isinstance(tc.targets, DataClass)
assert tc.inputs.mean is None
assert tc.targets.std is None
def test_init_values(self):
tc = TransformationClass(inputs_mean=1, inputs_std=2, inputs_method="f", targets_mean=3, targets_std=4,
targets_method="g")
assert tc.inputs.mean == 1
assert tc.inputs.std == 2
assert tc.inputs.transform_method == "f"
assert tc.inputs.max is None
assert tc.targets.mean == 3
assert tc.targets.std == 4
assert tc.targets.transform_method == "g"
assert tc.inputs.min is None
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment