Commit 101cfd69 authored by felix kleinert's avatar felix kleinert

Merge remote-tracking branch...

Merge remote-tracking branch 'origin/lukas_issue144_feat_workflow-with-advanced-data-handling' into felix_issue140-implement-station-preparation
parents a25a884b 64467572
......@@ -31,13 +31,13 @@ def prepare_host(create_new=True, data_path=None, sampling="daily") -> str:
elif hostname == "zam347":
path = f"/home/{user}/Data/toar_{sampling}/"
elif hostname == "linux-aa9b":
path = f"/home/{user}/machinelearningtools/data/toar_{sampling}/"
path = f"/home/{user}/mlair/data/toar_{sampling}/"
elif (len(hostname) > 2) and (hostname[:2] == "jr"):
path = f"/p/project/cjjsc42/{user}/DATA/toar_{sampling}/"
elif (len(hostname) > 2) and (hostname[:2] in ['jw', 'ju'] or hostname[:5] in ['hdfml']):
path = f"/p/project/deepacf/intelliaq/{user}/DATA/toar_{sampling}/"
elif runner_regex.match(hostname) is not None:
path = f"/home/{user}/machinelearningtools/data/toar_{sampling}/"
path = f"/home/{user}/mlair/data/toar_{sampling}/"
raise OSError(f"unknown host '{hostname}'")
if not os.path.exists(path):
......@@ -13,3 +13,6 @@ from .bootstraps import BootStraps
from .data_preparation_join import DataPrepJoin
from .data_generator import DataGenerator
from .data_distributor import Distributor
from .iterator import KerasIterator, DataCollection
from .advanced_data_handling import DefaultDataPreparation
from .data_preparation import StationPrep
\ No newline at end of file
This diff is collapsed.
......@@ -68,8 +68,8 @@ class AbstractStationPrep():
class StationPrep(AbstractStationPrep):
def __init__(self, path, station, statistics_per_var, transformation, station_type, network, sampling, target_dim, target_var,
interpolate_dim, window_history_size, window_lead_time, **kwargs):
def __init__(self, station, data_path, statistics_per_var, transformation, station_type, network, sampling, target_dim, target_var,
interpolate_dim, window_history_size, window_lead_time, overwrite_local_data: bool = False, **kwargs):
super().__init__() # path, station, statistics_per_var, transformation, **kwargs)
self.station_type = station_type = network
......@@ -80,12 +80,10 @@ class StationPrep(AbstractStationPrep):
self.window_history_size = window_history_size
self.window_lead_time = window_lead_time
self.path = os.path.abspath(path)
self.path = os.path.abspath(data_path)
self.station = helpers.to_list(station)
self.statistics_per_var = statistics_per_var
# self.target_dim = 'variable'
self.transformation = self.setup_transformation(transformation)
self.kwargs = kwargs
# internal = None
......@@ -95,8 +93,15 @@ class StationPrep(AbstractStationPrep):
self.label = None
self.observation = None
self.transformation = None # self.setup_transformation(transformation)
self.kwargs = kwargs
self.kwargs["overwrite_local_data"] = overwrite_local_data
def __str__(self):
return self.station[0]
def __repr__(self):
return f"StationPrep(path='{self.path}', station={self.station}, statistics_per_var={self.statistics_per_var}, " \
f"transformation={self.transformation}, station_type='{self.station_type}', network='{}', " \
__author__ = 'Lukas Leufen'
__date__ = '2020-07-17'
from src.helpers import to_list, remove_items
from src.data_handling.data_preparation import StationPrep
from src.data_handling.advanced_data_handling import AbstractDataPreparation, DefaultDataPreparation
import numpy as np
import xarray as xr
import pickle
import os
import shutil
import inspect
from typing import Union, List, Tuple
import logging
from functools import reduce
number = Union[float, int]
num_or_list = Union[number, List[number]]
class DataPreparationNeighbors(DefaultDataPreparation):
def __init__(self, id_class, data_path, neighbors=None, min_length=0,
extreme_values: num_or_list = None, extremes_on_right_tail_only: bool = False):
self.neighbors = to_list(neighbors) if neighbors is not None else []
super().__init__(id_class, data_path, min_length=min_length, extreme_values=extreme_values,
def build(cls, station, **kwargs):
sp_keys = {k: kwargs[k] for k in cls._requirements if k in kwargs}
sp = StationPrep(station, **sp_keys)
n_list = []
for neighbor in kwargs.get("neighbors", []):
n_list.append(StationPrep(neighbor, **sp_keys))
kwargs["neighbors"] = n_list if len(n_list) > 0 else None
dp_args = {k: kwargs[k] for k in cls.own_args("id_class") if k in kwargs}
return cls(sp, **dp_args)
def _create_collection(self):
return [self.id_class] + self.neighbors
if __name__ == "__main__":
a = DataPreparationNeighbors
requirements = a.requirements()
kwargs = {"path": os.path.join(os.path.dirname(os.path.abspath(__file__)), "testdata"),
"station_type": None,
"network": 'UBA',
"sampling": 'daily',
"target_dim": 'variables',
"target_var": 'o3',
"interpolate_dim": 'datetime',
"window_history_size": 7,
"window_lead_time": 3,
"neighbors": ["DEBW034"],
"data_path": os.path.join(os.path.dirname(os.path.abspath(__file__)), "testdata"),
"statistics_per_var": {'o3': 'dma8eu', 'temp': 'maximum'},
"transformation": None,}
a_inst ="DEBW011", **kwargs)
__author__ = 'Lukas Leufen'
__date__ = '2020-07-07'
from collections import Iterator, Iterable
import keras
import numpy as np
import math
import os
import shutil
import pickle
from typing import Tuple, List
class StandardIterator(Iterator):
_position: int = None
def __init__(self, collection: list):
assert isinstance(collection, list)
self._collection = collection
self._position = 0
def __next__(self):
"""Return next element or stop iteration."""
value = self._collection[self._position]
self._position += 1
except IndexError:
raise StopIteration()
return value
class DataCollection(Iterable):
def __init__(self, collection: list = None):
if collection is None:
collection = []
assert isinstance(collection, list)
self._collection = collection
def __len__(self):
return len(self._collection)
def __iter__(self) -> Iterator:
return StandardIterator(self._collection)
def __getitem__(self, index):
return self._collection[index]
def add(self, element):
class KerasIterator(keras.utils.Sequence):
def __init__(self, collection: DataCollection, batch_size: int, batch_path: str, shuffle_batches: bool = False,
model=None, upsampling=False, name=None):
self._collection = collection
batch_path = os.path.join(batch_path, str(name if name is not None else id(self)))
self._path = os.path.join(batch_path, "%i.pickle")
self.batch_size = batch_size
self.model = model
self.shuffle = shuffle_batches
self.upsampling = upsampling
self.indexes: list = []
def __len__(self) -> int:
return len(self.indexes)
def __getitem__(self, index: int) -> Tuple[np.ndarray, np.ndarray]:
"""Get batch for given index."""
return self.__data_generation(self.indexes[index])
def _get_model_rank(self):
if self.model is not None:
mod_out = self.model.output_shape
if isinstance(mod_out, tuple): # only one output branch: (None, ahead)
mod_rank = 1
elif isinstance(mod_out, list): # multiple output branches, e.g.: [(None, ahead), (None, ahead)]
mod_rank = len(mod_out)
else: # pragma: no cover
raise TypeError("model output shape must either be tuple or list.")
return mod_rank
else: # no model provided, assume to use single output
return 1
def __data_generation(self, index: int) -> Tuple[np.ndarray, np.ndarray]:
"""Load pickle data from disk."""
file = self._path % index
with open(file, "rb") as f:
data = pickle.load(f)
return data["X"], data["Y"]
def _concatenate(new: List[np.ndarray], old: List[np.ndarray]) -> List[np.ndarray]:
"""Concatenate two lists of data along axis=0."""
return list(map(lambda n1, n2: np.concatenate((n1, n2), axis=0), old, new))
def _get_batch(self, data_list: List[np.ndarray], b: int) -> List[np.ndarray]:
"""Get batch according to batch size from data list."""
return list(map(lambda data: data[b * self.batch_size:(b+1) * self.batch_size, ...], data_list))
def _permute_data(self, X, Y):
p = np.random.permutation(len(X[0])) # equiv to .shape[0]
X = list(map(lambda x: x[p], X))
Y = list(map(lambda x: x[p], Y))
return X, Y
def _prepare_batches(self) -> None:
Prepare all batches as locally stored files.
Walk through all elements of collection and split (or merge) data according to the batch size. Too long data
sets are divided into multiple batches. Not fully filled batches are merged with data from the next collection
element. If data is remaining after the last element, it is saved as smaller batch. All batches are enumerated
beginning from 0. A list with all batch numbers is stored in class's parameter indexes.
index = 0
remaining = None
mod_rank = self._get_model_rank()
for data in self._collection:
X = data.get_X(upsampling=self.upsampling)
Y = [data.get_Y(upsampling=self.upsampling)[0] for _ in range(mod_rank)]
if self.upsampling:
X, Y = self._permute_data(X, Y)
if remaining is not None:
X, Y = self._concatenate(X, remaining[0]), self._concatenate(Y, remaining[1])
length = X[0].shape[0]
batches = self._get_number_of_mini_batches(length)
for b in range(batches):
batch_X, batch_Y = self._get_batch(X, b), self._get_batch(Y, b)
self._save_to_pickle(X=batch_X, Y=batch_Y, index=index)
index += 1
if (batches * self.batch_size) < length: # keep remaining to concatenate with next data element
remaining = (self._get_batch(X, batches), self._get_batch(Y, batches))
remaining = None
if remaining is not None: # add remaining as smaller batch
self._save_to_pickle(X=remaining[0], Y=remaining[1], index=index)
index += 1
self.indexes = np.arange(0, index).tolist()
def _save_to_pickle(self, X: List[np.ndarray], Y: List[np.ndarray], index: int) -> None:
"""Save data as pickle file with variables X and Y and given index as <index>.pickle ."""
data = {"X": X, "Y": Y}
file = self._path % index
with open(file, "wb") as f:
pickle.dump(data, f)
def _get_number_of_mini_batches(self, number_of_samples: int) -> int:
"""Return number of mini batches as the floored ration of number of samples to batch size."""
return math.floor(number_of_samples / self.batch_size)
def _cleanup_path(path: str, create_new: bool = True) -> None:
"""First remove existing path, second create empty path if enabled."""
if os.path.exists(path):
if create_new is True:
def on_epoch_end(self) -> None:
"""Randomly shuffle indexes if enabled."""
if self.shuffle is True:
class DummyData: # pragma: no cover
def __init__(self, number_of_samples=np.random.randint(100, 150)):
self.number_of_samples = number_of_samples
def get_X(self):
X1 = np.random.randint(0, 10, size=(self.number_of_samples, 14, 5)) # samples, window, variables
X2 = np.random.randint(21, 30, size=(self.number_of_samples, 10, 2)) # samples, window, variables
X3 = np.random.randint(-5, 0, size=(self.number_of_samples, 1, 2)) # samples, window, variables
return [X1, X2, X3]
def get_Y(self):
Y1 = np.random.randint(0, 10, size=(self.number_of_samples, 5, 1)) # samples, window, variables
Y2 = np.random.randint(21, 30, size=(self.number_of_samples, 5, 1)) # samples, window, variables
return [Y1, Y2]
if __name__ == "__main__":
collection = []
for _ in range(3):
data_collection = DataCollection(collection=collection)
path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "testdata")
iterator = KerasIterator(data_collection, 25, path, shuffle=True)
for data in data_collection:
\ No newline at end of file
......@@ -42,21 +42,27 @@ class OrdinaryLeastSquaredModel:
return self.ordinary_least_squared_model(self.x, self.y)
def _set_x_y_from_generator(self):
data_x = None
data_y = None
data_x, data_y = None, None
for item in self.generator:
x = self.reshape_xarray_to_numpy(item[0])
y = item[1].values
data_x = np.concatenate((data_x, x), axis=0) if data_x is not None else x
data_y = np.concatenate((data_y, y), axis=0) if data_y is not None else y
self.x = data_x
self.y = data_y
x, y = item.get_data(as_numpy=True)
x = self.flatten(x)
data_x = self._concatenate(x, data_x)
data_y = self._concatenate(y, data_y)
self.x, self.y = np.concatenate(data_x, axis=1), data_y[0]
def _concatenate(self, new, old):
return list(map(lambda n1, n2: np.concatenate((n1, n2), axis=0), old, new)) if old is not None else new
def predict(self, data):
"""Apply OLS model on data."""
data = sm.add_constant(self.reshape_xarray_to_numpy(data), has_constant="add")
return np.atleast_2d(self.model.predict(data))
def flatten(data):
shapes = list(map(lambda x: x.shape, data))
return list(map(lambda x, shape: x.reshape(shape[0], -1), data, shapes))
def reshape_xarray_to_numpy(data):
"""Reshape xarray data to numpy data and flatten."""
This diff is collapsed.
......@@ -29,7 +29,7 @@ def run(stations=None,
params = inspect.getfullargspec(DefaultWorkflow).args
kwargs = {k: v for k, v in locals().items() if k in params and v is not None}
......@@ -39,5 +39,4 @@ def run(stations=None,
if __name__ == "__main__":
run(stations=["DEBW013","DEBW025"], statistics_per_var={'o3': 'dma8eu', "temp": "maximum"}, trainable=True, create_new_model=True)
......@@ -18,7 +18,7 @@ from src.configuration.defaults import DEFAULT_STATIONS, DEFAULT_VAR_ALL_DICT, D
from src.data_handling import DataPrepJoin
from src.data_handling.advanced_data_handling import DefaultDataPreparation
from src.run_modules.run_environment import RunEnvironment
from src.model_modules.model_class import MyLittleModel as VanillaModel
......@@ -228,8 +228,8 @@ class ExperimentSetup(RunEnvironment):
create_new_model = None, bootstrap_path=None, permute_data_on_training = None, transformation=None,
train_min_length=None, val_min_length=None, test_min_length=None, extreme_values: list = None,
extremes_on_right_tail_only: bool = None, evaluate_bootstraps=None, plot_list=None, number_of_bootstraps=None,
create_new_bootstraps=None, data_path: str = None, login_nodes=None, hpc_hosts=None, model=None,
batch_size=None, epochs=None, data_preparation=None):
create_new_bootstraps=None, data_path: str = None, batch_path: str = None, login_nodes=None,
hpc_hosts=None, model=None, batch_size=None, epochs=None, data_preparation=None):
# create run framework
......@@ -265,6 +265,9 @@ class ExperimentSetup(RunEnvironment):"Experiment path is: {experiment_path}")
# batch path (temporary)
self._set_param("batch_path", batch_path, default=os.path.join(experiment_path, "batch_data"))
# set model path
self._set_param("model_path", None, os.path.join(experiment_path, "model"))
......@@ -297,7 +300,7 @@ class ExperimentSetup(RunEnvironment):
self._set_param("sampling", sampling)
self._set_param("transformation", transformation, default=DEFAULT_TRANSFORMATION)
self._set_param("transformation", None, scope="preprocessing")
self._set_param("data_preparation", data_preparation, default=DataPrepJoin)
self._set_param("data_preparation", data_preparation, default=DefaultDataPreparation)
# target
self._set_param("target_var", target_var, default=DEFAULT_TARGET_VAR)
......@@ -344,6 +347,7 @@ class ExperimentSetup(RunEnvironment):
self._set_param("number_of_bootstraps", number_of_bootstraps, default=DEFAULT_NUMBER_OF_BOOTSTRAPS,
self._set_param("plot_list", plot_list, default=DEFAULT_PLOT_LIST, scope="general.postprocessing")
self._set_param("neighbors", ["DEBW030"]) # TODO: just for testing
# check variables, statistics and target variable
......@@ -70,7 +70,7 @@ class ModelSetup(RunEnvironment):
def _run(self):
# set channels depending on inputs
# build model graph using settings from my_model_settings()
......@@ -88,10 +88,12 @@ class ModelSetup(RunEnvironment):
# compile model
def _set_channels(self):
"""Set channels as number of variables of train generator."""
channels = self.data_store.get("generator", "train")[0][0].shape[-1]
self.data_store.set("channels", channels, self.scope)
def _set_shapes(self):
"""Set input and output shapes from train collection."""
shape = list(map(lambda x: x.shape[1:], self.data_store.get("data_collection", "train")[0].get_X()))
self.data_store.set("shape_inputs", shape, self.scope)
shape = list(map(lambda y: y.shape[1:], self.data_store.get("data_collection", "train")[0].get_Y()))
self.data_store.set("shape_outputs", shape, self.scope)
def compile_model(self):
......@@ -128,8 +130,8 @@ class ModelSetup(RunEnvironment):'no weights to reload...')
def build_model(self):
"""Build model using window_history_size, window_lead_time and channels from data store."""
args_list = ["window_history_size", "window_lead_time", "channels"]
"""Build model using input and output shapes from data store."""
args_list = ["shape_inputs", "shape_outputs"]
args = self.data_store.create_args_dict(args_list, self.scope)
model = self.data_store.get("model_class")
self.model = model(**args)
......@@ -13,7 +13,7 @@ import numpy as np
import pandas as pd
import xarray as xr
from src.data_handling import BootStraps, Distributor, DataGenerator, DataPrepJoin
from src.data_handling import BootStraps, Distributor, DataGenerator, DataPrepJoin, KerasIterator
from src.helpers.datastore import NameNotFoundInDataStore
from src.helpers import TimeTracking, statistics
from src.model_modules.linear_model import OrdinaryLeastSquaredModel
......@@ -65,11 +65,12 @@ class PostProcessing(RunEnvironment):
self.model: keras.Model = self._load_model()
self.ols_model = None
self.batch_size: int = self.data_store.get_default("batch_size", "model", 64)
self.test_data: DataGenerator = self.data_store.get("generator", "test")
self.test_data_distributed = Distributor(self.test_data, self.model, self.batch_size)
self.train_data: DataGenerator = self.data_store.get("generator", "train")
self.val_data: DataGenerator = self.data_store.get("generator", "val")
self.train_val_data: DataGenerator = self.data_store.get("generator", "train_val")
self.test_data = self.data_store.get("data_collection", "test")
batch_path = self.data_store.get("batch_path", scope="test")
self.test_data_distributed = KerasIterator(self.test_data, self.batch_size, model=self.model, name="test", batch_path=batch_path)
self.train_data = self.data_store.get("data_collection", "train")
self.val_data = self.data_store.get("data_collection", "val")
self.train_val_data = self.data_store.get("data_collection", "train_val")
self.plot_path: str = self.data_store.get("plot_path")
self.target_var = self.data_store.get("target_var")
self._sampling = self.data_store.get("sampling")
......@@ -311,17 +312,17 @@ class PostProcessing(RunEnvironment):
be found inside `forecast_path`.
logging.debug("start make_prediction")
for i, _ in enumerate(self.test_data):
data = self.test_data.get_data_generator(i)
input_data = data.get_transposed_history()
for i, data in enumerate(self.test_data):
input_data = data.get_X()
target_data = data.get_Y()
# get scaling parameters
mean, std, transformation_method = data.get_transformation_information(variable=self.target_var)
# mean, std, transformation_method = data.get_transformation_information(variable=self.target_var)
for normalised in [True, False]:
# create empty arrays
nn_prediction, persistence_prediction, ols_prediction, observation = self._create_empty_prediction_arrays(
data, count=4)
target_data, count=4)
# nn forecast
nn_prediction = self._create_nn_forecast(input_data, nn_prediction, mean, std, transformation_method,
......@@ -459,8 +460,8 @@ class PostProcessing(RunEnvironment):
return nn_prediction
def _create_empty_prediction_arrays(generator, count=1):
return [generator.label.copy() for _ in range(count)]
def _create_empty_prediction_arrays(target_data, count=1):
return [target_data.copy() for _ in range(count)]
def create_fullindex(df: Union[xr.DataArray, pd.DataFrame, pd.DatetimeIndex], freq: str) -> pd.DataFrame:
......@@ -11,6 +11,7 @@ import numpy as np
import pandas as pd
from src.data_handling import DataGenerator
from src.data_handling import DataCollection
from src.helpers import TimeTracking
from src.configuration import path_config
from src.helpers.join import EmptyQueryResult
......@@ -59,10 +60,9 @@ class PreProcessing(RunEnvironment):
def _run(self):
args = self.data_store.create_args_dict(DEFAULT_ARGS_LIST, scope="preprocessing")
kwargs = self.data_store.create_args_dict(DEFAULT_KWARGS_LIST, scope="preprocessing")
stations = self.data_store.get("stations")
valid_stations = self.check_valid_stations(args, kwargs, stations, load_tmp=False, save_tmp=False, name="all")
data_preparation = self.data_store.get("data_preparation")
_, valid_stations = self.validate_station(data_preparation, stations, "preprocessing", overwrite_local_data=True)
self.data_store.set("stations", valid_stations)
......@@ -70,16 +70,14 @@ class PreProcessing(RunEnvironment):
def report_pre_processing(self):
"""Log some metrics on data and create latex report."""
logging.debug(20 * '##')
n_train = len(self.data_store.get('generator', 'train'))
n_val = len(self.data_store.get('generator', 'val'))
n_test = len(self.data_store.get('generator', 'test'))
n_train = len(self.data_store.get('data_collection', 'train'))
n_val = len(self.data_store.get('data_collection', 'val'))
n_test = len(self.data_store.get('data_collection', 'test'))
n_total = n_train + n_val + n_test
logging.debug(f"Number of all stations: {n_total}")
logging.debug(f"Number of training stations: {n_train}")
logging.debug(f"Number of val stations: {n_val}")
logging.debug(f"Number of test stations: {n_test}")
logging.debug(f"TEST SHAPE OF GENERATOR CALL: {self.data_store.get('generator', 'test')[0][0].shape}"
f"{self.data_store.get('generator', 'test')[0][1].shape}")
def create_latex_report(self):
......@@ -121,11 +119,12 @@ class PreProcessing(RunEnvironment):
set_names = ["train", "val", "test"]
df = pd.DataFrame(columns=meta_data + set_names)
for set_name in set_names:
data: DataGenerator = self.data_store.get("generator", set_name)
for station in data.stations:
df.loc[station, set_name] = data.get_data_generator(station).get_transposed_label().shape[0]
if df.loc[station, meta_data].isnull().any():
df.loc[station, meta_data] = data.get_data_generator(station).meta.loc[meta_data].values.flatten()
data = self.data_store.get("data_collection", set_name)
for station in data:
station_name = str(station.id_class)
df.loc[station_name, set_name] = station.get_Y()[0].shape[0]
if df.loc[station_name, meta_data].isnull().any():
df.loc[station_name, meta_data] = station.id_class.meta.loc[meta_data].values.flatten()
df.loc["# Samples", set_name] = df.loc[:, set_name].sum()
df.loc["# Stations", set_name] = df.loc[:, set_name].count()
df[meta_round] = df[meta_round].astype(float).round(precision)
......@@ -147,7 +146,7 @@ class PreProcessing(RunEnvironment):
Split data into subsets.
Currently: train, val, test and train_val (actually this is only the merge of train and val, but as an separate
generator). IMPORTANT: Do not change to order of the execution of create_set_split. The train subset needs
data_collection). IMPORTANT: Do not change to order of the execution of create_set_split. The train subset needs
always to be executed at first, to set a proper transformation.
fraction_of_training = self.data_store.get("fraction_of_training")
......@@ -159,7 +158,7 @@ class PreProcessing(RunEnvironment):
raise AssertionError(f"Make sure, that the train subset is always at first execution position! Given subset"
f"order was: {subset_names}.")
for (ind, scope) in zip([train_index, val_index, test_index, train_val_index], subset_names):
self.create_set_split(ind, scope)
self.create_set_split_new(ind, scope)