Source code for sisppeo.masks.waterdetect.Common

import ast
import configparser
from importlib.resources import path
from pathlib import Path

import numpy as np
from sklearn.model_selection import train_test_split

with path(__package__, 'WaterDetect.ini') as p:
    config_file = p


[docs]class DWConfig: _config_file = p _defaults = {'reference_band': 'Red', 'maximum_invalid': '0.8', 'create_composite': 'True', 'pdf_reports': 'False', 'save_indices': 'False', 'texture_streching': 'False', 'clustering_method': 'aglomerative', 'min_clusters': '1', 'max_clusters': '5', 'clip_band': 'None', 'clip_value': 'None', 'classifier': 'naive_bayes', 'train_size': '0.1', 'min_train_size': '1000', 'max_train_size': '10000', 'score_index': 'calinsk', 'detectwatercluster': 'maxmndwi', 'clustering_bands': "[['ndwi', 'Nir']]", 'graphs_bands': "[['mbwi', 'mndwi'], ['ndwi', 'mbwi']]", 'plot_ts': 'False' } def __init__(self, config_file=None): self.config = self.load_config_file(config_file)
[docs] def return_defaults(self, section, key): default_value = self._defaults[key] print('Key {} not found in section {}: using default value {}'.format(key, section, default_value)) return default_value
[docs] def get_option(self, section, key, evaluate: bool): try: str_value = self.config.get(section, key) except Exception: str_value = self.return_defaults(section, key) if evaluate and str == type(str_value): try: return ast.literal_eval(str_value) except Exception: return str_value else: return str_value
[docs] def load_config_file(self, config_file): if config_file: self._config_file = config_file print('Loading configuration file {}'.format(self._config_file)) DWutils.check_path(self._config_file) config = configparser.ConfigParser() config.read(self._config_file) return config
@property def average_results(self): return self.get_option('Clustering', 'average_results', evaluate=True) @property def min_positive_pixels(self): return self.get_option('Clustering', 'min_positive_pixels', evaluate=True) @property def clustering_method(self): return self.get_option('Clustering', 'clustering_method', evaluate=False) @property def linkage(self): return self.get_option('Clustering', 'linkage', evaluate=False) @property def train_size(self): return self.get_option('Clustering', 'train_size', evaluate=True) @property def min_train_size(self): return self.get_option('Clustering', 'min_train_size', evaluate=True) @property def max_train_size(self): return self.get_option('Clustering', 'max_train_size', evaluate=True) @property def score_index(self): return self.get_option('Clustering', 'score_index', evaluate=False) @property def classifier(self): return self.get_option('Clustering', 'classifier', evaluate=False) @property def detect_water_cluster(self): return self.get_option('Clustering', 'detectwatercluster', evaluate=False) @property def min_clusters(self): return self.get_option('Clustering', 'min_clusters', evaluate=True) @property def max_clusters(self): return self.get_option('Clustering', 'max_clusters', evaluate=True) @property def clustering_bands(self): bands_lst = self.get_option('Clustering', 'clustering_bands', evaluate=True) # if bands_keys is not a list of lists, transform it if type(bands_lst[0]) == str: bands_lst = [bands_lst] return bands_lst @property def clip_band(self): bands_lst = self.get_option('Clustering', 'clip_band', evaluate=True) if type(bands_lst) == str: return [bands_lst] else: return bands_lst if bands_lst is not None else [] @property def clip_inf_value(self): value = self.get_option('Clustering', 'clip_inf_value', evaluate=True) if value is not None: return value if type(value) is list else [value] else: return [] @property def clip_sup_value(self): value = self.get_option('Clustering', 'clip_sup_value', evaluate=True) if value is not None: return value if type(value) is list else [value] else: return []
[docs]class DWutils:
[docs] @staticmethod def check_path(path_str, is_dir=False): """ Check if the path/file exists and returns a Path variable with it :param path_str: path string to test :param is_dir: whether if it is a directory or a file :return: Path type variable """ if path_str is None: return None path = Path(path_str) if is_dir: if not path.is_dir(): raise OSError('The specified folder {} does not exist'.format(path_str)) else: if not path.exists(): raise OSError('The specified file {} does not exist'.format(path_str)) print(('Folder' if is_dir else 'File') + ' {} verified.'.format(path_str)) return path
[docs] @staticmethod def calc_normalized_difference(img1, img2, mask=None): """ Calc the normalized difference of given arrays (img1 - img2)/(img1 + img2). Updates the mask if any invalid numbers (ex. np.inf or np.nan) are encountered :param img1: first array :param img2: second array :param mask: initial mask, that will be updated :return: nd array filled with -9999 in the mask and the mask itself """ # changement for negative SRE scenes if mask is not None: min_cte = np.min([np.min(img1[~mask]), np.min(img2[~mask])]) else: min_cte = np.min([np.min(img1), np.min(img2)]) if min_cte <= 0: min_cte = -min_cte + 0.001 else: min_cte = 0 nd = ((img1 + min_cte) - (img2 + min_cte)) / ((img1 + min_cte) + (img2 + min_cte)) # if any of the bands is set to zero in the pixel, makes a small shift upwards, as proposed by olivier hagole # https://github.com/olivierhagolle/modified_NDVI # nd = np.where((img1 > 0) & (img2 > 0), (img1-img2) / (img1 + img2), np.nan) # (img1+0.005-img2-0.005) / (img1+0.005 + img2+0.005)) # nd = np.where((img1 <= 0) & (img2 <= 0), np.nan, (img1-img2) / (img1 + img2)) # nd = (img1-img2) / (img1 + img2) # nd[~mask] = MinMaxScaler(feature_range=(-1,1), copy=False).fit_transform(nd[~mask].reshape(-1,1)).reshape(-1) nd[nd > 1] = 1 nd[nd < -1] = -1 # if result is infinite, result should be 1 nd[np.isinf(nd)] = 1 # nd_mask = np.isinf(nd) | np.isnan(nd) | mask nd_mask = np.isnan(nd) | mask nd = np.ma.array(nd, mask=nd_mask, fill_value=-9999) return nd.filled(), nd.mask
[docs] @staticmethod def get_train_test_data(data, train_size, min_train_size, max_train_size): """ Split the provided data in train-test bunches :param min_train_size: minimum data quantity for train set :param max_train_size: maximum data quantity for train set :param train_size: percentage of the data to be used as train dataset :param data: data to be split :return: train and test datasets """ dataset_size = data.shape[0] if (dataset_size * train_size) < min_train_size: train_size = min_train_size / dataset_size train_size = 1 if train_size > 1 else train_size elif (dataset_size * train_size) > max_train_size: train_size = max_train_size / dataset_size return train_test_split(data, train_size=train_size)
[docs] @staticmethod def create_bands_dict(bands_array, bands_order): bands_dict = {} for i, band in enumerate(bands_order): bands_dict.update({band: bands_array[:, :, i]}) return bands_dict
[docs] @staticmethod def listify(lst, uniques=[]): # pdb.set_trace() for item in lst: if isinstance(item, list): uniques = DWutils.listify(item, uniques) else: uniques.append(item) return uniques.copy()