Commit c3bca050 authored by lukas leufen's avatar lukas leufen

path inside a data handler is more often parsed as arg to enable further...

path inside a data handler is more often parsed as arg to enable further inheritance adjustments, data handler data is now stored separatly from the raw data location and inside experiment, data_path lost sampling attribute (is added by the handler itself),
parent ba00aec8
Pipeline #51131 passed with stages
in 6 minutes and 17 seconds
......@@ -49,6 +49,7 @@ DEFAULT_NUMBER_OF_BOOTSTRAPS = 20
DEFAULT_PLOT_LIST = ["PlotMonthlySummary", "PlotStationMap", "PlotClimatologicalSkillScore", "PlotTimeSeries",
"PlotCompetitiveSkillScore", "PlotBootstrapSkillScore", "PlotConditionalQuantiles",
"PlotAvailability"]
DEFAULT_SAMPLING = "daily"
def get_defaults():
......
......@@ -20,7 +20,7 @@ def prepare_host(create_new=True, data_path=None, sampling="daily") -> str:
:param create_new: Create new path if enabled
:param data_path: Parse your custom path (and therefore ignore preset paths fitting to known hosts)
:param sampling: sampling rate to separate data physically by temporal resolution
:param sampling: sampling rate to separate data physically by temporal resolution (deprecated)
:return: full path to data
"""
......@@ -32,17 +32,14 @@ def prepare_host(create_new=True, data_path=None, sampling="daily") -> str:
data_path = f"/home/{user}/Data/toar_{sampling}/"
elif hostname == "zam347":
data_path = f"/home/{user}/Data/toar_{sampling}/"
elif hostname == "linux-aa9b":
data_path = f"/home/{user}/mlair/data/toar_{sampling}/"
elif (len(hostname) > 2) and (hostname[:2] == "jr"):
data_path = f"/p/project/cjjsc42/{user}/DATA/toar_{sampling}/"
elif (len(hostname) > 2) and (hostname[:2] in ['jw', 'ju'] or hostname[:5] in ['hdfml']):
data_path = f"/p/project/deepacf/intelliaq/{user}/DATA/toar_{sampling}/"
data_path = f"/p/project/deepacf/intelliaq/{user}/DATA/MLAIR/"
elif runner_regex.match(hostname) is not None:
data_path = f"/home/{user}/mlair/data/toar_{sampling}/"
data_path = f"/home/{user}/mlair/data/"
else:
data_path = os.path.join(os.getcwd(), "data", sampling)
# raise OSError(f"unknown host '{hostname}'")
data_path = os.path.join(os.getcwd(), "data")
if not os.path.exists(data_path):
try:
......@@ -97,7 +94,7 @@ def set_experiment_name(name: str = None, sampling: str = None) -> str:
return experiment_name
def set_bootstrap_path(bootstrap_path: str, data_path: str, sampling: str) -> str:
def set_bootstrap_path(bootstrap_path: str, data_path: str) -> str:
"""
Set path for bootstrap input data.
......@@ -105,12 +102,11 @@ def set_bootstrap_path(bootstrap_path: str, data_path: str, sampling: str) -> st
:param bootstrap_path: custom path to store bootstrap data
:param data_path: path of data for default bootstrap path
:param sampling: sampling rate to add, if path is set to default
:return: full bootstrap path
"""
if bootstrap_path is None:
bootstrap_path = os.path.join(data_path, "..", f"bootstrap_{sampling}")
bootstrap_path = os.path.join(data_path, "bootstrap")
check_path_and_create(bootstrap_path)
return os.path.abspath(bootstrap_path)
......
......@@ -52,7 +52,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
min_length: int = 0, start=None, end=None, variables=None, **kwargs):
super().__init__() # path, station, statistics_per_var, transformation, **kwargs)
self.station = helpers.to_list(station)
self.path = os.path.abspath(data_path) # ToDo: data_path could be a dict or list?
self.path = self.setup_data_path(data_path, sampling)
self.statistics_per_var = statistics_per_var
self.do_transformation = transformation is not None
self.input_data, self.target_data = self.setup_transformation(transformation)
......@@ -141,7 +141,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
"""
Setup samples. This method prepares and creates samples X, and labels Y.
"""
self.load_data(self.station, self.statistics_per_var, self.sampling, self.station_type, self.network,
self.load_data(self.path, self.station, self.statistics_per_var, self.sampling, self.station_type, self.network,
self.store_data_locally)
self.interpolate(dim=self.time_dim, method=self.interpolation_method, limit=self.interpolation_limit)
self.set_inputs_and_targets()
......@@ -161,7 +161,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
self.make_observation(self.target_dim, self.target_var, self.time_dim)
self.remove_nan(self.time_dim)
def load_data(self, station, statistics_per_var, sampling, station_type=None, network=None,
def load_data(self, path, station, statistics_per_var, sampling, station_type=None, network=None,
store_data_locally=False):
"""
Load data and meta data either from local disk (preferred) or download new data by using a custom download method.
......@@ -170,9 +170,9 @@ class DataHandlerSingleStation(AbstractDataHandler):
cases, downloaded data is only stored locally if store_data_locally is not disabled. If this parameter is not
set, it is assumed, that data should be saved locally.
"""
check_path_and_create(self.path)
file_name = self._set_file_name(self.path, station, statistics_per_var)
meta_file = self._set_meta_file_name(self.path, station, statistics_per_var)
check_path_and_create(path)
file_name = self._set_file_name(path, station, statistics_per_var)
meta_file = self._set_meta_file_name(path, station, statistics_per_var)
if self.overwrite_local_data is True:
logging.debug(f"overwrite_local_data is true, therefore reload {file_name}")
if os.path.exists(file_name):
......@@ -265,10 +265,15 @@ class DataHandlerSingleStation(AbstractDataHandler):
"""
chem_vars = ["benzene", "ch4", "co", "ethane", "no", "no2", "nox", "o3", "ox", "pm1", "pm10", "pm2p5",
"propane", "so2", "toluene"]
# used_chem_vars = list(set(chem_vars) & set(self.statistics_per_var.keys()))
used_chem_vars = list(set(chem_vars) & set(self.variables))
data.loc[..., used_chem_vars] = data.loc[..., used_chem_vars].clip(min=minimum)
return data
@staticmethod
def setup_data_path(data_path, sampling):
return os.path.join(os.path.abspath(data_path), sampling)
def shift(self, data: xr.DataArray, dim: str, window: int) -> xr.DataArray:
"""
Shift data multiple times to represent history (if window <= 0) or lead time (if window > 0).
......@@ -462,25 +467,6 @@ class DataHandlerSingleStation(AbstractDataHandler):
"""
return data.loc[{coord: slice(str(start), str(end))}]
def check_for_negative_concentrations(self, data: xr.DataArray, minimum: int = 0) -> xr.DataArray:
"""
Set all negative concentrations to zero.
Names of all concentrations are extracted from https://join.fz-juelich.de/services/rest/surfacedata/
#2.1 Parameters. Currently, this check is applied on "benzene", "ch4", "co", "ethane", "no", "no2", "nox",
"o3", "ox", "pm1", "pm10", "pm2p5", "propane", "so2", and "toluene".
:param data: data array containing variables to check
:param minimum: minimum value, by default this should be 0
:return: corrected data
"""
chem_vars = ["benzene", "ch4", "co", "ethane", "no", "no2", "nox", "o3", "ox", "pm1", "pm10", "pm2p5",
"propane", "so2", "toluene"]
used_chem_vars = list(set(chem_vars) & set(self.statistics_per_var.keys()))
data.loc[..., used_chem_vars] = data.loc[..., used_chem_vars].clip(min=minimum)
return data
@staticmethod
def setup_transformation(transformation: statistics.TransformationClass):
"""
......
......@@ -30,7 +30,7 @@ class DefaultDataHandler(AbstractDataHandler):
_requirements = remove_items(inspect.getfullargspec(data_handler).args, ["self", "station"])
def __init__(self, id_class: data_handler, data_path: str, min_length: int = 0,
def __init__(self, id_class: data_handler, experiment_path: str, min_length: int = 0,
extreme_values: num_or_list = None, extremes_on_right_tail_only: bool = False, name_affix=None,
store_processed_data=True):
super().__init__()
......@@ -42,7 +42,7 @@ class DefaultDataHandler(AbstractDataHandler):
self._X_extreme = None
self._Y_extreme = None
_name_affix = str(f"{str(self.id_class)}_{name_affix}" if name_affix is not None else id(self))
self._save_file = os.path.join(data_path, f"data_preparation_{_name_affix}.pickle")
self._save_file = os.path.join(experiment_path, "data", f"{_name_affix}.pickle")
self._collection = self._create_collection()
self.harmonise_X()
self.multiply_extremes(extreme_values, extremes_on_right_tail_only, dim=self.interpolation_dim)
......
......@@ -17,7 +17,7 @@ from mlair.configuration.defaults import DEFAULT_STATIONS, DEFAULT_VAR_ALL_DICT,
DEFAULT_TRAIN_START, DEFAULT_TRAIN_END, DEFAULT_TRAIN_MIN_LENGTH, DEFAULT_VAL_START, DEFAULT_VAL_END, \
DEFAULT_VAL_MIN_LENGTH, DEFAULT_TEST_START, DEFAULT_TEST_END, DEFAULT_TEST_MIN_LENGTH, DEFAULT_TRAIN_VAL_MIN_LENGTH, \
DEFAULT_USE_ALL_STATIONS_ON_ALL_DATA_SETS, DEFAULT_EVALUATE_BOOTSTRAPS, DEFAULT_CREATE_NEW_BOOTSTRAPS, \
DEFAULT_NUMBER_OF_BOOTSTRAPS, DEFAULT_PLOT_LIST
DEFAULT_NUMBER_OF_BOOTSTRAPS, DEFAULT_PLOT_LIST, DEFAULT_SAMPLING
from mlair.data_handler import DefaultDataHandler
from mlair.run_modules.run_environment import RunEnvironment
from mlair.model_modules.model_class import MyLittleModel as VanillaModel
......@@ -214,20 +214,25 @@ class ExperimentSetup(RunEnvironment):
dimensions=None,
time_dim=None,
interpolation_method=None,
interpolation_limit=None, train_start=None, train_end=None, val_start=None, val_end=None, test_start=None,
test_end=None, use_all_stations_on_all_data_sets=None, train_model: bool = None, fraction_of_train: float = None,
experiment_path=None, plot_path: str = None, forecast_path: str = None, overwrite_local_data = None, sampling: str = "daily",
create_new_model = None, bootstrap_path=None, permute_data_on_training = None, transformation=None,
interpolation_limit=None, train_start=None, train_end=None, val_start=None, val_end=None,
test_start=None,
test_end=None, use_all_stations_on_all_data_sets=None, train_model: bool = None,
fraction_of_train: float = None,
experiment_path=None, plot_path: str = None, forecast_path: str = None, overwrite_local_data=None,
sampling: str = None,
create_new_model=None, bootstrap_path=None, permute_data_on_training=None, transformation=None,
train_min_length=None, val_min_length=None, test_min_length=None, extreme_values: list = None,
extremes_on_right_tail_only: bool = None, evaluate_bootstraps=None, plot_list=None, number_of_bootstraps=None,
extremes_on_right_tail_only: bool = None, evaluate_bootstraps=None, plot_list=None,
number_of_bootstraps=None,
create_new_bootstraps=None, data_path: str = None, batch_path: str = None, login_nodes=None,
hpc_hosts=None, model=None, batch_size=None, epochs=None, data_handler=None, **kwargs):
hpc_hosts=None, model=None, batch_size=None, epochs=None, data_handler=None, sampling_inputs=None,
sampling_outputs=None, **kwargs):
# create run framework
super().__init__()
# experiment setup, hyperparameters
self._set_param("data_path", path_config.prepare_host(data_path=data_path, sampling=sampling))
self._set_param("data_path", path_config.prepare_host(data_path=data_path))
self._set_param("hostname", path_config.get_host())
self._set_param("hpc_hosts", hpc_hosts, default=DEFAULT_HPC_HOST_LIST + DEFAULT_HPC_LOGIN_LIST)
self._set_param("login_nodes", login_nodes, default=DEFAULT_HPC_LOGIN_LIST)
......@@ -235,7 +240,7 @@ class ExperimentSetup(RunEnvironment):
if self.data_store.get("create_new_model"):
train_model = True
data_path = self.data_store.get("data_path")
bootstrap_path = path_config.set_bootstrap_path(bootstrap_path, data_path, sampling)
bootstrap_path = path_config.set_bootstrap_path(bootstrap_path, data_path)
self._set_param("bootstrap_path", bootstrap_path)
self._set_param("train_model", train_model, default=DEFAULT_TRAIN_MODEL)
self._set_param("fraction_of_training", fraction_of_train, default=DEFAULT_FRACTION_OF_TRAINING)
......@@ -250,6 +255,7 @@ class ExperimentSetup(RunEnvironment):
self._set_param("epochs", epochs, default=DEFAULT_EPOCHS)
# set experiment name
sampling = self._set_param("sampling", sampling, default=DEFAULT_SAMPLING)
experiment_name = path_config.set_experiment_name(name=experiment_date, sampling=sampling)
experiment_path = path_config.set_experiment_path(name=experiment_name, path=experiment_path)
self._set_param("experiment_name", experiment_name)
......@@ -287,7 +293,7 @@ class ExperimentSetup(RunEnvironment):
self._set_param("window_history_size", window_history_size, default=DEFAULT_WINDOW_HISTORY_SIZE)
self._set_param("overwrite_local_data", overwrite_local_data, default=DEFAULT_OVERWRITE_LOCAL_DATA,
scope="preprocessing")
self._set_param("sampling", sampling)
self._set_param("sampling_inputs", sampling_inputs, default=sampling)
self._set_param("transformation", transformation, default=DEFAULT_TRANSFORMATION)
self._set_param("transformation", None, scope="preprocessing")
self._set_param("data_handler", data_handler, default=DefaultDataHandler)
......@@ -356,7 +362,7 @@ class ExperimentSetup(RunEnvironment):
f"conflict with an existing entry with same naming: {k}={self.data_store.get(k)}")
def _set_param(self, param: str, value: Any, default: Any = None, scope: str = "general",
apply: Callable = None) -> None:
apply: Callable = None) -> Any:
"""Set given parameter and log in debug. Use apply parameter to adjust the stored value (e.g. to transform value
to a list use apply=helpers.to_list)."""
if value is None and default is not None:
......@@ -365,6 +371,7 @@ class ExperimentSetup(RunEnvironment):
value = apply(value)
self.data_store.set(param, value, scope)
logging.debug(f"set experiment attribute: {param}({scope})={value}")
return value
def _compare_variables_and_statistics(self):
"""
......
......@@ -11,22 +11,21 @@ from mlair.helpers import PyTestRegex
class TestPrepareHost:
@mock.patch("socket.gethostname", side_effect=["linux-aa9b", "ZAM144", "zam347", "jrtest", "jwtest",
@mock.patch("socket.gethostname", side_effect=["ZAM144", "zam347", "jrtest", "jwtest",
"runner-6HmDp9Qd-project-2411-concurrent-01"])
@mock.patch("getpass.getuser", return_value="testUser")
@mock.patch("os.path.exists", return_value=True)
def test_prepare_host(self, mock_host, mock_user, mock_path):
assert prepare_host() == "/home/testUser/mlair/data/toar_daily/"
assert prepare_host() == "/home/testUser/Data/toar_daily/"
assert prepare_host() == "/home/testUser/Data/toar_daily/"
assert prepare_host() == "/p/project/cjjsc42/testUser/DATA/toar_daily/"
assert prepare_host() == "/p/project/deepacf/intelliaq/testUser/DATA/toar_daily/"
assert prepare_host() == '/home/testUser/mlair/data/toar_daily/'
assert prepare_host() == "/p/project/deepacf/intelliaq/testUser/DATA/MLAIR/"
assert prepare_host() == '/home/testUser/mlair/data/'
@mock.patch("socket.gethostname", return_value="NotExistingHostName")
@mock.patch("getpass.getuser", return_value="zombie21")
def test_prepare_host_unknown(self, mock_user, mock_host):
assert prepare_host() == os.path.join(os.path.abspath(os.getcwd()), 'data', 'daily')
assert prepare_host() == os.path.join(os.path.abspath(os.getcwd()), 'data')
@mock.patch("getpass.getuser", return_value="zombie21")
@mock.patch("mlair.configuration.path_config.check_path_and_create", side_effect=PermissionError)
......@@ -42,13 +41,13 @@ class TestPrepareHost:
# assert "does not exist for host 'linux-aa9b'" in e.value.args[0]
assert PyTestRegex(r"path '.*' does not exist for host '.*'\.") == e.value.args[0]
@mock.patch("socket.gethostname", side_effect=["linux-aa9b"])
@mock.patch("socket.gethostname", side_effect=["zam347"])
@mock.patch("getpass.getuser", return_value="testUser")
@mock.patch("os.path.exists", return_value=False)
@mock.patch("os.makedirs", side_effect=None)
def test_os_path_exists(self, mock_host, mock_user, mock_path, mock_check):
path = prepare_host()
assert path == "/home/testUser/mlair/data/toar_daily/"
assert path == "/home/testUser/Data/toar_daily/"
class TestSetExperimentName:
......@@ -80,12 +79,12 @@ class TestSetBootstrapPath:
@mock.patch("os.makedirs", side_effect=None)
def test_bootstrap_path_is_none(self, mock_makedir):
bootstrap_path = set_bootstrap_path(None, 'TestDataPath/', 'daily')
assert bootstrap_path == os.path.abspath('TestDataPath/../bootstrap_daily')
bootstrap_path = set_bootstrap_path(None, 'TestDataPath/')
assert bootstrap_path == os.path.abspath('TestDataPath/bootstrap')
@mock.patch("os.makedirs", side_effect=None)
def test_bootstap_path_is_given(self, mock_makedir):
bootstrap_path = set_bootstrap_path('Test/path/to/boots', None, None)
bootstrap_path = set_bootstrap_path('Test/path/to/boots', None)
assert bootstrap_path == os.path.abspath('./Test/path/to/boots')
......
......@@ -125,7 +125,8 @@ class TestTraining:
@pytest.fixture
def data_collection(self, path, window_history_size, window_lead_time, statistics_per_var):
data_prep = DefaultDataHandler.build(['DEBW107'], data_path=os.path.join(os.path.dirname(__file__), 'data'),
data_prep = DefaultDataHandler.build(['DEBW107'], data_path=os.path.join(path, 'data'),
experiment_path=os.path.join(path, 'exp_path'),
statistics_per_var=statistics_per_var, station_type="background",
network="AIRBASE", sampling="daily", target_dim="variables",
target_var="o3", time_dim="datetime",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment