Commit 6db0375f authored by lukas leufen's avatar lukas leufen

Merge branch 'develop' into 'master'

include new development

Closes #39, #46, #36, #4, #37, #30, #32, #34, and #6

See merge request toar/machinelearningtools!37
parents eff70314 926cccdf
Pipeline #30916 passed with stages
in 5 minutes and 49 seconds
......@@ -58,4 +58,4 @@ htmlcov/
/test/test_modules/data/
report.html
/TestExperiment/
/testrun_network/
/testrun_network*/
......@@ -6,4 +6,10 @@ This is a collection of all relevant functions used for ML stuff in the ESDE gro
See a description [here](https://towardsdatascience.com/a-simple-guide-to-the-versions-of-the-inception-network-7fc52b863202)
or take a look on the papers [Going Deeper with Convolutions (Szegedy et al., 2014)](https://arxiv.org/abs/1409.4842)
and [Network In Network (Lin et al., 2014)](https://arxiv.org/abs/1312.4400).
\ No newline at end of file
and [Network In Network (Lin et al., 2014)](https://arxiv.org/abs/1312.4400).
# Installation
* Install __proj__ on your machine using the console. E.g. for opensuse / leap `zypper install proj`
* c++ compiler required for cartopy installation
\ No newline at end of file
......@@ -11,4 +11,16 @@ pytest-html
pydot
mock
statsmodels
seaborn
dask==0.20.2
toolz # for dask
cloudpickle # for dask
cython==0.29.14
pyshp
six
pyproj
shapely
Cartopy==0.16.0
matplotlib
pillow
scipy
\ No newline at end of file
......@@ -2,15 +2,15 @@ __author__ = "Lukas Leufen"
__date__ = '2019-11-14'
import logging
import argparse
import logging
from src.run_modules.experiment_setup import ExperimentSetup
from src.run_modules.run_environment import RunEnvironment
from src.run_modules.pre_processing import PreProcessing
from src.run_modules.model_setup import ModelSetup
from src.run_modules.training import Training
from src.run_modules.post_processing import PostProcessing
from src.run_modules.pre_processing import PreProcessing
from src.run_modules.run_environment import RunEnvironment
from src.run_modules.training import Training
def main(parser_args):
......@@ -31,6 +31,7 @@ if __name__ == "__main__":
formatter = '%(asctime)s - %(levelname)s: %(message)s [%(filename)s:%(funcName)s:%(lineno)s]'
logging.basicConfig(format=formatter, level=logging.INFO)
# logging.basicConfig(format=formatter, level=logging.DEBUG)
parser = argparse.ArgumentParser()
parser.add_argument('--experiment_date', metavar='--exp_date', type=str, default=None,
......
import logging
__author__ = "Lukas Leufen"
__date__ = '2019-11-14'
import argparse
import logging
from src.run_modules.run_environment import RunEnvironment
from src.run_modules.experiment_setup import ExperimentSetup
from src.run_modules.model_setup import ModelSetup
from src.run_modules.post_processing import PostProcessing
from src.run_modules.pre_processing import PreProcessing
from src.run_modules.run_environment import RunEnvironment
from src.run_modules.training import Training
class Training(RunEnvironment):
def main(parser_args):
def __init__(self):
super().__init__()
with RunEnvironment():
ExperimentSetup(parser_args, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBW001'],
station_type='background', trainable=True, sampling="hourly", window_history_size=48)
PreProcessing()
ModelSetup()
class PostProcessing(RunEnvironment):
Training()
def __init__(self):
super().__init__()
PostProcessing()
if __name__ == "__main__":
formatter = '%(asctime)s - %(levelname)s: %(message)s [%(filename)s:%(funcName)s:%(lineno)s]'
logging.basicConfig(format=formatter, level=logging.DEBUG)
logging.basicConfig(format=formatter, level=logging.INFO)
# logging.basicConfig(format=formatter, level=logging.DEBUG)
parser = argparse.ArgumentParser()
parser.add_argument('--experiment_date', metavar='--exp_date', type=str, nargs=1, default=None,
parser.add_argument('--experiment_date', metavar='--exp_date', type=str, default=None,
help="set experiment date as string")
parser_args = parser.parse_args()
with RunEnvironment():
ExperimentSetup(parser_args, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBW001'],
station_type='background')
PreProcessing()
args = parser.parse_args(["--experiment_date", "testrun"])
main(args)
join_settings.py
\ No newline at end of file
......@@ -45,7 +45,7 @@ class Distributor(keras.utils.Sequence):
for prev, curr in enumerate(range(1, num_mini_batches+1)):
x = x_total[prev*self.batch_size:curr*self.batch_size, ...]
y = [y_total[prev*self.batch_size:curr*self.batch_size, ...] for _ in range(mod_rank)]
if x is not None:
if x is not None: # pragma: no branch
yield (x, y)
if (k + 1) == len(self.generator) and curr == num_mini_batches and not fit_call:
return
......
__author__ = 'Felix Kleinert, Lukas Leufen'
__date__ = '2019-11-07'
import os
from typing import Union, List, Tuple, Any
import keras
import xarray as xr
import pickle
import logging
from src import helpers
from src.data_handling.data_preparation import DataPrep
import os
from typing import Union, List, Tuple
import xarray as xr
class DataGenerator(keras.utils.Sequence):
......@@ -23,6 +27,9 @@ class DataGenerator(keras.utils.Sequence):
interpolate_method: str = "linear", limit_nan_fill: int = 1, window_history_size: int = 7,
window_lead_time: int = 4, transform_method: str = "standardise", **kwargs):
self.data_path = os.path.abspath(data_path)
self.data_path_tmp = os.path.join(os.path.abspath(data_path), "tmp")
if not os.path.exists(self.data_path_tmp):
os.makedirs(self.data_path_tmp)
self.network = network
self.stations = helpers.to_list(stations)
self.variables = variables
......@@ -70,11 +77,11 @@ class DataGenerator(keras.utils.Sequence):
if self._iterator < self.__len__():
data = self.get_data_generator()
self._iterator += 1
if data.history is not None and data.label is not None:
if data.history is not None and data.label is not None: # pragma: no branch
return data.history.transpose("datetime", "window", "Stations", "variables"), \
data.label.squeeze("Stations").transpose("datetime", "window")
else:
self.__next__()
self.__next__() # pragma: no cover
else:
raise StopIteration
......@@ -85,24 +92,61 @@ class DataGenerator(keras.utils.Sequence):
:return: The generator's time series of history data and its labels
"""
data = self.get_data_generator(key=item)
return data.history.transpose("datetime", "window", "Stations", "variables"), \
data.label.squeeze("Stations").transpose("datetime", "window")
return data.get_transposed_history(), data.label.squeeze("Stations").transpose("datetime", "window")
def get_data_generator(self, key: Union[str, int] = None) -> DataPrep:
def get_data_generator(self, key: Union[str, int] = None, local_tmp_storage: bool = True) -> DataPrep:
"""
Select data for given key, create a DataPrep object and interpolate, transform, make history and labels and
remove nans.
:param key: station key to choose the data generator.
:param local_tmp_storage: say if data should be processed from scratch or loaded as already processed data from
tmp pickle file to save computational time (but of course more disk space required).
:return: preprocessed data as a DataPrep instance
"""
station = self.get_station_key(key)
data = DataPrep(self.data_path, self.network, station, self.variables, station_type=self.station_type,
**self.kwargs)
data.interpolate(self.interpolate_dim, method=self.interpolate_method, limit=self.limit_nan_fill)
data.transform("datetime", method=self.transform_method)
data.make_history_window(self.interpolate_dim, self.window_history_size)
data.make_labels(self.target_dim, self.target_var, self.interpolate_dim, self.window_lead_time)
data.history_label_nan_remove(self.interpolate_dim)
try:
if not local_tmp_storage:
raise FileNotFoundError
data = self._load_pickle_data(station, self.variables)
except FileNotFoundError:
logging.info(f"load not pickle data for {station}")
data = DataPrep(self.data_path, self.network, station, self.variables, station_type=self.station_type,
**self.kwargs)
data.interpolate(self.interpolate_dim, method=self.interpolate_method, limit=self.limit_nan_fill)
data.transform("datetime", method=self.transform_method)
data.make_history_window(self.interpolate_dim, self.window_history_size)
data.make_labels(self.target_dim, self.target_var, self.interpolate_dim, self.window_lead_time)
data.history_label_nan_remove(self.interpolate_dim)
self._save_pickle_data(data)
return data
def _save_pickle_data(self, data: Any):
"""
Save given data locally as .pickle in self.data_path_tmp with name '<station>_<var1>_<var2>_..._<varX>.pickle'
:param data: any data, that should be saved
"""
date = f"{self.kwargs.get('start')}_{self.kwargs.get('end')}"
vars = '_'.join(sorted(data.variables))
station = ''.join(data.station)
file = os.path.join(self.data_path_tmp, f"{station}_{vars}_{date}_.pickle")
with open(file, "wb") as f:
pickle.dump(data, f)
logging.debug(f"save pickle data to {file}")
def _load_pickle_data(self, station: Union[str, List[str]], variables: List[str]) -> Any:
"""
Load locally saved data from self.data_path_tmp and name '<station>_<var1>_<var2>_..._<varX>.pickle'.
:param station: station to load
:param variables: list of variables to load
:return: loaded data
"""
date = f"{self.kwargs.get('start')}_{self.kwargs.get('end')}"
vars = '_'.join(sorted(variables))
station = ''.join(station)
file = os.path.join(self.data_path_tmp, f"{station}_{vars}_{date}_.pickle")
with open(file, "rb") as f:
data = pickle.load(f)
logging.debug(f"load pickle data from {file}")
return data
def get_station_key(self, key: Union[None, str, int, List[Union[None, str, int]]]) -> str:
......
__author__ = 'Felix Kleinert, Lukas Leufen'
__date__ = '2019-10-16'
import xarray as xr
import pandas as pd
import numpy as np
import datetime as dt
import logging
import os
from src import join, helpers
from src import statistics
from typing import Union, List, Iterable
import datetime as dt
import numpy as np
import pandas as pd
import xarray as xr
from src import join, helpers
from src import statistics
# define a more general date type for type hinting
date = Union[dt.date, dt.datetime]
......@@ -60,50 +60,56 @@ class DataPrep(object):
self.meta = None
self._transform_method = None
self.statistics_per_var = kwargs.get("statistics_per_var", None)
if self.statistics_per_var is not None:
self.sampling = kwargs.get("sampling", "daily")
if self.statistics_per_var is not None or self.sampling == "hourly":
self.load_data()
else:
raise NotImplementedError # hourly data usage is not implemented yet
# self.data, self.meta = Fkf.read_hourly_data_from_csv_to_xarray(self.path, self.network, self.station,
# self.variables, **kwargs)
raise NotImplementedError("Either select hourly data or provide statistics_per_var.")
def load_data(self):
"""
Load data and meta data either from local disk (preferred) or download new data from TOAR database if no local
data is available. The latter case, store downloaded data locally if wished (default yes).
"""
helpers.check_path_and_create(self.path)
file_name = self._set_file_name()
meta_file = self._set_meta_file_name()
try:
logging.debug(f"try to load local data from: {file_name}")
data = self._slice_prep(xr.open_dataarray(file_name))
self.data = self.check_for_negative_concentrations(data)
self.meta = pd.read_csv(meta_file, index_col=0)
if self.station_type is not None:
self.check_station_meta()
logging.debug("loading finished")
except FileNotFoundError as e:
logging.warning(e)
data, self.meta = self.download_data_from_join(file_name, meta_file)
data = self._slice_prep(data)
self.data = self.check_for_negative_concentrations(data)
if self.kwargs.get('overwrite_local_data', False):
logging.debug(f"overwrite_local_data is true, therefore reload {file_name} from JOIN")
if os.path.exists(file_name):
os.remove(file_name)
if os.path.exists(meta_file):
os.remove(meta_file)
self.download_data(file_name, meta_file)
logging.debug("loaded new data from JOIN")
else:
try:
logging.debug(f"try to load local data from: {file_name}")
data = self._slice_prep(xr.open_dataarray(file_name))
self.data = self.check_for_negative_concentrations(data)
self.meta = pd.read_csv(meta_file, index_col=0)
if self.station_type is not None:
self.check_station_meta()
logging.debug("loading finished")
except FileNotFoundError as e:
logging.warning(e)
self.download_data(file_name, meta_file)
logging.debug("loaded new data from JOIN")
def download_data(self, file_name, meta_file):
data, self.meta = self.download_data_from_join(file_name, meta_file)
data = self._slice_prep(data)
self.data = self.check_for_negative_concentrations(data)
def check_station_meta(self):
"""
Search for the entries in meta data and compare the value with the requested values. Raise a FileNotFoundError
if the values mismatch.
"""
check_dict = {
"station_type": self.station_type,
"network_name": self.network
}
check_dict = {"station_type": self.station_type, "network_name": self.network}
for (k, v) in check_dict.items():
if self.meta.at[k, self.station[0]] != v:
logging.debug(f"meta data does not agree which given request for {k}: {v} (requested) != "
logging.debug(f"meta data does not agree with given request for {k}: {v} (requested) != "
f"{self.meta.at[k, self.station[0]]} (local). Raise FileNotFoundError to trigger new "
f"grapping from web.")
raise FileNotFoundError
......@@ -117,7 +123,7 @@ class DataPrep(object):
"""
df_all = {}
df, meta = join.download_join(station_name=self.station, stat_var=self.statistics_per_var,
station_type=self.station_type, network_name=self.network)
station_type=self.station_type, network_name=self.network, sampling=self.sampling)
df_all[self.station[0]] = df
# convert df_all to xarray
xarr = {k: xr.DataArray(v, dims=['datetime', 'variables']) for k, v in df_all.items()}
......@@ -138,8 +144,8 @@ class DataPrep(object):
return f"Dataprep(path='{self.path}', network='{self.network}', station={self.station}, " \
f"variables={self.variables}, station_type={self.station_type}, **{self.kwargs})"
def interpolate(self, dim: str, method: str = 'linear', limit: int = None,
use_coordinate: Union[bool, str] = True, **kwargs):
def interpolate(self, dim: str, method: str = 'linear', limit: int = None, use_coordinate: Union[bool, str] = True,
**kwargs):
"""
(Copy paste from dataarray.interpolate_na)
Interpolate values according to different methods.
......@@ -193,6 +199,7 @@ class DataPrep(object):
Perform inverse transformation
:return:
"""
def f_inverse(data, mean, std, method_inverse):
if method_inverse == 'standardise':
return statistics.standardise_inverse(data, mean, std), None, None
......@@ -319,8 +326,7 @@ class DataPrep(object):
if (self.history is not None) and (self.label is not None):
non_nan_history = self.history.dropna(dim=dim)
non_nan_label = self.label.dropna(dim=dim)
intersect = np.intersect1d(non_nan_history.coords[dim].values,
non_nan_label.coords[dim].values)
intersect = np.intersect1d(non_nan_history.coords[dim].values, non_nan_label.coords[dim].values)
if len(intersect) == 0:
self.history = None
......@@ -380,8 +386,11 @@ class DataPrep(object):
data.loc[..., used_chem_vars] = data.loc[..., used_chem_vars].clip(min=minimum)
return data
def get_transposed_history(self):
if self.history is not None:
return self.history.transpose("datetime", "window", "Stations", "variables")
if __name__ == "__main__":
if __name__ == "__main__":
dp = DataPrep('data/', 'dummy', 'DEBW107', ['o3', 'temp'], statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'})
print(dp)
......@@ -2,8 +2,8 @@ __author__ = 'Lukas Leufen'
__date__ = '2019-11-22'
from typing import Any, List, Tuple, Dict
from abc import ABC
from typing import Any, List, Tuple, Dict
class NameNotFoundInDataStore(Exception):
......@@ -144,6 +144,22 @@ class DataStoreByVariable(AbstractDataStore):
"""
return self._stride_through_scopes(name, scope)[2]
def get_default(self, name: str, scope: str, default: Any) -> Any:
"""
Same functionality like the standard get method. But this method adds a default argument that is returned if no
data was stored in the data store. Use this function with care, because it will not report any errors and just
return the given default value. Currently, there is no statement that reports, if the returned value comes from
the data store or the default value.
:param name: Name to look for
:param scope: scope to search the name for
:param default: default value that is return, if no data was found for given name and scope
:return: the stored object or the default value
"""
try:
return self._stride_through_scopes(name, scope)[2]
except (NameNotFoundInDataStore, NameNotFoundInScope):
return default
def _stride_through_scopes(self, name, scope, depth=0):
if depth <= scope.count("."):
local_scope = scope.rsplit(".", maxsplit=depth)[0]
......@@ -267,6 +283,22 @@ class DataStoreByScope(AbstractDataStore):
"""
return self._stride_through_scopes(name, scope)[2]
def get_default(self, name: str, scope: str, default: Any) -> Any:
"""
Same functionality like the standard get method. But this method adds a default argument that is returned if no
data was stored in the data store. Use this function with care, because it will not report any errors and just
return the given default value. Currently, there is no statement that reports, if the returned value comes from
the data store or the default value.
:param name: Name to look for
:param scope: scope to search the name for
:param default: default value that is return, if no data was found for given name and scope
:return: the stored object or the default value
"""
try:
return self._stride_through_scopes(name, scope)[2]
except (NameNotFoundInDataStore, NameNotFoundInScope):
return default
def _stride_through_scopes(self, name, scope, depth=0):
if depth <= scope.count("."):
local_scope = scope.rsplit(".", maxsplit=depth)[0]
......
......@@ -5,16 +5,17 @@ __date__ = '2019-10-21'
import logging
import keras
import keras.backend as K
import math
from typing import Union
import numpy as np
import os
import time
import socket
import datetime as dt
import keras.backend as K
import xarray as xr
from typing import Dict, Callable
def to_list(arg):
if not isinstance(arg, list):
......@@ -42,55 +43,6 @@ def l_p_loss(power: int):
return loss
class LearningRateDecay(keras.callbacks.History):
"""
Decay learning rate during model training. Start with a base learning rate and lower this rate after every
n(=epochs_drop) epochs by drop value (0, 1], drop value = 1 means no decay in learning rate.
"""
def __init__(self, base_lr: float = 0.01, drop: float = 0.96, epochs_drop: int = 8):
super().__init__()
self.lr = {'lr': []}
self.base_lr = self.check_param(base_lr, 'base_lr')
self.drop = self.check_param(drop, 'drop')
self.epochs_drop = self.check_param(epochs_drop, 'epochs_drop', upper=None)
@staticmethod
def check_param(value: float, name: str, lower: Union[float, None] = 0, upper: Union[float, None] = 1):
"""
Check if given value is in interval. The left (lower) endpoint is open, right (upper) endpoint is closed. To
only one side of the interval, set the other endpoint to None. If both ends are set to None, just return the
value without any check.
:param value: value to check
:param name: name of the variable to display in error message
:param lower: left (lower) endpoint of interval, opened
:param upper: right (upper) endpoint of interval, closed
:return: unchanged value or raise ValueError
"""
if lower is None:
lower = -np.inf
if upper is None:
upper = np.inf
if lower < value <= upper:
return value
else:
raise ValueError(f"{name} is out of allowed range ({lower}, {upper}{')' if upper == np.inf else ']'}: "
f"{name}={value}")
def on_epoch_begin(self, epoch: int, logs=None):
"""
Lower learning rate every epochs_drop epochs by factor drop.
:param epoch: current epoch
:param logs: ?
:return: update keras learning rate
"""
current_lr = self.base_lr * math.pow(self.drop, math.floor(epoch / self.epochs_drop))
K.set_value(self.model.optimizer.lr, current_lr)
self.lr['lr'].append(current_lr)
logging.info(f"Set learning rate to {current_lr}")
return K.get_value(self.model.optimizer.lr)
class TimeTracking(object):
"""
Track time to measure execution time. Time tracking automatically starts on initialisation and ends by calling stop
......@@ -136,25 +88,32 @@ class TimeTracking(object):
def duration(self):
return self._duration()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
logging.info(f"undefined job finished after {self}")
def prepare_host(create_new=True):
def prepare_host(create_new=True, sampling="daily"):
hostname = socket.gethostname()
try:
user = os.getlogin()
except OSError:
user = "default"
if hostname == 'ZAM144':
path = f'/home/{user}/Data/toar_daily/'
elif hostname == 'zam347':
path = f'/home/{user}/Data/toar_daily/'
elif hostname == 'linux-gzsx':
path = f'/home/{user}/machinelearningtools/data/toar_daily/'
elif (len(hostname) > 2) and (hostname[:2] == 'jr'):
path = f'/p/project/cjjsc42/{user}/DATA/toar_daily/'
elif (len(hostname) > 2) and (hostname[:2] == 'jw'):
path = f'/p/home/jusers/{user}/juwels/intelliaq/DATA/toar_daily/'
if hostname == "ZAM144":
path = f"/home/{user}/Data/toar_{sampling}/"
elif hostname == "zam347":
path = f"/home/{user}/Data/toar_{sampling}/"
elif hostname == "linux-aa9b":
path = f"/home/{user}/machinelearningtools/data/toar_{sampling}/"
elif (len(hostname) > 2) and (hostname[:2] == "jr"):
path = f"/p/project/cjjsc42/{user}/DATA/toar_{sampling}/"
elif (len(hostname) > 2) and (hostname[:2] == "jw"):
path = f"/p/home/jusers/{user}/juwels/intelliaq/DATA/toar_{sampling}/"
elif "runner-6HmDp9Qd-project-2411-concurrent" in hostname:
path = f'/home/{user}/machinelearningtools/data/toar_daily/'
path = f"/home/{user}/machinelearningtools/data/toar_{sampling}/"
else:
logging.error(f"unknown host '{hostname}'")
raise OSError(f"unknown host '{hostname}'")
......@@ -173,12 +132,14 @@ def prepare_host(create_new=True):
return path
def set_experiment_name(experiment_date=None, experiment_path=None):
def set_experiment_name(experiment_date=None, experiment_path=None, sampling=None):
if experiment_date is None:
experiment_name = "TestExperiment"
else:
experiment_name = f"{experiment_date}_network"
if sampling == "hourly":
experiment_name += f"_{sampling}"
if experiment_path is None:
experiment_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", experiment_name))
else:
......@@ -197,3 +158,35 @@ class PyTestRegex:
def __repr__(self) -> str:
return self._regex.pattern
def dict_to_xarray(d: Dict, coordinate_name: str) -> xr.DataArray:
"""
Convert a dictionary of 2D-xarrays to single 3D-xarray. The name of new coordinate axis follows <coordinate_name>.
:param d: dictionary with 2D-xarrays
:param coordinate_name: name of the new created axis (2D -> 3D)
:return: combined xarray
"""