Commit cc7be2a8 authored by lukas leufen's avatar lukas leufen

new data handler using kz filter (not fully completed), there went something...

new data handler using kz filter (not fully completed), there went something wrong in the spectral calculation but is fixed now, tests still missing, updated xarray version
parent 07c8bcf5
Pipeline #50412 passed with stages
in 7 minutes and 52 seconds
"""Data Handler using kz-filtered data."""
__author__ = 'Lukas Leufen'
__date__ = '2020-08-26'
import inspect
import numpy as np
import pandas as pd
import xarray as xr
from typing import List
from mlair.data_handler.data_handler_single_station import DataHandlerSingleStation
from mlair.data_handler import DefaultDataHandler
from mlair.helpers import remove_items, to_list, TimeTrackingWrapper
from mlair.helpers.statistics import KolmogorovZurbenkoFilterMovingWindow as KZFilter
class DataHandlerKzFilterSingleStation(DataHandlerSingleStation):
"""Data handler for a single station to be used by a superior data handler. Data is kz filtered."""
_requirements = remove_items(inspect.getfullargspec(DataHandlerSingleStation).args, ["self", "station"])
def __init__(self, *args, kz_filter_length, kz_filter_iter, **kwargs):
assert kwargs.get("sampling") == "hourly" # This data handler requires hourly data resolution
kz_filter_length = to_list(kz_filter_length)
kz_filter_iter = to_list(kz_filter_iter)
# self.original_data = None # ToDo: implement here something to store unfiltered data
self.kz_filter_length = kz_filter_length
self.kz_filter_iter = kz_filter_iter
self.cutoff_period = None
self.cutoff_period_days = None
super().__init__(*args, **kwargs)
def setup_samples(self):
"""
Setup samples. This method prepares and creates samples X, and labels Y.
"""
self.load_data()
self.interpolate(dim=self.time_dim, method=self.interpolation_method, limit=self.interpolation_limit)
import matplotlib
matplotlib.use("TkAgg")
import matplotlib.pyplot as plt
# self.original_data = self.data # ToDo: implement here something to store unfiltered data
self.apply_kz_filter()
self.data.sel(filter="74d", variables="temp", Stations="DEBW107").plot()
if self.transformation is not None:
self.call_transform()
self.make_samples() # ToDo: target samples are still comming from filtered data
@TimeTrackingWrapper
def apply_kz_filter(self):
kz = KZFilter(self.data, wl=self.kz_filter_length, itr=self.kz_filter_iter, filter_dim="datetime")
filtered_data: List[xr.DataArray] = kz.run()
self.cutoff_period = kz.period_null()
self.cutoff_period_days = kz.period_null_days()
self.data = xr.concat(filtered_data, pd.Index(self.create_filter_index(), name="filter"))
def create_filter_index(self) -> pd.Index:
"""
Round cut off periods in days and append 'res' for residuum index.
Round small numbers (<10) to single decimal, and higher numbers to int. Transform as list of str and append
'res' for residuum index.
"""
index = np.round(self.cutoff_period_days, 1)
f = lambda x: int(np.round(x)) if x >= 10 else np.round(x, 1)
index = list(map(f, index.tolist()))
index = list(map(lambda x: str(x) + "d", index)) + ["res"]
return pd.Index(index, name="filter")
def get_transposed_history(self) -> xr.DataArray:
"""Return history.
:return: history with dimensions datetime, window, Stations, variables.
"""
return self.history.transpose("datetime", "window", "Stations", "variables", "filter").copy()
def get_transposed_label(self) -> xr.DataArray:
"""Return label.
:return: label with dimensions datetime*, window*, Stations, variables.
"""
return self.label.squeeze("Stations").transpose("datetime", "window", "filter").copy()
class DataHandlerKzFilter(DefaultDataHandler):
"""Data handler using kz filtered data."""
data_handler = DataHandlerKzFilterSingleStation
data_handler_transformation = DataHandlerKzFilterSingleStation
_requirements = data_handler.requirements()
......@@ -357,7 +357,7 @@ class KolmogorovZurbenkoBaseClass:
It create the variables associate with the Kolmogorov-Zurbenko-filter.
Args:
df(pd.DataFrame): time series of a variable
df(pd.DataFrame, None): time series of a variable
wl(list of int): window length
itr(list of int): number of iteration
"""
......@@ -373,27 +373,25 @@ class KolmogorovZurbenkoBaseClass:
def set_child(self):
if len(self.wl) > 1:
return KolmogorovZurbenkoBaseClass(self.df, self.wl[1:], self.itr[1:], True, self.filter_dim)
return KolmogorovZurbenkoBaseClass(None, self.wl[1:], self.itr[1:], True, self.filter_dim)
else:
return None
def kz_filter(self, m, k):
def kz_filter(self, df, m, k):
pass
def spectral_calc(self):
kz = self.kz_filter(self.wl[0], self.itr[0])
df_start = self.df
kz = self.kz_filter(df_start, self.wl[0], self.itr[0])
filtered = self.subtract(df_start, kz)
# case I: no child avail -> return kz and remaining
if self.child is None:
if not self._isChild:
return [self.subtract(self.df, kz), kz]
else:
return [kz, kz]
return [kz, filtered]
# case II: has child -> return current kz and all child results
else:
if not self._isChild:
kz_next = self.child.spectral_calc()
return [self.subtract(self.df, kz), self.subtract(kz, kz_next[0])] + kz_next[1:]
else:
kz_next = self.child.spectral_calc()
return [kz, self.subtract(kz, kz_next[0])] + kz_next[1:]
self.child.df = filtered
kz_next = self.child.spectral_calc()
return [kz] + kz_next
@staticmethod
def subtract(minuend, subtrahend):
......@@ -413,9 +411,9 @@ class KolmogorovZurbenkoBaseClass:
def omega_null(self, alpha=0.5):
a = np.sqrt(6) / np.pi
b = 1 / (2 * self.itr[0])
b = 1 / (2 * np.array(self.itr))
c = 1 - alpha ** b
d = self.wl[0] ** 2 - alpha ** b
d = np.array(self.wl) ** 2 - alpha ** b
return a * np.sqrt(c / d)
def period_null(self, alpha=0.5):
......@@ -482,7 +480,7 @@ class KolmogorovZurbenkoFilterMovingWindow(KolmogorovZurbenkoBaseClass):
else:
return None
def kz_filter(self, wl, itr):
def kz_filter(self, df, wl, itr):
"""
It passes the low frequency time series.
......@@ -490,24 +488,28 @@ class KolmogorovZurbenkoFilterMovingWindow(KolmogorovZurbenkoBaseClass):
wl(int): a window length
itr(int): a number of iteration
"""
df_itr = self.df.__deepcopy__()
df_itr = df.__deepcopy__()
try:
kwargs = {"min_periods": 1,
"center": True,
self.filter_dim: wl}
for _ in np.arange(0, itr):
rolling = df_itr.rolling(**kwargs)
if self.method == "median":
df_mv_avg_tmp = rolling.median()
elif self.method == "percentile":
df_mv_avg_tmp = rolling.quantile(self.percentile)
elif self.method == "max":
df_mv_avg_tmp = rolling.max()
elif self.method == "min":
df_mv_avg_tmp = rolling.min()
else:
df_mv_avg_tmp = rolling.mean()
df_itr = df_mv_avg_tmp
iter_vars = df_itr.coords["variables"].values
for var in iter_vars:
df_itr_var = df_itr.sel(variables=[var]).chunk()
for _ in np.arange(0, itr):
rolling = df_itr_var.rolling(**kwargs)
if self.method == "median":
df_mv_avg_tmp = rolling.median()
elif self.method == "percentile":
df_mv_avg_tmp = rolling.quantile(self.percentile)
elif self.method == "max":
df_mv_avg_tmp = rolling.max()
elif self.method == "min":
df_mv_avg_tmp = rolling.min()
else:
df_mv_avg_tmp = rolling.mean()
df_itr_var = df_mv_avg_tmp.compute()
df_itr = df_itr.drop_sel(variables=var).combine_first(df_itr_var)
return df_itr
except ValueError:
raise ValueError
......@@ -61,7 +61,7 @@ typing-extensions
urllib3==1.25.8
wcwidth==0.1.8
Werkzeug==1.0.0
xarray==0.15.0
xarray==0.16.1
zipp==3.1.0
setuptools~=49.6.0
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment