Source code for sisppeo.readers.C2RCC

# Copyright 2020 Arthur Coqué, Pôle OFB-INRAE ECLA, UR RECOVER
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""This module contains a reader for C2RCC products.

This reader is dedicated to extract data from C2RCC products.

Example::

    reader = C2RCCReader(**config)
    reader.extract_bands()
    reader.create_ds()
    extracted_dataset = reader.dataset
"""

from collections import namedtuple
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Tuple, Union

import numpy as np
import xarray as xr
from pyproj import CRS, Transformer
from pyproj.aoi import AreaOfInterest
from pyproj.database import query_utm_crs_info
from tqdm import tqdm

from sisppeo.readers.reader import Reader, Inputs
from sisppeo.utils.exceptions import GeometryError, InputError, ProductError

C2RCCInputs = namedtuple('C2RCCInputs', Inputs._fields + ('sensing_date',))


[docs]class C2RCCReader(Reader): """A reader dedicated to extract data from C2RCC products. Attributes: dataset: A dataset containing extracted data. """
[docs] def __init__(self, input_product: Path, requested_bands: List[str], geom: Optional['dict'] = None, sensing_date: Optional[Union[str, datetime]] = None, **_ignored) -> None: """See base class. Args: sensing_date: The sensing date (in UTC time) of the product. """ super().__init__(input_product, requested_bands, geom) if sensing_date is None: raise InputError('"sensing_date" is missing!') for band in requested_bands: if band in ('B7', 'B8', 'B8A', 'B11', 'B12'): raise ProductError(f'{band} is not available in C2RCC products') if isinstance(sensing_date, str): try: sensing_date = datetime.strptime(sensing_date.replace('-', ''), '%Y%m%d') except ValueError: msg = '"sensing_date" must be in "YYYY-MM-DD" (or "YYYMMDD") format' raise InputError(msg) self._inputs = C2RCCInputs(*self._inputs, sensing_date)
# pylint: disable=too-many-locals # More readable if geotransform is defined.
[docs] def extract_bands(self) -> None: """See base class.""" # Load data dataset = xr.open_dataset(self._inputs.input_product) # Get and store the CRS latlon_bbox = (dataset.lat.max().values, dataset.lon.min().values, dataset.lat.min().values, dataset.lon.max().values) nx, ny = len(dataset.x), len(dataset.y) utm_crs_list = query_utm_crs_info( datum_name='WGS 84', area_of_interest=AreaOfInterest( west_lon_degree=latlon_bbox[1], south_lat_degree=latlon_bbox[2], east_lon_degree=latlon_bbox[3], north_lat_degree=latlon_bbox[0] ) ) utm_crs = CRS.from_epsg(utm_crs_list[0].code) self._intermediate_data['crs'] = utm_crs # Compute projected coordinates and get image boundaries xy_bbox = self._compute_proj_coords(latlon_bbox, nx, ny) # Get ROI and read data data = {} ij_bbox = self._extract_ROI(xy_bbox, nx, ny) for band in tqdm(self._inputs.requested_bands, unit='bands'): band_name = f'rhown_{band}' band_array = _extract_band(dataset, band_name, ij_bbox) data[band] = band_array.reshape(1, *band_array.shape) print('') dataset.close() # Store outputs self._intermediate_data['data'] = data
[docs] def create_ds(self) -> None: """See base class.""" # Create the dataset ds = xr.Dataset( {key: (['time', 'y', 'x'], val) for key, val in self._intermediate_data['data'].items()}, coords={ # 'time': [datetime.strptime( # self._intermediate_data['metadata']['attrs']['start_date'], # '%d-%b-%Y %H:%M:%S.%f')], 'time': [self._inputs.sensing_date], 'x': ('x', self._intermediate_data['x']), 'y': ('y', self._intermediate_data['y']) } ) crs = self._intermediate_data['crs'] # Set up coordinate variables ds.x.attrs['axis'] = 'X' ds.x.attrs['long_name'] = f'x-coordinate ({crs.name})' ds.x.attrs['standard_name'] = "projection_x_coordinate" ds.x.attrs['units'] = 'm' ds.y.attrs['axis'] = 'Y' ds.y.attrs['long_name'] = f'y-coordinate ({crs.name})' ds.y.attrs['standard_name'] = "projection_y_coordinate" ds.y.attrs['units'] = 'm' ds.time.attrs['axis'] = 'T' ds.time.attrs['long_name'] = 'time' # Set up the 'grid mapping variable' ds['crs'] = xr.DataArray(name='crs', attrs=crs.to_cf()) ds['product_metadata'] = xr.DataArray() ds.attrs['data_type'] = 'rho' self.dataset = ds
# pylint: disable=invalid-name # ROI is a common abbreviation for a Region Of Interest. def _extract_ROI(self, xy_bbox, nx, ny) -> Tuple[int, int, int, int]: if self._inputs.ROI is not None: self._reproject_geom() x_min, y_min, x_max, y_max = self._intermediate_data['geom'].bounds x0, y0, x1, y1 = xy_bbox if (x_max < x0 or y_min > y0) or (x_min > x1 or y_max < y1): raise GeometryError('Wanted ROI is outside the input product') row_start = 0 if y_max > y0 else int(np.floor((y0 - y_max) / 20)) col_start = 0 if x_min < x0 else int(np.floor((x_min - x0) / 20)) row_stop = ny - 1 if y_min < y1 else int( np.floor((y0 - y_min) / 20)) col_stop = nx - 1 if x_max > x1 else int( np.floor((x_max - x0) / 20)) else: row_start, col_start = 0, 0 row_stop, col_stop = ny - 1, nx - 1 return row_start, col_start, row_stop, col_stop def _compute_proj_coords(self, latlon_bbox, nx, ny) -> Tuple[int, int, int, int]: utm_crs = self._intermediate_data['crs'] proj = Transformer.from_crs(utm_crs.geodetic_crs, utm_crs) x_min, y_max = np.round(proj.transform(*latlon_bbox[:2])).astype(int) x_max, y_min = np.round(proj.transform(*latlon_bbox[2:])).astype(int) x = np.linspace(x_min, x_max, nx) y = np.linspace(y_max, y_min, ny) self._intermediate_data['x'] = x self._intermediate_data['y'] = y return x_min, y_max, x_max, y_min
def _extract_band(dataset, band, ij_bbox): row_start, col_start, row_stop, col_stop = ij_bbox band_array = dataset[band].values[row_start:row_stop + 1, col_start:col_stop + 1] return band_array