Commit 016e94652d58de4be6d29c91a8d35d7c25d4e747
1 parent
ed06e2fa
Exists in
master
Add granules builder
Showing
2 changed files
with
644 additions
and
0 deletions
Show diff stats
... | ... | @@ -0,0 +1,450 @@ |
1 | +#!/usr/bin/env python | |
2 | +# -*- coding: utf-8 -*- | |
3 | + | |
4 | +# interpreter: Python 3.6 with anaconda. Please set and prepare the conda environment. | |
5 | +# set PATH $HOME/.anaconda2/bin/ $PATH; and source $HOME/.anaconda2/etc/fish/conf.d/conda.fish | |
6 | +# set PATH $HOME/.anaconda3/bin/ $PATH; and source $HOME/.anaconda3/etc/fish/conf.d/conda.fish | |
7 | +# Add this lines in your init.fish (adapt for Bash terms), so you can choose which conda version to use: | |
8 | +# conda3 # Using conda3 | |
9 | +# conda create --name granules # 1st time only | |
10 | +# activate granules # or `conda activate granules` in Bash terms | |
11 | +# conda install netCDF4 # 1st time only | |
12 | + | |
13 | +"""This script download all files from a ``SPASE`` registry, then log and correct eventual errors | |
14 | +and add several files and information, such as granules estimation size.""" | |
15 | + | |
16 | +import os.path as op | |
17 | +from os import makedirs | |
18 | +import xml.etree.ElementTree as ElTr | |
19 | +import re | |
20 | +import shutil | |
21 | +import json | |
22 | +import sys | |
23 | +from tempfile import gettempdir | |
24 | +from datetime import datetime | |
25 | +from urllib.request import urlretrieve | |
26 | +from urllib.error import HTTPError | |
27 | +from time import time, strftime, gmtime | |
28 | +from typing import Tuple, List, Dict | |
29 | +from nc_parser import GranuleIndexReader, GranuleIndex | |
30 | + | |
31 | +# URLs | |
32 | +GET_INDEXES_WEBSERVICE = 'http://amda-dev.irap.omp.eu/BASE/DDService/getGranulesIndex.php' | |
33 | +GET_ESTSIZE_WEBSERVICE = 'http://amda-dev.irap.omp.eu/BASE/DDService/getGranulesSize.php' | |
34 | +RESOLVER_URL = 'http://apus.irap.omp.eu:8080/amda-registry/resolver' | |
35 | +XMLNS = 'http://www.spase-group.org/data/schema' | |
36 | +TARGET_URL_PREFIX = 'http://amda-dev.irap.omp.eu/BASE/DDService/get_cdf.php?id=' | |
37 | +# Used if you want to apply a filter to the downloaded files. | |
38 | +SPASE_PREFIX = 'spase://CDPP/' | |
39 | +# SPASE_PREFIX = 'spase://CDPP/NumericalData/AMDA/THEMIS/A/' | |
40 | + | |
41 | +NUMDATA_KEYWORDS = ['/NumericalData/', '/NumericalOutput/'] | |
42 | +GRANULE_KEYWORD = '/Granules/' | |
43 | + | |
44 | +# local paths | |
45 | +BASE_DIR = op.dirname(op.dirname(op.abspath(__file__))) | |
46 | +SPASE_DIR = op.join(BASE_DIR, 'DATA') # /!\ Double-check this : this directory will be recursively deleted. | |
47 | +LOG_FILE_PATH = op.join(BASE_DIR, 'create_granules.log') | |
48 | +BLACKLIST_PATH = op.join(BASE_DIR, 'blacklist') | |
49 | + | |
50 | +LOG_FILE = open(LOG_FILE_PATH, 'w+') # Please set to None if you want to log in stdout instead of a file. | |
51 | + | |
52 | +# dates format | |
53 | +SPASE_DATE_FORMAT = '%Y%j%H%M%S' # ex: 2016238000000* | |
54 | +XML_DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ' # ex: <StartDate>2016-08-26T00:00:00Z</StartDate> | |
55 | + | |
56 | +GRANULE_TEMPLATE = '''<?xml version="1.0" encoding="UTF-8"?> | |
57 | +<Spase xmlns="http://www.spase-group.org/data/schema" | |
58 | + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
59 | + xsi:schemaLocation="http://www.spase-group.org/data/schema | |
60 | + http://cdpp1.cesr.fr/AMDA-NG/public/schemas/spase-amda-1_2_0.xsd"> | |
61 | + <Version>2.2.6</Version> | |
62 | + <Granule> | |
63 | + <ResourceID>%s</ResourceID> | |
64 | + <ReleaseDate>%s</ReleaseDate> | |
65 | + <ParentID>%s</ParentID> | |
66 | + <StartDate>%s</StartDate> | |
67 | + <StopDate>%s</StopDate> | |
68 | + <Source> | |
69 | + <SourceType>Data</SourceType> | |
70 | + <URL>%s</URL> | |
71 | + <DataExtent> | |
72 | + <Quantity>%s</Quantity> | |
73 | + </DataExtent> | |
74 | + </Source> | |
75 | + </Granule> | |
76 | +</Spase>''' | |
77 | + | |
78 | + | |
79 | +def log(error: str, location: str, problem: str, what_is_done: str) -> None: | |
80 | + """Log a warning in a log file or the stdout. | |
81 | + | |
82 | +- ``error``: The error code, ex: ``BAD_BYTES``. | |
83 | +- ``location``: The granule name, or dataset name, or any location information related to the error. | |
84 | +- ``problem``: A phrase describing the problem. | |
85 | +- ``what_is_done``: A phrase describing how the error has been corrected. | |
86 | +""" | |
87 | + | |
88 | + message = '%s\ton %s.\t%s\t%s\n' % (error, location, problem, what_is_done) | |
89 | + if LOG_FILE is not None: | |
90 | + LOG_FILE.write(message) | |
91 | + else: | |
92 | + print(message) | |
93 | + | |
94 | + | |
95 | +def get_datasets_ids(datasets_ids: List[str] = None, spase_id: str = None) -> List[str]: | |
96 | + """Recursively get all dataset ids (``NumericalData``, ``Instrument``, ``Person``, etc.), | |
97 | +using the amda registry resolver. | |
98 | + | |
99 | +- no arguments required (``datasets_ids`` and ``spase_id`` are used for the recursion); | |
100 | +- ``return``: A list containing all the dataset spase ids. | |
101 | +""" | |
102 | + | |
103 | + datasets_ids = [] if datasets_ids is None else datasets_ids | |
104 | + id_param = '' if spase_id is None else 'id=%s&' % spase_id | |
105 | + with open(urlretrieve('%s?%st=yes' % (RESOLVER_URL, id_param))[0]) as http_content: | |
106 | + for node in ElTr.fromstring(http_content.read()): | |
107 | + node_id = node.attrib.get('id') | |
108 | + if node.tag == 'node': | |
109 | + print('Found dataset {:<50.50}'.format(node_id), end='\r') | |
110 | + get_datasets_ids(datasets_ids, node_id) | |
111 | + elif node.tag == 'leaf': | |
112 | + print('Found leaf {:<50.50}'.format(node_id), end='\r') | |
113 | + datasets_ids.append(node_id) | |
114 | + if spase_id is None: | |
115 | + return datasets_ids | |
116 | + | |
117 | + | |
118 | +def download_dataset_files(datasets_spase_raw_ids: List[str], black_list: Tuple[str]) -> Dict[str, str]: | |
119 | + """Download all the spase dataset files, according to the spase id list, and store them | |
120 | +recursively to appropriated folders. | |
121 | + | |
122 | +- ``datasets_spase_raw_ids``: The list of all datasets, returned by get_datasets_ids(); | |
123 | +- ``return``: a dictionary with: | |
124 | + | |
125 | + - **key** = dataset spase id ; | |
126 | + - **value** = dataset local path*. | |
127 | +""" | |
128 | + | |
129 | + nb_datasets = len(datasets_spase_raw_ids) | |
130 | + if nb_datasets == 0: | |
131 | + print('There is no dataset to parse... :/') | |
132 | + sys.exit() | |
133 | + | |
134 | + datasets_path = {} | |
135 | + for n_dataset, dataset_raw_id in enumerate(datasets_spase_raw_ids): | |
136 | + if dataset_raw_id.startswith(black_list): | |
137 | + continue | |
138 | + | |
139 | + dataset_path = op.abspath(op.join(*([SPASE_DIR] + dataset_raw_id[8:].split('/'))) + '.xml') | |
140 | + if not op.isdir(op.dirname(dataset_path)): | |
141 | + makedirs(op.dirname(dataset_path)) | |
142 | + dataset_raw_id = dataset_raw_id.strip().replace(' ', '+') | |
143 | + | |
144 | + try: | |
145 | + urlretrieve('%s?id=%s' % (RESOLVER_URL, dataset_raw_id), filename=dataset_path) | |
146 | + except HTTPError as err: | |
147 | + log('INDEX_RESOLVER_INACCESSIBLE', | |
148 | + 'dataset %s' % dataset_path, | |
149 | + 'Can not connect to URL %s, because %s' % ('%s?id=%s' % (RESOLVER_URL, dataset_raw_id), err), | |
150 | + 'Ignoring this dataset.') | |
151 | + | |
152 | + try: | |
153 | + resource_node = ElTr.parse(dataset_path).getroot().find(".//{%s}ResourceID" % XMLNS) | |
154 | + new_dataset_id = getattr(resource_node, 'text', dataset_raw_id) | |
155 | + except ElTr.ParseError: | |
156 | + log('RESOURCE_ID_NOT_FOUND', | |
157 | + 'dataset %s' % dataset_path, | |
158 | + 'Can not find ResourceID in the dataset.', | |
159 | + 'Ignoring this dataset.') | |
160 | + continue | |
161 | + datasets_path[new_dataset_id.split('/')[-1]] = dataset_path | |
162 | + | |
163 | + print('{:<50.50} [{:<50.50}] {:<11.11}'.format('Downloaded ' + new_dataset_id.split('/')[-1], | |
164 | + '.' * int((n_dataset + 1) / nb_datasets * 50), | |
165 | + '%d/%d' % (n_dataset + 1, nb_datasets)), end='\r') | |
166 | + print() | |
167 | + return datasets_path | |
168 | + | |
169 | + | |
170 | +def get_granules_indexes_url() -> Tuple[str, Dict[str, str]]: | |
171 | + """Get the granules indexes URL. | |
172 | + | |
173 | +- ``return``: A tuple containing: | |
174 | + - **The URL prefix (ie. *http://manunja.irap.omp.eu/BASE/DATA/*); | |
175 | + - a dictionary as: | |
176 | + - **key**: the dataset id (ie: *ros-magib-rsmp*); | |
177 | + - **value**: the granule URL suffix (ie. *ROS/MAG.PSA/IB.RESAMPLED/mag_times.nc*).""" | |
178 | + | |
179 | + try: | |
180 | + with open(urlretrieve(GET_INDEXES_WEBSERVICE)[0]) as http_content: | |
181 | + ws_response = http_content.read().strip() | |
182 | + except HTTPError: | |
183 | + log('GET_INDEXES_WEBSERVICE_INACCESSIBLE', | |
184 | + 'all datasets', | |
185 | + 'Can not access to get_indexes webservice (%s).' % GET_INDEXES_WEBSERVICE, | |
186 | + 'Filled all datasets with 1 granule containing default values, all granules URLs will be wrong!') | |
187 | + return '', {} | |
188 | + | |
189 | + try: | |
190 | + gr_indexes = json.loads(ws_response) | |
191 | + except ValueError: | |
192 | + ws_res_path = op.join(gettempdir(), 'indexes_response') | |
193 | + with open(ws_res_path, 'w') as f_indexes: | |
194 | + f_indexes.write(ws_response) | |
195 | + log('INDEXES_NOT_JSON', | |
196 | + 'all datasets', | |
197 | + 'get_indexes webservice (%s) did not returned a Json file. See %s.' % (GET_INDEXES_WEBSERVICE, ws_res_path), | |
198 | + 'Filled all datasets with 1 granule containing default values, all granules URLs will be wrong!') | |
199 | + return '', {} | |
200 | + | |
201 | + url_prefix = list(gr_indexes.keys())[0] if len(gr_indexes) > 0 else None | |
202 | + granules = gr_indexes.get(url_prefix, None) | |
203 | + if not url_prefix or not url_prefix.startswith('http://') or len(granules) <= 1 or type(granules) is not dict: | |
204 | + indexes_path = op.join(gettempdir(), 'get_indexes.json') | |
205 | + with open(indexes_path) as f_indexes: | |
206 | + f_indexes.write(gr_indexes) | |
207 | + log('INCONSISTENT_INDEXES', | |
208 | + 'all datasets', | |
209 | + 'The get_indexes Json file is supposed to contain one root element, ' | |
210 | + 'containing a pair (dataset_url, granules dictionary). See %s.' % indexes_path, | |
211 | + 'Filled all datasets with 1 granule containing default values, all granules URLs will be wrong!') | |
212 | + return '', {} | |
213 | + | |
214 | + return url_prefix.replace('manunja', 'amda-dev'), {k: v for (k, v) in granules.items()} | |
215 | + | |
216 | + | |
217 | +def get_grs_size_dic(dataset_spase_id: str) -> Dict[str, int]: | |
218 | + """Download the dictionary containing the granules sizes.""" | |
219 | + | |
220 | + url = '%s?id=%s' % (GET_ESTSIZE_WEBSERVICE, dataset_spase_id) | |
221 | + try: | |
222 | + with open(urlretrieve(url)[0]) as http_content: | |
223 | + try: | |
224 | + gr_dic = json.loads(http_content.read().strip()) | |
225 | + for dataset_prefix, granules_sizes in gr_dic.items(): | |
226 | + return granules_sizes # There is only one item in the dictionary. | |
227 | + except ValueError: | |
228 | + log('GRANULES_SIZE_BAD_JSON', | |
229 | + 'dataset %s' % dataset_spase_id, | |
230 | + 'When querying the granules size, can not decode the json string (`%s`...).' | |
231 | + % http_content.read().strip()[:30], | |
232 | + 'Set the granules size to 0.') | |
233 | + return {} | |
234 | + except HTTPError: | |
235 | + log('GRANULES_SIZE_SERVICE_INACCESSIBLE', | |
236 | + 'dataset %s', | |
237 | + 'Can not access to the webservice on %s when querying the granules size.' % url, | |
238 | + 'Set the granules size to 0.') | |
239 | + return {} | |
240 | + | |
241 | + | |
242 | +def get_gr_size(granules_size: Dict[str, int], granule_name: str) -> int: | |
243 | + """Get the granule size, by looking for the granule id in the dictionary.""" | |
244 | + | |
245 | + if not granules_size: | |
246 | + log('NO_GRANULES_SIZE', | |
247 | + 'granule %s' % granule_name, | |
248 | + 'There is no granules size dictionary.' % granule_name, | |
249 | + 'Set granule estimation size to 0.') | |
250 | + return 0 | |
251 | + try: | |
252 | + return int(granules_size[granule_name]) | |
253 | + except KeyError: | |
254 | + log('GRANULES_KEY_ERROR', | |
255 | + 'granule %s' % granule_name, | |
256 | + 'Can not access to the item %s in the dictionary.' % granule_name, | |
257 | + 'Set granule estimation size to 0.') | |
258 | + return 0 | |
259 | + except ValueError: | |
260 | + log('GRANULE_SIZE_NOT_INTEGER', | |
261 | + 'granule %s' % granule_name, | |
262 | + 'When retrieving the granule estsize, can not convert `%s` to an integer.' % granule_name, | |
263 | + 'Set granule estimation size to 0.') | |
264 | + return 0 | |
265 | + except TypeError: | |
266 | + log('GRANULES_SIZE_NOT_DIC', | |
267 | + 'granule %s' % granule_name, | |
268 | + 'The returned json is not a dictionary: `%s...`.' % str(granules_size)[:30], | |
269 | + 'Set granule estimation size to 0.') | |
270 | + return 0 | |
271 | + | |
272 | + | |
273 | +def write_granules(dataset_spase_id: str, granules_dir: str, release_date: str, gr_dir_url_prefix: str, | |
274 | + gr_idx_list: List[GranuleIndex], dataset_info: str) -> int: | |
275 | + """Write the granule files. | |
276 | + | |
277 | +- ``dataset_id``: the spase id of dataset that we want to get the granules; | |
278 | +- ``granules_dir``: the local directory where the granules must be writen; | |
279 | +- ``release_date``: The release date of the granule (ie, now); | |
280 | +- ``gr_idx_list``: a list of all GranuleIndex of this dataset; | |
281 | +- ``dataset_info``: Some information about the dataset which will be printed in the standard output; | |
282 | +- ``return``: The number of created files.""" | |
283 | + | |
284 | + gr_sizes = get_grs_size_dic(dataset_spase_id) | |
285 | + if not gr_sizes: | |
286 | + return 0 | |
287 | + | |
288 | + log_size = LOG_FILE.tell() | |
289 | + gr_nb = 1 | |
290 | + start_time = time() | |
291 | + info = '' | |
292 | + for n, granule in enumerate(gr_idx_list): | |
293 | + granule_name = op.splitext(granule.filename)[0] | |
294 | + granule_id = dataset_spase_id + '-%05d' % n | |
295 | + info = '{:<50.50} [{:<50.50}] {:<12.12}'.format(dataset_info, '.' * int(gr_nb / len(gr_idx_list) * 50), | |
296 | + '%d/%d' % (gr_nb, len(gr_idx_list))) | |
297 | + print(info, end='\r') | |
298 | + | |
299 | + access_url = TARGET_URL_PREFIX + gr_dir_url_prefix + '/' + granule_name # CDF file | |
300 | + # access_url = gr_dir_url_prefix + '/' + granule.filename + '.gz' # NetCDF file | |
301 | + | |
302 | + granule = GRANULE_TEMPLATE % (granule_id, release_date, dataset_spase_id, granule.start_date, granule.stop_date, | |
303 | + access_url, get_gr_size(gr_sizes, granule_name)) | |
304 | + gr_nb += 1 | |
305 | + | |
306 | + with open(op.join(granules_dir, granule_id + '.xml'), 'w+') as granule_file: | |
307 | + granule_file.write(granule) | |
308 | + | |
309 | + str_time = strftime('elapsed: %Hh%Mm%S', gmtime(time() - start_time)) | |
310 | + warning = ' see log file' if log_size != LOG_FILE.tell() else '' | |
311 | + print(info + str_time + warning) | |
312 | + return gr_nb | |
313 | + | |
314 | + | |
315 | +def check_num_data(paths: Dict[str, str]) -> None: | |
316 | + """Check the *NumericalData* files, particularly the dataproduct type and XML duration format.""" | |
317 | + | |
318 | + regex_xml_duration = re.compile(r'(?P<sign>-?)P(?:(?P<years>\d+)Y)?(?:(?P<months>\d+)M)?(?:(?P<days>\d+)D)?' + | |
319 | + r'(?:T(?:(?P<hours>\d+)H)?(?:(?P<minutes>\d+)M)?(?:(?P<seconds>\d+)S)?)?') | |
320 | + | |
321 | + for _, dataset_local_path in paths.items(): | |
322 | + tree = ElTr.parse(dataset_local_path) | |
323 | + | |
324 | + if tree.getroot().tag == 'Message': | |
325 | + log('NUM-DATA_XML_MESSAGE', | |
326 | + 'On NumericalData file %s' % dataset_local_path, | |
327 | + 'The XML file contains this message: ' + tree.getroot().text, | |
328 | + 'Set the duration to 0.') | |
329 | + return | |
330 | + | |
331 | + numdata_node = tree.getroot().find('{%s}NumericalData' % XMLNS) | |
332 | + numdata_node = tree.getroot().find('{%s}NumericalOutput' % XMLNS) if numdata_node is None else numdata_node | |
333 | + | |
334 | + temporal_description_node = numdata_node.find('{%s}TemporalDescription' % XMLNS) | |
335 | + | |
336 | + dataproduct_types = set() | |
337 | + for param in numdata_node.findall('{%s}Parameter' % XMLNS): | |
338 | + hints = param.findall('{%s}RenderingHints' % XMLNS) | |
339 | + dt_nodes = [hint.find('{%s}DisplayType' % XMLNS) for hint in hints] | |
340 | + for display in [display.text for display in dt_nodes if display is not None and display.text is not None]: | |
341 | + dataproduct_types.add(display) | |
342 | + if not dataproduct_types: | |
343 | + log('NO_DATAPRODUCT_TYPE', | |
344 | + 'On NumericalData file %s' % dataset_local_path, | |
345 | + 'There is no dataproduct type.', | |
346 | + 'Set the dataproduct type to "TimeSeries".') | |
347 | + # ts is added in build_BDD.py | |
348 | + | |
349 | + if temporal_description_node is not None: | |
350 | + for duration_key in ('Cadence_Min', 'Cadence_Max', 'Exposure'): | |
351 | + duration_node = temporal_description_node.find('{%s}%s' % (XMLNS, duration_key)) | |
352 | + xml_duration = getattr(duration_node, 'text', 'P0D') | |
353 | + try: | |
354 | + regex_xml_duration.match(xml_duration.upper()).groupdict(0) | |
355 | + except AttributeError: | |
356 | + log('NUM-DATA_BAD_DATE', | |
357 | + 'On NumericalData file %s' % dataset_local_path, | |
358 | + 'Can not decode duration: %s.' % xml_duration, | |
359 | + 'Set the duration to 0.') | |
360 | + duration_node.text = 'P0D' | |
361 | + tree.write(dataset_local_path) | |
362 | + | |
363 | + | |
364 | +def write_all_granules() -> None: | |
365 | + """Create the granules.""" | |
366 | + | |
367 | + black_list = tuple() | |
368 | + try: | |
369 | + with open(BLACKLIST_PATH) as f: | |
370 | + black_list += tuple(l.strip() for l in f.readlines() if l.strip() and not l.startswith('#')) | |
371 | + except IOError: | |
372 | + pass | |
373 | + print('ignored datasets: %s' % ', '.join(black_list)) | |
374 | + | |
375 | + print('Getting datasets spase ids...') | |
376 | + all_spase_id = get_datasets_ids() | |
377 | + | |
378 | + print('Downloading dataset files into %s...' % SPASE_DIR) | |
379 | + datasets_spase_id = [num_data for num_data in all_spase_id if num_data.startswith(SPASE_PREFIX)] | |
380 | + | |
381 | + spase_files_path = download_dataset_files(datasets_spase_id, black_list) | |
382 | + # We don't want to write granules from files which are not NumData | |
383 | + paths = {d_id: path for (d_id, path) in spase_files_path.items() | |
384 | + if True in [keyword in path for keyword in NUMDATA_KEYWORDS]} | |
385 | + | |
386 | + print('Checking numerical data files...') | |
387 | + check_num_data(paths) | |
388 | + | |
389 | + print('Getting granules index file paths...') | |
390 | + url_prefix, grs_idx_url = get_granules_indexes_url() | |
391 | + reader = GranuleIndexReader(log) | |
392 | + | |
393 | + n_datasets = 0 | |
394 | + n_gr = 0 | |
395 | + | |
396 | + for gr_idx_url in grs_idx_url: | |
397 | + if gr_idx_url not in paths: | |
398 | + log('DATASET_INDEX_NOT_LINKED', | |
399 | + 'dataset %s' % gr_idx_url, | |
400 | + 'This dataset is found in the granules indexes json file (returned by %s), ' | |
401 | + 'but not in the resolver (%s).' % (GET_INDEXES_WEBSERVICE, RESOLVER_URL), | |
402 | + 'Ignored this dataset.') | |
403 | + | |
404 | + print('Creating granules...') | |
405 | + start_time = time() | |
406 | + | |
407 | + for dataset_spase_id, dataset_local_path in paths.items(): | |
408 | + nc_file_path = grs_idx_url.get(dataset_spase_id, '') | |
409 | + if not nc_file_path: | |
410 | + log('DATASET_NOT_IN_IDX_DIC', | |
411 | + 'dataset %s' % dataset_spase_id, | |
412 | + 'This dataset is not found in the granules indexes json file returned by %s.' % GET_INDEXES_WEBSERVICE, | |
413 | + 'Set default times values for all granules of this dataset.') | |
414 | + grs_idx_list = reader.get_granules_index(dataset_spase_id, url_prefix + nc_file_path) | |
415 | + | |
416 | + for keyword in NUMDATA_KEYWORDS: | |
417 | + dataset_local_path = dataset_local_path.replace(keyword, GRANULE_KEYWORD) | |
418 | + grs_local_dir = op.dirname(dataset_local_path) | |
419 | + if not op.exists(grs_local_dir): | |
420 | + makedirs(grs_local_dir) | |
421 | + | |
422 | + release_date = datetime.now().strftime(XML_DATE_FORMAT) | |
423 | + dataset_info = '%s dataset %d/%d (%.2f%%) %s' % \ | |
424 | + (strftime('%H:%M'), n_datasets + 1, len(paths), | |
425 | + (n_datasets / len(paths) * 100), dataset_spase_id) | |
426 | + gr_dir_url_suffix = '' if not nc_file_path else '/'.join(nc_file_path.split('/')[:-1]) | |
427 | + try: | |
428 | + n_gr += write_granules(dataset_spase_id, grs_local_dir, release_date, gr_dir_url_suffix, grs_idx_list, | |
429 | + dataset_info) | |
430 | + except Exception as error: | |
431 | + print('A problem occurred when creating a granule from dataset %s:' % dataset_spase_id) | |
432 | + LOG_FILE.close() | |
433 | + raise error | |
434 | + n_datasets += 1 | |
435 | + | |
436 | + elapsed = strftime('%Hh%Mm%S', gmtime(time() - start_time)) | |
437 | + print('100%%, %d files created in %s.' % (n_gr, elapsed)) | |
438 | + | |
439 | + | |
440 | +if __name__ == '__main__': | |
441 | + if not op.exists(BASE_DIR): | |
442 | + makedirs(BASE_DIR) | |
443 | + | |
444 | + if op.isdir(SPASE_DIR): | |
445 | + print('Clearing SPASE directory (%s)...' % SPASE_DIR) | |
446 | + shutil.rmtree(SPASE_DIR) | |
447 | + | |
448 | + write_all_granules() | |
449 | + | |
450 | + LOG_FILE.close() | ... | ... |
... | ... | @@ -0,0 +1,194 @@ |
1 | +#!/usr/bin/env python | |
2 | +# -*- coding: utf-8 -*- | |
3 | + | |
4 | +"""This script parses netCdf files.""" | |
5 | + | |
6 | +import re | |
7 | +import os | |
8 | +import os.path as op | |
9 | +from datetime import datetime | |
10 | +from mimetypes import MimeTypes | |
11 | +from netCDF4 import Dataset | |
12 | +import pathlib | |
13 | +from collections import namedtuple | |
14 | +from typing import List, Optional | |
15 | +from tempfile import gettempdir | |
16 | +from urllib.request import urlretrieve | |
17 | +from urllib.error import HTTPError | |
18 | + | |
19 | +# dates format | |
20 | +SPASE_DATE_FORMAT = '%Y%j%H%M%S' # ex: 2016238000000* | |
21 | +XML_DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ' # ex: <StartDate>2016-08-26T00:00:00Z</StartDate> | |
22 | +SPASE_INDEX_TEMP_PATH = op.join(gettempdir(), 'index.nc') | |
23 | + | |
24 | +GranuleIndex = namedtuple('GranuleIndex', 'start_date stop_date filename') | |
25 | + | |
26 | + | |
27 | +class GranuleIndexReader: | |
28 | + | |
29 | + def __init__(self, log_fct): | |
30 | + self.log_fct = log_fct | |
31 | + | |
32 | + def load_dataset(self, target_name: str, granule_index_url: str) -> Optional[Dataset]: | |
33 | + """Load the Dataset stored in `self.nc_file_path`.""" | |
34 | + if op.isfile(SPASE_INDEX_TEMP_PATH): | |
35 | + os.remove(SPASE_INDEX_TEMP_PATH) | |
36 | + | |
37 | + try: | |
38 | + urlretrieve(granule_index_url, SPASE_INDEX_TEMP_PATH) | |
39 | + except HTTPError: | |
40 | + self.log_fct('INDEX_INACCESSIBLE', | |
41 | + 'dataset %s' % target_name, | |
42 | + 'Can not access to %s.' % granule_index_url, | |
43 | + 'Filled this dataset with 1 granule containing default values, granules URLs will be wrong!') | |
44 | + return | |
45 | + | |
46 | + if not op.isfile(SPASE_INDEX_TEMP_PATH): | |
47 | + self.log_fct('INDEX_FILE_NOT_FOUND', | |
48 | + 'dataset %s' % target_name, | |
49 | + 'The granules index file has not been correctly downloaded.', | |
50 | + 'Filled this dataset with 1 granule containing default values, granules URLs will be wrong!') | |
51 | + return | |
52 | + | |
53 | + mime_type = MimeTypes().guess_type(pathlib.Path(op.abspath(SPASE_INDEX_TEMP_PATH)).as_uri())[0] | |
54 | + if mime_type != 'application/x-netcdf': | |
55 | + self.log_fct('INDEX_FILE_NOT_NET-CDF', | |
56 | + 'dataset %s' % target_name, | |
57 | + 'The mime-type of the granules index file is not application/netcdf but "%s". See %s.' % | |
58 | + (mime_type, SPASE_INDEX_TEMP_PATH), | |
59 | + 'Filled this dataset with 1 granule containing default values, granules URLs will be wrong!') | |
60 | + return | |
61 | + | |
62 | + try: | |
63 | + return Dataset(SPASE_INDEX_TEMP_PATH) | |
64 | + except Exception as e: | |
65 | + self.log_fct('CANT_LOAD_INDEX_FILE', | |
66 | + 'dataset %s' % target_name, | |
67 | + 'Can not load the granules index file with NetCDF4 (%e).' | |
68 | + 'See %s.' % (e.__cause__, SPASE_INDEX_TEMP_PATH), | |
69 | + 'Filled this dataset with 1 granule containing default values, granules URLs will be wrong!') | |
70 | + | |
71 | + def get_granules_index(self, target_name: str, nc_file_path: str) -> List[GranuleIndex]: | |
72 | + if not nc_file_path: | |
73 | + return [GranuleIndex('0001-01-01T00:00:00Z', '0001-01-01T00:00:00Z', target_name + '_unknown.nc')] | |
74 | + | |
75 | + dataset = self.load_dataset(target_name, nc_file_path) | |
76 | + if not dataset: | |
77 | + return [GranuleIndex('0001-01-01T00:00:00Z', '0001-01-01T00:00:00Z', target_name + '_unknown.nc')] | |
78 | + | |
79 | + str_start_time = self.nc_ba_to_strings(target_name, 'StartTime', dataset.variables['StartTime'][:]) | |
80 | + str_stop_time = self.nc_ba_to_strings(target_name, 'StopTime', dataset.variables['StopTime'][:]) | |
81 | + file_names = self.nc_ba_to_strings(target_name, 'FileName', dataset.variables['FileName'][:]) | |
82 | + xml_start_times = self.get_nc_times(target_name, str_start_time) | |
83 | + xml_stop_times = self.get_nc_times(target_name, str_stop_time) | |
84 | + | |
85 | + rec_len = dataset.dimensions['record'].size | |
86 | + granules_index = [GranuleIndex(xml_start_times[i], xml_stop_times[i], file_names[i]) for i in range(rec_len)] | |
87 | + dataset.close() | |
88 | + | |
89 | + return granules_index | |
90 | + | |
91 | + def nc_ba_to_strings(self, target_name: str, col_name: str, byte_arrays: List): | |
92 | + """Convert a net-cdf byte array to a string. | |
93 | + If ``UnicodeDecodeError`` is raised, converts only the bytes before the first ``b''``. | |
94 | + | |
95 | + - ``byte_arrays``: A net-cdf bytes array; | |
96 | + - ``return``: The string representation of the bytes array.""" | |
97 | + | |
98 | + strings = [] | |
99 | + for i, bytes_array in enumerate(byte_arrays): | |
100 | + txt = [] | |
101 | + string_ended = False | |
102 | + for j, byte in enumerate(bytes_array): | |
103 | + if byte: | |
104 | + if string_ended: | |
105 | + hex_array = ', '.join([str(byte) for byte in bytes_array]) | |
106 | + self.log_fct('INVISIBLE_BYTES', | |
107 | + 'granules index "%s" on column %s and row %d' % (target_name, col_name, i), | |
108 | + 'The bytes array contains the byte b\'\' (at index %d), ' % j + | |
109 | + 'followed by other characters: [%s]. ' % hex_array, | |
110 | + 'Removed all characters after the first occurrence of b\'\' in the array.') | |
111 | + break | |
112 | + try: | |
113 | + txt.append(byte.decode('utf-8')) | |
114 | + except UnicodeDecodeError: | |
115 | + hex_array = ', '.join([str(byte) for byte in bytes_array]) | |
116 | + self.log_fct('BAD_BYTES', | |
117 | + 'granules index "%s" on column %s and row %d' % (target_name, col_name, i), | |
118 | + 'Can not decode byte %s at index %d on the the bytes array: [%s].' | |
119 | + % (str(byte), j, hex_array), | |
120 | + 'Changed bad byte by byte\'\'.') | |
121 | + break | |
122 | + else: | |
123 | + string_ended = True | |
124 | + strings.append(''.join(txt)) | |
125 | + return strings | |
126 | + | |
127 | + def get_nc_times(self, target_name: str, nc_times: List[str]): | |
128 | + """Converts an array of *SPASE dates* to an array of **XML dates*. | |
129 | + | |
130 | + - ``nc_times``: An array of string, containing the dates in their net-cdf format. | |
131 | + - ``self.target_name``: The url of the net-cdf file of the granule, only used to print it in log_fct. | |
132 | + - ``return``: An array of string, containing the dates in their XML format.""" | |
133 | + | |
134 | + contains_no_digit_chars = re.compile(r'.*\D.*') | |
135 | + dates = [] | |
136 | + for nc_time in nc_times: | |
137 | + if contains_no_digit_chars.match(nc_time): | |
138 | + self.log_fct('DATE_NO_NUM', | |
139 | + 'granules index "%s"' % target_name, | |
140 | + 'The date "%s" contains non numerical characters.' % nc_time, | |
141 | + 'Removed other chars.') | |
142 | + nc_time = re.sub(r'\D', '', nc_time) | |
143 | + if len(nc_time) > 16: | |
144 | + self.log_fct('DATE_TOO_LONG', | |
145 | + 'granules index "%s"' % target_name, | |
146 | + 'The length of the date "%s" is more than 16 chars.' % nc_time, | |
147 | + 'Removed other chars.') | |
148 | + nc_time = nc_time[:16] | |
149 | + if len(nc_time) < 16: | |
150 | + self.log_fct('DATE_TOO_SHORT', | |
151 | + 'granules index "%s"' % target_name, | |
152 | + 'The length of the date "%s" is less than 16 chars.' % nc_time, | |
153 | + 'Replaced other chars by 0.') | |
154 | + nc_time = nc_time.ljust(16, '0') | |
155 | + | |
156 | + year, days = int(nc_time[:4]), int(nc_time[4:7]) + 1 | |
157 | + hour, minute, sec = int(nc_time[7:9]), int(nc_time[9:11]), int(nc_time[11:13]) | |
158 | + | |
159 | + if year == 0: | |
160 | + self.log_fct('WRONG_YEAR', | |
161 | + 'granules index "%s", date ' % target_name, | |
162 | + 'The year of the date "%s" is 0.' % nc_time, | |
163 | + 'Replaced by 1.') | |
164 | + year = 1 | |
165 | + # check leap years: | |
166 | + max_days = 366 if (year % 4 == 0 and not (year % 100 == 0 and year % 400 != 0)) else 365 | |
167 | + if days > max_days: | |
168 | + self.log_fct('WRONG_DAY', | |
169 | + 'granules index "%s"' % target_name, | |
170 | + 'The day of the year in the date "%s" is > %d.' % (nc_time, max_days), | |
171 | + 'Replaced by %d.' % max_days) | |
172 | + days = max_days | |
173 | + if hour > 23: | |
174 | + self.log_fct('WRONG_HOUR', | |
175 | + 'granules index "%s"' % target_name, | |
176 | + 'The hour of the time "%s" is > 23.' % nc_time, | |
177 | + 'Replaced by 23.') | |
178 | + hour = 23 | |
179 | + if minute > 59: | |
180 | + self.log_fct('WRONG_MIN', | |
181 | + 'granules index "%s"' % target_name, | |
182 | + 'The minute of the time %s is > 59.' % nc_time, | |
183 | + 'Replaced by 59.') | |
184 | + minute = 59 | |
185 | + if sec > 59: | |
186 | + self.log_fct('WRONG_SEC', | |
187 | + 'granules index "%s"' % target_name, | |
188 | + 'The second of the time "%s" is > 59.' % nc_time, | |
189 | + 'Replaced by 59.') | |
190 | + sec = 59 | |
191 | + | |
192 | + str_date = '%04d%03d%02d%02d%02d' % (year, days, hour, minute, sec) | |
193 | + dates.append(datetime.strptime(str_date, SPASE_DATE_FORMAT).strftime(XML_DATE_FORMAT)) | |
194 | + return dates | ... | ... |