Commit 1f2c0182 authored by lukas leufen's avatar lukas leufen

Merge branch 'develop' into 'master'

update to v0.4.0

Closes #23, #27, #14, #26, #24, #25, and #13

See merge request toar/machinelearningtools!17
parents 3b54d872 d70a6b27
Pipeline #26710 passed with stages
in 2 minutes and 34 seconds
......@@ -23,3 +23,6 @@ exclude_lines =
# Don't complain about import statements
# Don't complain about abstract class declarations and placeholders
......@@ -47,7 +47,7 @@ tests:
when: always
- badges/
- test/
- test_results/
......@@ -90,7 +90,7 @@ pages:
- cp -af coverage/. public/coverage
- ls public/coverage
- mkdir -p public/test
- cp -af test/. public/test
- cp -af test_results/. public/test
- ls public/test
- ls public
when: always
......@@ -101,7 +101,7 @@ pages:
- public
- badges/
- coverage/
- test/
- test_results/
key: old-pages
......@@ -6,14 +6,14 @@ python3 -m pytest --html=report.html --self-contained-html test/ | tee test_resu
# move html test report
mkdir test/
mkdir test_results/
BRANCH_NAME=$( echo -e "${CI_COMMIT_REF_NAME////_}")
mkdir test/${BRANCH_NAME}
mkdir test/recent
cp report.html test/${BRANCH_NAME}/.
cp report.html test/recent/.
mkdir test_results/${BRANCH_NAME}
mkdir test_results/recent
cp report.html test_results/${BRANCH_NAME}/.
cp report.html test_results/recent/.
if [[ "${CI_COMMIT_REF_NAME}" = "master" ]]; then
cp -r report.html test/.
cp -r report.html test_results/.
# exit 0 if no tests implemented
......@@ -3,145 +3,33 @@ __date__ = '2019-11-14'
import logging
from src.helpers import TimeTracking
from src import helpers
import argparse
import time
from src.modules.experiment_setup import ExperimentSetup
from src.modules import run, PreProcessing, Training, PostProcessing
formatter = "%(asctime)s - %(levelname)s: %(message)s [%(filename)s:%(funcName)s:%(lineno)s]"
logging.basicConfig(level=logging.INFO, format=formatter)
def main():
with run():
exp_setup = ExperimentSetup(args, trainable=True, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087'])
class run(object):
basic run class to measure execution time. Either call this class calling it by 'with' or delete the class instance
after finishing the measurement. The duration result is logged.
def __init__(self):
self.time = TimeTracking()"{self.__class__.__name__} started")
def __del__(self):
self.time.stop()"{self.__class__.__name__} finished after {self.time}")
def __enter__(self):
def __exit__(self, exc_type, exc_val, exc_tb):
def do_stuff(self):
class ExperimentSetup:
trainable: Train new model if true, otherwise try to load existing model
def __init__(self, **kwargs):
self.data_path = None
self.experiment_path = None
self.experiment_name = None
self.trainable = None
self.fraction_of_train = None
self.use_all_stations_on_all_data_sets = None = None
self.var_all_dict = None
self.all_stations = None
self.variables = None
self.dimensions = None
self.dim = None
self.target_dim = None
self.target_var = None
def _set_param(self, param, value, default=None):
if default is not None:
value = value.get(param, default)
setattr(self, param, value)"set experiment attribute: {param}={value}")
def setup_experiment(self, **kwargs):
# set data path of this experiment
self._set_param("data_path", helpers.prepare_host())
# set experiment name
exp_date = args.experiment_date
exp_name, exp_path = helpers.set_experiment_name(experiment_date=exp_date)
self._set_param("experiment_name", exp_name)
self._set_param("experiment_path", exp_path)
# set if model is trainable
self._set_param("trainable", kwargs, default=True)
# set fraction of train
self._set_param("fraction_of_train", kwargs, default=0.8)
# use all stations on all data sets (train, val, test)
self._set_param("use_all_stations_on_all_data_sets", kwargs, default=True)
self._set_param("network", kwargs, default="AIRBASE")
self._set_param("var_all_dict", kwargs, default={'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum',
'u': 'average_values', 'v': 'average_values', 'no': 'dma8eu',
'no2': 'dma8eu', 'cloudcover': 'average_values',
'pblheight': 'maximum'})
self._set_param("all_stations", kwargs, default=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087',
'DEBY052', 'DEBY032', 'DEBW022', 'DEBY004', 'DEBY020',
'DEBW030', 'DEBW037', 'DEBW031', 'DEBW015', 'DEBW073',
'DEBY039', 'DEBW038', 'DEBW081', 'DEBY075', 'DEBW040',
'DEBY053', 'DEBW059', 'DEBW027', 'DEBY072', 'DEBW042',
'DEBW039', 'DEBY001', 'DEBY113', 'DEBY089', 'DEBW024',
'DEBW004', 'DEBY037', 'DEBW056', 'DEBW029', 'DEBY068',
'DEBW010', 'DEBW026', 'DEBY002', 'DEBY079', 'DEBW084',
'DEBY049', 'DEBY031', 'DEBW019', 'DEBW001', 'DEBY063',
'DEBY005', 'DEBW046', 'DEBW103', 'DEBW052', 'DEBW034',
'DEBY088', ])
self._set_param("variables", kwargs, default=list(self.var_all_dict.keys()))
self._set_param("dimensions", kwargs, default={'new_index': ['datetime', 'Stations']})
self._set_param("dim", kwargs, default='datetime')
self._set_param("target_dim", kwargs, default='variables')
self._set_param("target_var", kwargs, default="o3")
class PreProcessing(run):
def __init__(self, setup):
self.setup = setup
class Training(run):
def __init__(self, setup):
self.setup = setup
class PostProcessing(run):
def __init__(self, setup):
self.setup = setup
if __name__ == "__main__":
formatter = '%(asctime)s - %(levelname)s: %(message)s [%(filename)s:%(funcName)s:%(lineno)s]'
logging.basicConfig(format=formatter, level=logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument('--experiment_date', metavar='--exp_date', type=str, nargs=1, default=None,
help="set experiment date as string")
args = parser.parse_args()
with run():
exp_setup = ExperimentSetup(trainable=True)
experiment = ExperimentSetup(args, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087'])
a = 1
# main()
......@@ -6,8 +6,6 @@ from src import helpers
from src.data_preparation import DataPrep
import os
from typing import Union, List, Tuple
import decimal
import numpy as np
import xarray as xr
......@@ -20,17 +18,18 @@ class DataGenerator(keras.utils.Sequence):
one entry of integer or string
def __init__(self, path: str, network: str, stations: Union[str, List[str]], variables: List[str],
interpolate_dim: str, target_dim: str, target_var: str, interpolate_method: str = "linear",
limit_nan_fill: int = 1, window_history: int = 7, window_lead_time: int = 4,
transform_method: str = "standardise", **kwargs):
self.path = os.path.abspath(path)
def __init__(self, data_path: str, network: str, stations: Union[str, List[str]], variables: List[str],
interpolate_dim: str, target_dim: str, target_var: str, station_type: str = None,
interpolate_method: str = "linear", limit_nan_fill: int = 1, window_history: int = 7,
window_lead_time: int = 4, transform_method: str = "standardise", **kwargs):
self.data_path = os.path.abspath(data_path) = network
self.stations = helpers.to_list(stations)
self.variables = variables
self.interpolate_dim = interpolate_dim
self.target_dim = target_dim
self.target_var = target_var
self.station_type = station_type
self.interpolate_method = interpolate_method
self.limit_nan_fill = limit_nan_fill
self.window_history = window_history
......@@ -42,9 +41,10 @@ class DataGenerator(keras.utils.Sequence):
display all class attributes
return f"DataGenerator(path='{self.path}', network='{}', stations={self.stations}, " \
f"variables={self.variables}, interpolate_dim='{self.interpolate_dim}', target_dim='{self.target_dim}'" \
f", target_var='{self.target_var}', **{self.kwargs})"
return f"DataGenerator(path='{self.data_path}', network='{}', stations={self.stations}, " \
f"variables={self.variables}, station_type={self.station_type}, " \
f"interpolate_dim='{self.interpolate_dim}', target_dim='{self.target_dim}', " \
f"target_var='{self.target_var}', **{self.kwargs})"
def __len__(self):
......@@ -96,7 +96,8 @@ class DataGenerator(keras.utils.Sequence):
:return: preprocessed data as a DataPrep instance
station = self.get_station_key(key)
data = DataPrep(self.path,, station, self.variables, **self.kwargs)
data = DataPrep(self.data_path,, station, self.variables, station_type=self.station_type,
data.interpolate(self.interpolate_dim, method=self.interpolate_method, limit=self.limit_nan_fill)
data.transform("datetime", method=self.transform_method)
data.make_history_window(self.interpolate_dim, self.window_history)
......@@ -44,11 +44,13 @@ class DataPrep(object):
def __init__(self, path: str, network: str, station: Union[str, List[str]], variables: List[str], **kwargs):
def __init__(self, path: str, network: str, station: Union[str, List[str]], variables: List[str],
station_type: str = None, **kwargs):
self.path = os.path.abspath(path) = network
self.station = helpers.to_list(station)
self.variables = variables
self.station_type = station_type
self.mean = None
self.std = None
self.history = None
......@@ -75,14 +77,36 @@ class DataPrep(object):
file_name = self._set_file_name()
meta_file = self._set_meta_file_name()
logging.debug(f"try to load local data from: {file_name}")
data = self._slice_prep(xr.open_dataarray(file_name)) = self.check_for_negative_concentrations(data)
self.meta = pd.read_csv(meta_file, index_col=0)
if self.station_type is not None:
logging.debug("loading finished")
except FileNotFoundError as e:
data, self.meta = self.download_data_from_join(file_name, meta_file)
data = self._slice_prep(data) = self.check_for_negative_concentrations(data)
logging.debug("loaded new data from JOIN")
def check_station_meta(self):
Search for the entries in meta data and compare the value with the requested values. Raise a FileNotFoundError
if the values mismatch.
check_dict = {
"station_type": self.station_type,
for (k, v) in check_dict.items():
if[k, self.station[0]] != v:
logging.debug(f"meta data does not agree which given request for {k}: {v} (requested) != "
f"{[k, self.station[0]]} (local). Raise FileNotFoundError to trigger new "
f"grapping from web.")
raise FileNotFoundError
def download_data_from_join(self, file_name: str, meta_file: str) -> [xr.DataArray, pd.DataFrame]:
......@@ -92,7 +116,8 @@ class DataPrep(object):
df_all = {}
df, meta = join.download_join(station_name=self.station, statvar=self.statistics_per_var)
df, meta = join.download_join(station_name=self.station, statvar=self.statistics_per_var,
df_all[self.station[0]] = df
# convert df_all to xarray
xarr = {k: xr.DataArray(v, dims=['datetime', 'variables']) for k, v in df_all.items()}
......@@ -111,7 +136,7 @@ class DataPrep(object):
def __repr__(self):
return f"Dataprep(path='{self.path}', network='{}', station={self.station}, " \
f"variables={self.variables}, **{self.kwargs})"
f"variables={self.variables}, station_type={self.station_type}, **{self.kwargs})"
def interpolate(self, dim: str, method: str = 'linear', limit: int = None,
use_coordinate: Union[bool, str] = True, **kwargs):
This diff is collapsed.
import re
__author__ = 'Lukas Leufen'
__date__ = '2019-10-21'
......@@ -11,7 +13,6 @@ import numpy as np
import os
import time
import socket
import sys
def to_list(arg):
......@@ -23,9 +24,9 @@ def to_list(arg):
def check_path_and_create(path):
os.makedirs(path)"Created path: {path}")
logging.debug(f"Created path: {path}")
except FileExistsError:"Path already exists: {path}")
logging.debug(f"Path already exists: {path}")
def l_p_loss(power: int):
......@@ -134,27 +135,39 @@ class TimeTracking(object):
return self._duration()
def prepare_host():
def prepare_host(create_new=True):
hostname = socket.gethostname()
user = os.getlogin()
user = os.getlogin()
except OSError:
user = "default"
if hostname == 'ZAM144':
path = f'/home/{user}/Data/toar_daily/'
elif hostname == 'zam347':
path = f'/home/{user}/Data/toar_daily/'
elif hostname == 'linux-gzsx':
path = f'/home/{user}/machinelearningtools'
path = f'/home/{user}/machinelearningtools/data/toar_daily/'
elif (len(hostname) > 2) and (hostname[:2] == 'jr'):
path = f'/p/project/cjjsc42/{user}/DATA/toar_daily/'
elif (len(hostname) > 2) and (hostname[:2] == 'jw'):
path = f'/p/home/jusers/{user}/juwels/intelliaq/DATA/toar_daily/'
elif "runner-6HmDp9Qd-project-2411-concurrent" in hostname:
path = f'/home/{user}/machinelearningtools/data/toar_daily/'
logging.error(f"unknown host '{hostname}'")
raise OSError(f"unknown host '{hostname}'")
if not os.path.exists(path):
logging.error(f"path '{path}' does not exist for host '{hostname}'.")
raise NotADirectoryError(f"path '{path}' does not exist for host '{hostname}'.")
if create_new:
return path
raise PermissionError
except PermissionError:
logging.error(f"path '{path}' does not exist for host '{hostname}'.")
raise NotADirectoryError(f"path '{path}' does not exist for host '{hostname}'.")
else:"set path to: {path}")
logging.debug(f"set path to: {path}")
return path
......@@ -169,3 +182,16 @@ def set_experiment_name(experiment_date=None, experiment_path=None):
experiment_path = os.path.abspath(experiment_path)
return experiment_name, experiment_path
class PyTestRegex:
"""Assert that a given string meets some expectations."""
def __init__(self, pattern: str, flags: int = 0):
self._regex = re.compile(pattern, flags)
def __eq__(self, actual: str) -> bool:
return bool(self._regex.match(actual))
def __repr__(self) -> str:
return self._regex.pattern
......@@ -3,7 +3,6 @@ __date__ = '2019-10-16'
import requests
import json
import logging
import pandas as pd
import datetime as dt
......@@ -11,15 +10,23 @@ from typing import Iterator, Union, List
from src import helpers
join_url_base = ''
def download_join(station_name: Union[str, List[str]], statvar: dict) -> [pd.DataFrame, pd.DataFrame]:
class EmptyQueryResult(Exception):
Exception that get raised if a query to JOIN returns empty results.
def download_join(station_name: Union[str, List[str]], statvar: dict, station_type: str = None, network_name: str = None) -> [pd.DataFrame, pd.DataFrame]:
read data from JOIN/TOAR
:param station_name: Station name e.g. DEBY122
:param statvar: key as variable like 'O3', values as statistics on keys like 'mean'
:param station_type:
:param network_name:
- df - pandas df with all variables and statistics
- meta - pandas df with all meta information
......@@ -28,7 +35,8 @@ def download_join(station_name: Union[str, List[str]], statvar: dict) -> [pd.Dat
station_name = helpers.to_list(station_name)
# load series information
opts = {'base': join_url_base, 'service': 'series', 'station_id': station_name[0]}
opts = {"base": join_url_base, "service": "series", "station_id": station_name[0], "station_type": station_type,
"network_name": network_name}
url = create_url(**opts)
response = requests.get(url)
station_vars = response.json()
......@@ -66,7 +74,7 @@ def download_join(station_name: Union[str, List[str]], statvar: dict) -> [pd.Dat
meta.columns = station_name
return df, meta
raise ValueError("No data found in JOIN.")
raise EmptyQueryResult("No data found in JOIN.")
def _correct_stat_name(stat: str) -> str:
......@@ -98,7 +106,7 @@ def create_url(base: str, service: str, **kwargs: Union[str, int, float]) -> str
:param kwargs: keyword pairs for optional request specifications, e.g. 'statistics=maximum'
:return: combined url as string
url = '{}{}/?'.format(base, service) + '&'.join('{}={}'.format(k, v) for k, v in kwargs.items())
url = '{}{}/?'.format(base, service) + '&'.join('{}={}'.format(k, v) for k, v in kwargs.items() if v is not None)
return url
__author__ = "Lukas Leufen"
__date__ = '2019-11-15'
import logging
import argparse
from typing import Union, Dict, Any
from src import helpers
from src.modules.run_environment import RunEnvironment
DEFAULT_STATIONS = ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBY052', 'DEBY032', 'DEBW022', 'DEBY004',
'DEBY020', 'DEBW030', 'DEBW037', 'DEBW031', 'DEBW015', 'DEBW073', 'DEBY039', 'DEBW038', 'DEBW081',
'DEBY075', 'DEBW040', 'DEBY053', 'DEBW059', 'DEBW027', 'DEBY072', 'DEBW042', 'DEBW039', 'DEBY001',
'DEBY113', 'DEBY089', 'DEBW024', 'DEBW004', 'DEBY037', 'DEBW056', 'DEBW029', 'DEBY068', 'DEBW010',
'DEBW026', 'DEBY002', 'DEBY079', 'DEBW084', 'DEBY049', 'DEBY031', 'DEBW019', 'DEBW001', 'DEBY063',
'DEBY005', 'DEBW046', 'DEBW103', 'DEBW052', 'DEBW034', 'DEBY088', ]
DEFAULT_VAR_ALL_DICT = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values',
'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values',
'pblheight': 'maximum'}
class ExperimentSetup(RunEnvironment):
trainable: Train new model if true, otherwise try to load existing model
def __init__(self, parser_args=None, var_all_dict=None, stations=None, network=None, station_type=None, variables=None,
statistics_per_var=None, start=None, end=None, window_history=None, target_var="o3", target_dim=None,
window_lead_time=None, dimensions=None, interpolate_dim=None, interpolate_method=None,
limit_nan_fill=None, train_start=None, train_end=None, val_start=None, val_end=None, test_start=None,
test_end=None, use_all_stations_on_all_data_sets=True, trainable=False, fraction_of_train=None,
# create run framework
# experiment setup
self._set_param("data_path", helpers.prepare_host())
self._set_param("trainable", trainable, default=False)
self._set_param("fraction_of_training", fraction_of_train, default=0.8)
# set experiment name
exp_date = self._get_parser_args(parser_args).get("experiment_date")
exp_name, exp_path = helpers.set_experiment_name(experiment_date=exp_date, experiment_path=experiment_path)
self._set_param("experiment_name", exp_name)
self._set_param("experiment_path", exp_path)
helpers.check_path_and_create(self.data_store.get("experiment_path", "general"))
# setup for data
self._set_param("var_all_dict", var_all_dict, default=DEFAULT_VAR_ALL_DICT)
self._set_param("stations", stations, default=DEFAULT_STATIONS)
self._set_param("network", network, default="AIRBASE")
self._set_param("station_type", station_type, default=None)
self._set_param("variables", variables, default=list(self.data_store.get("var_all_dict", "general").keys()))
self._set_param("statistics_per_var", statistics_per_var, default=self.data_store.get("var_all_dict", "general"))
self._set_param("start", start, default="1997-01-01", scope="general")
self._set_param("end", end, default="2017-12-31", scope="general")
self._set_param("window_history", window_history, default=13)
# target
self._set_param("target_var", target_var, default="o3")
self._set_param("target_dim", target_dim, default='variables')
self._set_param("window_lead_time", window_lead_time, default=3)
# interpolation
self._set_param("dimensions", dimensions, default={'new_index': ['datetime', 'Stations']})
self._set_param("interpolate_dim", interpolate_dim, default='datetime')
self._set_param("interpolate_method", interpolate_method, default='linear')
self._set_param("limit_nan_fill", limit_nan_fill, default=1)
# train parameters
self._set_param("start", train_start, default="1997-01-01", scope="general.train")
self._set_param("end", train_end, default="2007-12-31", scope="general.train")
# validation parameters
self._set_param("start", val_start, default="2008-01-01", scope="general.val")
self._set_param("end", val_end, default="2009-12-31", scope="general.val")
# test parameters
self._set_param("start", test_start, default="2010-01-01", scope="general.test")
self._set_param("end", test_end, default="2017-12-31", scope="general.test")
# use all stations on all data sets (train, val, test)
self._set_param("use_all_stations_on_all_data_sets", use_all_stations_on_all_data_sets, default=True)
def _set_param(self, param: str, value: Any, default: Any = None, scope: str = "general") -> None:
if value is None and default is not None:
value = default
self.data_store.put(param, value, scope)
logging.debug(f"set experiment attribute: {param}({scope})={value}")
def _get_parser_args(args: Union[Dict, argparse.Namespace]) -> Dict:
Transform args to dict if given as argparse.Namespace
:param args: either a dictionary or an argument parser instance
:return: dictionary with all arguments
if isinstance(args, argparse.Namespace):
return args.__dict__
elif isinstance(args, dict):
return args
return {}
if __name__ == "__main__":
formatter = '%(asctime)s - %(levelname)s: %(message)s [%(filename)s:%(funcName)s:%(lineno)s]'
logging.basicConfig(format=formatter, level=logging.DEBUG)
parser = argparse.ArgumentParser()
parser.add_argument('--experiment_date', metavar='--exp_date', type=str, nargs=1, default=None,
help="set experiment date as string")
parser_args = parser.parse_args()
with RunEnvironment():
setup = ExperimentSetup(parser_args, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087'])
import logging
import argparse
from src.modules.run_environment import RunEnvironment
from src.modules.experiment_setup import ExperimentSetup
from src.modules.pre_processing import PreProcessing
class Training(RunEnvironment):