Commit eff70314 authored by lukas leufen's avatar lukas leufen

include new development

parents 1f2c0182 c7621247
Pipeline #29044 failed with stages
in 4 minutes and 46 seconds
......@@ -55,5 +55,7 @@ Thumbs.db
htmlcov/
.pytest_cache
/test/data/
/test/test_modules/data/
report.html
/TestExperiment/
/testrun_network/
#!/bin/bash
# run pytest for all modules
# run pytest for all run_modules
python3 -m pytest --html=report.html --self-contained-html test/ | tee test_results.out
IS_FAILED=$?
......
import os
import shutil
def pytest_runtest_teardown(item, nextitem):
"""
Teardown method to clean up folder creations during testing. This method is called after each test, but performs
deletions only after an entire test class was executed.
:param item: tested item
:param nextitem: next item (could be None, if no following test is available)
"""
if nextitem is None or item.cls != nextitem.cls:
# clean up all TestExperiment and data folder that have been created during testing
rel_path = os.path.relpath(item.fspath.dirname, os.path.abspath(__file__))
path = os.path.dirname(__file__)
for stage in filter(None, rel_path.replace("..", ".").split("/")):
path = os.path.abspath(os.path.join(path, stage))
list_dir = os.listdir(path)
if "data" in list_dir and path != os.path.dirname(__file__): # do not delete data folder in src
shutil.rmtree(os.path.join(path, "data"), ignore_errors=True)
if "TestExperiment" in list_dir:
shutil.rmtree(os.path.join(path, "TestExperiment"), ignore_errors=True)
else:
pass # nothing to do if next test is from same test class
......@@ -4,20 +4,27 @@ __date__ = '2019-11-14'
import logging
import argparse
from src.modules.experiment_setup import ExperimentSetup
from src.modules import run, PreProcessing, Training, PostProcessing
from src.run_modules.experiment_setup import ExperimentSetup
from src.run_modules.run_environment import RunEnvironment
from src.run_modules.pre_processing import PreProcessing
from src.run_modules.model_setup import ModelSetup
from src.run_modules.training import Training
from src.run_modules.post_processing import PostProcessing
def main():
with run():
exp_setup = ExperimentSetup(args, trainable=True, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087'])
def main(parser_args):
PreProcessing(exp_setup)
with RunEnvironment():
ExperimentSetup(parser_args, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBW001'],
station_type='background', trainable=True)
PreProcessing()
Training(exp_setup)
ModelSetup()
PostProcessing(exp_setup)
Training()
PostProcessing()
if __name__ == "__main__":
......@@ -26,10 +33,8 @@ if __name__ == "__main__":
logging.basicConfig(format=formatter, level=logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument('--experiment_date', metavar='--exp_date', type=str, nargs=1, default=None,
parser.add_argument('--experiment_date', metavar='--exp_date', type=str, default=None,
help="set experiment date as string")
args = parser.parse_args()
args = parser.parse_args(["--experiment_date", "testrun"])
experiment = ExperimentSetup(args, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087'])
a = 1
# main()
main(args)
from __future__ import generator_stop
__author__ = "Lukas Leufen, Felix Kleinert"
__date__ = '2019-12-05'
import math
import keras
import numpy as np
class Distributor(keras.utils.Sequence):
def __init__(self, generator: keras.utils.Sequence, model: keras.models, batch_size: int = 256,
fit_call: bool = True):
self.generator = generator
self.model = model
self.batch_size = batch_size
self.fit_call = fit_call
def _get_model_rank(self):
mod_out = self.model.output_shape
if isinstance(mod_out, tuple):
# only one output branch: (None, ahead)
mod_rank = 1
elif isinstance(mod_out, list):
# multiple output branches, e.g.: [(None, ahead), (None, ahead)]
mod_rank = len(mod_out)
else: # pragma: no cover
raise TypeError("model output shape must either be tuple or list.")
return mod_rank
def _get_number_of_mini_batches(self, values):
return math.ceil(values[0].shape[0] / self.batch_size)
def distribute_on_batches(self, fit_call=True):
while True:
for k, v in enumerate(self.generator):
# get rank of output
mod_rank = self._get_model_rank()
# get number of mini batches
num_mini_batches = self._get_number_of_mini_batches(v)
x_total = np.copy(v[0])
y_total = np.copy(v[1])
for prev, curr in enumerate(range(1, num_mini_batches+1)):
x = x_total[prev*self.batch_size:curr*self.batch_size, ...]
y = [y_total[prev*self.batch_size:curr*self.batch_size, ...] for _ in range(mod_rank)]
if x is not None:
yield (x, y)
if (k + 1) == len(self.generator) and curr == num_mini_batches and not fit_call:
return
def __len__(self):
num_batch = 0
for _ in self.distribute_on_batches(fit_call=False):
num_batch += 1
return num_batch
......@@ -3,7 +3,7 @@ __date__ = '2019-11-07'
import keras
from src import helpers
from src.data_preparation import DataPrep
from src.data_handling.data_preparation import DataPrep
import os
from typing import Union, List, Tuple
import xarray as xr
......@@ -20,7 +20,7 @@ class DataGenerator(keras.utils.Sequence):
def __init__(self, data_path: str, network: str, stations: Union[str, List[str]], variables: List[str],
interpolate_dim: str, target_dim: str, target_var: str, station_type: str = None,
interpolate_method: str = "linear", limit_nan_fill: int = 1, window_history: int = 7,
interpolate_method: str = "linear", limit_nan_fill: int = 1, window_history_size: int = 7,
window_lead_time: int = 4, transform_method: str = "standardise", **kwargs):
self.data_path = os.path.abspath(data_path)
self.network = network
......@@ -32,7 +32,7 @@ class DataGenerator(keras.utils.Sequence):
self.station_type = station_type
self.interpolate_method = interpolate_method
self.limit_nan_fill = limit_nan_fill
self.window_history = window_history
self.window_history_size = window_history_size
self.window_lead_time = window_lead_time
self.transform_method = transform_method
self.kwargs = kwargs
......@@ -100,7 +100,7 @@ class DataGenerator(keras.utils.Sequence):
**self.kwargs)
data.interpolate(self.interpolate_dim, method=self.interpolate_method, limit=self.limit_nan_fill)
data.transform("datetime", method=self.transform_method)
data.make_history_window(self.interpolate_dim, self.window_history)
data.make_history_window(self.interpolate_dim, self.window_history_size)
data.make_labels(self.target_dim, self.target_var, self.interpolate_dim, self.window_lead_time)
data.history_label_nan_remove(self.interpolate_dim)
return data
......
......@@ -116,7 +116,7 @@ class DataPrep(object):
:return:
"""
df_all = {}
df, meta = join.download_join(station_name=self.station, statvar=self.statistics_per_var,
df, meta = join.download_join(station_name=self.station, stat_var=self.statistics_per_var,
station_type=self.station_type, network_name=self.network)
df_all[self.station[0]] = df
# convert df_all to xarray
......@@ -249,6 +249,17 @@ class DataPrep(object):
else:
self.inverse_transform()
def get_transformation_information(self, variable):
try:
mean = self.mean.sel({'variables': variable}).values
except AttributeError:
mean = None
try:
std = self.std.sel({'variables': variable}).values
except AttributeError:
std = None
return mean, std, self._transform_method
def make_history_window(self, dim: str, window: int) -> None:
"""
This function uses shifts the data window+1 times and returns a xarray which has a new dimension 'window'
......
......@@ -2,7 +2,7 @@ __author__ = 'Lukas Leufen'
__date__ = '2019-11-22'
from typing import Any, List, Tuple
from typing import Any, List, Tuple, Dict
from abc import ABC
......@@ -30,15 +30,15 @@ class EmptyScope(Exception):
class AbstractDataStore(ABC):
"""
Data store for all settings for the experiment workflow to save experiment parameters for the proceeding modules
Data store for all settings for the experiment workflow to save experiment parameters for the proceeding run_modules
and predefine parameters loaded during the experiment setup phase. The data store is hierarchically structured, so
that global settings can be overwritten by local adjustments.
"""
def __init__(self):
# empty initialise the data-store variables
self._store = {}
self._store: Dict = {}
def put(self, name: str, obj: Any, scope: str) -> None:
def set(self, name: str, obj: Any, scope: str) -> None:
"""
Abstract method to add an object to the data store
:param name: Name of object to store
......@@ -89,11 +89,24 @@ class AbstractDataStore(ABC):
def clear_data_store(self) -> None:
self._store = {}
def create_args_dict(self, arg_list: List[str], scope: str = "general") -> Dict:
args = {}
for arg in arg_list:
try:
args[arg] = self.get(arg, scope)
except (NameNotFoundInDataStore, NameNotFoundInScope):
pass
return args
def set_args_from_dict(self, arg_dict: Dict, scope: str = "general") -> None:
for (k, v) in arg_dict.items():
self.set(k, v, scope)
class DataStoreByVariable(AbstractDataStore):
"""
Data store for all settings for the experiment workflow to save experiment parameters for the proceeding modules
Data store for all settings for the experiment workflow to save experiment parameters for the proceeding run_modules
and predefine parameters loaded during the experiment setup phase. The data store is hierarchically structured, so
that global settings can be overwritten by local adjustments.
......@@ -106,7 +119,7 @@ class DataStoreByVariable(AbstractDataStore):
<scope3>: value
"""
def put(self, name: str, obj: Any, scope: str) -> None:
def set(self, name: str, obj: Any, scope: str) -> None:
"""
Store an object `obj` with given `name` under `scope`. In the current implementation, existing entries are
overwritten.
......@@ -217,7 +230,7 @@ class DataStoreByVariable(AbstractDataStore):
class DataStoreByScope(AbstractDataStore):
"""
Data store for all settings for the experiment workflow to save experiment parameters for the proceeding modules
Data store for all settings for the experiment workflow to save experiment parameters for the proceeding run_modules
and predefine parameters loaded during the experiment setup phase. The data store is hierarchically structured, so
that global settings can be overwritten by local adjustments.
......@@ -230,7 +243,7 @@ class DataStoreByScope(AbstractDataStore):
<variable3>: value
"""
def put(self, name: str, obj: Any, scope: str) -> None:
def set(self, name: str, obj: Any, scope: str) -> None:
"""
Store an object `obj` with given `name` under `scope`. In the current implementation, existing entries are
overwritten.
......
import re
__author__ = 'Lukas Leufen'
__author__ = 'Lukas Leufen, Felix Kleinert'
__date__ = '2019-10-21'
......@@ -13,6 +13,7 @@ import numpy as np
import os
import time
import socket
import datetime as dt
def to_list(arg):
......@@ -116,7 +117,8 @@ class TimeTracking(object):
return time.time() - self.start
def __repr__(self):
return f"{round(self._duration(), 2)}s"
# return f"{round(self._duration(), 2)}s"
return f"{dt.timedelta(seconds=math.ceil(self._duration()))} (hh:mm:ss)"
def run(self):
self._start()
......@@ -176,11 +178,11 @@ def set_experiment_name(experiment_date=None, experiment_path=None):
if experiment_date is None:
experiment_name = "TestExperiment"
else:
experiment_name = f"{experiment_date}_network/"
experiment_name = f"{experiment_date}_network"
if experiment_path is None:
experiment_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", experiment_name))
else:
experiment_path = os.path.abspath(experiment_path)
experiment_path = os.path.join(os.path.abspath(experiment_path), experiment_name)
return experiment_name, experiment_path
......
......@@ -6,10 +6,11 @@ import requests
import logging
import pandas as pd
import datetime as dt
from typing import Iterator, Union, List
from typing import Iterator, Union, List, Dict
from src import helpers
join_url_base = 'https://join.fz-juelich.de/services/rest/surfacedata/'
str_or_none = Union[str, None]
class EmptyQueryResult(Exception):
......@@ -19,54 +20,46 @@ class EmptyQueryResult(Exception):
pass
def download_join(station_name: Union[str, List[str]], statvar: dict, station_type: str = None, network_name: str = None) -> [pd.DataFrame, pd.DataFrame]:
def download_join(station_name: Union[str, List[str]], stat_var: dict, station_type: str = None,
network_name: str = None) -> [pd.DataFrame, pd.DataFrame]:
"""
read data from JOIN/TOAR
:param station_name: Station name e.g. DEBY122
:param statvar: key as variable like 'O3', values as statistics on keys like 'mean'
:param station_type:
:param network_name:
:param stat_var: key as variable like 'O3', values as statistics on keys like 'mean'
:param station_type: set the station type like "traffic" or "background", can be none
:param network_name: set the measurement network like "UBA" or "AIRBASE", can be none
:returns:
- df - pandas df with all variables and statistics
- meta - pandas df with all meta information
- df - data frame with all variables and statistics
- meta - data frame with all meta information
"""
# make sure station_name parameter is a list
station_name = helpers.to_list(station_name)
# load series information
opts = {"base": join_url_base, "service": "series", "station_id": station_name[0], "station_type": station_type,
"network_name": network_name}
url = create_url(**opts)
response = requests.get(url)
station_vars = response.json()
vars_dict = {item[3].lower(): item[0] for item in station_vars}
vars_dict = load_series_information(station_name, station_type, network_name)
# download all variables with given statistic
data = None
df = None
for var in _lower_list(sorted(vars_dict.keys())):
if var in statvar.keys():
if var in stat_var.keys():
logging.info('load: {}'.format(var))
# create data link
opts = {'base': join_url_base, 'service': 'stats', 'id': vars_dict[var], 'statistics': statvar[var],
opts = {'base': join_url_base, 'service': 'stats', 'id': vars_dict[var], 'statistics': stat_var[var],
'sampling': 'daily', 'capture': 0, 'min_data_length': 1460}
url = create_url(**opts)
# load data
response = requests.get(url)
data = response.json()
data = get_data(opts)
# correct namespace of statistics
stat = _correct_stat_name(statvar[var])
stat = _correct_stat_name(stat_var[var])
# store data in pandas dataframe
index = map(lambda s: dt.datetime.strptime(s, "%Y-%m-%d %H:%M"), data['datetime'])
if df is None:
df = pd.DataFrame(data[stat], index=index, columns=[var])
else:
df = pd.concat([df, pd.DataFrame(data[stat], index=index, columns=[var])], axis=1)
df = _save_to_pandas(df, data, stat, var)
logging.debug('finished: {}'.format(var))
if data:
......@@ -77,6 +70,51 @@ def download_join(station_name: Union[str, List[str]], statvar: dict, station_ty
raise EmptyQueryResult("No data found in JOIN.")
def get_data(opts: Dict) -> Union[Dict, List]:
"""
Download join data using requests framework. Data is returned as json like structure. Depending on the response
structure, this can lead to a list or dictionary.
:param opts: options to create the request url
:return: requested data (either as list or dictionary)
"""
url = create_url(**opts)
response = requests.get(url)
return response.json()
def load_series_information(station_name: List[str], station_type: str_or_none, network_name: str_or_none) -> Dict:
"""
List all series ids that are available for given station id and network name.
:param station_name: Station name e.g. DEBW107
:param station_type: station type like "traffic" or "background"
:param network_name: measurement network of the station like "UBA" or "AIRBASE"
:return: all available series for requested station stored in an dictionary with parameter name (variable) as key
and the series id as value.
"""
opts = {"base": join_url_base, "service": "series", "station_id": station_name[0], "station_type": station_type,
"network_name": network_name}
station_vars = get_data(opts)
vars_dict = {item[3].lower(): item[0] for item in station_vars}
return vars_dict
def _save_to_pandas(df: Union[pd.DataFrame, None], data: dict, stat: str, var: str) -> pd.DataFrame:
"""
Save given data in data frame. If given data frame is not empty, the data is appened as new column.
:param df: data frame to append the new data, can be none
:param data: new data to append or format as data frame containing the keys 'datetime' and '<stat>'
:param stat: extracted statistic to get values from data (e.g. 'mean', 'dma8eu')
:param var: variable the data is from (e.g. 'o3')
:return: new created or concatenated data frame
"""
index = map(lambda s: dt.datetime.strptime(s, "%Y-%m-%d %H:%M"), data['datetime'])
if df is None:
df = pd.DataFrame(data[stat], index=index, columns=[var])
else:
df = pd.concat([df, pd.DataFrame(data[stat], index=index, columns=[var])], axis=1)
return df
def _correct_stat_name(stat: str) -> str:
"""
Map given statistic name to new namespace defined by mapping dict. Return given name stat if not element of mapping
......@@ -98,7 +136,7 @@ def _lower_list(args: List[str]) -> Iterator[str]:
yield string.lower()
def create_url(base: str, service: str, **kwargs: Union[str, int, float]) -> str:
def create_url(base: str, service: str, **kwargs: Union[str, int, float, None]) -> str:
"""
create a request url with given base url, service type and arbitrarily many additional keyword arguments
:param base: basic url of the rest service
......@@ -106,7 +144,9 @@ def create_url(base: str, service: str, **kwargs: Union[str, int, float]) -> str
:param kwargs: keyword pairs for optional request specifications, e.g. 'statistics=maximum'
:return: combined url as string
"""
url = '{}{}/?'.format(base, service) + '&'.join('{}={}'.format(k, v) for k, v in kwargs.items() if v is not None)
if not base.endswith("/"):
base += "/"
url = f"{base}{service}/?{'&'.join(f'{k}={v}' for k, v in kwargs.items() if v is not None)}"
return url
......
__author__ = "Felix Kleinert, Lukas Leufen"
__date__ = '2019-12-02'
import keras
from typing import Callable
def flatten_tail(input_X: keras.layers, name: str, bound_weight: bool = False, dropout_rate: float = 0.0,
window_lead_time: int = 4, activation: Callable = keras.activations.relu,
reduction_filter: int = 64, first_dense: int = 64):
X_in = keras.layers.Conv2D(reduction_filter, (1, 1), padding='same', name='{}_Conv_1x1'.format(name))(input_X)
X_in = activation(name='{}_conv_act'.format(name))(X_in)
X_in = keras.layers.Flatten(name='{}'.format(name))(X_in)
X_in = keras.layers.Dropout(dropout_rate, name='{}_Dropout_1'.format(name))(X_in)
X_in = keras.layers.Dense(first_dense, kernel_regularizer=keras.regularizers.l2(0.01),
name='{}_Dense_1'.format(name))(X_in)
if bound_weight:
X_in = keras.layers.Activation('tanh')(X_in)
else:
try:
X_in = activation(name='{}_act'.format(name))(X_in)
except:
X_in = activation()(X_in)
X_in = keras.layers.Dropout(dropout_rate, name='{}_Dropout_2'.format(name))(X_in)
out = keras.layers.Dense(window_lead_time, activation='linear', kernel_regularizer=keras.regularizers.l2(0.01),
name='{}_Dense_2'.format(name))(X_in)
return out
......@@ -3,6 +3,7 @@ __date__ = '2019-10-22'
import keras
import keras.layers as layers
import logging
class InceptionModelBase:
......@@ -51,7 +52,7 @@ class InceptionModelBase:
regularizer = kwargs.get('regularizer', keras.regularizers.l2(0.01))
bn_settings = kwargs.get('bn_settings', {})
act_settings = kwargs.get('act_settings', {})
print(f'Inception Block with activation: {activation}')
logging.debug(f'Inception Block with activation: {activation}')
block_name = f'Block_{self.number_of_blocks}{self.block_part_name()}_{tower_kernel[0]}x{tower_kernel[1]}'
......
__author__ = "Felix Kleinert, Lukas Leufen"
__date__ = '2019-12-11'
import statsmodels.api as sm
import numpy as np
class OrdinaryLeastSquaredModel:
def __init__(self, generator):
self.x = []
self.y = []
self.generator = generator
self.model = self.train_ols_model_from_generator()
def train_ols_model_from_generator(self):
self.set_x_y_from_generator()
self.x = sm.add_constant(self.x)
return self.ordinary_least_squared_model(self.x, self.y)
def set_x_y_from_generator(self):
data_x = None
data_y = None
for item in self.generator:
x = self.reshape_xarray_to_numpy(item[0])
y = item[1].values
data_x = np.concatenate((data_x, x), axis=0) if data_x is not None else x
data_y = np.concatenate((data_y, y), axis=0) if data_y is not None else y
self.x = data_x
self.y = data_y
def predict(self, data):
data = sm.add_constant(self.reshape_xarray_to_numpy(data))
return self.model.predict(data)
@staticmethod
def reshape_xarray_to_numpy(data):
shape = data.values.shape
res = data.values.reshape(shape[0], shape[1] * shape[3])
return res
@staticmethod
def ordinary_least_squared_model(x, y):
ols_model = sm.OLS(y, x)
return ols_model.fit()
__author__ = "Lukas Leufen"
__date__ = '2019-12-12'
from abc import ABC
from typing import Any, Callable
import keras
from src import helpers
class AbstractModelClass(ABC):
"""
The AbstractModelClass provides a unified skeleton for any model provided to the machine learning workflow. The
model can always be accessed by calling ModelClass.model or directly by an model method without parsing the model
attribute name (e.g. ModelClass.model.compile -> ModelClass.compile). Beside the model, this class provides the
corresponding loss function.
"""
def __init__(self) -> None:
"""
Predefine internal attributes for model and loss.
"""
self.__model = None
self.__loss = None
def __getattr__(self, name: str) -> Any: