Commit 7e9cd7eb authored by lukas leufen's avatar lukas leufen

Merge branch 'develop' into 'master'

new version v0.2.0

Closes #8 and #5

See merge request toar/machinelearningtools!9
parents c265313e 2dd1364d
Pipeline #25764 passed with stages
in 1 minute and 41 seconds
......@@ -55,3 +55,4 @@ Thumbs.db
htmlcov/
.pytest_cache
/test/data/
report.html
......@@ -47,6 +47,7 @@ tests:
when: always
paths:
- badges/
- test/
coverage:
tags:
......@@ -88,6 +89,9 @@ pages:
- mkdir -p public/coverage
- cp -af coverage/. public/coverage
- ls public/coverage
- mkdir -p public/test
- cp -af test/. public/test
- ls public/test
- ls public
when: always
artifacts:
......@@ -97,8 +101,10 @@ pages:
- public
- badges/
- coverage/
- test/
cache:
key: old-pages
paths:
- public/badges/
- public/coverage/
- public/test/
#!/bin/bash
# run pytest for all modules
python3 -m pytest test/ | tee test_results.out
python3 -m pytest --html=report.html --self-contained-html test/ | tee test_results.out
IS_FAILED=$?
# move html test report
mkdir test/
BRANCH_NAME=$( echo -e "${CI_COMMIT_REF_NAME////_}")
mkdir test/${BRANCH_NAME}
mkdir test/recent
cp report.html test/${BRANCH_NAME}/.
cp report.html test/recent/.
if [[ "${CI_COMMIT_REF_NAME}" = "master" ]]; then
cp -r report.html test/.
fi
# exit 0 if no tests implemented
RUN_NO_TESTS="$(grep -c 'no tests ran' test_results.out)"
if [[ ${RUN_NO_TESTS} > 0 ]]; then
......
__author__ = 'Felix Kleinert, Lukas Leufen'
__date__ = '2019-11-07'
import keras
from src import helpers
from src.data_preparation import DataPrep
import os
from typing import Union, List, Tuple
import decimal
import numpy as np
import xarray as xr
class DataGenerator(keras.utils.Sequence):
"""
This class is a generator to handle large arrays for machine learning. This class can be used with keras'
fit_generator and predict_generator. Individual stations are the iterables. This class uses class Dataprep and
returns X, y when an item is called.
Item can be called manually by position (integer) or station id (string). Methods also accept lists with exactly
one entry of integer or string
"""
def __init__(self, path: str, network: str, stations: Union[str, List[str]], variables: List[str],
interpolate_dim: str, target_dim: str, target_var: str, interpolate_method: str = "linear",
limit_nan_fill: int = 1, window_history: int = 7, window_lead_time: int = 4,
transform_method: str = "standardise", **kwargs):
self.path = os.path.abspath(path)
self.network = network
self.stations = helpers.to_list(stations)
self.variables = variables
self.interpolate_dim = interpolate_dim
self.target_dim = target_dim
self.target_var = target_var
self.interpolate_method = interpolate_method
self.limit_nan_fill = limit_nan_fill
self.window_history = window_history
self.window_lead_time = window_lead_time
self.transform_method = transform_method
self.kwargs = kwargs
def __repr__(self):
"""
display all class attributes
"""
return f"DataGenerator(path='{self.path}', network='{self.network}', stations={self.stations}, " \
f"variables={self.variables}, interpolate_dim='{self.interpolate_dim}', target_dim='{self.target_dim}'" \
f", target_var='{self.target_var}', **{self.kwargs})"
def __len__(self):
"""
display the number of stations
"""
return len(self.stations)
def __iter__(self) -> "DataGenerator":
"""
Define the __iter__ part of the iterator protocol to iterate through this generator. Sets the private attribute
`_iterator` to 0.
:return:
"""
self._iterator = 0
return self
def __next__(self) -> Tuple[xr.DataArray, xr.DataArray]:
"""
This is the implementation of the __next__ method of the iterator protocol. Get the data generator, and return
the history and label data of this generator.
:return:
"""
if self._iterator < self.__len__():
data = self.get_data_generator()
self._iterator += 1
if data.history is not None and data.label is not None:
return data.history.transpose("datetime", "window", "Stations", "variables"), \
data.label.squeeze("Stations").transpose("datetime", "window")
else:
self.__next__()
else:
raise StopIteration
def __getitem__(self, item: Union[str, int]) -> Tuple[xr.DataArray, xr.DataArray]:
"""
Defines the get item method for this generator. Retrieve data from generator and return history and labels.
:param item: station key to choose the data generator.
:return: The generator's time series of history data and its labels
"""
data = self.get_data_generator(key=item)
return data.history.transpose("datetime", "window", "Stations", "variables"), \
data.label.squeeze("Stations").transpose("datetime", "window")
def get_data_generator(self, key: Union[str, int] = None) -> DataPrep:
"""
Select data for given key, create a DataPrep object and interpolate, transform, make history and labels and
remove nans.
:param key: station key to choose the data generator.
:return: preprocessed data as a DataPrep instance
"""
station = self.get_station_key(key)
data = DataPrep(self.path, self.network, station, self.variables, **self.kwargs)
data.interpolate(self.interpolate_dim, method=self.interpolate_method, limit=self.limit_nan_fill)
data.transform("datetime", method=self.transform_method)
data.make_history_window(self.interpolate_dim, self.window_history)
data.make_labels(self.target_dim, self.target_var, self.interpolate_dim, self.window_lead_time)
data.history_label_nan_remove(self.interpolate_dim)
return data
def get_station_key(self, key: Union[None, str, int, List[Union[None, str, int]]]) -> str:
"""
Return a valid station key or raise KeyError if this wasn't possible
:param key: station key to choose the data generator.
:return: station key (id from database)
"""
# extract value if given as list
if isinstance(key, list):
if len(key) == 1:
key = key[0]
else:
raise KeyError(f"More than one key was given: {key}")
# return station name either from key or the recent element from iterator
if key is None:
return self.stations[self._iterator]
else:
if isinstance(key, int):
if key < self.__len__():
return self.stations[key]
else:
raise KeyError(f"{key} is not in range(0, {self.__len__()})")
elif isinstance(key, str):
if key in self.stations:
return key
else:
raise KeyError(f"{key} is not in stations")
else:
raise KeyError(f"Key has to be from Union[str, int]. Given was {key} ({type(key)})")
......@@ -71,7 +71,7 @@ class DataPrep(object):
data is available. The latter case, store downloaded data locally if wished (default yes).
"""
self.check_path_and_create()
helpers.check_path_and_create(self.path)
file_name = self._set_file_name()
meta_file = self._set_meta_file_name()
try:
......@@ -113,14 +113,6 @@ class DataPrep(object):
return f"Dataprep(path='{self.path}', network='{self.network}', station={self.station}, " \
f"variables={self.variables}, **{self.kwargs})"
def check_path_and_create(self):
try:
os.makedirs(self.path)
logging.info(f"Created path: {self.path}")
except FileExistsError:
logging.info(f"Path already exists: {self.path}")
pass
def interpolate(self, dim: str, method: str = 'linear', limit: int = None,
use_coordinate: Union[bool, str] = True, **kwargs):
"""
......
......@@ -2,7 +2,85 @@ __author__ = 'Lukas Leufen'
__date__ = '2019-10-21'
import logging
import keras
import keras.backend as K
import math
from typing import Union
import numpy as np
import os
def to_list(arg):
if not isinstance(arg, list):
arg = [arg]
return arg
def check_path_and_create(path):
try:
os.makedirs(path)
logging.info(f"Created path: {path}")
except FileExistsError:
logging.info(f"Path already exists: {path}")
def l_p_loss(power: int):
"""
Calculate the L<p> loss for given power p. L1 (p=1) is equal to mean absolute error (MAE), L2 (p=2) is to mean
squared error (MSE), ...
:param power: set the power of the error calculus
:return: loss for given power
"""
def loss(y_true, y_pred):
return K.mean(K.pow(K.abs(y_pred - y_true), power), axis=-1)
return loss
class LearningRateDecay(keras.callbacks.History):
"""
Decay learning rate during model training. Start with a base learning rate and lower this rate after every
n(=epochs_drop) epochs by drop value (0, 1], drop value = 1 means no decay in learning rate.
"""
def __init__(self, base_lr: float = 0.01, drop: float = 0.96, epochs_drop: int = 8):
super().__init__()
self.lr = {'lr': []}
self.base_lr = self.check_param(base_lr, 'base_lr')
self.drop = self.check_param(drop, 'drop')
self.epochs_drop = self.check_param(epochs_drop, 'epochs_drop', upper=None)
@staticmethod
def check_param(value: float, name: str, lower: Union[float, None] = 0, upper: Union[float, None] = 1):
"""
Check if given value is in interval. The left (lower) endpoint is open, right (upper) endpoint is closed. To
only one side of the interval, set the other endpoint to None. If both ends are set to None, just return the
value without any check.
:param value: value to check
:param name: name of the variable to display in error message
:param lower: left (lower) endpoint of interval, opened
:param upper: right (upper) endpoint of interval, closed
:return: unchanged value or raise ValueError
"""
if lower is None:
lower = -np.inf
if upper is None:
upper = np.inf
if lower < value <= upper:
return value
else:
raise ValueError(f"{name} is out of allowed range ({lower}, {upper}{')' if upper == np.inf else ']'}: "
f"{name}={value}")
def on_epoch_begin(self, epoch: int, logs=None):
"""
Lower learning rate every epochs_drop epochs by factor drop.
:param epoch: current epoch
:param logs: ?
:return: update keras learning rate
"""
current_lr = self.base_lr * math.pow(self.drop, math.floor(epoch / self.epochs_drop))
K.set_value(self.model.optimizer.lr, current_lr)
self.lr['lr'].append(current_lr)
logging.info(f"Set learning rate to {current_lr}")
return K.get_value(self.model.optimizer.lr)
......@@ -8,6 +8,7 @@ import logging
import pandas as pd
import datetime as dt
from typing import Iterator, Union, List
from src import helpers
join_url_base = 'https://join.fz-juelich.de/services/rest/surfacedata/'
logging.basicConfig(level=logging.INFO)
......@@ -24,8 +25,7 @@ def download_join(station_name: Union[str, List[str]], statvar: dict) -> [pd.Dat
- meta - pandas df with all meta information
"""
# make sure station_name parameter is a list
if not isinstance(station_name, list):
station_name = [station_name]
station_name = helpers.to_list(station_name)
# load series information
opts = {'base': join_url_base, 'service': 'series', 'station_id': station_name[0]}
......
import pytest
import os
from src.data_generator import DataGenerator
import logging
import numpy as np
import xarray as xr
import datetime as dt
import pandas as pd
from operator import itemgetter
class TestDataGenerator:
@pytest.fixture
def gen(self):
return DataGenerator(os.path.join(os.path.dirname(__file__), 'data'), 'UBA', 'DEBW107', ['o3', 'temp'],
'datetime', 'variables', 'o3')
def test_init(self, gen):
assert gen.path == os.path.join(os.path.dirname(__file__), 'data')
assert gen.network == 'UBA'
assert gen.stations == ['DEBW107']
assert gen.variables == ['o3', 'temp']
assert gen.interpolate_dim == 'datetime'
assert gen.target_dim == 'variables'
assert gen.target_var == 'o3'
assert gen.interpolate_method == "linear"
assert gen.limit_nan_fill == 1
assert gen.window_history == 7
assert gen.window_lead_time == 4
assert gen.transform_method == "standardise"
assert gen.kwargs == {}
def test_repr(self, gen):
path = os.path.join(os.path.dirname(__file__), 'data')
assert gen.__repr__().rstrip() == f"DataGenerator(path='{path}', network='UBA', stations=['DEBW107'], "\
f"variables=['o3', 'temp'], interpolate_dim='datetime', " \
f"target_dim='variables', target_var='o3', **{{}})".rstrip()
def test_len(self, gen):
assert len(gen) == 1
gen.stations = ['station1', 'station2', 'station3']
assert len(gen) == 3
def test_iter(self, gen):
assert hasattr(gen, '_iterator') is False
iter(gen)
assert hasattr(gen, '_iterator')
assert gen._iterator == 0
def test_next(self, gen):
gen.kwargs = {'statistics_per_var': {'o3': 'dma8eu', 'temp': 'maximum'}}
for i, d in enumerate(gen, start=1):
assert i == gen._iterator
def test_getitem(self, gen):
gen.kwargs = {'statistics_per_var': {'o3': 'dma8eu', 'temp': 'maximum'}}
station = gen["DEBW107"]
assert len(station) == 2
assert station[0].Stations.data == "DEBW107"
assert station[0].data.shape[1:] == (8, 1, 2)
assert station[1].data.shape[-1] == gen.window_lead_time
assert station[0].data.shape[1] == gen.window_history + 1
def test_get_station_key(self, gen):
gen.stations.append("DEBW108")
f = gen.get_station_key
iter(gen)
assert f(None) == "DEBW107"
with pytest.raises(KeyError) as e:
f([None, None])
assert "More than one key was given: [None, None]" in e.value.args[0]
assert f(1) == "DEBW108"
assert f([1]) == "DEBW108"
with pytest.raises(KeyError) as e:
f(3)
assert "3 is not in range(0, 2)" in e.value.args[0]
assert f("DEBW107") == "DEBW107"
assert f(["DEBW108"]) == "DEBW108"
with pytest.raises(KeyError) as e:
f("DEBW999")
assert "DEBW999 is not in stations" in e.value.args[0]
with pytest.raises(KeyError) as e:
f(6.5)
assert "key has to be from Union[str, int]. Given was 6.5 (float)"
......@@ -13,8 +13,8 @@ class TestDataPrep:
@pytest.fixture
def data(self):
return DataPrep('test/data/', 'dummy', 'DEBW107', ['o3', 'temp'], test='testKWARGS',
statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'})
return DataPrep(os.path.join(os.path.dirname(__file__), 'data'), 'dummy', 'DEBW107', ['o3', 'temp'],
test='testKWARGS', statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'})
def test_init(self, data):
assert data.path == os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data')
......@@ -29,18 +29,6 @@ class TestDataPrep:
with pytest.raises(NotImplementedError):
DataPrep('data/', 'dummy', 'DEBW107', ['o3', 'temp'])
def test_check_path_and_create(self, caplog):
caplog.set_level(logging.INFO)
d = object.__new__(DataPrep)
d.path = 'data/test'
assert not os.path.exists('data/test')
d.check_path_and_create()
assert os.path.exists('data/test')
assert caplog.messages[0] == "Created path: data/test"
d.check_path_and_create()
assert caplog.messages[1] == "Path already exists: data/test"
os.rmdir('data/test')
def test_repr(self):
d = object.__new__(DataPrep)
d.path = 'data/test'
......@@ -53,13 +41,13 @@ class TestDataPrep:
def test_set_file_name_and_meta(self):
d = object.__new__(DataPrep)
d.path = os.path.abspath('test/data/test')
d.path = os.path.abspath('test/data/')
d.station = 'TESTSTATION'
d.variables = ['a', 'bc']
assert d._set_file_name() == os.path.join(os.path.abspath(os.path.dirname(__file__)),
"data/test/TESTSTATION_a_bc.nc")
"data/TESTSTATION_a_bc.nc")
assert d._set_meta_file_name() == os.path.join(os.path.abspath(os.path.dirname(__file__)),
"data/test/TESTSTATION_a_bc_meta.csv")
"data/TESTSTATION_a_bc_meta.csv")
@pytest.mark.parametrize('opts', [{'dim': 'datetime', 'method': 'nearest', 'limit': 10, 'use_coordinate': True},
{'dim': 'datetime', 'limit': 5}, {'dim': 'datetime'}])
......
import pytest
from src.helpers import to_list, check_path_and_create, l_p_loss, LearningRateDecay
import logging
import os
import keras
import numpy as np
class TestToList:
def test_to_list(self):
assert to_list('a') == ['a']
assert to_list('abcd') == ['abcd']
assert to_list([1, 2, 3]) == [1, 2, 3]
assert to_list([45]) == [45]
class TestCheckPath:
def test_check_path_and_create(self, caplog):
caplog.set_level(logging.INFO)
path = 'data/test'
assert not os.path.exists('data/test')
check_path_and_create(path)
assert os.path.exists('data/test')
assert caplog.messages[0] == "Created path: data/test"
check_path_and_create(path)
assert caplog.messages[1] == "Path already exists: data/test"
os.rmdir('data/test')
class TestLoss:
def test_l_p_loss(self):
model = keras.Sequential()
model.add(keras.layers.Lambda(lambda x: x, input_shape=(None, )))
model.compile(optimizer=keras.optimizers.Adam(), loss=l_p_loss(2))
hist = model.fit(np.array([1, 0, 2, 0.5]), np.array([1, 1, 0, 0.5]), epochs=1)
assert hist.history['loss'][0] == 1.25
model.compile(optimizer=keras.optimizers.Adam(), loss=l_p_loss(3))
hist = model.fit(np.array([1, 0, -2, 0.5]), np.array([1, 1, 0, 0.5]), epochs=1)
assert hist.history['loss'][0] == 2.25
class TestLearningRateDecay:
def test_init(self):
lr_decay = LearningRateDecay()
assert lr_decay.lr == {'lr': []}
assert lr_decay.base_lr == 0.01
assert lr_decay.drop == 0.96
assert lr_decay.epochs_drop == 8
def test_check_param(self):
lr_decay = object.__new__(LearningRateDecay)
assert lr_decay.check_param(1, "tester") == 1
assert lr_decay.check_param(0.5, "tester") == 0.5
with pytest.raises(ValueError) as e:
lr_decay.check_param(0, "tester")
assert "tester is out of allowed range (0, 1]: tester=0" in e.value.args[0]
with pytest.raises(ValueError) as e:
lr_decay.check_param(1.5, "tester")
assert "tester is out of allowed range (0, 1]: tester=1.5" in e.value.args[0]
assert lr_decay.check_param(1.5, "tester", upper=None) == 1.5
with pytest.raises(ValueError) as e:
lr_decay.check_param(0, "tester", upper=None)
assert "tester is out of allowed range (0, inf): tester=0" in e.value.args[0]
assert lr_decay.check_param(0.5, "tester", lower=None) == 0.5
with pytest.raises(ValueError) as e:
lr_decay.check_param(0.5, "tester", lower=None, upper=0.2)
assert "tester is out of allowed range (-inf, 0.2]: tester=0.5" in e.value.args[0]
assert lr_decay.check_param(10, "tester", upper=None, lower=None)
def test_on_epoch_begin(self):
lr_decay = LearningRateDecay(base_lr=0.02, drop=0.95, epochs_drop=2)
model = keras.Sequential()
model.add(keras.layers.Dense(1, input_dim=1))
model.compile(optimizer=keras.optimizers.Adam(), loss=l_p_loss(2))
model.fit(np.array([1, 0, 2, 0.5]), np.array([1, 1, 0, 0.5]), epochs=5, callbacks=[lr_decay])
assert lr_decay.lr['lr'] == [0.02, 0.02, 0.02*0.95, 0.02*0.95, 0.02*0.95*0.95]
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment