Commit 0e8e07f6 authored by lukas leufen's avatar lukas leufen

Merge branch 'lukas_issue164_feat_parallel-station-check' into 'develop'

Resolve "Parallel station check"

See merge request !195
parents 60048b9d f74de303
Pipeline #52947 passed with stages
in 12 minutes and 46 seconds
......@@ -42,6 +42,13 @@ ehthumbs.db
Thumbs.db
.idea/
/venv/
/venv*/
/build/
# ignore HPC related skripts #
##############################
run_*_develgpus.bash
run_*_gpus.bash
# don't check data and plot folder #
####################################
......
......@@ -6,6 +6,8 @@ __date__ = '2019-11-25'
import logging
import os
from typing import Tuple
import multiprocessing
import requests
import numpy as np
import pandas as pd
......@@ -201,6 +203,50 @@ class PreProcessing(RunEnvironment):
Valid means, that there is data available for the given time range (is included in `kwargs`). The shape and the
loading time are logged in debug mode.
:return: Corrected list containing only valid station IDs.
"""
t_outer = TimeTracking()
logging.info(f"check valid stations started{' (%s)' % (set_name if set_name is not None else 'all')}")
# calculate transformation using train data
if set_name == "train":
logging.info("setup transformation using train data exclusively")
self.transformation(data_handler, set_stations)
# start station check
collection = DataCollection()
valid_stations = []
kwargs = self.data_store.create_args_dict(data_handler.requirements(), scope=set_name)
if multiprocessing.cpu_count() > 1: # parallel solution
logging.info("use parallel validate station approach")
pool = multiprocessing.Pool()
output = [
pool.apply_async(f_proc, args=(data_handler, station, set_name, store_processed_data), kwds=kwargs)
for station in set_stations]
for p in output:
dh, s = p.get()
if dh is not None:
collection.add(dh)
valid_stations.append(s)
else: # serial solution
logging.info("use serial validate station approach")
for station in set_stations:
dh, s = f_proc(data_handler, station, set_name, store_processed_data, **kwargs)
if dh is not None:
collection.add(dh)
valid_stations.append(s)
logging.info(f"run for {t_outer} to check {len(set_stations)} station(s). Found {len(collection)}/"
f"{len(set_stations)} valid stations.")
return collection, valid_stations
def validate_station_old(self, data_handler: AbstractDataHandler, set_stations, set_name=None,
store_processed_data=True):
"""
Check if all given stations in `all_stations` are valid.
Valid means, that there is data available for the given time range (is included in `kwargs`). The shape and the
loading time are logged in debug mode.
:return: Corrected list containing only valid station IDs.
"""
t_outer = TimeTracking()
......@@ -231,3 +277,18 @@ class PreProcessing(RunEnvironment):
transformation_dict = data_handler.transformation(stations, **kwargs)
if transformation_dict is not None:
self.data_store.set("transformation", transformation_dict)
def f_proc(data_handler, station, name_affix, store, **kwargs):
"""
Try to create a data handler for given arguments. If build fails, this station does not fulfil all requirements and
therefore f_proc will return None as indication. On a successfull build, f_proc returns the built data handler and
the station that was used. This function must be implemented globally to work together with multiprocessing.
"""
try:
res = data_handler.build(station, name_affix=name_affix, store_processed_data=store,
**kwargs)
except (AttributeError, EmptyQueryResult, KeyError, requests.ConnectionError) as e:
logging.info(f"remove station {station} because it raised an error: {e}")
res = None
return res, station
......@@ -6,6 +6,17 @@ import argparse
from mlair.workflows import DefaultWorkflow
def load_stations():
import json
try:
filename = 'supplement/station_list_north_german_plain.json'
with open(filename, 'r') as jfile:
stations = json.load(jfile)
except FileNotFoundError:
stations = None
return stations
def main(parser_args):
workflow = DefaultWorkflow(sampling="hourly", window_history_size=48, **parser_args.__dict__)
......
[
"DENI031",
"DESH016",
"DEBB050",
"DEHH022",
"DEHH049",
"DEHH021",
"DEMV007",
"DESH015",
"DEBE062",
"DEHH012",
"DESH004",
"DENI062",
"DEBE051",
"DEHH011",
"DEHH023",
"DEUB020",
"DESH005",
"DEBB039",
"DEHH050",
"DENI029",
"DESH001",
"DEBE001",
"DEHH030",
"DEHH018",
"DEUB022",
"DEBB038",
"DEBB053",
"DEMV017",
"DENI063",
"DENI058",
"DESH014",
"DEUB007",
"DEUB005",
"DEBB051",
"DEUB034",
"DEST089",
"DEHH005",
"DESH003",
"DEUB028",
"DESH017",
"DEUB030",
"DEMV012",
"DENI052",
"DENI059",
"DENI060",
"DESH013",
"DEUB006",
"DEMV018",
"DEUB027",
"DEUB026",
"DEUB038",
"DEMV001",
"DEUB024",
"DEUB037",
"DESH008",
"DEMV004",
"DEUB040",
"DEMV024",
"DEMV026",
"DESH056",
"DEHH063",
"DEUB001",
"DEST069",
"DEBB040",
"DEBB028",
"DEBB048",
"DEBB063",
"DEBB067",
"DESH006",
"DEBE008",
"DESH012",
"DEHH004",
"DEBE009",
"DEHH007",
"DEBE005",
"DEHH057",
"DEHH047",
"DEBE006",
"DEBB110"
]
import logging
import pytest
import mock
from mlair.data_handler import DefaultDataHandler, DataCollection, AbstractDataHandler
from mlair.helpers.datastore import NameNotFoundInScope
......@@ -34,7 +35,8 @@ class TestPreProcessing:
yield pre
RunEnvironment().__del__()
def test_init(self, caplog):
@mock.patch("multiprocessing.cpu_count", return_value=1)
def test_init(self, mock_cpu, caplog):
ExperimentSetup(stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087'],
statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'})
caplog.clear()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment