Commit c63cff8a authored by lukas leufen's avatar lukas leufen

Merge branch 'lukas_issue195_feat_kz-filter-dimension' into 'develop'

Resolve "KZ Filter creating additional dimension"

See merge request !177
parents 99539a22 23ce7ee0
Pipeline #51112 passed with stages
in 7 minutes and 55 seconds
join_settings.py
\ No newline at end of file
join_settings.py
join_rest
\ No newline at end of file
__author__ = "Lukas Leufen"
__date__ = '2020-06-25'
from mlair.helpers.statistics import TransformationClass
DEFAULT_STATIONS = ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087']
DEFAULT_VAR_ALL_DICT = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values',
......@@ -13,8 +14,7 @@ DEFAULT_START = "1997-01-01"
DEFAULT_END = "2017-12-31"
DEFAULT_WINDOW_HISTORY_SIZE = 13
DEFAULT_OVERWRITE_LOCAL_DATA = False
# DEFAULT_TRANSFORMATION = {"scope": "data", "method": "standardise", "mean": "estimate"}
DEFAULT_TRANSFORMATION = {"scope": "data", "method": "standardise"}
DEFAULT_TRANSFORMATION = TransformationClass(inputs_method="standardise", targets_method="standardise")
DEFAULT_HPC_LOGIN_LIST = ["ju", "hdfmll"] # ju[wels} #hdfmll(ogin)
DEFAULT_HPC_HOST_LIST = ["jw", "hdfmlc"] # first part of node names for Juwels (jw[comp], hdfmlc(ompute).
DEFAULT_CREATE_NEW_MODEL = True
......@@ -46,15 +46,11 @@ DEFAULT_USE_ALL_STATIONS_ON_ALL_DATA_SETS = True
DEFAULT_EVALUATE_BOOTSTRAPS = True
DEFAULT_CREATE_NEW_BOOTSTRAPS = False
DEFAULT_NUMBER_OF_BOOTSTRAPS = 20
#DEFAULT_PLOT_LIST = ["PlotMonthlySummary", "PlotStationMap", "PlotClimatologicalSkillScore", "PlotTimeSeries",
# "PlotCompetitiveSkillScore", "PlotBootstrapSkillScore", "PlotConditionalQuantiles",
# "PlotAvailability"]
DEFAULT_PLOT_LIST = ["PlotMonthlySummary", "PlotStationMap", "PlotClimatologicalSkillScore",
DEFAULT_PLOT_LIST = ["PlotMonthlySummary", "PlotStationMap", "PlotClimatologicalSkillScore", "PlotTimeSeries",
"PlotCompetitiveSkillScore", "PlotBootstrapSkillScore", "PlotConditionalQuantiles",
"PlotAvailability"]
def get_defaults():
"""Return all default parameters set in defaults.py"""
return {key: value for key, value in globals().items() if key.startswith('DEFAULT')}
......
......@@ -13,4 +13,4 @@ from .bootstraps import BootStraps
from .iterator import KerasIterator, DataCollection
from .default_data_handler import DefaultDataHandler
from .abstract_data_handler import AbstractDataHandler
from .data_preparation_neighbors import DataHandlerNeighbors
from .data_handler_neighbors import DataHandlerNeighbors
......@@ -27,7 +27,10 @@ class AbstractDataHandler:
@classmethod
def own_args(cls, *args):
return remove_items(inspect.getfullargspec(cls).args, ["self"] + list(args))
"""Return all arguments (including kwonlyargs)."""
arg_spec = inspect.getfullargspec(cls)
list_of_args = arg_spec.args + arg_spec.kwonlyargs
return remove_items(list_of_args, ["self"] + list(args))
@classmethod
def transformation(cls, *args, **kwargs):
......
......@@ -10,15 +10,18 @@ import datetime as dt
from mlair.data_handler import AbstractDataHandler
from typing import Union, List
from typing import Union, List, Tuple, Dict
import logging
from functools import reduce
from mlair.helpers.join import EmptyQueryResult
from mlair.helpers import TimeTracking
number = Union[float, int]
num_or_list = Union[number, List[number]]
def run_data_prep():
from .data_preparation_neighbors import DataHandlerNeighbors
from .data_handler_neighbors import DataHandlerNeighbors
data = DummyDataHandler("main_class")
data.get_X()
data.get_Y()
......@@ -33,8 +36,7 @@ def run_data_prep():
def create_data_prep():
from .data_preparation_neighbors import DataHandlerNeighbors
from .data_handler_neighbors import DataHandlerNeighbors
path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "testdata")
station_type = None
network = 'UBA'
......@@ -98,7 +100,7 @@ class DummyDataHandler(AbstractDataHandler):
if __name__ == "__main__":
from mlair.data_handler.station_preparation import DataHandlerSingleStation
from mlair.data_handler.data_handler_single_station import DataHandlerSingleStation
from mlair.data_handler.iterator import KerasIterator, DataCollection
data_prep = create_data_prep()
data_collection = DataCollection(data_prep)
......
"""Data Handler using kz-filtered data."""
__author__ = 'Lukas Leufen'
__date__ = '2020-08-26'
import inspect
import numpy as np
import pandas as pd
import xarray as xr
from typing import List, Union
from mlair.data_handler.data_handler_single_station import DataHandlerSingleStation
from mlair.data_handler import DefaultDataHandler
from mlair.helpers import remove_items, to_list, TimeTrackingWrapper
from mlair.helpers.statistics import KolmogorovZurbenkoFilterMovingWindow as KZFilter
# define a more general date type for type hinting
str_or_list = Union[str, List[str]]
class DataHandlerKzFilterSingleStation(DataHandlerSingleStation):
"""Data handler for a single station to be used by a superior data handler. Inputs are kz filtered."""
_requirements = remove_items(inspect.getfullargspec(DataHandlerSingleStation).args, ["self", "station"])
def __init__(self, *args, kz_filter_length, kz_filter_iter, **kwargs):
assert kwargs.get("sampling") == "hourly" # This data handler requires hourly data resolution
kz_filter_length = to_list(kz_filter_length)
kz_filter_iter = to_list(kz_filter_iter)
# self.original_data = None # ToDo: implement here something to store unfiltered data
self.kz_filter_length = kz_filter_length
self.kz_filter_iter = kz_filter_iter
self.cutoff_period = None
self.cutoff_period_days = None
super().__init__(*args, **kwargs)
def setup_samples(self):
"""
Setup samples. This method prepares and creates samples X, and labels Y.
"""
self.load_data()
self.interpolate(dim=self.time_dim, method=self.interpolation_method, limit=self.interpolation_limit)
self.set_inputs_and_targets()
self.apply_kz_filter()
# this is just a code snippet to check the results of the kz filter
# import matplotlib
# matplotlib.use("TkAgg")
# import matplotlib.pyplot as plt
# self.input_data.data.sel(filter="74d", variables="temp", Stations="DEBW107").plot()
# self.input_data.data.sel(variables="temp", Stations="DEBW107").plot.line(hue="filter")
if self.do_transformation is True:
self.call_transform()
self.make_samples()
@TimeTrackingWrapper
def apply_kz_filter(self):
"""Apply kolmogorov zurbenko filter only on inputs."""
kz = KZFilter(self.input_data.data, wl=self.kz_filter_length, itr=self.kz_filter_iter, filter_dim="datetime")
filtered_data: List[xr.DataArray] = kz.run()
self.cutoff_period = kz.period_null()
self.cutoff_period_days = kz.period_null_days()
self.input_data.data = xr.concat(filtered_data, pd.Index(self.create_filter_index(), name="filter"))
def create_filter_index(self) -> pd.Index:
"""
Round cut off periods in days and append 'res' for residuum index.
Round small numbers (<10) to single decimal, and higher numbers to int. Transform as list of str and append
'res' for residuum index.
"""
index = np.round(self.cutoff_period_days, 1)
f = lambda x: int(np.round(x)) if x >= 10 else np.round(x, 1)
index = list(map(f, index.tolist()))
index = list(map(lambda x: str(x) + "d", index)) + ["res"]
return pd.Index(index, name="filter")
def get_transposed_history(self) -> xr.DataArray:
"""Return history.
:return: history with dimensions datetime, window, Stations, variables.
"""
return self.history.transpose("datetime", "window", "Stations", "variables", "filter").copy()
class DataHandlerKzFilter(DefaultDataHandler):
"""Data handler using kz filtered data."""
data_handler = DataHandlerKzFilterSingleStation
data_handler_transformation = DataHandlerKzFilterSingleStation
_requirements = data_handler.requirements()
......@@ -4,9 +4,9 @@ __date__ = '2020-07-17'
from mlair.helpers import to_list
from mlair.data_handler.station_preparation import DataHandlerSingleStation
from mlair.data_handler import DefaultDataHandler
import os
import copy
from typing import Union, List
......@@ -15,6 +15,7 @@ num_or_list = Union[number, List[number]]
class DataHandlerNeighbors(DefaultDataHandler):
"""Data handler including neighboring stations."""
def __init__(self, id_class, data_path, neighbors=None, min_length=0,
extreme_values: num_or_list = None, extremes_on_right_tail_only: bool = False):
......@@ -24,14 +25,14 @@ class DataHandlerNeighbors(DefaultDataHandler):
@classmethod
def build(cls, station, **kwargs):
sp_keys = {k: kwargs[k] for k in cls._requirements if k in kwargs}
sp = DataHandlerSingleStation(station, **sp_keys)
sp_keys = {k: copy.deepcopy(kwargs[k]) for k in cls._requirements if k in kwargs}
sp = cls.data_handler(station, **sp_keys)
n_list = []
for neighbor in kwargs.get("neighbors", []):
n_list.append(DataHandlerSingleStation(neighbor, **sp_keys))
n_list.append(cls.data_handler(neighbor, **sp_keys))
else:
kwargs["neighbors"] = n_list if len(n_list) > 0 else None
dp_args = {k: kwargs[k] for k in cls.own_args("id_class") if k in kwargs}
dp_args = {k: copy.deepcopy(kwargs[k]) for k in cls.own_args("id_class") if k in kwargs}
return cls(sp, **dp_args)
def _create_collection(self):
......
......@@ -4,6 +4,7 @@ __date__ = '2020-09-21'
import copy
import inspect
import gc
import logging
import os
import pickle
......@@ -15,7 +16,6 @@ import numpy as np
import xarray as xr
from mlair.data_handler.abstract_data_handler import AbstractDataHandler
from mlair.data_handler.station_preparation import DataHandlerSingleStation
from mlair.helpers import remove_items, to_list
from mlair.helpers.join import EmptyQueryResult
......@@ -25,11 +25,14 @@ num_or_list = Union[number, List[number]]
class DefaultDataHandler(AbstractDataHandler):
from mlair.data_handler.data_handler_single_station import DataHandlerSingleStation as data_handler
from mlair.data_handler.data_handler_single_station import DataHandlerSingleStation as data_handler_transformation
_requirements = remove_items(inspect.getfullargspec(DataHandlerSingleStation).args, ["self", "station"])
_requirements = remove_items(inspect.getfullargspec(data_handler).args, ["self", "station"])
def __init__(self, id_class: DataHandlerSingleStation, data_path: str, min_length: int = 0,
extreme_values: num_or_list = None, extremes_on_right_tail_only: bool = False, name_affix=None):
def __init__(self, id_class: data_handler, data_path: str, min_length: int = 0,
extreme_values: num_or_list = None, extremes_on_right_tail_only: bool = False, name_affix=None,
store_processed_data=True):
super().__init__()
self.id_class = id_class
self.interpolation_dim = "datetime"
......@@ -43,12 +46,12 @@ class DefaultDataHandler(AbstractDataHandler):
self._collection = self._create_collection()
self.harmonise_X()
self.multiply_extremes(extreme_values, extremes_on_right_tail_only, dim=self.interpolation_dim)
self._store(fresh_store=True)
self._store(fresh_store=True, store_processed_data=store_processed_data)
@classmethod
def build(cls, station: str, **kwargs):
sp_keys = {k: copy.deepcopy(kwargs[k]) for k in cls._requirements if k in kwargs}
sp = DataHandlerSingleStation(station, **sp_keys)
sp = cls.data_handler(station, **sp_keys)
dp_args = {k: copy.deepcopy(kwargs[k]) for k in cls.own_args("id_class") if k in kwargs}
return cls(sp, **dp_args)
......@@ -61,6 +64,7 @@ class DefaultDataHandler(AbstractDataHandler):
def _reset_data(self):
self._X, self._Y, self._X_extreme, self._Y_extreme = None, None, None, None
gc.collect()
def _cleanup(self):
directory = os.path.dirname(self._save_file)
......@@ -69,13 +73,14 @@ class DefaultDataHandler(AbstractDataHandler):
if os.path.exists(self._save_file):
shutil.rmtree(self._save_file, ignore_errors=True)
def _store(self, fresh_store=False):
self._cleanup() if fresh_store is True else None
data = {"X": self._X, "Y": self._Y, "X_extreme": self._X_extreme, "Y_extreme": self._Y_extreme}
with open(self._save_file, "wb") as f:
pickle.dump(data, f)
logging.debug(f"save pickle data to {self._save_file}")
self._reset_data()
def _store(self, fresh_store=False, store_processed_data=True):
if store_processed_data is True:
self._cleanup() if fresh_store is True else None
data = {"X": self._X, "Y": self._Y, "X_extreme": self._X_extreme, "Y_extreme": self._Y_extreme}
with open(self._save_file, "wb") as f:
pickle.dump(data, f)
logging.debug(f"save pickle data to {self._save_file}")
self._reset_data()
def _load(self):
try:
......@@ -140,7 +145,7 @@ class DefaultDataHandler(AbstractDataHandler):
return self.id_class.observation.copy().squeeze()
def get_transformation_Y(self):
return self.id_class.get_transformation_information()
return self.id_class.get_transformation_targets()
def multiply_extremes(self, extreme_values: num_or_list = 1., extremes_on_right_tail_only: bool = False,
timedelta: Tuple[int, str] = (1, 'm'), dim="datetime"):
......@@ -212,27 +217,55 @@ class DefaultDataHandler(AbstractDataHandler):
@classmethod
def transformation(cls, set_stations, **kwargs):
"""
### supported transformation methods
Currently supported methods are:
* standardise (default, if method is not given)
* centre
### mean and std estimation
Mean and std (depending on method) are estimated. For each station, mean and std are calculated and afterwards
aggregated using the mean value over all station-wise metrics. This method is not exactly accurate, especially
regarding the std calculation but therefore much faster. Furthermore, it is a weighted mean weighted by the
time series length / number of data itself - a longer time series has more influence on the transformation
settings than a short time series. The estimation of the std in less accurate, because the unweighted mean of
all stds in not equal to the true std, but still the mean of all station-wise std is a decent estimate. Finally,
the real accuracy of mean and std is less important, because it is "just" a transformation / scaling.
### mean and std given
If mean and std are not None, the default data handler expects this parameters to match the data and applies
this values to the data. Make sure that all dimensions and/or coordinates are in agreement.
"""
sp_keys = {k: copy.deepcopy(kwargs[k]) for k in cls._requirements if k in kwargs}
transformation_dict = sp_keys.pop("transformation")
if transformation_dict is None:
transformation_class = sp_keys.get("transformation", None)
if transformation_class is None:
return
scope = transformation_dict.pop("scope")
method = transformation_dict.pop("method")
if transformation_dict.pop("mean", None) is not None:
transformation_inputs = transformation_class.inputs
if transformation_inputs.mean is not None:
return
mean, std = None, None
means = [None, None]
stds = [None, None]
for station in set_stations:
try:
sp = DataHandlerSingleStation(station, transformation={"method": method}, **sp_keys)
mean = sp.mean.copy(deep=True) if mean is None else mean.combine_first(sp.mean)
std = sp.std.copy(deep=True) if std is None else std.combine_first(sp.std)
sp = cls.data_handler_transformation(station, **sp_keys)
for i, data in enumerate([sp.input_data, sp.target_data]):
means[i] = data.mean.copy(deep=True) if means[i] is None else means[i].combine_first(data.mean)
stds[i] = data.std.copy(deep=True) if stds[i] is None else stds[i].combine_first(data.std)
except (AttributeError, EmptyQueryResult):
continue
if mean is None:
if means[0] is None:
return None
mean_estimated = mean.mean("Stations")
std_estimated = std.mean("Stations")
return {"scope": scope, "method": method, "mean": mean_estimated, "std": std_estimated}
transformation_class.inputs.mean = means[0].mean("Stations")
transformation_class.inputs.std = stds[0].mean("Stations")
transformation_class.targets.mean = means[1].mean("Stations")
transformation_class.targets.std = stds[1].mean("Stations")
return transformation_class
def get_coordinates(self):
return self.id_class.get_coordinates()
\ No newline at end of file
......@@ -55,7 +55,7 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t
for var in _lower_list(sorted(vars_dict.keys())):
if var in stat_var.keys():
logging.debug('load: {}'.format(var))
logging.debug('load: {}'.format(var)) # ToDo start here for #206
# create data link
opts = {'base': join_url_base, 'service': 'stats', 'id': vars_dict[var], 'statistics': stat_var[var],
......@@ -138,6 +138,7 @@ def load_series_information(station_name: List[str], station_type: str_or_none,
opts = {"base": join_url_base, "service": "series", "station_id": station_name[0], "station_type": station_type,
"network_name": network_name}
station_vars = get_data(opts, headers)
logging.debug(f"{station_name}: {station_vars}") # ToDo start here for #206
vars_dict = {item[3].lower(): item[0] for item in station_vars}
return vars_dict
......
......@@ -9,10 +9,36 @@ import numpy as np
import xarray as xr
import pandas as pd
from typing import Union, Tuple, Dict
from matplotlib import pyplot as plt
from mlair.helpers import to_list, remove_items
Data = Union[xr.DataArray, pd.DataFrame]
class DataClass:
def __init__(self, data=None, mean=None, std=None, max=None, min=None, transform_method=None):
self.data = data
self.mean = mean
self.std = std
self.max = max
self.min = min
self.transform_method = transform_method
self._method = None
def as_dict(self):
return remove_items(self.__dict__, "_method")
class TransformationClass:
def __init__(self, inputs_mean=None, inputs_std=None, inputs_method=None, targets_mean=None, targets_std=None,
targets_method=None):
self.inputs = DataClass(mean=inputs_mean, std=inputs_std, transform_method=inputs_method)
self.targets = DataClass(mean=targets_mean, std=targets_std, transform_method=targets_method)
def apply_inverse_transformation(data: Data, mean: Data, std: Data = None, method: str = "standardise") -> Data:
"""
Apply inverse transformation for given statistics.
......@@ -345,3 +371,168 @@ class SkillScores:
monthly_mean[monthly_mean.index.dt.month == month, :] = mu[mu.month == month].values
return monthly_mean
class KolmogorovZurbenkoBaseClass:
def __init__(self, df, wl, itr, is_child=False, filter_dim="window"):
"""
It create the variables associate with the Kolmogorov-Zurbenko-filter.
Args:
df(pd.DataFrame, None): time series of a variable
wl(list of int): window length
itr(list of int): number of iteration
"""
self.df = df
self.filter_dim = filter_dim
self.wl = to_list(wl)
self.itr = to_list(itr)
if abs(len(self.wl) - len(self.itr)) > 0:
raise ValueError("Length of lists for wl and itr must agree!")
self._isChild = is_child
self.child = self.set_child()
self.type = type(self).__name__
def set_child(self):
if len(self.wl) > 1:
return KolmogorovZurbenkoBaseClass(None, self.wl[1:], self.itr[1:], True, self.filter_dim)
else:
return None
def kz_filter(self, df, m, k):
pass
def spectral_calc(self):
df_start = self.df
kz = self.kz_filter(df_start, self.wl[0], self.itr[0])
filtered = self.subtract(df_start, kz)
# case I: no child avail -> return kz and remaining
if self.child is None:
return [kz, filtered]
# case II: has child -> return current kz and all child results
else:
self.child.df = filtered
kz_next = self.child.spectral_calc()
return [kz] + kz_next
@staticmethod
def subtract(minuend, subtrahend):
try: # pandas implementation
return minuend.sub(subtrahend, axis=0)
except AttributeError: # general implementation
return minuend - subtrahend
def run(self):
return self.spectral_calc()
def transfer_function(self):
m = self.wl[0]
k = self.itr[0]
omega = np.linspace(0.00001, 0.15, 5000)
return omega, (np.sin(m * np.pi * omega) / (m * np.sin(np.pi * omega))) ** (2 * k)
def omega_null(self, alpha=0.5):
a = np.sqrt(6) / np.pi
b = 1 / (2 * np.array(self.itr))
c = 1 - alpha ** b
d = np.array(self.wl) ** 2 - alpha ** b
return a * np.sqrt(c / d)
def period_null(self, alpha=0.5):
return 1. / self.omega_null(alpha)
def period_null_days(self, alpha=0.5):
return self.period_null(alpha) / 24.
def plot_transfer_function(self, fig=None, name=None):
if fig is None:
fig = plt.figure()
omega, transfer_function = self.transfer_function()
if self.child is not None:
transfer_function_child = self.child.plot_transfer_function(fig)
else:
transfer_function_child = transfer_function * 0
plt.semilogx(omega, transfer_function - transfer_function_child,
label="m={:3.0f}, k={:3.0f}, T={:6.2f}d".format(self.wl[0],
self.itr[0],
self.period_null_days()))
plt.axvline(x=self.omega_null())
if not self._isChild:
locs, labels = plt.xticks()
plt.xticks(locs, np.round(1. / (locs * 24), 1))
plt.xlim([0.00001, 0.15])
plt.legend()
if name is None:
plt.show()
else:
plt.savefig(name)
else:
return transfer_function
class KolmogorovZurbenkoFilterMovingWindow(KolmogorovZurbenkoBaseClass):
def __init__(self, df, wl: Union[list, int], itr: Union[list, int], is_child=False, filter_dim="window",
method="mean", percentile=0.5):
"""
It create the variables associate with the KolmogorovZurbenkoFilterMovingWindow class.
Args:
df(pd.DataFrame, xr.DataArray): time series of a variable
wl: window length
itr: number of iteration
"""
self.valid_methods = ["mean", "percentile", "median", "max", "min"]
if method not in self.valid_methods:
raise ValueError("Method '{}' is not supported. Please select from [{}].".format(
method, ", ".join(self.valid_methods)))
else:
self.method = method
if percentile > 1 or percentile < 0:
raise ValueError("Percentile must be in range [0, 1]. Given was {}!".format(percentile))
else:
self.percentile = percentile
super().__init__(df, wl, itr, is_child, filter_dim)
def set_child(self):
if len(self.wl) > 1:
return KolmogorovZurbenkoFilterMovingWindow(self.df, self.wl[1:], self.itr[1:], is_child=True,
filter_dim=self.filter_dim, method=self.method,
percentile=self.percentile)
else:
return None
def kz_filter(self, df, wl, itr):
"""
It passes the low frequency time series.
Args:
wl(int): a window length
itr(int): a number of iteration
"""
df_itr = df.__deepcopy__()
try:
kwargs = {"min_periods": 1,
"center": True,
self.filter_dim: wl}
iter_vars = df_itr.coords["variables"].values
for var in iter_vars:
df_itr_var = df_itr.sel(variables=[var]).chunk()
for _ in np.arange(0, itr):
rolling = df_itr_var.rolling(**kwargs)
if self.method == "median":
df_mv_avg_tmp = rolling.median()
elif self.method == "percentile":
df_mv_avg_tmp = rolling.quantile(self.percentile)
elif self.method == "max":
df_mv_avg_tmp = rolling.max()
elif self.method == "min":
df_mv_avg_tmp = rolling.min()
else:
df_mv_avg_tmp = rolling.mean()
df_itr_var = df_mv_avg_tmp.compute()
df_itr = df_itr.drop_sel(variables=var).combine_first(df_itr_var)
return df_itr