Commit 8a330277 authored by lukas leufen's avatar lukas leufen

Merge branch 'develop' into 'master'

release for v0.7.0

Closes #54, #57, #38, #40, #45, #42, and #43

See merge request toar/machinelearningtools!50
parents 6db0375f 0b347cc3
Pipeline #31037 passed with stages
in 6 minutes and 30 seconds
......@@ -59,3 +59,7 @@ htmlcov/
report.html
/TestExperiment/
/testrun_network*/
# secret variables #
####################
/src/join_settings.py
\ No newline at end of file
......@@ -12,4 +12,68 @@ and [Network In Network (Lin et al., 2014)](https://arxiv.org/abs/1312.4400).
# Installation
* Install __proj__ on your machine using the console. E.g. for opensuse / leap `zypper install proj`
* c++ compiler required for cartopy installation
\ No newline at end of file
* c++ compiler required for cartopy installation
# Security
* To use hourly data from ToarDB via JOIN interface, a private token is required. Request your personal access token and
add it to `src/join_settings.py` in the hourly data section. Replace the `TOAR_SERVICE_URL` and the `Authorization`
value. To make sure, that this **sensitive** data is not uploaded to the remote server, use the following command to
prevent git from tracking this file: `git update-index --assume-unchanged src/join_settings.py
`
# Customise your experiment
This section summarises which parameters can be customised for a training.
## Transformation
There are two different approaches (called scopes) to transform the data:
1) `station`: transform data for each station independently (somehow like batch normalisation)
1) `data`: transform all data of each station with shared metrics
Transformation must be set by the `transformation` attribute. If `transformation = None` is given to `ExperimentSetup`,
data is not transformed at all. For all other setups, use the following dictionary structure to specify the
transformation.
```
transformation = {"scope": <...>,
"method": <...>,
"mean": <...>,
"std": <...>}
ExperimentSetup(..., transformation=transformation, ...)
```
### scopes
**station**: mean and std are not used
**data**: either provide already calculated values for mean and std (if required by transformation method), or choose
from different calculation schemes, explained in the mean and std section.
### supported transformation methods
Currently supported methods are:
* standardise (default, if method is not given)
* centre
### mean and std
`"mean"="accurate"`: calculate the accurate values of mean and std (depending on method) by using all data. Although,
this method is accurate, it may take some time for the calculation. Furthermore, this could potentially lead to memory
issue (not explored yet, but could appear for a very big amount of data)
`"mean"="estimate"`: estimate mean and std (depending on method). For each station, mean and std are calculated and
afterwards aggregated using the mean value over all station-wise metrics. This method is less accurate, especially
regarding the std calculation but therefore much faster.
We recommend to use the later method *estimate* because of following reasons:
* much faster calculation
* real accuracy of mean and std is less important, because it is "just" a transformation / scaling
* accuracy of mean is almost as high as in the *accurate* case, because of
$\bar{x_{ij}} = \bar{\left(\bar{x_i}\right)_j}$. The only difference is, that in the *estimate* case, each mean is
equally weighted for each station independently of the actual data count of the station.
* accuracy of std is lower for *estimate* because of $\var{x_{ij}} \ne \bar{\left(\var{x_i}\right)_j}$, but still the mean of all
station-wise std is a decent estimate of the true std.
`"mean"=<value, e.g. xr.DataArray>`: If mean and std are already calculated or shall be set manually, just add the
scaling values instead of the calculation method. For method *centre*, std can still be None, but is required for the
*standardise* method. **Important**: Format of given values **must** match internal data format of DataPreparation
class: `xr.DataArray` with `dims=["variables"]` and one value for each variable.
\ No newline at end of file
......@@ -12,11 +12,11 @@ import numpy as np
class Distributor(keras.utils.Sequence):
def __init__(self, generator: keras.utils.Sequence, model: keras.models, batch_size: int = 256,
fit_call: bool = True):
permute_data: bool = False):
self.generator = generator
self.model = model
self.batch_size = batch_size
self.fit_call = fit_call
self.do_data_permutation = permute_data
def _get_model_rank(self):
mod_out = self.model.output_shape
......@@ -33,6 +33,16 @@ class Distributor(keras.utils.Sequence):
def _get_number_of_mini_batches(self, values):
return math.ceil(values[0].shape[0] / self.batch_size)
def _permute_data(self, x, y):
"""
Permute inputs x and labels y
"""
if self.do_data_permutation:
p = np.random.permutation(len(x)) # equiv to .shape[0]
x = x[p]
y = y[p]
return x, y
def distribute_on_batches(self, fit_call=True):
while True:
for k, v in enumerate(self.generator):
......@@ -42,6 +52,8 @@ class Distributor(keras.utils.Sequence):
num_mini_batches = self._get_number_of_mini_batches(v)
x_total = np.copy(v[0])
y_total = np.copy(v[1])
# permute order for mini-batches
x_total, y_total = self._permute_data(x_total, y_total)
for prev, curr in enumerate(range(1, num_mini_batches+1)):
x = x_total[prev*self.batch_size:curr*self.batch_size, ...]
y = [y_total[prev*self.batch_size:curr*self.batch_size, ...] for _ in range(mod_rank)]
......
......@@ -2,8 +2,9 @@ __author__ = 'Felix Kleinert, Lukas Leufen'
__date__ = '2019-11-07'
import os
from typing import Union, List, Tuple, Any
from typing import Union, List, Tuple, Any, Dict
import dask.array as da
import keras
import xarray as xr
import pickle
......@@ -11,6 +12,7 @@ import logging
from src import helpers
from src.data_handling.data_preparation import DataPrep
from src.join import EmptyQueryResult
class DataGenerator(keras.utils.Sequence):
......@@ -25,7 +27,7 @@ class DataGenerator(keras.utils.Sequence):
def __init__(self, data_path: str, network: str, stations: Union[str, List[str]], variables: List[str],
interpolate_dim: str, target_dim: str, target_var: str, station_type: str = None,
interpolate_method: str = "linear", limit_nan_fill: int = 1, window_history_size: int = 7,
window_lead_time: int = 4, transform_method: str = "standardise", **kwargs):
window_lead_time: int = 4, transformation: Dict = None, **kwargs):
self.data_path = os.path.abspath(data_path)
self.data_path_tmp = os.path.join(os.path.abspath(data_path), "tmp")
if not os.path.exists(self.data_path_tmp):
......@@ -41,8 +43,8 @@ class DataGenerator(keras.utils.Sequence):
self.limit_nan_fill = limit_nan_fill
self.window_history_size = window_history_size
self.window_lead_time = window_lead_time
self.transform_method = transform_method
self.kwargs = kwargs
self.transformation = self.setup_transformation(transformation)
def __repr__(self):
"""
......@@ -94,18 +96,86 @@ class DataGenerator(keras.utils.Sequence):
data = self.get_data_generator(key=item)
return data.get_transposed_history(), data.label.squeeze("Stations").transpose("datetime", "window")
def get_data_generator(self, key: Union[str, int] = None, local_tmp_storage: bool = True) -> DataPrep:
def setup_transformation(self, transformation):
if transformation is None:
return
transformation = transformation.copy()
scope = transformation.get("scope", "station")
method = transformation.get("method", "standardise")
mean = transformation.get("mean", None)
std = transformation.get("std", None)
if scope == "data":
if isinstance(mean, str):
if mean == "accurate":
mean, std = self.calculate_accurate_transformation(method)
elif mean == "estimate":
mean, std = self.calculate_estimated_transformation(method)
else:
raise ValueError(f"given mean attribute must either be equal to strings 'accurate' or 'estimate' or"
f"be an array with already calculated means. Given was: {mean}")
elif scope == "station":
mean, std = None, None
else:
raise ValueError(f"Scope argument can either be 'station' or 'data'. Given was: {scope}")
transformation["method"] = method
transformation["mean"] = mean
transformation["std"] = std
return transformation
def calculate_accurate_transformation(self, method):
tmp = []
mean = None
std = None
for station in self.stations:
try:
data = DataPrep(self.data_path, self.network, station, self.variables, station_type=self.station_type,
**self.kwargs)
chunks = (1, 100, data.data.shape[2])
tmp.append(da.from_array(data.data.data, chunks=chunks))
except EmptyQueryResult:
continue
tmp = da.concatenate(tmp, axis=1)
if method in ["standardise", "centre"]:
mean = da.nanmean(tmp, axis=1).compute()
mean = xr.DataArray(mean.flatten(), coords={"variables": sorted(self.variables)}, dims=["variables"])
if method == "standardise":
std = da.nanstd(tmp, axis=1).compute()
std = xr.DataArray(std.flatten(), coords={"variables": sorted(self.variables)}, dims=["variables"])
else:
raise NotImplementedError
return mean, std
def calculate_estimated_transformation(self, method):
data = [[]]*len(self.variables)
coords = {"variables": self.variables, "Stations": range(0)}
mean = xr.DataArray(data, coords=coords, dims=["variables", "Stations"])
std = xr.DataArray(data, coords=coords, dims=["variables", "Stations"])
for station in self.stations:
try:
data = DataPrep(self.data_path, self.network, station, self.variables, station_type=self.station_type,
**self.kwargs)
data.transform("datetime", method=method)
mean = mean.combine_first(data.mean)
std = std.combine_first(data.std)
data.transform("datetime", method=method, inverse=True)
except EmptyQueryResult:
continue
return mean.mean("Stations") if mean.shape[1] > 0 else None, std.mean("Stations") if std.shape[1] > 0 else None
def get_data_generator(self, key: Union[str, int] = None, load_local_tmp_storage: bool = True,
save_local_tmp_storage: bool = True) -> DataPrep:
"""
Select data for given key, create a DataPrep object and interpolate, transform, make history and labels and
remove nans.
:param key: station key to choose the data generator.
:param local_tmp_storage: say if data should be processed from scratch or loaded as already processed data from
tmp pickle file to save computational time (but of course more disk space required).
:param load_local_tmp_storage: say if data should be processed from scratch or loaded as already processed data
from tmp pickle file to save computational time (but of course more disk space required).
:param save_local_tmp_storage: save processed data as temporal file locally (default True)
:return: preprocessed data as a DataPrep instance
"""
station = self.get_station_key(key)
try:
if not local_tmp_storage:
if not load_local_tmp_storage:
raise FileNotFoundError
data = self._load_pickle_data(station, self.variables)
except FileNotFoundError:
......@@ -113,11 +183,13 @@ class DataGenerator(keras.utils.Sequence):
data = DataPrep(self.data_path, self.network, station, self.variables, station_type=self.station_type,
**self.kwargs)
data.interpolate(self.interpolate_dim, method=self.interpolate_method, limit=self.limit_nan_fill)
data.transform("datetime", method=self.transform_method)
if self.transformation is not None:
data.transform("datetime", **helpers.dict_pop(self.transformation, "scope"))
data.make_history_window(self.interpolate_dim, self.window_history_size)
data.make_labels(self.target_dim, self.target_var, self.interpolate_dim, self.window_lead_time)
data.history_label_nan_remove(self.interpolate_dim)
self._save_pickle_data(data)
if save_local_tmp_storage:
self._save_pickle_data(data)
return data
def _save_pickle_data(self, data: Any):
......
......@@ -216,7 +216,7 @@ class DataPrep(object):
self.data, self.mean, self.std = f_inverse(self.data, self.mean, self.std, self._transform_method)
self._transform_method = None
def transform(self, dim: Union[str, int] = 0, method: str = 'standardise', inverse: bool = False) -> None:
def transform(self, dim: Union[str, int] = 0, method: str = 'standardise', inverse: bool = False, mean = None, std=None) -> None:
"""
This function transforms a xarray.dataarray (along dim) or pandas.DataFrame (along axis) either with mean=0
and std=1 (`method=standardise`) or centers the data with mean=0 and no change in data scale
......@@ -247,11 +247,19 @@ class DataPrep(object):
else:
raise NotImplementedError
def f_apply(data):
if method == "standardise":
return mean, std, statistics.standardise_apply(data, mean, std)
elif method == "centre":
return mean, None, statistics.centre_apply(data, mean)
else:
raise NotImplementedError
if not inverse:
if self._transform_method is not None:
raise AssertionError(f"Transform method is already set. Therefore, data was already transformed with "
f"{self._transform_method}. Please perform inverse transformation of data first.")
self.mean, self.std, self.data = f(self.data)
self.mean, self.std, self.data = locals()["f" if mean is None else "f_apply"](self.data)
self._transform_method = method
else:
self.inverse_transform()
......@@ -370,7 +378,7 @@ class DataPrep(object):
:param coord: name of axis to slice
:return:
"""
return data.loc[{coord: slice(start, end)}]
return data.loc[{coord: slice(str(start), str(end))}]
def check_for_negative_concentrations(self, data: xr.DataArray, minimum: int = 0) -> xr.DataArray:
"""
......@@ -387,8 +395,7 @@ class DataPrep(object):
return data
def get_transposed_history(self):
if self.history is not None:
return self.history.transpose("datetime", "window", "Stations", "variables")
return self.history.transpose("datetime", "window", "Stations", "variables")
if __name__ == "__main__":
......
......@@ -190,3 +190,8 @@ def float_round(number: float, decimals: int = 0, round_type: Callable = math.ce
"""
multiplier = 10. ** decimals
return round_type(number * multiplier) / multiplier
def dict_pop(dict: Dict, pop_keys):
pop_keys = to_list(pop_keys)
return {k: v for k, v in dict.items() if k not in pop_keys}
......@@ -10,6 +10,8 @@ import numpy as np
from keras import backend as K
from keras.callbacks import History, ModelCheckpoint
from src import helpers
class HistoryAdvanced(History):
"""
......@@ -125,7 +127,7 @@ class ModelCheckpointAdvanced(ModelCheckpoint):
Update all stored callback objects. The argument callbacks needs to follow the same convention like described
in the class description (list of dictionaries). Must be run before resuming a training process.
"""
self.callbacks = callbacks
self.callbacks = helpers.to_list(callbacks)
def on_epoch_end(self, epoch, logs=None):
"""
......@@ -139,12 +141,73 @@ class ModelCheckpointAdvanced(ModelCheckpoint):
if self.save_best_only:
current = logs.get(self.monitor)
if current == self.best:
if self.verbose > 0:
if self.verbose > 0: # pragma: no branch
print('\nEpoch %05d: save to %s' % (epoch + 1, file_path))
with open(file_path, "wb") as f:
pickle.dump(callback["callback"], f)
else:
with open(file_path, "wb") as f:
if self.verbose > 0:
if self.verbose > 0: # pragma: no branch
print('\nEpoch %05d: save to %s' % (epoch + 1, file_path))
pickle.dump(callback["callback"], f)
class CallbackHandler:
def __init__(self):
self.__callbacks = []
self._checkpoint = None
self.editable = True
@property
def _callbacks(self):
return [{"callback": clbk[clbk["name"]], "path": clbk["path"]} for clbk in self.__callbacks]
@_callbacks.setter
def _callbacks(self, value):
name, callback, callback_path = value
self.__callbacks.append({"name": name, name: callback, "path": callback_path})
def _update_callback(self, pos, value):
name = self.__callbacks[pos]["name"]
self.__callbacks[pos][name] = value
def add_callback(self, callback, callback_path, name="callback"):
if self.editable:
self._callbacks = (name, callback, callback_path)
else:
raise PermissionError(f"{__class__.__name__} is protected and cannot be edited.")
def get_callbacks(self, as_dict=True):
if as_dict:
return self._get_callbacks()
else:
return [clb["callback"] for clb in self._get_callbacks()]
def get_callback_by_name(self, obj_name):
if obj_name != "callback":
return [clbk[clbk["name"]] for clbk in self.__callbacks if clbk["name"] == obj_name][0]
def _get_callbacks(self):
clbks = self._callbacks
if self._checkpoint is not None:
clbks += [{"callback": self._checkpoint, "path": self._checkpoint.filepath}]
return clbks
def get_checkpoint(self):
if self._checkpoint is not None:
return self._checkpoint
def create_model_checkpoint(self, **kwargs):
self._checkpoint = ModelCheckpointAdvanced(callbacks=self._callbacks, **kwargs)
self.editable = False
def load_callbacks(self):
for pos, callback in enumerate(self.__callbacks):
path = callback["path"]
clb = pickle.load(open(path, "rb"))
self._update_callback(pos, clb)
def update_checkpoint(self, history_name="hist"):
self._checkpoint.update_callbacks(self._callbacks)
self._checkpoint.update_best(self.get_callback_by_name(history_name))
......@@ -8,6 +8,8 @@ from abc import ABC
from typing import Any, Callable
import keras
from src.model_modules.inception_model import InceptionModelBase
from src.model_modules.flatten import flatten_tail
class AbstractModelClass(ABC):
......@@ -27,6 +29,7 @@ class AbstractModelClass(ABC):
self.__model = None
self.__loss = None
self.model_name = self.__class__.__name__
def __getattr__(self, name: str) -> Any:
......@@ -239,3 +242,112 @@ class MyBranchedModel(AbstractModelClass):
self.loss = [keras.losses.mean_absolute_error] + [keras.losses.mean_squared_error] + \
[keras.losses.mean_squared_error]
class MyTowerModel(AbstractModelClass):
def __init__(self, window_history_size, window_lead_time, channels):
"""
Sets model and loss depending on the given arguments.
:param activation: activation function
:param window_history_size: number of historical time steps included in the input data
:param channels: number of variables used in input data
:param regularizer: <not used here>
:param dropout_rate: dropout rate used in the model [0, 1)
:param window_lead_time: number of time steps to forecast in the output layer
"""
super().__init__()
# settings
self.window_history_size = window_history_size
self.window_lead_time = window_lead_time
self.channels = channels
self.dropout_rate = 1e-2
self.regularizer = keras.regularizers.l2(0.1)
self.initial_lr = 1e-2
self.optimizer = keras.optimizers.adam(lr=self.initial_lr)
self.lr_decay = src.model_modules.keras_extensions.LearningRateDecay(base_lr=self.initial_lr, drop=.94, epochs_drop=10)
self.epochs = 20
self.batch_size = int(256*4)
self.activation = keras.layers.PReLU
# apply to model
self.set_model()
self.set_loss()
def set_model(self):
"""
Build the model.
:param activation: activation function
:param window_history_size: number of historical time steps included in the input data
:param channels: number of variables used in input data
:param dropout_rate: dropout rate used in the model [0, 1)
:param window_lead_time: number of time steps to forecast in the output layer
:return: built keras model
"""
activation = self.activation
conv_settings_dict1 = {
'tower_1': {'reduction_filter': 8, 'tower_filter': 8 * 2, 'tower_kernel': (3, 1), 'activation': activation},
'tower_2': {'reduction_filter': 8, 'tower_filter': 8 * 2, 'tower_kernel': (5, 1), 'activation': activation},
'tower_3': {'reduction_filter': 8, 'tower_filter': 8 * 2, 'tower_kernel': (1, 1), 'activation': activation},
}
pool_settings_dict1 = {'pool_kernel': (3, 1), 'tower_filter': 8 * 2, 'activation': activation}
conv_settings_dict2 = {
'tower_1': {'reduction_filter': 8 * 2, 'tower_filter': 16 * 2 * 2, 'tower_kernel': (3, 1),
'activation': activation},
'tower_2': {'reduction_filter': 8 * 2, 'tower_filter': 16 * 2 * 2, 'tower_kernel': (5, 1),
'activation': activation},
'tower_3': {'reduction_filter': 8 * 2, 'tower_filter': 16 * 2 * 2, 'tower_kernel': (1, 1),
'activation': activation},
}
pool_settings_dict2 = {'pool_kernel': (3, 1), 'tower_filter': 16, 'activation': activation}
conv_settings_dict3 = {'tower_1': {'reduction_filter': 16 * 4, 'tower_filter': 32 * 2, 'tower_kernel': (3, 1),
'activation': activation},
'tower_2': {'reduction_filter': 16 * 4, 'tower_filter': 32 * 2, 'tower_kernel': (5, 1),
'activation': activation},
'tower_3': {'reduction_filter': 16 * 4, 'tower_filter': 32 * 2, 'tower_kernel': (1, 1),
'activation': activation},
}
pool_settings_dict3 = {'pool_kernel': (3, 1), 'tower_filter': 32, 'activation': activation}
##########################################
inception_model = InceptionModelBase()
X_input = keras.layers.Input(
shape=(self.window_history_size + 1, 1, self.channels)) # add 1 to window_size to include current time step t0
X_in = inception_model.inception_block(X_input, conv_settings_dict1, pool_settings_dict1,
regularizer=self.regularizer,
batch_normalisation=True)
X_in = keras.layers.Dropout(self.dropout_rate)(X_in)
X_in = inception_model.inception_block(X_in, conv_settings_dict2, pool_settings_dict2, regularizer=self.regularizer,
batch_normalisation=True)
X_in = keras.layers.Dropout(self.dropout_rate)(X_in)
X_in = inception_model.inception_block(X_in, conv_settings_dict3, pool_settings_dict3, regularizer=self.regularizer,
batch_normalisation=True)
#############################################
out_main = flatten_tail(X_in, 'Main', activation=activation, bound_weight=True, dropout_rate=self.dropout_rate,
reduction_filter=64, first_dense=64, window_lead_time=self.window_lead_time)
self.model = keras.Model(inputs=X_input, outputs=[out_main])
def set_loss(self):
"""
Set the loss
:return: loss function
"""
self.loss = [keras.losses.mean_squared_error]
......@@ -50,8 +50,8 @@ class PlotMonthlySummary(RunEnvironment):
def _prepare_data(self, stations: List) -> xr.DataArray:
"""
Pre-process data required to plot. For each station, load locally saved predictions, extract the CNN and orig
prediction and group them into monthly bins (no aggregation, only sorting them).
Pre-process data required to plot. For each station, load locally saved predictions, extract the CNN prediction
and the observation and group them into monthly bins (no aggregation, only sorting them).
:param stations: all stations to plot
:return: The entire data set, flagged with the corresponding month.
"""
......@@ -65,10 +65,10 @@ class PlotMonthlySummary(RunEnvironment):
if len(data_cnn.shape) > 1:
data_cnn.coords["ahead"].values = [f"{days}d" for days in data_cnn.coords["ahead"].values]
data_orig = data.sel(type="orig", ahead=1).squeeze()
data_orig.coords["ahead"] = "orig"
data_obs = data.sel(type="obs", ahead=1).squeeze()
data_obs.coords["ahead"] = "obs"
data_concat = xr.concat([data_orig, data_cnn], dim="ahead")
data_concat = xr.concat([data_obs, data_cnn], dim="ahead")
data_concat = data_concat.drop("type")
data_concat.index.values = data_concat.index.values.astype("datetime64[M]").astype(int) % 12 + 1
......@@ -189,7 +189,7 @@ class PlotStationMap(RunEnvironment):
plt.close('all')
def plot_conditional_quantiles(stations: list, plot_folder: str = ".", rolling_window: int = 3, ref_name: str = 'orig',
def plot_conditional_quantiles(stations: list, plot_folder: str = ".", rolling_window: int = 3, ref_name: str = 'obs',
pred_name: str = 'CNN', season: str = "", forecast_path: str = None,
plot_name_affix: str = "", units: str = "ppb"):
"""
......@@ -222,7 +222,7 @@ def plot_conditional_quantiles(stations: list, plot_folder: str = ".", rolling_w
for station in stations:
file = os.path.join(forecast_path, f"forecasts_{station}_test.nc")
data_tmp = xr.open_dataarray(file)
data_collector.append(data_tmp.loc[:, :, ['CNN', 'orig', 'OLS']].assign_coords(station=station))
data_collector.append(data_tmp.loc[:, :, ['CNN', 'obs', 'OLS']].assign_coords(station=station))
return xr.concat(data_collector, dim='station').transpose('index', 'type', 'ahead', 'station')
def segment_data(data):
......@@ -252,7 +252,7 @@ def plot_conditional_quantiles(stations: list, plot_folder: str = ".", rolling_w
def labels(plot_type, data_unit="ppb"):
names = (f"forecast concentration (in {data_unit})", f"observed concentration (in {data_unit})")
if plot_type == "orig":
if plot_type == "obs":
return names
else:
return names[::-1]
......@@ -515,7 +515,7 @@ class PlotTimeSeries(RunEnvironment):
logging.debug(f"... preprocess station {station}")
file_name = os.path.join(self._data_path, self._data_name % station)
data = xr.open_dataarray(file_name)
return data.sel(type=["CNN", "orig"])
return data.sel(type=["CNN", "obs"])
def _plot(self, plot_folder):
pdf_pages = self._create_pdf_pages(plot_folder)
......@@ -527,9 +527,9 @@ class PlotTimeSeries(RunEnvironment):
for i_year in range(end - start + 1):
data_year = data.sel(index=f"{start + i_year}")
for i_half_of_year in range(factor):
pos = 2 * i_year + i_half_of_year
pos = factor * i_year + i_half_of_year