Commit e1bbf647 authored by lukas leufen's avatar lukas leufen

Merge branch 'develop' into 'master'

Develop

Closes #48, #49, #50, #51, #52, #59, #20, #61, #64, #65, #62, #67, and #68

See merge request toar/machinelearningtools!59
parents 8a330277 7742fb56
Pipeline #31457 passed with stages
in 7 minutes and 27 seconds
......@@ -32,7 +32,7 @@ tests:
- django
stage: test
variables:
FAILURE_THRESHOLD: 90
FAILURE_THRESHOLD: 100
before_script:
- chmod +x ./CI/update_badge.sh
- ./CI/update_badge.sh > /dev/null
......
absl-py==0.9.0
astor==0.8.1
atomicwrites==1.3.0
attrs==19.3.0
Cartopy==0.17.0
certifi==2019.11.28
chardet==3.0.4
cloudpickle==1.3.0
coverage==5.0.3
cycler==0.10.0
Cython==0.29.15
dask==2.11.0
fsspec==0.6.2
gast==0.3.3
grpcio==1.27.2
h5py==2.10.0
idna==2.8
importlib-metadata==1.5.0
Keras==2.2.4
numpy==1.15.4
Keras-Applications==1.0.8
Keras-Preprocessing==1.1.0
kiwisolver==1.1.0
locket==0.2.0
Markdown==3.2.1
matplotlib==3.2.0
mock==4.0.1
more-itertools==8.2.0
numpy==1.18.1
packaging==20.3
pandas==1.0.1
partd==1.1.0
patsy==0.5.1
Pillow==7.0.0
pluggy==0.13.1
protobuf==3.11.3
py==1.8.1
pydot==1.4.1
pyparsing==2.4.6
pyproj==2.5.0
pyshp==2.1.0
pytest==5.3.5
pytest-cov==2.8.1
pytest-html==2.0.1
pytest-lazy-fixture==0.6.3
pytest-metadata==1.8.0
python-dateutil==2.8.1
pytz==2019.3
PyYAML==5.3
requests==2.23.0
scipy==1.4.1
seaborn==0.10.0
Shapely==1.7.0
six==1.11.0
statsmodels==0.11.1
tensorboard==1.12.2
tensorflow==1.12.0
xarray==0.14.0
pandas==0.25.1
requests==2.22.0
pytest==5.2.1
pytest-lazy-fixture==0.6.1
pytest-cov
pytest-html
pydot
mock
statsmodels
seaborn
dask==0.20.2
toolz # for dask
cloudpickle # for dask
cython==0.29.14
pyshp
six
pyproj
shapely
Cartopy==0.16.0
matplotlib
pillow
scipy
\ No newline at end of file
termcolor==1.1.0
toolz==0.10.0
urllib3==1.25.8
wcwidth==0.1.8
Werkzeug==1.0.0
xarray==0.15.0
zipp==3.1.0
......@@ -17,7 +17,7 @@ def main(parser_args):
with RunEnvironment():
ExperimentSetup(parser_args, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBW001'],
station_type='background', trainable=True)
station_type='background', trainable=False, create_new_model=False)
PreProcessing()
ModelSetup()
......
__author__ = 'Felix Kleinert, Lukas Leufen'
__date__ = '2020-02-07'
from src.run_modules.run_environment import RunEnvironment
from src.data_handling.data_generator import DataGenerator
import numpy as np
import logging
import dask.array as da
import xarray as xr
import os
import re
from src import helpers
class BootStrapGenerator:
def __init__(self, orig_generator, boots, chunksize, bootstrap_path):
self.orig_generator: DataGenerator = orig_generator
self.stations = self.orig_generator.stations
self.variables = self.orig_generator.variables
self.boots = boots
self.chunksize = chunksize
self.bootstrap_path = bootstrap_path
self._iterator = 0
self.bootstrap_meta = []
def __len__(self):
"""
display the number of stations
"""
return len(self.orig_generator)*self.boots*len(self.variables)
def get_labels(self, key):
_, label = self.orig_generator[key]
for _ in range(self.boots):
yield label
def get_generator(self):
"""
This is the implementation of the __next__ method of the iterator protocol. Get the data generator, and return
the history and label data of this generator.
:return:
"""
while True:
for i, data in enumerate(self.orig_generator):
station = self.orig_generator.get_station_key(i)
logging.info(f"station: {station}")
hist, label = data
len_of_label = len(label)
shuffled_data = self.load_boot_data(station)
for var in self.variables:
logging.info(f" var: {var}")
for boot in range(self.boots):
logging.debug(f"boot: {boot}")
boot_hist = hist.sel(variables=helpers.list_pop(self.variables, var))
shuffled_var = shuffled_data.sel(variables=var, boots=boot).expand_dims("variables").drop("boots").transpose("datetime", "window", "Stations", "variables")
boot_hist = boot_hist.combine_first(shuffled_var)
boot_hist = boot_hist.sortby("variables")
self.bootstrap_meta.extend([[var, station]]*len_of_label)
yield boot_hist, label
return
def get_orig_prediction(self, path, file_name, prediction_name="CNN"):
file = os.path.join(path, file_name)
data = xr.open_dataarray(file)
for _ in range(self.boots):
yield data.sel(type=prediction_name).squeeze()
def load_boot_data(self, station):
files = os.listdir(self.bootstrap_path)
regex = re.compile(rf"{station}_\w*\.nc")
file_name = os.path.join(self.bootstrap_path, list(filter(regex.search, files))[0])
shuffled_data = xr.open_dataarray(file_name, chunks=100)
return shuffled_data
class BootStraps(RunEnvironment):
def __init__(self, data, bootstrap_path, number_bootstraps=10):
super().__init__()
self.data: DataGenerator = data
self.number_bootstraps = number_bootstraps
self.bootstrap_path = bootstrap_path
self.chunks = self.get_chunk_size()
self.create_shuffled_data()
self._boot_strap_generator = BootStrapGenerator(self.data, self.number_bootstraps, self.chunks, self.bootstrap_path)
def get_boot_strap_meta(self):
return self._boot_strap_generator.bootstrap_meta
def boot_strap_generator(self):
return self._boot_strap_generator.get_generator()
def get_boot_strap_generator_length(self):
return self._boot_strap_generator.__len__()
def get_labels(self, key):
labels_list = []
chunks = None
for labels in self._boot_strap_generator.get_labels(key):
if len(labels_list) == 0:
chunks = (100, labels.data.shape[1])
labels_list.append(da.from_array(labels.data, chunks=chunks))
labels_out = da.concatenate(labels_list, axis=0)
return labels_out.compute()
def get_orig_prediction(self, path, name):
labels_list = []
chunks = None
for labels in self._boot_strap_generator.get_orig_prediction(path, name):
if len(labels_list) == 0:
chunks = (100, labels.data.shape[1])
labels_list.append(da.from_array(labels.data, chunks=chunks))
labels_out = da.concatenate(labels_list, axis=0)
labels_out = labels_out.compute()
return labels_out[~np.isnan(labels_out).any(axis=1), :]
def get_chunk_size(self):
hist, _ = self.data[0]
return (100, *hist.shape[1:], self.number_bootstraps)
def create_shuffled_data(self):
"""
Create shuffled data. Use original test data, add dimension 'boots' with length number of bootstraps and insert
randomly selected variables. If there is a suitable local file for requested window size and number of
bootstraps, no additional file will be created inside this function.
"""
logging.info("create shuffled bootstrap data")
variables_str = '_'.join(sorted(self.data.variables))
window = self.data.window_history_size
for station in self.data.stations:
valid, nboot = self.valid_bootstrap_file(station, variables_str, window)
if not valid:
logging.info(f'create bootstap data for {station}')
hist, _ = self.data[station]
data = hist.copy()
file_name = f"{station}_{variables_str}_hist{window}_nboots{nboot}_shuffled.nc"
file_path = os.path.join(self.bootstrap_path, file_name)
data = data.expand_dims({'boots': range(nboot)}, axis=-1)
shuffled_variable = []
for i, var in enumerate(data.coords['variables']):
single_variable = data.sel(variables=var).values
shuffled_variable.append(self.shuffle_single_variable(single_variable, chunks=(100, *data.shape[1:3], data.shape[-1])))
shuffled_variable_da = da.stack(shuffled_variable, axis=-2, ).rechunk("auto")
shuffled_data = xr.DataArray(shuffled_variable_da, coords=data.coords, dims=data.dims)
shuffled_data.to_netcdf(file_path)
def valid_bootstrap_file(self, station, variables, window):
"""
Compare local bootstrap file with given settings for station, variables, window and number of bootstraps. If a
match was found, this method returns a tuple (True, None). In any other case, it returns (False, max_nboot),
where max_nboot is the highest boot number found in the local storage. A match is defined so that the window
length is ge than given window size form args and the number of boots is also ge than the given number of boots
from this class. Furthermore, this functions deletes local files, if the match the station pattern but don't fit
the window and bootstrap condition. This is performed, because it is assumed, that the corresponding file will
be created with a longer or at least same window size and numbers of bootstraps.
:param station:
:param variables:
:param window:
:return:
"""
regex = re.compile(rf"{station}_{variables}_hist(\d+)_nboots(\d+)_shuffled")
max_nboot = self.number_bootstraps
for file in os.listdir(self.bootstrap_path):
match = regex.match(file)
if match:
window_file = int(match.group(1))
nboot_file = int(match.group(2))
max_nboot = max([max_nboot, nboot_file])
if (window_file >= window) and (nboot_file >= self.number_bootstraps):
return True, None
else:
os.remove(os.path.join(self.bootstrap_path, file))
return False, max_nboot
@staticmethod
def shuffle_single_variable(data: da.array, chunks) -> da.core.Array:
size = data.shape
return da.random.choice(data.reshape(-1,), size=size, chunks=chunks)
if __name__ == "__main__":
from src.run_modules.experiment_setup import ExperimentSetup
from src.run_modules.run_environment import RunEnvironment
from src.run_modules.pre_processing import PreProcessing
formatter = '%(asctime)s - %(levelname)s: %(message)s [%(filename)s:%(funcName)s:%(lineno)s]'
logging.basicConfig(format=formatter, level=logging.INFO)
with RunEnvironment() as run_env:
ExperimentSetup(stations=['DEBW107', 'DEBY081', 'DEBW013'],
station_type='background', trainable=True, window_history_size=9)
PreProcessing()
data = run_env.data_store.get("generator", "general.test")
path = run_env.data_store.get("bootstrap_path", "general")
number_bootstraps = 10
boots = BootStraps(data, path, number_bootstraps)
for b in boots.boot_strap_generator():
a, c = b
logging.info(f"len is {len(boots.get_boot_strap_meta())}")
......@@ -80,8 +80,7 @@ class DataGenerator(keras.utils.Sequence):
data = self.get_data_generator()
self._iterator += 1
if data.history is not None and data.label is not None: # pragma: no branch
return data.history.transpose("datetime", "window", "Stations", "variables"), \
data.label.squeeze("Stations").transpose("datetime", "window")
return data.get_transposed_history(), data.get_transposed_label()
else:
self.__next__() # pragma: no cover
else:
......@@ -94,7 +93,7 @@ class DataGenerator(keras.utils.Sequence):
:return: The generator's time series of history data and its labels
"""
data = self.get_data_generator(key=item)
return data.get_transposed_history(), data.label.squeeze("Stations").transpose("datetime", "window")
return data.get_transposed_history(), data.get_transposed_label()
def setup_transformation(self, transformation):
if transformation is None:
......@@ -182,12 +181,13 @@ class DataGenerator(keras.utils.Sequence):
logging.info(f"load not pickle data for {station}")
data = DataPrep(self.data_path, self.network, station, self.variables, station_type=self.station_type,
**self.kwargs)
data.interpolate(self.interpolate_dim, method=self.interpolate_method, limit=self.limit_nan_fill)
if self.transformation is not None:
data.transform("datetime", **helpers.dict_pop(self.transformation, "scope"))
data.make_history_window(self.interpolate_dim, self.window_history_size)
data.interpolate(self.interpolate_dim, method=self.interpolate_method, limit=self.limit_nan_fill)
data.make_history_window(self.target_dim, self.window_history_size, self.interpolate_dim)
data.make_labels(self.target_dim, self.target_var, self.interpolate_dim, self.window_lead_time)
data.history_label_nan_remove(self.interpolate_dim)
data.make_observation(self.target_dim, self.target_var, self.interpolate_dim)
data.remove_nan(self.interpolate_dim)
if save_local_tmp_storage:
self._save_pickle_data(data)
return data
......
......@@ -2,6 +2,7 @@ __author__ = 'Felix Kleinert, Lukas Leufen'
__date__ = '2019-10-16'
import datetime as dt
from functools import reduce
import logging
import os
from typing import Union, List, Iterable
......@@ -15,6 +16,7 @@ from src import statistics
# define a more general date type for type hinting
date = Union[dt.date, dt.datetime]
str_or_list = Union[str, List[str]]
class DataPrep(object):
......@@ -55,6 +57,7 @@ class DataPrep(object):
self.std = None
self.history = None
self.label = None
self.observation = None
self.kwargs = kwargs
self.data = None
self.meta = None
......@@ -135,10 +138,12 @@ class DataPrep(object):
return xarr, meta
def _set_file_name(self):
return os.path.join(self.path, f"{''.join(self.station)}_{'_'.join(sorted(self.variables))}.nc")
all_vars = sorted(self.statistics_per_var.keys())
return os.path.join(self.path, f"{''.join(self.station)}_{'_'.join(all_vars)}.nc")
def _set_meta_file_name(self):
return os.path.join(self.path, f"{''.join(self.station)}_{'_'.join(sorted(self.variables))}_meta.csv")
all_vars = sorted(self.statistics_per_var.keys())
return os.path.join(self.path, f"{''.join(self.station)}_{'_'.join(all_vars)}_meta.csv")
def __repr__(self):
return f"Dataprep(path='{self.path}', network='{self.network}', station={self.station}, " \
......@@ -275,19 +280,20 @@ class DataPrep(object):
std = None
return mean, std, self._transform_method
def make_history_window(self, dim: str, window: int) -> None:
def make_history_window(self, dim_name_of_inputs: str, window: int, dim_name_of_shift: str) -> None:
"""
This function uses shifts the data window+1 times and returns a xarray which has a new dimension 'window'
containing the shifted data. This is used to represent history in the data. Results are stored in self.history .
:param dim: Dimension along shift will be applied
:param dim_name_of_inputs: Name of dimension which contains the input variables
:param window: number of time steps to look back in history
Note: window will be treated as negative value. This should be in agreement with looking back on
a time line. Nonetheless positive values are allowed but they are converted to its negative
expression
:param dim_name_of_shift: Dimension along shift will be applied
"""
window = -abs(window)
self.history = self.shift(dim, window)
self.history = self.shift(dim_name_of_shift, window).sel({dim_name_of_inputs: self.variables})
def shift(self, dim: str, window: int) -> xr.DataArray:
"""
......@@ -310,7 +316,7 @@ class DataPrep(object):
res = xr.concat(res, dim=window_array)
return res
def make_labels(self, dim_name_of_target: str, target_var: str, dim_name_of_shift: str, window: int) -> None:
def make_labels(self, dim_name_of_target: str, target_var: str_or_list, dim_name_of_shift: str, window: int) -> None:
"""
This function creates a xarray.DataArray containing labels
......@@ -322,7 +328,17 @@ class DataPrep(object):
window = abs(window)
self.label = self.shift(dim_name_of_shift, window).sel({dim_name_of_target: target_var})
def history_label_nan_remove(self, dim: str) -> None:
def make_observation(self, dim_name_of_target: str, target_var: str_or_list, dim_name_of_shift: str) -> None:
"""
This function creates a xarray.DataArray containing labels
:param dim_name_of_target: Name of dimension which contains the target variable
:param target_var: Name of target variable(s) in 'dimension'
:param dim_name_of_shift: Name of dimension on which xarray.DataArray.shift will be applied
"""
self.observation = self.shift(dim_name_of_shift, 0).sel({dim_name_of_target: target_var})
def remove_nan(self, dim: str) -> None:
"""
All NAs slices in dim which contain nans in self.history or self.label are removed in both data sets.
This is done to present only a full matrix to keras.fit.
......@@ -334,14 +350,17 @@ class DataPrep(object):
if (self.history is not None) and (self.label is not None):
non_nan_history = self.history.dropna(dim=dim)
non_nan_label = self.label.dropna(dim=dim)
intersect = np.intersect1d(non_nan_history.coords[dim].values, non_nan_label.coords[dim].values)
non_nan_observation = self.observation.dropna(dim=dim)
intersect = reduce(np.intersect1d, (non_nan_history.coords[dim].values, non_nan_label.coords[dim].values, non_nan_observation.coords[dim].values))
if len(intersect) == 0:
self.history = None
self.label = None
self.observation = None
else:
self.history = self.history.sel({dim: intersect})
self.label = self.label.sel({dim: intersect})
self.observation = self.observation.sel({dim: intersect})
@staticmethod
def create_index_array(index_name: str, index_value: Iterable[int]) -> xr.DataArray:
......@@ -395,7 +414,10 @@ class DataPrep(object):
return data
def get_transposed_history(self):
return self.history.transpose("datetime", "window", "Stations", "variables")
return self.history.transpose("datetime", "window", "Stations", "variables").copy()
def get_transposed_label(self):
return self.label.squeeze("Stations").transpose("datetime", "window").copy()
if __name__ == "__main__":
......
......@@ -49,9 +49,10 @@ class TimeTracking(object):
method. Duration can always be shown by printing the time tracking object or calling get_current_duration.
"""
def __init__(self, start=True):
def __init__(self, start=True, name="undefined job"):
self.start = None
self.end = None
self._name = name
if start:
self._start()
......@@ -93,7 +94,7 @@ class TimeTracking(object):
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
logging.info(f"undefined job finished after {self}")
logging.info(f"{self._name} finished after {self}")
def prepare_host(create_new=True, sampling="daily"):
......@@ -147,6 +148,13 @@ def set_experiment_name(experiment_date=None, experiment_path=None, sampling=Non
return experiment_name, experiment_path
def set_bootstrap_path(bootstrap_path, data_path, sampling):
if bootstrap_path is None:
bootstrap_path = os.path.join(data_path, "..", f"bootstrap_{sampling}")
check_path_and_create(bootstrap_path)
return bootstrap_path
class PyTestRegex:
"""Assert that a given string meets some expectations."""
......@@ -195,3 +203,18 @@ def float_round(number: float, decimals: int = 0, round_type: Callable = math.ce
def dict_pop(dict: Dict, pop_keys):
pop_keys = to_list(pop_keys)
return {k: v for k, v in dict.items() if k not in pop_keys}
def list_pop(list_full: list, pop_items):
pop_items = to_list(pop_items)
if len(pop_items) > 1:
return [e for e in list_full if e not in pop_items]
else:
list_pop = list_full.copy()
list_pop.remove(pop_items[0])
return list_pop
def dict_pop(dict_orig: Dict, pop_keys):
pop_keys = to_list(pop_keys)
return {k: v for k, v in dict_orig.items() if k not in pop_keys}
__author__ = 'Felix Kleinert'
__date__ = '2020-03-02'
import tensorflow as tf
import numpy as np
import keras.backend as K
from keras.layers.convolutional import _ZeroPadding
from keras.legacy import interfaces
from keras.utils import conv_utils
from keras.utils.generic_utils import transpose_shape
from keras.backend.common import normalize_data_format
class PadUtils:
"""
Helper class for advanced paddings
"""
@staticmethod
def get_padding_for_same(kernel_size, strides=1):
"""
This methods calculates the padding size to keep input and output dimensions equal for a given kernel size
(STRIDES HAVE TO BE EQUAL TO ONE!)
:param kernel_size:
:return:
"""
if strides != 1:
raise NotImplementedError("Strides other than 1 not implemented!")
if not all(isinstance(k, int) for k in kernel_size):
raise ValueError(f"The `kernel_size` argument must have a tuple of integers. Got: {kernel_size} "
f"of type {[type(k) for k in kernel_size]}")
ks = np.array(kernel_size, dtype=np.int64)
if any(k <= 0 for k in ks):
raise ValueError(f"All values of kernel_size must be > 0. Got: {kernel_size} ")
if all(k % 2 == 1 for k in ks): # (d & 0x1 for d in ks):
pad = ((ks - 1) / 2).astype(np.int64)
# convert numpy int to base int
pad = [np.asscalar(v) for v in pad]
return tuple(pad)
# return tuple(PadUtils.check_padding_format(pad))
else:
raise NotImplementedError(f"even kernel size not implemented. Got {kernel_size}")
@staticmethod
def spatial_2d_padding(padding=((1, 1), (1, 1)), data_format=None):
"""Pads the 2nd and 3rd dimensions of a 4D tensor.
# Arguments
x: Tensor or variable.
padding: Tuple of 2 tuples, padding pattern.
data_format: string, `"channels_last"` or `"channels_first"`.
# Returns
A padded 4D tensor.
# Raises
ValueError: if `data_format` is neither `"channels_last"` or `"channels_first"`.
"""
assert len(padding) == 2
assert len(padding[0]) == 2
assert len(padding[1]) == 2
data_format = normalize_data_format(data_format)
pattern = [[0, 0],
list(padding[0]),
list(padding[1]),
[0, 0]]
pattern = transpose_shape(pattern, data_format, spatial_axes=(1, 2))
return pattern
@staticmethod
def check_padding_format(padding):
if isinstance(padding, int):
normalized_padding = ((padding, padding), (padding, padding))
elif hasattr(padding, '__len__'):
if len(padding) != 2:
raise ValueError('`padding` should have two elements. '
'Found: ' + str(padding))
for idx_pad, sub_pad in enumerate(padding):
if isinstance(sub_pad, str):
raise ValueError(f'`padding[{idx_pad}]` is str but must be int')
if hasattr(sub_pad, '__len__'):
if len(sub_pad) != 2:
raise ValueError(f'`padding[{idx_pad}]` should have one or two elements. '