#!/usr/bin/env python # -*- coding: utf-8 -*- """This script parses netCdf files.""" import re import os import os.path as op from datetime import datetime from mimetypes import MimeTypes from netCDF4 import Dataset import pathlib from collections import namedtuple from typing import List, Optional from tempfile import gettempdir from urllib.request import urlretrieve from urllib.error import HTTPError # dates format SPASE_DATE_FORMAT = '%Y%j%H%M%S' # ex: 2016238000000* XML_DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ' # ex: 2016-08-26T00:00:00Z SPASE_INDEX_TEMP_PATH = op.join(gettempdir(), 'index.nc') GranuleIndex = namedtuple('GranuleIndex', 'start_date stop_date filename') class GranuleIndexReader: def __init__(self, log_fct): self.log_fct = log_fct def load_dataset(self, target_name: str, granule_index_url: str) -> Optional[Dataset]: """Load the Dataset stored in `self.nc_file_path`.""" if op.isfile(SPASE_INDEX_TEMP_PATH): os.remove(SPASE_INDEX_TEMP_PATH) try: urlretrieve(granule_index_url, SPASE_INDEX_TEMP_PATH) except HTTPError: self.log_fct('INDEX_INACCESSIBLE', 'dataset %s' % target_name, 'Can not access to %s.' % granule_index_url, 'Filled this dataset with 1 granule containing default values, granules URLs will be wrong!') return if not op.isfile(SPASE_INDEX_TEMP_PATH): self.log_fct('INDEX_FILE_NOT_FOUND', 'dataset %s' % target_name, 'The granules index file has not been correctly downloaded.', 'Filled this dataset with 1 granule containing default values, granules URLs will be wrong!') return mime_type = MimeTypes().guess_type(pathlib.Path(op.abspath(SPASE_INDEX_TEMP_PATH)).as_uri())[0] if mime_type != 'application/x-netcdf': self.log_fct('INDEX_FILE_NOT_NET-CDF', 'dataset %s' % target_name, 'The mime-type of the granules index file is not application/netcdf but "%s". See %s.' % (mime_type, SPASE_INDEX_TEMP_PATH), 'Filled this dataset with 1 granule containing default values, granules URLs will be wrong!') return try: return Dataset(SPASE_INDEX_TEMP_PATH) except Exception as e: self.log_fct('CANT_LOAD_INDEX_FILE', 'dataset %s' % target_name, 'Can not load the granules index file with NetCDF4 (%e).' 'See %s.' % (e.__cause__, SPASE_INDEX_TEMP_PATH), 'Filled this dataset with 1 granule containing default values, granules URLs will be wrong!') def get_granules_index(self, target_name: str, nc_file_path: str) -> List[GranuleIndex]: if not nc_file_path: return [GranuleIndex('0001-01-01T00:00:00Z', '0001-01-01T00:00:00Z', target_name + '_unknown.nc')] dataset = self.load_dataset(target_name, nc_file_path) if not dataset: return [GranuleIndex('0001-01-01T00:00:00Z', '0001-01-01T00:00:00Z', target_name + '_unknown.nc')] str_start_time = self.nc_ba_to_strings(target_name, 'StartTime', dataset.variables['StartTime'][:]) str_stop_time = self.nc_ba_to_strings(target_name, 'StopTime', dataset.variables['StopTime'][:]) file_names = self.nc_ba_to_strings(target_name, 'FileName', dataset.variables['FileName'][:]) xml_start_times = self.get_nc_times(target_name, str_start_time) xml_stop_times = self.get_nc_times(target_name, str_stop_time) rec_len = dataset.dimensions['record'].size granules_index = [GranuleIndex(xml_start_times[i], xml_stop_times[i], file_names[i]) for i in range(rec_len)] dataset.close() return granules_index def nc_ba_to_strings(self, target_name: str, col_name: str, byte_arrays: List): """Convert a net-cdf byte array to a string. If ``UnicodeDecodeError`` is raised, converts only the bytes before the first ``b''``. - ``byte_arrays``: A net-cdf bytes array; - ``return``: The string representation of the bytes array.""" strings = [] for i, bytes_array in enumerate(byte_arrays): txt = [] string_ended = False for j, byte in enumerate(bytes_array): if byte: if string_ended: hex_array = ', '.join([str(byte) for byte in bytes_array]) self.log_fct('INVISIBLE_BYTES', 'granules index "%s" on column %s and row %d' % (target_name, col_name, i), 'The bytes array contains the byte b\'\' (at index %d), ' % j + 'followed by other characters: [%s]. ' % hex_array, 'Removed all characters after the first occurrence of b\'\' in the array.') break try: txt.append(byte.decode('utf-8')) except UnicodeDecodeError: hex_array = ', '.join([str(byte) for byte in bytes_array]) self.log_fct('BAD_BYTES', 'granules index "%s" on column %s and row %d' % (target_name, col_name, i), 'Can not decode byte %s at index %d on the the bytes array: [%s].' % (str(byte), j, hex_array), 'Changed bad byte by byte\'\'.') break else: string_ended = True strings.append(''.join(txt)) return strings def get_nc_times(self, target_name: str, nc_times: List[str]): """Converts an array of *SPASE dates* to an array of **XML dates*. - ``nc_times``: An array of string, containing the dates in their net-cdf format. - ``self.target_name``: The url of the net-cdf file of the granule, only used to print it in log_fct. - ``return``: An array of string, containing the dates in their XML format.""" contains_no_digit_chars = re.compile(r'.*\D.*') dates = [] for nc_time in nc_times: if contains_no_digit_chars.match(nc_time): self.log_fct('DATE_NO_NUM', 'granules index "%s"' % target_name, 'The date "%s" contains non numerical characters.' % nc_time, 'Removed other chars.') nc_time = re.sub(r'\D', '', nc_time) if len(nc_time) > 16: self.log_fct('DATE_TOO_LONG', 'granules index "%s"' % target_name, 'The length of the date "%s" is more than 16 chars.' % nc_time, 'Removed other chars.') nc_time = nc_time[:16] if len(nc_time) < 16: self.log_fct('DATE_TOO_SHORT', 'granules index "%s"' % target_name, 'The length of the date "%s" is less than 16 chars.' % nc_time, 'Replaced other chars by 0.') nc_time = nc_time.ljust(16, '0') year, days = int(nc_time[:4]), int(nc_time[4:7]) + 1 hour, minute, sec = int(nc_time[7:9]), int(nc_time[9:11]), int(nc_time[11:13]) if year == 0: self.log_fct('WRONG_YEAR', 'granules index "%s", date ' % target_name, 'The year of the date "%s" is 0.' % nc_time, 'Replaced by 1.') year = 1 # check leap years: max_days = 366 if (year % 4 == 0 and not (year % 100 == 0 and year % 400 != 0)) else 365 if days > max_days: self.log_fct('WRONG_DAY', 'granules index "%s"' % target_name, 'The day of the year in the date "%s" is > %d.' % (nc_time, max_days), 'Replaced by %d.' % max_days) days = max_days if hour > 23: self.log_fct('WRONG_HOUR', 'granules index "%s"' % target_name, 'The hour of the time "%s" is > 23.' % nc_time, 'Replaced by 23.') hour = 23 if minute > 59: self.log_fct('WRONG_MIN', 'granules index "%s"' % target_name, 'The minute of the time %s is > 59.' % nc_time, 'Replaced by 59.') minute = 59 if sec > 59: self.log_fct('WRONG_SEC', 'granules index "%s"' % target_name, 'The second of the time "%s" is > 59.' % nc_time, 'Replaced by 59.') sec = 59 str_date = '%04d%03d%02d%02d%02d' % (year, days, hour, minute, sec) dates.append(datetime.strptime(str_date, SPASE_DATE_FORMAT).strftime(XML_DATE_FORMAT)) return dates