Blame view

dataset_getter/nc_parser.py 9.25 KB
016e9465   Nathanael Jourdane   Add granules builder
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""This script parses netCdf files."""

import re
import os
import os.path as op
from datetime import datetime
from mimetypes import MimeTypes
from netCDF4 import Dataset
import pathlib
from collections import namedtuple
from typing import List, Optional
from tempfile import gettempdir
from urllib.request import urlretrieve
from urllib.error import HTTPError

# dates format
SPASE_DATE_FORMAT = '%Y%j%H%M%S'  # ex: 2016238000000*
XML_DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'  # ex: <StartDate>2016-08-26T00:00:00Z</StartDate>
SPASE_INDEX_TEMP_PATH = op.join(gettempdir(), 'index.nc')

GranuleIndex = namedtuple('GranuleIndex', 'start_date stop_date filename')


class GranuleIndexReader:

    def __init__(self, log_fct):
        self.log_fct = log_fct

    def load_dataset(self, target_name: str, granule_index_url: str) -> Optional[Dataset]:
        """Load the Dataset stored in `self.nc_file_path`."""
        if op.isfile(SPASE_INDEX_TEMP_PATH):
            os.remove(SPASE_INDEX_TEMP_PATH)

        try:
            urlretrieve(granule_index_url, SPASE_INDEX_TEMP_PATH)
        except HTTPError:
            self.log_fct('INDEX_INACCESSIBLE',
                         'dataset %s' % target_name,
                         'Can not access to %s.' % granule_index_url,
                         'Filled this dataset with 1 granule containing default values, granules URLs will be wrong!')
            return

        if not op.isfile(SPASE_INDEX_TEMP_PATH):
            self.log_fct('INDEX_FILE_NOT_FOUND',
                         'dataset %s' % target_name,
                         'The granules index file has not been correctly downloaded.',
                         'Filled this dataset with 1 granule containing default values, granules URLs will be wrong!')
            return

        mime_type = MimeTypes().guess_type(pathlib.Path(op.abspath(SPASE_INDEX_TEMP_PATH)).as_uri())[0]
        if mime_type != 'application/x-netcdf':
            self.log_fct('INDEX_FILE_NOT_NET-CDF',
                         'dataset %s' % target_name,
                         'The mime-type of the granules index file is not application/netcdf but "%s". See %s.' %
                         (mime_type, SPASE_INDEX_TEMP_PATH),
                         'Filled this dataset with 1 granule containing default values, granules URLs will be wrong!')
            return

        try:
            return Dataset(SPASE_INDEX_TEMP_PATH)
        except Exception as e:
            self.log_fct('CANT_LOAD_INDEX_FILE',
                         'dataset %s' % target_name,
                         'Can not load the granules index file with NetCDF4 (%e).'
                         'See %s.' % (e.__cause__, SPASE_INDEX_TEMP_PATH),
                         'Filled this dataset with 1 granule containing default values, granules URLs will be wrong!')

    def get_granules_index(self, target_name: str, nc_file_path: str) -> List[GranuleIndex]:
        if not nc_file_path:
            return [GranuleIndex('0001-01-01T00:00:00Z', '0001-01-01T00:00:00Z', target_name + '_unknown.nc')]

        dataset = self.load_dataset(target_name, nc_file_path)
        if not dataset:
            return [GranuleIndex('0001-01-01T00:00:00Z', '0001-01-01T00:00:00Z', target_name + '_unknown.nc')]

        str_start_time = self.nc_ba_to_strings(target_name, 'StartTime', dataset.variables['StartTime'][:])
        str_stop_time = self.nc_ba_to_strings(target_name, 'StopTime', dataset.variables['StopTime'][:])
        file_names = self.nc_ba_to_strings(target_name, 'FileName', dataset.variables['FileName'][:])
        xml_start_times = self.get_nc_times(target_name, str_start_time)
        xml_stop_times = self.get_nc_times(target_name, str_stop_time)

        rec_len = dataset.dimensions['record'].size
        granules_index = [GranuleIndex(xml_start_times[i], xml_stop_times[i], file_names[i]) for i in range(rec_len)]
        dataset.close()

        return granules_index

    def nc_ba_to_strings(self, target_name: str, col_name: str, byte_arrays: List):
        """Convert a net-cdf byte array to a string.
    If ``UnicodeDecodeError`` is raised, converts only the bytes before the first ``b''``.

    - ``byte_arrays``: A net-cdf bytes array;
    - ``return``: The string representation of the bytes array."""

        strings = []
        for i, bytes_array in enumerate(byte_arrays):
            txt = []
            string_ended = False
            for j, byte in enumerate(bytes_array):
                if byte:
                    if string_ended:
                        hex_array = ', '.join([str(byte) for byte in bytes_array])
                        self.log_fct('INVISIBLE_BYTES',
                                     'granules index "%s" on column %s and row %d' % (target_name, col_name, i),
                                     'The bytes array contains the byte b\'\' (at index %d), ' % j +
                                     'followed by other characters: [%s]. ' % hex_array,
                                     'Removed all characters after the first occurrence of b\'\' in the array.')
                        break
                    try:
                        txt.append(byte.decode('utf-8'))
                    except UnicodeDecodeError:
                        hex_array = ', '.join([str(byte) for byte in bytes_array])
                        self.log_fct('BAD_BYTES',
                                     'granules index "%s" on column %s and row %d' % (target_name, col_name, i),
                                     'Can not decode byte %s at index %d on the the bytes array: [%s].'
                                     % (str(byte), j, hex_array),
                                     'Changed bad byte by byte\'\'.')
                        break
                else:
                    string_ended = True
            strings.append(''.join(txt))
        return strings

    def get_nc_times(self, target_name: str, nc_times: List[str]):
        """Converts an array of *SPASE dates* to an array of **XML dates*.

    - ``nc_times``: An array of string, containing the dates in their net-cdf format.
    - ``self.target_name``: The url of the net-cdf file of the granule, only used to print it in log_fct.
    - ``return``: An array of string, containing the dates in their XML format."""

        contains_no_digit_chars = re.compile(r'.*\D.*')
        dates = []
        for nc_time in nc_times:
            if contains_no_digit_chars.match(nc_time):
                self.log_fct('DATE_NO_NUM',
                             'granules index "%s"' % target_name,
                             'The date "%s" contains non numerical characters.' % nc_time,
                             'Removed other chars.')
                nc_time = re.sub(r'\D', '', nc_time)
            if len(nc_time) > 16:
                self.log_fct('DATE_TOO_LONG',
                             'granules index "%s"' % target_name,
                             'The length of the date "%s" is more than 16 chars.' % nc_time,
                             'Removed other chars.')
                nc_time = nc_time[:16]
            if len(nc_time) < 16:
                self.log_fct('DATE_TOO_SHORT',
                             'granules index "%s"' % target_name,
                             'The length of the date "%s" is less than 16 chars.' % nc_time,
                             'Replaced other chars by 0.')
                nc_time = nc_time.ljust(16, '0')

            year, days = int(nc_time[:4]), int(nc_time[4:7]) + 1
            hour, minute, sec = int(nc_time[7:9]), int(nc_time[9:11]), int(nc_time[11:13])

            if year == 0:
                self.log_fct('WRONG_YEAR',
                             'granules index "%s", date ' % target_name,
                             'The year of the date "%s" is 0.' % nc_time,
                             'Replaced by 1.')
                year = 1
            # check leap years:
            max_days = 366 if (year % 4 == 0 and not (year % 100 == 0 and year % 400 != 0)) else 365
            if days > max_days:
                self.log_fct('WRONG_DAY',
                             'granules index "%s"' % target_name,
                             'The day of the year in the date "%s" is > %d.' % (nc_time, max_days),
                             'Replaced by %d.' % max_days)
                days = max_days
            if hour > 23:
                self.log_fct('WRONG_HOUR',
                             'granules index "%s"' % target_name,
                             'The hour of the time "%s" is > 23.' % nc_time,
                             'Replaced by 23.')
                hour = 23
            if minute > 59:
                self.log_fct('WRONG_MIN',
                             'granules index "%s"' % target_name,
                             'The minute of the time %s is > 59.' % nc_time,
                             'Replaced by 59.')
                minute = 59
            if sec > 59:
                self.log_fct('WRONG_SEC',
                             'granules index "%s"' % target_name,
                             'The second of the time "%s" is > 59.' % nc_time,
                             'Replaced by 59.')
                sec = 59

            str_date = '%04d%03d%02d%02d%02d' % (year, days, hour, minute, sec)
            dates.append(datetime.strptime(str_date, SPASE_DATE_FORMAT).strftime(XML_DATE_FORMAT))
        return dates