nc_parser.py 9.25 KB
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""This script parses netCdf files."""

import re
import os
import os.path as op
from datetime import datetime
from mimetypes import MimeTypes
from netCDF4 import Dataset
import pathlib
from collections import namedtuple
from typing import List, Optional
from tempfile import gettempdir
from urllib.request import urlretrieve
from urllib.error import HTTPError

# dates format
SPASE_DATE_FORMAT = '%Y%j%H%M%S'  # ex: 2016238000000*
XML_DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'  # ex: <StartDate>2016-08-26T00:00:00Z</StartDate>
SPASE_INDEX_TEMP_PATH = op.join(gettempdir(), 'index.nc')

GranuleIndex = namedtuple('GranuleIndex', 'start_date stop_date filename')


class GranuleIndexReader:

    def __init__(self, log_fct):
        self.log_fct = log_fct

    def load_dataset(self, target_name: str, granule_index_url: str) -> Optional[Dataset]:
        """Load the Dataset stored in `self.nc_file_path`."""
        if op.isfile(SPASE_INDEX_TEMP_PATH):
            os.remove(SPASE_INDEX_TEMP_PATH)

        try:
            urlretrieve(granule_index_url, SPASE_INDEX_TEMP_PATH)
        except HTTPError:
            self.log_fct('INDEX_INACCESSIBLE',
                         'dataset %s' % target_name,
                         'Can not access to %s.' % granule_index_url,
                         'Filled this dataset with 1 granule containing default values, granules URLs will be wrong!')
            return

        if not op.isfile(SPASE_INDEX_TEMP_PATH):
            self.log_fct('INDEX_FILE_NOT_FOUND',
                         'dataset %s' % target_name,
                         'The granules index file has not been correctly downloaded.',
                         'Filled this dataset with 1 granule containing default values, granules URLs will be wrong!')
            return

        mime_type = MimeTypes().guess_type(pathlib.Path(op.abspath(SPASE_INDEX_TEMP_PATH)).as_uri())[0]
        if mime_type != 'application/x-netcdf':
            self.log_fct('INDEX_FILE_NOT_NET-CDF',
                         'dataset %s' % target_name,
                         'The mime-type of the granules index file is not application/netcdf but "%s". See %s.' %
                         (mime_type, SPASE_INDEX_TEMP_PATH),
                         'Filled this dataset with 1 granule containing default values, granules URLs will be wrong!')
            return

        try:
            return Dataset(SPASE_INDEX_TEMP_PATH)
        except Exception as e:
            self.log_fct('CANT_LOAD_INDEX_FILE',
                         'dataset %s' % target_name,
                         'Can not load the granules index file with NetCDF4 (%e).'
                         'See %s.' % (e.__cause__, SPASE_INDEX_TEMP_PATH),
                         'Filled this dataset with 1 granule containing default values, granules URLs will be wrong!')

    def get_granules_index(self, target_name: str, nc_file_path: str) -> List[GranuleIndex]:
        if not nc_file_path:
            return [GranuleIndex('0001-01-01T00:00:00Z', '0001-01-01T00:00:00Z', target_name + '_unknown.nc')]

        dataset = self.load_dataset(target_name, nc_file_path)
        if not dataset:
            return [GranuleIndex('0001-01-01T00:00:00Z', '0001-01-01T00:00:00Z', target_name + '_unknown.nc')]

        str_start_time = self.nc_ba_to_strings(target_name, 'StartTime', dataset.variables['StartTime'][:])
        str_stop_time = self.nc_ba_to_strings(target_name, 'StopTime', dataset.variables['StopTime'][:])
        file_names = self.nc_ba_to_strings(target_name, 'FileName', dataset.variables['FileName'][:])
        xml_start_times = self.get_nc_times(target_name, str_start_time)
        xml_stop_times = self.get_nc_times(target_name, str_stop_time)

        rec_len = dataset.dimensions['record'].size
        granules_index = [GranuleIndex(xml_start_times[i], xml_stop_times[i], file_names[i]) for i in range(rec_len)]
        dataset.close()

        return granules_index

    def nc_ba_to_strings(self, target_name: str, col_name: str, byte_arrays: List):
        """Convert a net-cdf byte array to a string.
    If ``UnicodeDecodeError`` is raised, converts only the bytes before the first ``b''``.

    - ``byte_arrays``: A net-cdf bytes array;
    - ``return``: The string representation of the bytes array."""

        strings = []
        for i, bytes_array in enumerate(byte_arrays):
            txt = []
            string_ended = False
            for j, byte in enumerate(bytes_array):
                if byte:
                    if string_ended:
                        hex_array = ', '.join([str(byte) for byte in bytes_array])
                        self.log_fct('INVISIBLE_BYTES',
                                     'granules index "%s" on column %s and row %d' % (target_name, col_name, i),
                                     'The bytes array contains the byte b\'\' (at index %d), ' % j +
                                     'followed by other characters: [%s]. ' % hex_array,
                                     'Removed all characters after the first occurrence of b\'\' in the array.')
                        break
                    try:
                        txt.append(byte.decode('utf-8'))
                    except UnicodeDecodeError:
                        hex_array = ', '.join([str(byte) for byte in bytes_array])
                        self.log_fct('BAD_BYTES',
                                     'granules index "%s" on column %s and row %d' % (target_name, col_name, i),
                                     'Can not decode byte %s at index %d on the the bytes array: [%s].'
                                     % (str(byte), j, hex_array),
                                     'Changed bad byte by byte\'\'.')
                        break
                else:
                    string_ended = True
            strings.append(''.join(txt))
        return strings

    def get_nc_times(self, target_name: str, nc_times: List[str]):
        """Converts an array of *SPASE dates* to an array of **XML dates*.

    - ``nc_times``: An array of string, containing the dates in their net-cdf format.
    - ``self.target_name``: The url of the net-cdf file of the granule, only used to print it in log_fct.
    - ``return``: An array of string, containing the dates in their XML format."""

        contains_no_digit_chars = re.compile(r'.*\D.*')
        dates = []
        for nc_time in nc_times:
            if contains_no_digit_chars.match(nc_time):
                self.log_fct('DATE_NO_NUM',
                             'granules index "%s"' % target_name,
                             'The date "%s" contains non numerical characters.' % nc_time,
                             'Removed other chars.')
                nc_time = re.sub(r'\D', '', nc_time)
            if len(nc_time) > 16:
                self.log_fct('DATE_TOO_LONG',
                             'granules index "%s"' % target_name,
                             'The length of the date "%s" is more than 16 chars.' % nc_time,
                             'Removed other chars.')
                nc_time = nc_time[:16]
            if len(nc_time) < 16:
                self.log_fct('DATE_TOO_SHORT',
                             'granules index "%s"' % target_name,
                             'The length of the date "%s" is less than 16 chars.' % nc_time,
                             'Replaced other chars by 0.')
                nc_time = nc_time.ljust(16, '0')

            year, days = int(nc_time[:4]), int(nc_time[4:7]) + 1
            hour, minute, sec = int(nc_time[7:9]), int(nc_time[9:11]), int(nc_time[11:13])

            if year == 0:
                self.log_fct('WRONG_YEAR',
                             'granules index "%s", date ' % target_name,
                             'The year of the date "%s" is 0.' % nc_time,
                             'Replaced by 1.')
                year = 1
            # check leap years:
            max_days = 366 if (year % 4 == 0 and not (year % 100 == 0 and year % 400 != 0)) else 365
            if days > max_days:
                self.log_fct('WRONG_DAY',
                             'granules index "%s"' % target_name,
                             'The day of the year in the date "%s" is > %d.' % (nc_time, max_days),
                             'Replaced by %d.' % max_days)
                days = max_days
            if hour > 23:
                self.log_fct('WRONG_HOUR',
                             'granules index "%s"' % target_name,
                             'The hour of the time "%s" is > 23.' % nc_time,
                             'Replaced by 23.')
                hour = 23
            if minute > 59:
                self.log_fct('WRONG_MIN',
                             'granules index "%s"' % target_name,
                             'The minute of the time %s is > 59.' % nc_time,
                             'Replaced by 59.')
                minute = 59
            if sec > 59:
                self.log_fct('WRONG_SEC',
                             'granules index "%s"' % target_name,
                             'The second of the time "%s" is > 59.' % nc_time,
                             'Replaced by 59.')
                sec = 59

            str_date = '%04d%03d%02d%02d%02d' % (year, days, hour, minute, sec)
            dates.append(datetime.strptime(str_date, SPASE_DATE_FORMAT).strftime(XML_DATE_FORMAT))
        return dates