Commit b04f5797 authored by lukas leufen's avatar lukas leufen

Merge branch 'lukas_issue135_feat_decouple-join' into 'develop'

MLAir is decoupled from join

See merge request !134
parents 0d9ea05d 7f9c4ac1
Pipeline #43673 passed with stages
in 7 minutes and 51 seconds
......@@ -60,7 +60,7 @@ Thumbs.db
#!/usr/bin/env bash
# run coverage twice, 1) for html deploy 2) for success evaluation
python3.6 -m pytest --cov=src --cov-report term --cov-report html test/ | tee coverage_results.out
python3.6 -m pytest --cov=mlair --cov-report term --cov-report html test/ | tee coverage_results.out
......@@ -13,7 +13,8 @@ DEFAULT_START = "1997-01-01"
DEFAULT_END = "2017-12-31"
DEFAULT_TRANSFORMATION = {"scope": "data", "method": "standardise", "mean": "estimate"}
# DEFAULT_TRANSFORMATION = {"scope": "data", "method": "standardise", "mean": "estimate"}
DEFAULT_TRANSFORMATION = {"scope": "data", "method": "standardise"}
DEFAULT_HPC_LOGIN_LIST = ["ju", "hdfmll"] # ju[wels} #hdfmll(ogin)
DEFAULT_HPC_HOST_LIST = ["jw", "hdfmlc"] # first part of node names for Juwels (jw[comp], hdfmlc(ompute).
......@@ -28,9 +29,9 @@ DEFAULT_TARGET_VAR = "o3"
DEFAULT_TARGET_DIM = "variables"
DEFAULT_DIMENSIONS = {"new_index": ["datetime", "Stations"]}
DEFAULT_TIME_DIM = "datetime"
DEFAULT_TRAIN_START = "1997-01-01"
DEFAULT_TRAIN_END = "2007-12-31"
......@@ -33,13 +33,13 @@ def prepare_host(create_new=True, data_path=None, sampling="daily") -> str:
elif hostname == "zam347":
data_path = f"/home/{user}/Data/toar_{sampling}/"
elif hostname == "linux-aa9b":
data_path = f"/home/{user}/machinelearningtools/data/toar_{sampling}/"
data_path = f"/home/{user}/mlair/data/toar_{sampling}/"
elif (len(hostname) > 2) and (hostname[:2] == "jr"):
data_path = f"/p/project/cjjsc42/{user}/DATA/toar_{sampling}/"
elif (len(hostname) > 2) and (hostname[:2] in ['jw', 'ju'] or hostname[:5] in ['hdfml']):
data_path = f"/p/project/deepacf/intelliaq/{user}/DATA/toar_{sampling}/"
elif runner_regex.match(hostname) is not None:
data_path = f"/home/{user}/machinelearningtools/data/toar_{sampling}/"
data_path = f"/home/{user}/mlair/data/toar_{sampling}/"
data_path = os.path.join(os.getcwd(), "data", sampling)
# raise OSError(f"unknown host '{hostname}'")
......@@ -10,6 +10,6 @@ __date__ = '2020-04-17'
from .bootstraps import BootStraps
from .data_preparation_join import DataPrepJoin
from .data_generator import DataGenerator
from .data_distributor import Distributor
from .iterator import KerasIterator, DataCollection
from .advanced_data_handler import DefaultDataPreparation, AbstractDataPreparation
from .data_preparation_neighbors import DataPreparationNeighbors
This diff is collapsed.
Collections of bootstrap methods and classes.
How to use
__author__ = 'Felix Kleinert, Lukas Leufen'
__date__ = '2020-02-07'
import os
from collections import Iterator, Iterable
from itertools import chain
import numpy as np
import xarray as xr
from mlair.data_handler.advanced_data_handler import AbstractDataPreparation
class BootstrapIterator(Iterator):
_position: int = None
def __init__(self, data: "BootStraps"):
assert isinstance(data, BootStraps)
self._data = data
self._dimension = data.bootstrap_dimension
self._collection = self._data.bootstraps()
self._position = 0
def __next__(self):
"""Return next element or stop iteration."""
index, dimension = self._collection[self._position]
nboot = self._data.number_of_bootstraps
_X, _Y =
_X = list(map(lambda x: x.expand_dims({'boots': range(nboot)}, axis=-1), _X))
_Y = _Y.expand_dims({"boots": range(nboot)}, axis=-1)
single_variable = _X[index].sel({self._dimension: [dimension]})
shuffled_variable = self.shuffle(single_variable.values)
shuffled_data = xr.DataArray(shuffled_variable, coords=single_variable.coords, dims=single_variable.dims)
_X[index] = shuffled_data.combine_first(_X[index]).reindex_like(_X[index])
self._position += 1
except IndexError:
raise StopIteration()
_X, _Y = self._to_numpy(_X), self._to_numpy(_Y)
return self._reshape(_X), self._reshape(_Y), (index, dimension)
def _reshape(d):
if isinstance(d, list):
return list(map(lambda x: np.rollaxis(x, -1, 0).reshape(x.shape[0] * x.shape[-1], *x.shape[1:-1]), d))
shape = d.shape
return np.rollaxis(d, -1, 0).reshape(shape[0] * shape[-1], *shape[1:-1])
def _to_numpy(d):
if isinstance(d, list):
return list(map(lambda x: x.values, d))
return d.values
def shuffle(data: np.ndarray) -> np.ndarray:
Shuffle randomly from given data (draw elements with replacement).
:param data: data to shuffle
:return: shuffled data as numpy array
size = data.shape
return np.random.choice(data.reshape(-1, ), size=size)
class BootStraps(Iterable):
Main class to perform bootstrap operations.
This class requires a data handler following the definition of the AbstractDataPreparation, the number of bootstraps
to create and the dimension along this bootstrapping is performed (default dimension is `variables`).
When iterating on this class, it returns the bootstrapped X, Y and a tuple with (position of variable in X, name of
this variable). The tuple is interesting if X consists on mutliple input streams X_i (e.g. two or more stations)
because it shows which variable of which input X_i has been bootstrapped. All bootstrap combinations can be
retrieved by calling the .bootstraps() method. Further more, by calling the .get_orig_prediction() this class
imitates according to the set number of bootstraps the original prediction
def __init__(self, data: AbstractDataPreparation, number_of_bootstraps: int = 10,
bootstrap_dimension: str = "variables"):
Create iterable class to be ready to iter.
:param data: a data generator object to get data / history
:param number_of_bootstraps: the number of bootstrap realisations
""" = data
self.number_of_bootstraps = number_of_bootstraps
self.bootstrap_dimension = bootstrap_dimension
def __iter__(self):
return BootstrapIterator(self)
def __len__(self):
return len(self.bootstraps())
def bootstraps(self):
l = []
for i, x in enumerate(
l.append(list(map(lambda y: (i, y), x.indexes['variables'])))
return list(chain(*l))
def get_orig_prediction(self, path: str, file_name: str, prediction_name: str = "CNN") -> np.ndarray:
Repeat predictions from given file(_name) in path by the number of boots.
:param path: path to file
:param file_name: file name
:param prediction_name: name of the prediction to select from loaded file (default CNN)
:return: repeated predictions
file = os.path.join(path, file_name)
prediction = xr.open_dataarray(file).sel(type=prediction_name).squeeze()
vals = np.tile(, (self.number_of_bootstraps, 1))
return vals[~np.isnan(vals).any(axis=1), :]
__author__ = 'Lukas Leufen'
__date__ = '2020-07-17'
from mlair.helpers import to_list
from mlair.data_handler.station_preparation import StationPrep
from mlair.data_handler.advanced_data_handler import DefaultDataPreparation
import os
from typing import Union, List
number = Union[float, int]
num_or_list = Union[number, List[number]]
class DataPreparationNeighbors(DefaultDataPreparation):
def __init__(self, id_class, data_path, neighbors=None, min_length=0,
extreme_values: num_or_list = None, extremes_on_right_tail_only: bool = False):
self.neighbors = to_list(neighbors) if neighbors is not None else []
super().__init__(id_class, data_path, min_length=min_length, extreme_values=extreme_values,
def build(cls, station, **kwargs):
sp_keys = {k: kwargs[k] for k in cls._requirements if k in kwargs}
sp = StationPrep(station, **sp_keys)
n_list = []
for neighbor in kwargs.get("neighbors", []):
n_list.append(StationPrep(neighbor, **sp_keys))
kwargs["neighbors"] = n_list if len(n_list) > 0 else None
dp_args = {k: kwargs[k] for k in cls.own_args("id_class") if k in kwargs}
return cls(sp, **dp_args)
def _create_collection(self):
return [self.id_class] + self.neighbors
def get_coordinates(self, include_neighbors=False):
neighbors = list(map(lambda n: n.get_coordinates(), self.neighbors)) if include_neighbors is True else []
return [super(DataPreparationNeighbors, self).get_coordinates()].append(neighbors)
if __name__ == "__main__":
a = DataPreparationNeighbors
requirements = a.requirements()
kwargs = {"path": os.path.join(os.path.dirname(os.path.abspath(__file__)), "testdata"),
"station_type": None,
"network": 'UBA',
"sampling": 'daily',
"target_dim": 'variables',
"target_var": 'o3',
"time_dim": 'datetime',
"window_history_size": 7,
"window_lead_time": 3,
"neighbors": ["DEBW034"],
"data_path": os.path.join(os.path.dirname(os.path.abspath(__file__)), "testdata"),
"statistics_per_var": {'o3': 'dma8eu', 'temp': 'maximum'},
"transformation": None,}
a_inst ="DEBW011", **kwargs)
__author__ = 'Lukas Leufen'
__date__ = '2020-07-07'
from collections import Iterator, Iterable
import keras
import numpy as np
import math
import os
import shutil
import pickle
from typing import Tuple, List
class StandardIterator(Iterator):
_position: int = None
def __init__(self, collection: list):
assert isinstance(collection, list)
self._collection = collection
self._position = 0
def __next__(self):
"""Return next element or stop iteration."""
value = self._collection[self._position]
self._position += 1
except IndexError:
raise StopIteration()
return value
class DataCollection(Iterable):
def __init__(self, collection: list = None):
if collection is None:
collection = []
assert isinstance(collection, list)
self._collection = collection
self._mapping = {}
def __len__(self):
return len(self._collection)
def __iter__(self) -> Iterator:
return StandardIterator(self._collection)
def __getitem__(self, index):
if isinstance(index, int):
return self._collection[index]
return self._collection[self._mapping[str(index)]]
def add(self, element):
self._mapping[str(element)] = len(self._collection)
def _set_mapping(self):
for i, e in enumerate(self._collection):
self._mapping[str(e)] = i
def keys(self):
return list(self._mapping.keys())
class KerasIterator(keras.utils.Sequence):
def __init__(self, collection: DataCollection, batch_size: int, batch_path: str, shuffle_batches: bool = False,
model=None, upsampling=False, name=None):
self._collection = collection
batch_path = os.path.join(batch_path, str(name if name is not None else id(self)))
self._path = os.path.join(batch_path, "%i.pickle")
self.batch_size = batch_size
self.model = model
self.shuffle = shuffle_batches
self.upsampling = upsampling
self.indexes: list = []
def __len__(self) -> int:
return len(self.indexes)
def __getitem__(self, index: int) -> Tuple[np.ndarray, np.ndarray]:
"""Get batch for given index."""
return self.__data_generation(self.indexes[index])
def _get_model_rank(self):
if self.model is not None:
mod_out = self.model.output_shape
if isinstance(mod_out, tuple): # only one output branch: (None, ahead)
mod_rank = 1
elif isinstance(mod_out, list): # multiple output branches, e.g.: [(None, ahead), (None, ahead)]
mod_rank = len(mod_out)
else: # pragma: no cover
raise TypeError("model output shape must either be tuple or list.")
return mod_rank
else: # no model provided, assume to use single output
return 1
def __data_generation(self, index: int) -> Tuple[np.ndarray, np.ndarray]:
"""Load pickle data from disk."""
file = self._path % index
with open(file, "rb") as f:
data = pickle.load(f)
return data["X"], data["Y"]
def _concatenate(new: List[np.ndarray], old: List[np.ndarray]) -> List[np.ndarray]:
"""Concatenate two lists of data along axis=0."""
return list(map(lambda n1, n2: np.concatenate((n1, n2), axis=0), old, new))
def _get_batch(self, data_list: List[np.ndarray], b: int) -> List[np.ndarray]:
"""Get batch according to batch size from data list."""
return list(map(lambda data: data[b * self.batch_size:(b+1) * self.batch_size, ...], data_list))
def _permute_data(self, X, Y):
p = np.random.permutation(len(X[0])) # equiv to .shape[0]
X = list(map(lambda x: x[p], X))
Y = list(map(lambda x: x[p], Y))
return X, Y
def _prepare_batches(self) -> None:
Prepare all batches as locally stored files.
Walk through all elements of collection and split (or merge) data according to the batch size. Too long data
sets are divided into multiple batches. Not fully filled batches are merged with data from the next collection
element. If data is remaining after the last element, it is saved as smaller batch. All batches are enumerated
beginning from 0. A list with all batch numbers is stored in class's parameter indexes.
index = 0
remaining = None
mod_rank = self._get_model_rank()
for data in self._collection:
X = data.get_X(upsampling=self.upsampling)
Y = [data.get_Y(upsampling=self.upsampling)[0] for _ in range(mod_rank)]
if self.upsampling:
X, Y = self._permute_data(X, Y)
if remaining is not None:
X, Y = self._concatenate(X, remaining[0]), self._concatenate(Y, remaining[1])
length = X[0].shape[0]
batches = self._get_number_of_mini_batches(length)
for b in range(batches):
batch_X, batch_Y = self._get_batch(X, b), self._get_batch(Y, b)
self._save_to_pickle(X=batch_X, Y=batch_Y, index=index)
index += 1
if (batches * self.batch_size) < length: # keep remaining to concatenate with next data element
remaining = (self._get_batch(X, batches), self._get_batch(Y, batches))
remaining = None
if remaining is not None: # add remaining as smaller batch
self._save_to_pickle(X=remaining[0], Y=remaining[1], index=index)
index += 1
self.indexes = np.arange(0, index).tolist()
def _save_to_pickle(self, X: List[np.ndarray], Y: List[np.ndarray], index: int) -> None:
"""Save data as pickle file with variables X and Y and given index as <index>.pickle ."""
data = {"X": X, "Y": Y}
file = self._path % index
with open(file, "wb") as f:
pickle.dump(data, f)
def _get_number_of_mini_batches(self, number_of_samples: int) -> int:
"""Return number of mini batches as the floored ration of number of samples to batch size."""
return math.floor(number_of_samples / self.batch_size)
def _cleanup_path(path: str, create_new: bool = True) -> None:
"""First remove existing path, second create empty path if enabled."""
if os.path.exists(path):
if create_new is True:
def on_epoch_end(self) -> None:
"""Randomly shuffle indexes if enabled."""
if self.shuffle is True:
class DummyData: # pragma: no cover
def __init__(self, number_of_samples=np.random.randint(100, 150)):
self.number_of_samples = number_of_samples
def get_X(self):
X1 = np.random.randint(0, 10, size=(self.number_of_samples, 14, 5)) # samples, window, variables
X2 = np.random.randint(21, 30, size=(self.number_of_samples, 10, 2)) # samples, window, variables
X3 = np.random.randint(-5, 0, size=(self.number_of_samples, 1, 2)) # samples, window, variables
return [X1, X2, X3]
def get_Y(self):
Y1 = np.random.randint(0, 10, size=(self.number_of_samples, 5, 1)) # samples, window, variables
Y2 = np.random.randint(21, 30, size=(self.number_of_samples, 5, 1)) # samples, window, variables
return [Y1, Y2]
if __name__ == "__main__":
collection = []
for _ in range(3):
data_collection = DataCollection(collection=collection)
path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "testdata")
iterator = KerasIterator(data_collection, 25, path, shuffle=True)
for data in data_collection:
\ No newline at end of file
This diff is collapsed.
Data Distribution Module.
How to use
Create distributor object from a generator object and parse it to the fit generator method. Provide the number of
steps per epoch with distributor's length method.
.. code-block:: python
model = YourKerasModel()
data_generator = DataGenerator(*args, **kwargs)
data_distributor = Distributor(data_generator, model, **kwargs)
history = model.fit_generator(generator=data_distributor.distribute_on_batches(),
Additionally, a validation data set can be parsed using the length and distribute methods.
from __future__ import generator_stop
__author__ = "Lukas Leufen, Felix Kleinert"
__date__ = '2019-12-05'
import math
import keras
import numpy as np
from mlair.data_handling.data_generator import DataGenerator
class Distributor(keras.utils.Sequence):
"""Distribute data generator elements according to mini batch size."""
def __init__(self, generator: DataGenerator, model: keras.models, batch_size: int = 256,
permute_data: bool = False, upsampling: bool = False):
Set up distributor.
:param generator: The generator object must be iterable and return inputs and targets on each iteration
:param model: a keras model with one or more output branches
:param batch_size: batch size to use
:param permute_data: data is randomly permuted if enabled on each train step
:param upsampling: upsample data with upsample extremes data from generator object and shuffle data or use only
the standard input data.
self.generator = generator
self.model = model
self.batch_size = batch_size
self.do_data_permutation = permute_data
self.upsampling = upsampling
def _get_model_rank(self):
mod_out = self.model.output_shape
if isinstance(mod_out, tuple):
# only one output branch: (None, ahead)
mod_rank = 1
elif isinstance(mod_out, list):
# multiple output branches, e.g.: [(None, ahead), (None, ahead)]
mod_rank = len(mod_out)
else: # pragma: no cover
raise TypeError("model output shape must either be tuple or list.")
return mod_rank
def _get_number_of_mini_batches(self, values):
return math.ceil(values.shape[0] / self.batch_size)
def _permute_data(self, x, y):
Permute inputs x and labels y if permutation is enabled in instance.
:param x: inputs
:param y: labels
:return: permuted or original data
if self.do_data_permutation:
p = np.random.permutation(len(x)) # equiv to .shape[0]
x = x[p]
y = y[p]
return x, y
def distribute_on_batches(self, fit_call=True):
Create generator object to distribute mini batches.
Split data from given generator object (usually for single station) according to the given batch size. Also
perform upsampling if enabled and random shuffling (either if data permutation is enabled or if upsampling is
enabled). Lastly multiply targets if provided model has multiple output branches.
:param fit_call: switch to exit while loop after first iteration. This is used to determine the length of all
distributed mini batches. For default, fit_call is True to obtain infinite loop for training.
:return: yields next mini batch
while True:
for k, v in enumerate(self.generator):
# get rank of output
mod_rank = self._get_model_rank()
# get data
x_total = np.copy(v[0])
y_total = np.copy(v[1])
if self.upsampling:
s = self.generator.get_data_generator(k)
x_total = np.concatenate([x_total, np.copy(s.get_extremes_history())], axis=0)
y_total = np.concatenate([y_total, np.copy(s.get_extremes_label())], axis=0)
except AttributeError: # no extremes history / labels available, copy will fail
# get number of mini batches
num_mini_batches = self._get_number_of_mini_batches(x_total)
# permute order for mini-batches
x_total, y_total = self._permute_data(x_total, y_total)
for prev, curr in enumerate(range(1, num_mini_batches + 1)):
x = x_total[prev * self.batch_size:curr * self.batch_size, ...]
y = [y_total[prev * self.batch_size:curr * self.batch_size, ...] for _ in range(mod_rank)]
if x is not None: # pragma: no branch