Commit 4261fd26 authored by lukas leufen's avatar lukas leufen

introduce garbage collection, accelerated preprocessing

parent e00531c8
Pipeline #45959 passed with stages
in 7 minutes and 14 seconds
......@@ -13,12 +13,13 @@ import datetime as dt
import shutil
import inspect
import copy
import gc
from typing import Union, List, Tuple, Dict
import logging
from functools import reduce
from mlair.helpers.join import EmptyQueryResult
from mlair.helpers import TimeTracking
number = Union[float, int]
num_or_list = Union[number, List[number]]
......@@ -96,7 +97,8 @@ class DefaultDataHandler(AbstractDataHandler):
_requirements = remove_items(inspect.getfullargspec(data_handler).args, ["self", "station"])
def __init__(self, id_class: data_handler, data_path: str, min_length: int = 0,
extreme_values: num_or_list = None, extremes_on_right_tail_only: bool = False, name_affix=None):
extreme_values: num_or_list = None, extremes_on_right_tail_only: bool = False, name_affix=None,
store_processed_data=True):
super().__init__()
self.id_class = id_class
self.interpolation_dim = "datetime"
......@@ -110,7 +112,7 @@ class DefaultDataHandler(AbstractDataHandler):
self._collection = self._create_collection()
self.harmonise_X()
self.multiply_extremes(extreme_values, extremes_on_right_tail_only, dim=self.interpolation_dim)
self._store(fresh_store=True)
self._store(fresh_store=True, store_processed_data=store_processed_data)
@classmethod
def build(cls, station: str, **kwargs):
......@@ -128,6 +130,7 @@ class DefaultDataHandler(AbstractDataHandler):
def _reset_data(self):
self._X, self._Y, self._X_extreme, self._Y_extreme = None, None, None, None
gc.collect()
def _cleanup(self):
directory = os.path.dirname(self._save_file)
......@@ -136,13 +139,14 @@ class DefaultDataHandler(AbstractDataHandler):
if os.path.exists(self._save_file):
shutil.rmtree(self._save_file, ignore_errors=True)
def _store(self, fresh_store=False):
self._cleanup() if fresh_store is True else None
data = {"X": self._X, "Y": self._Y, "X_extreme": self._X_extreme, "Y_extreme": self._Y_extreme}
with open(self._save_file, "wb") as f:
pickle.dump(data, f)
logging.debug(f"save pickle data to {self._save_file}")
self._reset_data()
def _store(self, fresh_store=False, store_processed_data=True):
if store_processed_data is True:
self._cleanup() if fresh_store is True else None
data = {"X": self._X, "Y": self._Y, "X_extreme": self._X_extreme, "Y_extreme": self._Y_extreme}
with open(self._save_file, "wb") as f:
pickle.dump(data, f)
logging.debug(f"save pickle data to {self._save_file}")
self._reset_data()
def _load(self):
try:
......
......@@ -8,7 +8,7 @@ import copy
from mlair.data_handler.data_handler_single_station import DataHandlerSingleStation
from mlair.data_handler import DefaultDataHandler
from mlair.helpers import remove_items, to_list
from mlair.helpers import remove_items, to_list, TimeTrackingWrapper
from mlair.helpers.statistics import KolmogorovZurbenkoFilterMovingWindow as KZFilter
......@@ -40,6 +40,7 @@ class DataHandlerKzFilterSingleStation(DataHandlerSingleStation):
self.apply_kz_filter()
self.make_samples()
@TimeTrackingWrapper
def apply_kz_filter(self):
kz = KZFilter(self.data, wl=self.kz_filter_length, itr=self.kz_filter_iter, filter_dim="datetime")
self.data = kz.run()[-1]
......@@ -51,4 +52,5 @@ class DataHandlerKzFilter(DefaultDataHandler):
"""Data handler using kz filtered data."""
data_handler = DataHandlerKzFilterSingleStation
data_handler_transformation = DataHandlerKzFilterSingleStation
_requirements = data_handler.requirements()
......@@ -15,9 +15,10 @@ import xarray as xr
from mlair.configuration import check_path_and_create
from mlair import helpers
from mlair.helpers import join, statistics
from mlair.helpers import join, statistics, TimeTracking, TimeTrackingWrapper
from mlair.data_handler.advanced_data_handler import AbstractDataHandler
# define a more general date type for type hinting
date = Union[dt.date, dt.datetime]
str_or_list = Union[str, List[str]]
......@@ -169,6 +170,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
self.call_transform()
self.make_samples()
@TimeTrackingWrapper
def setup_samples(self):
"""
Setup samples. This method prepares and creates samples X, and labels Y.
......
......@@ -56,7 +56,7 @@ class PreProcessing(RunEnvironment):
def _run(self):
stations = self.data_store.get("stations")
data_handler = self.data_store.get("data_handler")
_, valid_stations = self.validate_station(data_handler, stations, "preprocessing", overwrite_local_data=True)
_, valid_stations = self.validate_station(data_handler, stations, "preprocessing")#, store_processed_data=False)
if len(valid_stations) == 0:
raise ValueError("Couldn't find any valid data according to given parameters. Abort experiment run.")
self.data_store.set("stations", valid_stations)
......@@ -192,7 +192,8 @@ class PreProcessing(RunEnvironment):
self.data_store.set("stations", valid_stations, scope=set_name)
self.data_store.set("data_collection", collection, scope=set_name)
def validate_station(self, data_handler: AbstractDataHandler, set_stations, set_name=None, overwrite_local_data=False):
def validate_station(self, data_handler: AbstractDataHandler, set_stations, set_name=None,
store_processed_data=True):
"""
Check if all given stations in `all_stations` are valid.
......@@ -212,7 +213,8 @@ class PreProcessing(RunEnvironment):
kwargs = self.data_store.create_args_dict(data_handler.requirements(), scope=set_name)
for station in set_stations:
try:
dp = data_handler.build(station, name_affix=set_name, **kwargs)
dp = data_handler.build(station, name_affix=set_name, store_processed_data=store_processed_data,
**kwargs)
collection.add(dp)
valid_stations.append(station)
except (AttributeError, EmptyQueryResult):
......@@ -259,6 +261,6 @@ class PreProcessingUpdateHandler(PreProcessing):
# perform default pre processing afterwards
super()._run()
def transformation(self, data_handler: AbstractDataHandler, stations):
"""INFO: fresh calculation of the transformation is not required because it can be reused from previous runs."""
pass
# def transformation(self, data_handler: AbstractDataHandler, stations):
# """INFO: fresh calculation of the transformation is not required because it can be reused from previous runs."""
# pass
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment