Commit 923775d4 authored by lukas leufen's avatar lukas leufen

Merge branch 'lukas_issue206_refac_inconsistent-variable-naming' into 'develop'

Resolve "handle inconsistent naming in data source"

See merge request !185
parents 45a48051 758a8616
Pipeline #52383 canceled with stages
in 3 minutes and 12 seconds
......@@ -50,6 +50,9 @@ DEFAULT_PLOT_LIST = ["PlotMonthlySummary", "PlotStationMap", "PlotClimatological
"PlotCompetitiveSkillScore", "PlotBootstrapSkillScore", "PlotConditionalQuantiles",
"PlotAvailability", "PlotSeparationOfScales"]
DEFAULT_SAMPLING = "daily"
DEFAULT_DATA_ORIGIN = {"cloudcover": "REA", "humidity": "REA", "pblheight": "REA", "press": "REA", "relhum": "REA",
"temp": "REA", "totprecip": "REA", "u": "REA", "v": "REA", "no": "", "no2": "", "o3": "",
"pm10": "", "so2": ""}
def get_defaults():
......
......@@ -40,7 +40,7 @@ class DataHandlerKzFilterSingleStation(DataHandlerSingleStation):
Setup samples. This method prepares and creates samples X, and labels Y.
"""
data, self.meta = self.load_data(self.path, self.station, self.statistics_per_var, self.sampling,
self.station_type, self.network, self.store_data_locally)
self.station_type, self.network, self.store_data_locally, self.data_origin)
self._data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method,
limit=self.interpolation_limit)
self.set_inputs_and_targets()
......
......@@ -37,7 +37,8 @@ class DataHandlerMixedSamplingSingleStation(DataHandlerSingleStation):
def load_and_interpolate(self, ind) -> [xr.DataArray, pd.DataFrame]:
data, self.meta = self.load_data(self.path[ind], self.station, self.statistics_per_var, self.sampling[ind],
self.station_type, self.network, self.store_data_locally, self.start, self.end)
self.station_type, self.network, self.store_data_locally, self.data_origin,
self.start, self.end)
data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method,
limit=self.interpolation_limit)
return data
......@@ -110,7 +111,8 @@ class DataHandlerMixedSamplingWithFilterSingleStation(DataHandlerMixedSamplingSi
start, end = self.start, self.end
data, self.meta = self.load_data(self.path[ind], self.station, self.statistics_per_var, self.sampling[ind],
self.station_type, self.network, self.store_data_locally, start, end)
self.station_type, self.network, self.store_data_locally, self.data_origin,
start, end)
data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method,
limit=self.interpolation_limit)
return data
......
......@@ -49,11 +49,12 @@ class DataHandlerSingleStation(AbstractDataHandler):
window_history_size=DEFAULT_WINDOW_HISTORY_SIZE, window_lead_time=DEFAULT_WINDOW_LEAD_TIME,
interpolation_limit: int = 0, interpolation_method: str = DEFAULT_INTERPOLATION_METHOD,
overwrite_local_data: bool = False, transformation=None, store_data_locally: bool = True,
min_length: int = 0, start=None, end=None, variables=None, **kwargs):
min_length: int = 0, start=None, end=None, variables=None, data_origin: Dict = None, **kwargs):
super().__init__() # path, station, statistics_per_var, transformation, **kwargs)
self.station = helpers.to_list(station)
self.path = self.setup_data_path(data_path, sampling)
self.statistics_per_var = statistics_per_var
self.data_origin = data_origin
self.do_transformation = transformation is not None
self.input_data, self.target_data = self.setup_transformation(transformation)
......@@ -142,7 +143,8 @@ class DataHandlerSingleStation(AbstractDataHandler):
Setup samples. This method prepares and creates samples X, and labels Y.
"""
data, self.meta = self.load_data(self.path, self.station, self.statistics_per_var, self.sampling,
self.station_type, self.network, self.store_data_locally, self.start, self.end)
self.station_type, self.network, self.store_data_locally, self.data_origin,
self.start, self.end)
self._data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method,
limit=self.interpolation_limit)
self.set_inputs_and_targets()
......@@ -163,7 +165,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
self.remove_nan(self.time_dim)
def load_data(self, path, station, statistics_per_var, sampling, station_type=None, network=None,
store_data_locally=False, start=None, end=None):
store_data_locally=False, data_origin: Dict = None, start=None, end=None):
"""
Load data and meta data either from local disk (preferred) or download new data by using a custom download method.
......@@ -182,7 +184,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
os.remove(meta_file)
data, meta = self.download_data(file_name, meta_file, station, statistics_per_var, sampling,
station_type=station_type, network=network,
store_data_locally=store_data_locally)
store_data_locally=store_data_locally, data_origin=data_origin)
logging.debug(f"loaded new data")
else:
try:
......@@ -196,7 +198,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
logging.debug(f"load new data")
data, meta = self.download_data(file_name, meta_file, station, statistics_per_var, sampling,
station_type=station_type, network=network,
store_data_locally=store_data_locally)
store_data_locally=store_data_locally, data_origin=data_origin)
logging.debug("loading finished")
# create slices and check for negative concentration.
data = self._slice_prep(data, start=start, end=end)
......@@ -205,8 +207,8 @@ class DataHandlerSingleStation(AbstractDataHandler):
@staticmethod
def download_data_from_join(file_name: str, meta_file: str, station, statistics_per_var, sampling,
station_type=None, network=None, store_data_locally=True) -> [xr.DataArray,
pd.DataFrame]:
station_type=None, network=None, store_data_locally=True, data_origin: Dict = None) \
-> [xr.DataArray, pd.DataFrame]:
"""
Download data from TOAR database using the JOIN interface.
......@@ -220,7 +222,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
"""
df_all = {}
df, meta = join.download_join(station_name=station, stat_var=statistics_per_var, station_type=station_type,
network_name=network, sampling=sampling)
network_name=network, sampling=sampling, data_origin=data_origin)
df_all[station[0]] = df
# convert df_all to xarray
xarr = {k: xr.DataArray(v, dims=['datetime', 'variables']) for k, v in df_all.items()}
......
......@@ -23,7 +23,8 @@ class EmptyQueryResult(Exception):
def download_join(station_name: Union[str, List[str]], stat_var: dict, station_type: str = None,
network_name: str = None, sampling: str = "daily") -> [pd.DataFrame, pd.DataFrame]:
network_name: str = None, sampling: str = "daily", data_origin: Dict = None) -> [pd.DataFrame,
pd.DataFrame]:
"""
Read data from JOIN/TOAR.
......@@ -32,6 +33,8 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t
:param station_type: set the station type like "traffic" or "background", can be none
:param network_name: set the measurement network like "UBA" or "AIRBASE", can be none
:param sampling: sampling rate of the downloaded data, either set to daily or hourly (default daily)
:param data_origin: additional dictionary to specify data origin as key (for variable) value (origin) pair. Valid
origins are "REA" for reanalysis data and "" (empty string) for observational data.
:returns: data frame with all variables and statistics and meta data frame with all meta information
"""
......@@ -42,7 +45,7 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t
join_url_base, headers = join_settings(sampling)
# load series information
vars_dict = load_series_information(station_name, station_type, network_name, join_url_base, headers)
vars_dict = load_series_information(station_name, station_type, network_name, join_url_base, headers, data_origin)
# correct stat_var values if data is not aggregated (hourly)
if sampling == "hourly":
......@@ -123,7 +126,7 @@ def get_data(opts: Dict, headers: Dict) -> Union[Dict, List]:
def load_series_information(station_name: List[str], station_type: str_or_none, network_name: str_or_none,
join_url_base: str, headers: Dict) -> Dict:
join_url_base: str, headers: Dict, data_origin: Dict = None) -> Dict:
"""
List all series ids that are available for given station id and network name.
......@@ -132,15 +135,36 @@ def load_series_information(station_name: List[str], station_type: str_or_none,
:param network_name: measurement network of the station like "UBA" or "AIRBASE"
:param join_url_base: base url name to download data from
:param headers: additional headers information like authorization, can be empty
:param data_origin: additional information to select a distinct series e.g. from reanalysis (REA) or from observation
("", empty string). This dictionary should contain a key for each variable and the information as key
:return: all available series for requested station stored in an dictionary with parameter name (variable) as key
and the series id as value.
"""
opts = {"base": join_url_base, "service": "series", "station_id": station_name[0], "station_type": station_type,
"network_name": network_name}
opts = {"base": join_url_base, "service": "search", "station_id": station_name[0], "station_type": station_type,
"network_name": network_name, "as_dict": "true",
"columns": "id,network_name,station_id,parameter_name,parameter_label,parameter_attribute"}
station_vars = get_data(opts, headers)
logging.debug(f"{station_name}: {station_vars}") # ToDo start here for #206
vars_dict = {item[3].lower(): item[0] for item in station_vars}
return vars_dict
return _select_distinct_series(station_vars, data_origin)
def _select_distinct_series(vars: List[Dict], data_origin: Dict = None):
"""
Select distinct series ids for all variables. Also check if a parameter is from REA or not.
"""
if data_origin is None:
data_origin = {"cloudcover": "REA", "humidity": "REA", "pblheight": "REA", "press": "REA", "relhum": "REA",
"temp": "REA", "totprecip": "REA", "u": "REA", "v": "REA",
"no": "", "no2": "", "o3": "", "pm10": "", "so2": ""}
# ToDo: maybe press, wdir, wspeed from obs? or also temp, ... ?
selected = {}
for var in vars:
name = var["parameter_name"].lower()
var_attr = var["parameter_attribute"].lower()
attr = data_origin.get(name, "").lower()
if var_attr == attr:
selected[name] = var["id"]
return selected
def _save_to_pandas(df: Union[pd.DataFrame, None], data: dict, stat: str, var: str) -> pd.DataFrame:
......
......@@ -17,7 +17,7 @@ from mlair.configuration.defaults import DEFAULT_STATIONS, DEFAULT_VAR_ALL_DICT,
DEFAULT_TRAIN_START, DEFAULT_TRAIN_END, DEFAULT_TRAIN_MIN_LENGTH, DEFAULT_VAL_START, DEFAULT_VAL_END, \
DEFAULT_VAL_MIN_LENGTH, DEFAULT_TEST_START, DEFAULT_TEST_END, DEFAULT_TEST_MIN_LENGTH, DEFAULT_TRAIN_VAL_MIN_LENGTH, \
DEFAULT_USE_ALL_STATIONS_ON_ALL_DATA_SETS, DEFAULT_EVALUATE_BOOTSTRAPS, DEFAULT_CREATE_NEW_BOOTSTRAPS, \
DEFAULT_NUMBER_OF_BOOTSTRAPS, DEFAULT_PLOT_LIST, DEFAULT_SAMPLING
DEFAULT_NUMBER_OF_BOOTSTRAPS, DEFAULT_PLOT_LIST, DEFAULT_SAMPLING, DEFAULT_DATA_ORIGIN
from mlair.data_handler import DefaultDataHandler
from mlair.run_modules.run_environment import RunEnvironment
from mlair.model_modules.model_class import MyLittleModel as VanillaModel
......@@ -226,7 +226,7 @@ class ExperimentSetup(RunEnvironment):
number_of_bootstraps=None,
create_new_bootstraps=None, data_path: str = None, batch_path: str = None, login_nodes=None,
hpc_hosts=None, model=None, batch_size=None, epochs=None, data_handler=None, sampling_inputs=None,
sampling_outputs=None, **kwargs):
sampling_outputs=None, data_origin: Dict = None, **kwargs):
# create run framework
super().__init__()
......@@ -288,6 +288,7 @@ class ExperimentSetup(RunEnvironment):
self._set_param("stations", stations, default=DEFAULT_STATIONS, apply=helpers.to_list)
self._set_param("statistics_per_var", statistics_per_var, default=DEFAULT_VAR_ALL_DICT)
self._set_param("variables", variables, default=list(self.data_store.get("statistics_per_var").keys()))
self._set_param("data_origin", data_origin, default=DEFAULT_DATA_ORIGIN)
self._set_param("start", start, default=DEFAULT_START)
self._set_param("end", end, default=DEFAULT_END)
self._set_param("window_history_size", window_history_size, default=DEFAULT_WINDOW_HISTORY_SIZE)
......
......@@ -22,6 +22,7 @@ def main(parser_args):
test_end="2011-12-31",
stations=["DEBW107", "DEBW013"],
epochs=100,
network="UBA",
)
workflow = DefaultWorkflow(**args)
workflow.run()
......
......@@ -3,7 +3,7 @@ from typing import Iterable
import pytest
from mlair.helpers.join import *
from mlair.helpers.join import _save_to_pandas, _correct_stat_name, _lower_list
from mlair.helpers.join import _save_to_pandas, _correct_stat_name, _lower_list, _select_distinct_series
from mlair.configuration.join_settings import join_settings
......@@ -52,7 +52,7 @@ class TestGetData:
class TestLoadSeriesInformation:
def test_standard_query(self):
expected_subset = {'o3': 23031, 'no2': 39002, 'temp--lubw': 17059, 'wspeed': 17060}
expected_subset = {'o3': 23031, 'no2': 39002, 'temp': 85584, 'wspeed': 17060}
assert expected_subset.items() <= load_series_information(['DEBW107'], None, None, join_settings()[0],
{}).items()
......@@ -60,6 +60,38 @@ class TestLoadSeriesInformation:
assert load_series_information(['DEBW107'], "traffic", None, join_settings()[0], {}) == {}
class TestSelectDistinctSeries:
@pytest.fixture
def vars(self):
return [{'id': 16686, 'network_name': 'UBA', 'station_id': 'DENW053', 'parameter_name': 'no2',
'parameter_label': 'NO2', 'parameter_attribute': ''},
{'id': 16687, 'network_name': 'UBA', 'station_id': 'DENW053', 'parameter_name': 'o3',
'parameter_label': 'O3',
'parameter_attribute': ''},
{'id': 16692, 'network_name': 'UBA', 'station_id': 'DENW053', 'parameter_name': 'press',
'parameter_label': 'PRESS--LANUV', 'parameter_attribute': ''},
{'id': 16693, 'network_name': 'UBA', 'station_id': 'DENW053', 'parameter_name': 'temp',
'parameter_label': 'TEMP--LANUV', 'parameter_attribute': ''},
{'id': 54036, 'network_name': 'UBA', 'station_id': 'DENW053', 'parameter_name': 'cloudcover',
'parameter_label': 'CLOUDCOVER', 'parameter_attribute': 'REA'},
{'id': 88491, 'network_name': 'UBA', 'station_id': 'DENW053', 'parameter_name': 'temp',
'parameter_label': 'TEMP-REA-MIUB', 'parameter_attribute': 'REA'},
{'id': 102660, 'network_name': 'UBA', 'station_id': 'DENW053', 'parameter_name': 'press',
'parameter_label': 'PRESS-REA-MIUB', 'parameter_attribute': 'REA'}]
def test_no_origin_given(self, vars):
res = _select_distinct_series(vars)
assert res == {"no2": 16686, "o3": 16687, "cloudcover": 54036, "temp": 88491, "press": 102660}
def test_different_origins(self, vars):
origin = {"no2": "test", "temp": "", "cloudcover": "REA"}
res = _select_distinct_series(vars, data_origin=origin)
assert res == {"o3": 16687, "press": 16692, "temp": 16693, "cloudcover": 54036}
res = _select_distinct_series(vars, data_origin={})
assert res == {"no2": 16686, "o3": 16687, "press": 16692, "temp": 16693}
class TestSaveToPandas:
@staticmethod
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment