Commit 312f4f18 authored by lukas leufen's avatar lukas leufen

removed outdated data_generator and data_distributor

parent 4ead7464
Pipeline #41715 failed with stages
in 2 minutes and 6 seconds
......@@ -10,9 +10,6 @@ __date__ = '2020-04-17'
from .bootstraps import BootStraps
from .data_preparation_join import DataPrepJoin
from .data_generator import DataGenerator
from .data_distributor import Distributor
from .iterator import KerasIterator, DataCollection
from .advanced_data_handling import DefaultDataPreparation
from .data_preparation import StationPrep
\ No newline at end of file
from .advanced_data_handling import DefaultDataPreparation, AbstractDataPreparation
from .data_preparation_neighbors import DataPreparationNeighbors
......@@ -16,7 +16,7 @@ import inspect
from typing import Union, List, Tuple
import logging
from functools import reduce
from src.data_handling.data_preparation import StationPrep
from src.data_handling.station_preparation import StationPrep
from src.helpers.join import EmptyQueryResult
......
"""
Data Distribution Module.
How to use
----------
Create distributor object from a generator object and parse it to the fit generator method. Provide the number of
steps per epoch with distributor's length method.
.. code-block:: python
model = YourKerasModel()
data_generator = DataGenerator(*args, **kwargs)
data_distributor = Distributor(data_generator, model, **kwargs)
history = model.fit_generator(generator=data_distributor.distribute_on_batches(),
steps_per_epoch=len(data_distributor),
epochs=10,)
Additionally, a validation data set can be parsed using the length and distribute methods.
"""
from __future__ import generator_stop
__author__ = "Lukas Leufen, Felix Kleinert"
__date__ = '2019-12-05'
import math
import keras
import numpy as np
from src.data_handling.data_generator import DataGenerator
class Distributor(keras.utils.Sequence):
"""Distribute data generator elements according to mini batch size."""
def __init__(self, generator: DataGenerator, model: keras.models, batch_size: int = 256,
permute_data: bool = False, upsampling: bool = False):
"""
Set up distributor.
:param generator: The generator object must be iterable and return inputs and targets on each iteration
:param model: a keras model with one or more output branches
:param batch_size: batch size to use
:param permute_data: data is randomly permuted if enabled on each train step
:param upsampling: upsample data with upsample extremes data from generator object and shuffle data or use only
the standard input data.
"""
self.generator = generator
self.model = model
self.batch_size = batch_size
self.do_data_permutation = permute_data
self.upsampling = upsampling
def _get_model_rank(self):
mod_out = self.model.output_shape
if isinstance(mod_out, tuple):
# only one output branch: (None, ahead)
mod_rank = 1
elif isinstance(mod_out, list):
# multiple output branches, e.g.: [(None, ahead), (None, ahead)]
mod_rank = len(mod_out)
else: # pragma: no cover
raise TypeError("model output shape must either be tuple or list.")
return mod_rank
def _get_number_of_mini_batches(self, values):
return math.ceil(values.shape[0] / self.batch_size)
def _permute_data(self, x, y):
"""
Permute inputs x and labels y if permutation is enabled in instance.
:param x: inputs
:param y: labels
:return: permuted or original data
"""
if self.do_data_permutation:
p = np.random.permutation(len(x)) # equiv to .shape[0]
x = x[p]
y = y[p]
return x, y
def distribute_on_batches(self, fit_call=True):
"""
Create generator object to distribute mini batches.
Split data from given generator object (usually for single station) according to the given batch size. Also
perform upsampling if enabled and random shuffling (either if data permutation is enabled or if upsampling is
enabled). Lastly multiply targets if provided model has multiple output branches.
:param fit_call: switch to exit while loop after first iteration. This is used to determine the length of all
distributed mini batches. For default, fit_call is True to obtain infinite loop for training.
:return: yields next mini batch
"""
while True:
for k, v in enumerate(self.generator):
# get rank of output
mod_rank = self._get_model_rank()
# get data
x_total = np.copy(v[0])
y_total = np.copy(v[1])
if self.upsampling:
try:
s = self.generator.get_data_generator(k)
x_total = np.concatenate([x_total, np.copy(s.get_extremes_history())], axis=0)
y_total = np.concatenate([y_total, np.copy(s.get_extremes_label())], axis=0)
except AttributeError: # no extremes history / labels available, copy will fail
pass
# get number of mini batches
num_mini_batches = self._get_number_of_mini_batches(x_total)
# permute order for mini-batches
x_total, y_total = self._permute_data(x_total, y_total)
for prev, curr in enumerate(range(1, num_mini_batches + 1)):
x = x_total[prev * self.batch_size:curr * self.batch_size, ...]
y = [y_total[prev * self.batch_size:curr * self.batch_size, ...] for _ in range(mod_rank)]
if x is not None: # pragma: no branch
yield x, y
if (k + 1) == len(self.generator) and curr == num_mini_batches and not fit_call:
return
def __len__(self) -> int:
"""
Total number of distributed mini batches.
:return: the length of the distribute on batches object
"""
num_batch = 0
for _ in self.distribute_on_batches(fit_call=False):
num_batch += 1
return num_batch
This diff is collapsed.
"""Data Preparation class to handle data processing for machine learning."""
__author__ = 'Felix Kleinert, Lukas Leufen'
__date__ = '2019-10-16'
import datetime as dt
import inspect
import logging
from typing import Union, List
import pandas as pd
import xarray as xr
from src import helpers
from src.helpers import join
from src.data_handling.data_preparation import AbstractDataPrep
# define a more general date type for type hinting
date = Union[dt.date, dt.datetime]
str_or_list = Union[str, List[str]]
number = Union[float, int]
num_or_list = Union[number, List[number]]
data_or_none = Union[xr.DataArray, None]
class DataPrepJoin(AbstractDataPrep):
"""
This class prepares data to be used in neural networks.
The instance searches for local stored data, that meet the given demands. If no local data is found, the DataPrep
instance will load data from TOAR database and store this data locally to use the next time. For the moment, there
is only support for daily aggregated time series. The aggregation can be set manually and differ for each variable.
After data loading, different data pre-processing steps can be executed to prepare the data for further
applications. Especially the following methods can be used for the pre-processing step:
- interpolate: interpolate between data points by using xarray's interpolation method
- standardise: standardise data to mean=1 and std=1, centralise to mean=0, additional methods like normalise on \
interval [0, 1] are not implemented yet.
- make window history: represent the history (time steps before) for training/ testing; X
- make labels: create target vector with given leading time steps for training/ testing; y
- remove Nans jointly from desired input and output, only keeps time steps where no NaNs are present in X AND y. \
Use this method after the creation of the window history and labels to clean up the data cube.
To create a DataPrep instance, it is needed to specify the stations by id (e.g. "DEBW107"), its network (e.g. UBA,
"Umweltbundesamt") and the variables to use. Further options can be set in the instance.
* `statistics_per_var`: define a specific statistic to extract from the TOAR database for each variable.
* `start`: define a start date for the data cube creation. Default: Use the first entry in time series
* `end`: set the end date for the data cube. Default: Use last date in time series.
* `store_data_locally`: store recently downloaded data on local disk. Default: True
* set further parameters for xarray's interpolation methods to modify the interpolation scheme
"""
def __init__(self, path: str, station: Union[str, List[str]], variables: List[str], network: str = None,
station_type: str = None, **kwargs):
self.network = network
self.station_type = station_type
params = helpers.remove_items(inspect.getfullargspec(AbstractDataPrep.__init__).args, "self")
kwargs = {**{k: v for k, v in locals().items() if k in params and v is not None}, **kwargs}
super().__init__(**kwargs)
def download_data(self, file_name, meta_file):
"""
Download data and meta from join.
:param file_name: name of file to save data to (containing full path)
:param meta_file: name of the meta data file (also containing full path)
"""
data, meta = self.download_data_from_join(file_name, meta_file)
return data, meta
def check_station_meta(self):
"""
Search for the entries in meta data and compare the value with the requested values.
Will raise a FileNotFoundError if the values mismatch.
"""
if self.station_type is not None:
check_dict = {"station_type": self.station_type, "network_name": self.network}
for (k, v) in check_dict.items():
if v is None:
continue
if self.meta.at[k, self.station[0]] != v:
logging.debug(f"meta data does not agree with given request for {k}: {v} (requested) != "
f"{self.meta.at[k, self.station[0]]} (local). Raise FileNotFoundError to trigger new "
f"grapping from web.")
raise FileNotFoundError
def download_data_from_join(self, file_name: str, meta_file: str) -> [xr.DataArray, pd.DataFrame]:
"""
Download data from TOAR database using the JOIN interface.
Data is transformed to a xarray dataset. If class attribute store_data_locally is true, data is additionally
stored locally using given names for file and meta file.
:param file_name: name of file to save data to (containing full path)
:param meta_file: name of the meta data file (also containing full path)
:return: downloaded data and its meta data
"""
df_all = {}
df, meta = join.download_join(station_name=self.station, stat_var=self.statistics_per_var,
station_type=self.station_type, network_name=self.network, sampling=self.sampling)
df_all[self.station[0]] = df
# convert df_all to xarray
xarr = {k: xr.DataArray(v, dims=['datetime', 'variables']) for k, v in df_all.items()}
xarr = xr.Dataset(xarr).to_array(dim='Stations')
if self.kwargs.get('store_data_locally', True):
# save locally as nc/csv file
xarr.to_netcdf(path=file_name)
meta.to_csv(meta_file)
return xarr, meta
def __repr__(self):
"""Represent class attributes."""
return f"Dataprep(path='{self.path}', network='{self.network}', station={self.station}, " \
f"variables={self.variables}, station_type={self.station_type}, **{self.kwargs})"
if __name__ == "__main__":
dp = DataPrepJoin('data/', 'dummy', 'DEBW107', ['o3', 'temp'], statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'})
print(dp)
......@@ -4,7 +4,7 @@ __date__ = '2020-07-17'
from src.helpers import to_list
from src.data_handling.data_preparation import StationPrep
from src.data_handling.station_preparation import StationPrep
from src.data_handling.advanced_data_handling import DefaultDataPreparation
import os
......
......@@ -7,8 +7,7 @@ import numpy as np
import pytest
import xarray as xr
from src.data_handling.bootstraps import BootStraps, CreateShuffledData, BootStrapGenerator
from src.data_handling.data_generator import DataGenerator
from src.data_handling.bootstraps import BootStraps
from src.data_handling import DataPrepJoin
......
import math
import os
import keras
import numpy as np
import pytest
from src.data_handling.data_distributor import Distributor
from src.data_handling.data_generator import DataGenerator
from src.data_handling import DataPrepJoin
from test.test_modules.test_training import my_test_model
class TestDistributor:
@pytest.fixture
def generator(self):
return DataGenerator(os.path.join(os.path.dirname(__file__), 'data'), 'DEBW107', ['o3', 'temp'],
'datetime', 'variables', 'o3', statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'},
data_preparation=DataPrepJoin)
@pytest.fixture
def generator_two_stations(self):
return DataGenerator(os.path.join(os.path.dirname(__file__), 'data'), ['DEBW107', 'DEBW013'],
['o3', 'temp'], 'datetime', 'variables', 'o3',
statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'},
data_preparation=DataPrepJoin)
@pytest.fixture
def model(self):
return my_test_model(keras.layers.PReLU, 5, 3, 0.1, False)
@pytest.fixture
def model_with_minor_branch(self):
return my_test_model(keras.layers.PReLU, 5, 3, 0.1, True)
@pytest.fixture
def distributor(self, generator, model):
return Distributor(generator, model)
def test_init_defaults(self, distributor):
assert distributor.batch_size == 256
assert distributor.do_data_permutation is False
def test_get_model_rank(self, distributor, model_with_minor_branch):
assert distributor._get_model_rank() == 1
distributor.model = model_with_minor_branch
assert distributor._get_model_rank() == 2
distributor.model = 1
def test_get_number_of_mini_batches(self, distributor):
values = np.zeros((2311, 19))
assert distributor._get_number_of_mini_batches(values) == math.ceil(2311 / distributor.batch_size)
def test_distribute_on_batches_single_loop(self, generator_two_stations, model):
d = Distributor(generator_two_stations, model)
for e in d.distribute_on_batches(fit_call=False):
assert e[0].shape[0] <= d.batch_size
def test_distribute_on_batches_infinite_loop(self, generator_two_stations, model):
d = Distributor(generator_two_stations, model)
elements = []
for i, e in enumerate(d.distribute_on_batches()):
if i < len(d):
elements.append(e[0])
elif i == 2 * len(d): # check if all elements are repeated
assert np.testing.assert_array_equal(e[0], elements[i - len(d)]) is None
else: # break when 3rd iteration starts (is called as infinite loop)
break
def test_len(self, distributor):
assert len(distributor) == math.ceil(len(distributor.generator[0][0]) / 256)
def test_len_two_stations(self, generator_two_stations, model):
gen = generator_two_stations
d = Distributor(gen, model)
expected = math.ceil(len(gen[0][0]) / 256) + math.ceil(len(gen[1][0]) / 256)
assert len(d) == expected
def test_permute_data_no_permutation(self, distributor):
x = np.array(range(20)).reshape(2, 10).T
y = np.array(range(10)).reshape(10, 1)
x_perm, y_perm = distributor._permute_data(x, y)
assert np.testing.assert_equal(x, x_perm) is None
assert np.testing.assert_equal(y, y_perm) is None
def test_permute_data(self, distributor):
x = np.array(range(20)).reshape(2, 10).T
y = np.array(range(10)).reshape(10, 1)
distributor.do_data_permutation = True
x_perm, y_perm = distributor._permute_data(x, y)
assert x_perm[0, 0] == y_perm[0]
assert x_perm[0, 1] == y_perm[0] + 10
assert x_perm[5, 0] == y_perm[5]
assert x_perm[5, 1] == y_perm[5] + 10
assert x_perm[-1, 0] == y_perm[-1]
assert x_perm[-1, 1] == y_perm[-1] + 10
# resort x_perm and compare if equal to x
x_perm.sort(axis=0)
y_perm.sort(axis=0)
assert np.testing.assert_equal(x, x_perm) is None
assert np.testing.assert_equal(y, y_perm) is None
def test_distribute_on_batches_upsampling_no_extremes_given(self, generator, model):
d = Distributor(generator, model, upsampling=True)
gen_len = d.generator.get_data_generator(0, load_local_tmp_storage=False).get_transposed_label().shape[0]
num_mini_batches = math.ceil(gen_len / d.batch_size)
i = 0
for i, e in enumerate(d.distribute_on_batches(fit_call=False)):
assert e[0].shape[0] <= d.batch_size
assert i + 1 == num_mini_batches
def test_distribute_on_batches_upsampling(self, generator, model):
generator.extreme_values = [1]
d = Distributor(generator, model, upsampling=True)
gen_len = d.generator.get_data_generator(0, load_local_tmp_storage=False).get_transposed_label().shape[0]
extr_len = d.generator.get_data_generator(0, load_local_tmp_storage=False).get_extremes_label().shape[0]
i = 0
for i, e in enumerate(d.distribute_on_batches(fit_call=False)):
assert e[0].shape[0] <= d.batch_size
assert i + 1 == math.ceil((gen_len + extr_len) / d.batch_size)
......@@ -6,7 +6,6 @@ import numpy as np
import pytest
import xarray as xr
from src.data_handling.data_generator import DataGenerator
from src.data_handling import DataPrepJoin
from src.helpers.join import EmptyQueryResult
......
......@@ -10,8 +10,6 @@ import pytest
from keras.callbacks import History
from src.data_handling import DataPrepJoin
from src.data_handling.data_distributor import Distributor
from src.data_handling.data_generator import DataGenerator
from src.helpers import PyTestRegex
from src.model_modules.flatten import flatten_tail
from src.model_modules.inception_model import InceptionModelBase
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment