Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
M
MLAir
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
76
Issues
76
List
Boards
Labels
Service Desk
Milestones
Merge Requests
1
Merge Requests
1
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Operations
Operations
Metrics
Incidents
Environments
Packages & Registries
Packages & Registries
Container Registry
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
toar
MLAir
Commits
af182919
Commit
af182919
authored
Nov 17, 2020
by
lukas leufen
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
use data_origin to specify if data is from REA or observation
parent
5c58d438
Pipeline
#52280
passed with stages
in 7 minutes and 34 seconds
Changes
7
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
82 additions
and
21 deletions
+82
-21
mlair/configuration/defaults.py
mlair/configuration/defaults.py
+3
-0
mlair/data_handler/data_handler_kz_filter.py
mlair/data_handler/data_handler_kz_filter.py
+1
-1
mlair/data_handler/data_handler_mixed_sampling.py
mlair/data_handler/data_handler_mixed_sampling.py
+1
-1
mlair/data_handler/data_handler_single_station.py
mlair/data_handler/data_handler_single_station.py
+9
-8
mlair/helpers/join.py
mlair/helpers/join.py
+31
-7
mlair/run_modules/experiment_setup.py
mlair/run_modules/experiment_setup.py
+3
-2
test/test_join.py
test/test_join.py
+34
-2
No files found.
mlair/configuration/defaults.py
View file @
af182919
...
...
@@ -50,6 +50,9 @@ DEFAULT_PLOT_LIST = ["PlotMonthlySummary", "PlotStationMap", "PlotClimatological
"PlotCompetitiveSkillScore"
,
"PlotBootstrapSkillScore"
,
"PlotConditionalQuantiles"
,
"PlotAvailability"
]
DEFAULT_SAMPLING
=
"daily"
DEFAULT_DATA_ORIGIN
=
{
"cloudcover"
:
"REA"
,
"humidity"
:
"REA"
,
"pblheight"
:
"REA"
,
"press"
:
"REA"
,
"relhum"
:
"REA"
,
"temp"
:
"REA"
,
"totprecip"
:
"REA"
,
"u"
:
"REA"
,
"v"
:
"REA"
,
"no"
:
""
,
"no2"
:
""
,
"o3"
:
""
,
"pm10"
:
""
,
"so2"
:
""
}
def
get_defaults
():
...
...
mlair/data_handler/data_handler_kz_filter.py
View file @
af182919
...
...
@@ -42,7 +42,7 @@ class DataHandlerKzFilterSingleStation(DataHandlerSingleStation):
Setup samples. This method prepares and creates samples X, and labels Y.
"""
data
,
self
.
meta
=
self
.
load_data
(
self
.
path
,
self
.
station
,
self
.
statistics_per_var
,
self
.
sampling
,
self
.
station_type
,
self
.
network
,
self
.
store_data_locally
)
self
.
station_type
,
self
.
network
,
self
.
store_data_locally
,
self
.
data_origin
)
self
.
_data
=
self
.
interpolate
(
data
,
dim
=
self
.
time_dim
,
method
=
self
.
interpolation_method
,
limit
=
self
.
interpolation_limit
)
self
.
set_inputs_and_targets
()
...
...
mlair/data_handler/data_handler_mixed_sampling.py
View file @
af182919
...
...
@@ -37,7 +37,7 @@ class DataHandlerMixedSamplingSingleStation(DataHandlerSingleStation):
def
load_and_interpolate
(
self
,
ind
)
->
[
xr
.
DataArray
,
pd
.
DataFrame
]:
data
,
self
.
meta
=
self
.
load_data
(
self
.
path
[
ind
],
self
.
station
,
self
.
statistics_per_var
,
self
.
sampling
[
ind
],
self
.
station_type
,
self
.
network
,
self
.
store_data_locally
)
self
.
station_type
,
self
.
network
,
self
.
store_data_locally
,
self
.
data_origin
)
data
=
self
.
interpolate
(
data
,
dim
=
self
.
time_dim
,
method
=
self
.
interpolation_method
,
limit
=
self
.
interpolation_limit
)
return
data
...
...
mlair/data_handler/data_handler_single_station.py
View file @
af182919
...
...
@@ -49,11 +49,12 @@ class DataHandlerSingleStation(AbstractDataHandler):
window_history_size
=
DEFAULT_WINDOW_HISTORY_SIZE
,
window_lead_time
=
DEFAULT_WINDOW_LEAD_TIME
,
interpolation_limit
:
int
=
0
,
interpolation_method
:
str
=
DEFAULT_INTERPOLATION_METHOD
,
overwrite_local_data
:
bool
=
False
,
transformation
=
None
,
store_data_locally
:
bool
=
True
,
min_length
:
int
=
0
,
start
=
None
,
end
=
None
,
variables
=
None
,
**
kwargs
):
min_length
:
int
=
0
,
start
=
None
,
end
=
None
,
variables
=
None
,
data_origin
:
Dict
=
None
,
**
kwargs
):
super
().
__init__
()
# path, station, statistics_per_var, transformation, **kwargs)
self
.
station
=
helpers
.
to_list
(
station
)
self
.
path
=
self
.
setup_data_path
(
data_path
,
sampling
)
self
.
statistics_per_var
=
statistics_per_var
self
.
data_origin
=
data_origin
self
.
do_transformation
=
transformation
is
not
None
self
.
input_data
,
self
.
target_data
=
self
.
setup_transformation
(
transformation
)
...
...
@@ -142,7 +143,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
Setup samples. This method prepares and creates samples X, and labels Y.
"""
data
,
self
.
meta
=
self
.
load_data
(
self
.
path
,
self
.
station
,
self
.
statistics_per_var
,
self
.
sampling
,
self
.
station_type
,
self
.
network
,
self
.
store_data_locally
)
self
.
station_type
,
self
.
network
,
self
.
store_data_locally
,
self
.
data_origin
)
self
.
_data
=
self
.
interpolate
(
data
,
dim
=
self
.
time_dim
,
method
=
self
.
interpolation_method
,
limit
=
self
.
interpolation_limit
)
self
.
set_inputs_and_targets
()
...
...
@@ -163,7 +164,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
self
.
remove_nan
(
self
.
time_dim
)
def
load_data
(
self
,
path
,
station
,
statistics_per_var
,
sampling
,
station_type
=
None
,
network
=
None
,
store_data_locally
=
False
):
store_data_locally
=
False
,
data_origin
:
Dict
=
None
):
"""
Load data and meta data either from local disk (preferred) or download new data by using a custom download method.
...
...
@@ -182,7 +183,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
os
.
remove
(
meta_file
)
data
,
meta
=
self
.
download_data
(
file_name
,
meta_file
,
station
,
statistics_per_var
,
sampling
,
station_type
=
station_type
,
network
=
network
,
store_data_locally
=
store_data_locally
)
store_data_locally
=
store_data_locally
,
data_origin
=
data_origin
)
logging
.
debug
(
f
"loaded new data"
)
else
:
try
:
...
...
@@ -196,7 +197,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
logging
.
debug
(
f
"load new data"
)
data
,
meta
=
self
.
download_data
(
file_name
,
meta_file
,
station
,
statistics_per_var
,
sampling
,
station_type
=
station_type
,
network
=
network
,
store_data_locally
=
store_data_locally
)
store_data_locally
=
store_data_locally
,
data_origin
=
data_origin
)
logging
.
debug
(
"loading finished"
)
# create slices and check for negative concentration.
data
=
self
.
_slice_prep
(
data
)
...
...
@@ -205,8 +206,8 @@ class DataHandlerSingleStation(AbstractDataHandler):
@
staticmethod
def
download_data_from_join
(
file_name
:
str
,
meta_file
:
str
,
station
,
statistics_per_var
,
sampling
,
station_type
=
None
,
network
=
None
,
store_data_locally
=
True
)
->
[
xr
.
DataArray
,
pd
.
DataFrame
]:
station_type
=
None
,
network
=
None
,
store_data_locally
=
True
,
data_origin
:
Dict
=
None
)
\
->
[
xr
.
DataArray
,
pd
.
DataFrame
]:
"""
Download data from TOAR database using the JOIN interface.
...
...
@@ -220,7 +221,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
"""
df_all
=
{}
df
,
meta
=
join
.
download_join
(
station_name
=
station
,
stat_var
=
statistics_per_var
,
station_type
=
station_type
,
network_name
=
network
,
sampling
=
sampling
)
network_name
=
network
,
sampling
=
sampling
,
data_origin
=
data_origin
)
df_all
[
station
[
0
]]
=
df
# convert df_all to xarray
xarr
=
{
k
:
xr
.
DataArray
(
v
,
dims
=
[
'datetime'
,
'variables'
])
for
k
,
v
in
df_all
.
items
()}
...
...
mlair/helpers/join.py
View file @
af182919
...
...
@@ -23,7 +23,8 @@ class EmptyQueryResult(Exception):
def
download_join
(
station_name
:
Union
[
str
,
List
[
str
]],
stat_var
:
dict
,
station_type
:
str
=
None
,
network_name
:
str
=
None
,
sampling
:
str
=
"daily"
)
->
[
pd
.
DataFrame
,
pd
.
DataFrame
]:
network_name
:
str
=
None
,
sampling
:
str
=
"daily"
,
data_origin
:
Dict
=
None
)
->
[
pd
.
DataFrame
,
pd
.
DataFrame
]:
"""
Read data from JOIN/TOAR.
...
...
@@ -32,6 +33,8 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t
:param station_type: set the station type like "traffic" or "background", can be none
:param network_name: set the measurement network like "UBA" or "AIRBASE", can be none
:param sampling: sampling rate of the downloaded data, either set to daily or hourly (default daily)
:param data_origin: additional dictionary to specify data origin as key (for variable) value (origin) pair. Valid
origins are "REA" for reanalysis data and "" (empty string) for observational data.
:returns: data frame with all variables and statistics and meta data frame with all meta information
"""
...
...
@@ -42,7 +45,7 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t
join_url_base
,
headers
=
join_settings
(
sampling
)
# load series information
vars_dict
=
load_series_information
(
station_name
,
station_type
,
network_name
,
join_url_base
,
headers
)
vars_dict
=
load_series_information
(
station_name
,
station_type
,
network_name
,
join_url_base
,
headers
,
data_origin
)
# correct stat_var values if data is not aggregated (hourly)
if
sampling
==
"hourly"
:
...
...
@@ -123,7 +126,7 @@ def get_data(opts: Dict, headers: Dict) -> Union[Dict, List]:
def
load_series_information
(
station_name
:
List
[
str
],
station_type
:
str_or_none
,
network_name
:
str_or_none
,
join_url_base
:
str
,
headers
:
Dict
)
->
Dict
:
join_url_base
:
str
,
headers
:
Dict
,
data_origin
:
Dict
=
None
)
->
Dict
:
"""
List all series ids that are available for given station id and network name.
...
...
@@ -132,15 +135,36 @@ def load_series_information(station_name: List[str], station_type: str_or_none,
:param network_name: measurement network of the station like "UBA" or "AIRBASE"
:param join_url_base: base url name to download data from
:param headers: additional headers information like authorization, can be empty
:param data_origin: additional information to select a distinct series e.g. from reanalysis (REA) or from observation
("", empty string). This dictionary should contain a key for each variable and the information as key
:return: all available series for requested station stored in an dictionary with parameter name (variable) as key
and the series id as value.
"""
opts
=
{
"base"
:
join_url_base
,
"service"
:
"series"
,
"station_id"
:
station_name
[
0
],
"station_type"
:
station_type
,
"network_name"
:
network_name
}
opts
=
{
"base"
:
join_url_base
,
"service"
:
"search"
,
"station_id"
:
station_name
[
0
],
"station_type"
:
station_type
,
"network_name"
:
network_name
,
"as_dict"
:
"true"
,
"columns"
:
"id,network_name,station_id,parameter_name,parameter_label,parameter_attribute"
}
station_vars
=
get_data
(
opts
,
headers
)
logging
.
debug
(
f
"
{
station_name
}
:
{
station_vars
}
"
)
# ToDo start here for #206
vars_dict
=
{
item
[
3
].
lower
():
item
[
0
]
for
item
in
station_vars
}
return
vars_dict
return
_select_distinct_series
(
station_vars
,
data_origin
)
def
_select_distinct_series
(
vars
:
List
[
Dict
],
data_origin
:
Dict
=
None
):
"""
Select distinct series ids for all variables. Also check if a parameter is from REA or not.
"""
if
data_origin
is
None
:
data_origin
=
{
"cloudcover"
:
"REA"
,
"humidity"
:
"REA"
,
"pblheight"
:
"REA"
,
"press"
:
"REA"
,
"relhum"
:
"REA"
,
"temp"
:
"REA"
,
"totprecip"
:
"REA"
,
"u"
:
"REA"
,
"v"
:
"REA"
,
"no"
:
""
,
"no2"
:
""
,
"o3"
:
""
,
"pm10"
:
""
,
"so2"
:
""
}
# ToDo: maybe press, wdir, wspeed from obs? or also temp, ... ?
selected
=
{}
for
var
in
vars
:
name
=
var
[
"parameter_name"
].
lower
()
var_attr
=
var
[
"parameter_attribute"
].
lower
()
attr
=
data_origin
.
get
(
name
,
""
).
lower
()
if
var_attr
==
attr
:
selected
[
name
]
=
var
[
"id"
]
return
selected
def
_save_to_pandas
(
df
:
Union
[
pd
.
DataFrame
,
None
],
data
:
dict
,
stat
:
str
,
var
:
str
)
->
pd
.
DataFrame
:
...
...
mlair/run_modules/experiment_setup.py
View file @
af182919
...
...
@@ -17,7 +17,7 @@ from mlair.configuration.defaults import DEFAULT_STATIONS, DEFAULT_VAR_ALL_DICT,
DEFAULT_TRAIN_START
,
DEFAULT_TRAIN_END
,
DEFAULT_TRAIN_MIN_LENGTH
,
DEFAULT_VAL_START
,
DEFAULT_VAL_END
,
\
DEFAULT_VAL_MIN_LENGTH
,
DEFAULT_TEST_START
,
DEFAULT_TEST_END
,
DEFAULT_TEST_MIN_LENGTH
,
DEFAULT_TRAIN_VAL_MIN_LENGTH
,
\
DEFAULT_USE_ALL_STATIONS_ON_ALL_DATA_SETS
,
DEFAULT_EVALUATE_BOOTSTRAPS
,
DEFAULT_CREATE_NEW_BOOTSTRAPS
,
\
DEFAULT_NUMBER_OF_BOOTSTRAPS
,
DEFAULT_PLOT_LIST
,
DEFAULT_SAMPLING
DEFAULT_NUMBER_OF_BOOTSTRAPS
,
DEFAULT_PLOT_LIST
,
DEFAULT_SAMPLING
,
DEFAULT_DATA_ORIGIN
from
mlair.data_handler
import
DefaultDataHandler
from
mlair.run_modules.run_environment
import
RunEnvironment
from
mlair.model_modules.model_class
import
MyLittleModel
as
VanillaModel
...
...
@@ -226,7 +226,7 @@ class ExperimentSetup(RunEnvironment):
number_of_bootstraps
=
None
,
create_new_bootstraps
=
None
,
data_path
:
str
=
None
,
batch_path
:
str
=
None
,
login_nodes
=
None
,
hpc_hosts
=
None
,
model
=
None
,
batch_size
=
None
,
epochs
=
None
,
data_handler
=
None
,
sampling_inputs
=
None
,
sampling_outputs
=
None
,
**
kwargs
):
sampling_outputs
=
None
,
data_origin
:
Dict
=
None
,
**
kwargs
):
# create run framework
super
().
__init__
()
...
...
@@ -288,6 +288,7 @@ class ExperimentSetup(RunEnvironment):
self
.
_set_param
(
"stations"
,
stations
,
default
=
DEFAULT_STATIONS
,
apply
=
helpers
.
to_list
)
self
.
_set_param
(
"statistics_per_var"
,
statistics_per_var
,
default
=
DEFAULT_VAR_ALL_DICT
)
self
.
_set_param
(
"variables"
,
variables
,
default
=
list
(
self
.
data_store
.
get
(
"statistics_per_var"
).
keys
()))
self
.
_set_param
(
"data_origin"
,
data_origin
,
default
=
DEFAULT_DATA_ORIGIN
)
self
.
_set_param
(
"start"
,
start
,
default
=
DEFAULT_START
)
self
.
_set_param
(
"end"
,
end
,
default
=
DEFAULT_END
)
self
.
_set_param
(
"window_history_size"
,
window_history_size
,
default
=
DEFAULT_WINDOW_HISTORY_SIZE
)
...
...
test/test_join.py
View file @
af182919
...
...
@@ -3,7 +3,7 @@ from typing import Iterable
import
pytest
from
mlair.helpers.join
import
*
from
mlair.helpers.join
import
_save_to_pandas
,
_correct_stat_name
,
_lower_list
from
mlair.helpers.join
import
_save_to_pandas
,
_correct_stat_name
,
_lower_list
,
_select_distinct_series
from
mlair.configuration.join_settings
import
join_settings
...
...
@@ -52,7 +52,7 @@ class TestGetData:
class
TestLoadSeriesInformation
:
def
test_standard_query
(
self
):
expected_subset
=
{
'o3'
:
23031
,
'no2'
:
39002
,
'temp
--lubw'
:
17059
,
'wspeed'
:
17060
}
expected_subset
=
{
'o3'
:
23031
,
'no2'
:
39002
,
'temp
'
:
85584
,
'wspeed'
:
17060
}
assert
expected_subset
.
items
()
<=
load_series_information
([
'DEBW107'
],
None
,
None
,
join_settings
()[
0
],
{}).
items
()
...
...
@@ -60,6 +60,38 @@ class TestLoadSeriesInformation:
assert
load_series_information
([
'DEBW107'
],
"traffic"
,
None
,
join_settings
()[
0
],
{})
==
{}
class
TestSelectDistinctSeries
:
@
pytest
.
fixture
def
vars
(
self
):
return
[{
'id'
:
16686
,
'network_name'
:
'UBA'
,
'station_id'
:
'DENW053'
,
'parameter_name'
:
'no2'
,
'parameter_label'
:
'NO2'
,
'parameter_attribute'
:
''
},
{
'id'
:
16687
,
'network_name'
:
'UBA'
,
'station_id'
:
'DENW053'
,
'parameter_name'
:
'o3'
,
'parameter_label'
:
'O3'
,
'parameter_attribute'
:
''
},
{
'id'
:
16692
,
'network_name'
:
'UBA'
,
'station_id'
:
'DENW053'
,
'parameter_name'
:
'press'
,
'parameter_label'
:
'PRESS--LANUV'
,
'parameter_attribute'
:
''
},
{
'id'
:
16693
,
'network_name'
:
'UBA'
,
'station_id'
:
'DENW053'
,
'parameter_name'
:
'temp'
,
'parameter_label'
:
'TEMP--LANUV'
,
'parameter_attribute'
:
''
},
{
'id'
:
54036
,
'network_name'
:
'UBA'
,
'station_id'
:
'DENW053'
,
'parameter_name'
:
'cloudcover'
,
'parameter_label'
:
'CLOUDCOVER'
,
'parameter_attribute'
:
'REA'
},
{
'id'
:
88491
,
'network_name'
:
'UBA'
,
'station_id'
:
'DENW053'
,
'parameter_name'
:
'temp'
,
'parameter_label'
:
'TEMP-REA-MIUB'
,
'parameter_attribute'
:
'REA'
},
{
'id'
:
102660
,
'network_name'
:
'UBA'
,
'station_id'
:
'DENW053'
,
'parameter_name'
:
'press'
,
'parameter_label'
:
'PRESS-REA-MIUB'
,
'parameter_attribute'
:
'REA'
}]
def
test_no_origin_given
(
self
,
vars
):
res
=
_select_distinct_series
(
vars
)
assert
res
==
{
"no2"
:
16686
,
"o3"
:
16687
,
"cloudcover"
:
54036
,
"temp"
:
88491
,
"press"
:
102660
}
def
test_different_origins
(
self
,
vars
):
origin
=
{
"no2"
:
"test"
,
"temp"
:
""
,
"cloudcover"
:
"REA"
}
res
=
_select_distinct_series
(
vars
,
data_origin
=
origin
)
assert
res
==
{
"o3"
:
16687
,
"press"
:
16692
,
"temp"
:
16693
,
"cloudcover"
:
54036
}
res
=
_select_distinct_series
(
vars
,
data_origin
=
{})
assert
res
==
{
"no2"
:
16686
,
"o3"
:
16687
,
"press"
:
16692
,
"temp"
:
16693
}
class
TestSaveToPandas
:
@
staticmethod
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment