#!/usr/bin/env python # -*- coding: utf-8 -*- """This script inspect a SPASE dataset folder (containing Granules, NumericalData, Instrument and Observatory folders), then generate a SQL script which insert all the granules in a database, formatted as epn-tap parameters. See http://spase-group.org/data/reference/spase-2_2_6/ for more information about spase specification, and https://voparis-confluence.obspm.fr/display/VES/EPN-TAP+V2.0+parameters for more information about epn-tap-v2 specification.""" import math import re import xml.etree.ElementTree as ElTr import os.path as op from os import walk from datetime import datetime, timedelta from typing import Tuple, List, Dict, Optional import sys # Type aliases SQLDic = Dict[str, object] SpaseDic = Dict[str, List[ElTr.Element]] # Paths WORKING_DIR = op.dirname(op.abspath(__file__)) # parent directory OUTPUT_SQL_FILE_PATH = op.join(WORKING_DIR, 'DaCHS', 'amdadb_db.sql') SPASE_DIR = op.join(WORKING_DIR, 'DATA') LOG_FILE_PATH = op.join(WORKING_DIR, 'log', 'build_granules.log') # Set to None if you want to log in stdout instead of a file # XML and SQL formats XMLNS = 'http://www.spase-group.org/data/schema' XML_DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ' SQL_DATE_FORMAT = '%Y-%m-%d' SEP = '#' # Dictionaries of values DATAPRODUCT_TYPE_DIC = {'Image': 'im', 'Plasmagram': 'ds', 'Spectrogram': 'ds', 'StackPlot': 'ts', 'TimeSeries': 'ts', 'time_series': 'ts', 'WaveForm': 'ts'} PROCESSING_LEVEL_DIC = {'Calibrated': 3, 'Raw': 1, 'Uncalibrated': 5} # Based on http://spase-group.org/ TARGET_CLASS_DIC = {'Heliosphere': 'interplanetary_medium', 'Interstellar': 'galaxy', 'Earth': 'planet', 'Saturn': 'planet', 'Mercury': 'planet', 'Uranus': 'planet', 'Mars': 'planet', 'Neptune': 'planet', 'Jupiter': 'planet', 'Venus': 'planet', 'Moon': 'satellite', 'Callisto': 'satellite', 'Europa': 'satellite', 'Ganymede': 'satellite', 'Dione': 'satellite', 'Enceladus': 'satellite', 'Mimas': 'satellite', 'Miranda': 'satellite', 'Phobos': 'satellite', 'Iapetus': 'satellite', 'Titania': 'satellite', 'Oberon': 'satellite', 'Puck': 'satellite', 'Deimos': 'satellite', 'Ariel': 'satellite', 'Umbriel': 'satellite', 'Rhea': 'satellite', 'Tethys': 'satellite', 'Titan': 'satellite', 'Io': 'satellite', 'Pluto': 'dwarf_planet', 'Comet': 'comet' } MIME_TYPE_LIST = {'AVI': 'video/x-msvideo', 'Binary': 'application/octet-stream', 'CDF': 'application/x-cdf-istp', 'CEF': 'application/x-cef1', 'CEF1': 'application/x-cef1', 'CEF2': 'application/x-cef2', 'Excel': 'application/vnd.ms-excel', 'FITS': 'application/x-fits-bintable', 'GIF': 'image/gif', 'HDF': 'application/x-hdf', 'HDF4': 'application/x-hdf', 'HDF5': 'application/x-hdf', 'HTML': 'text/html', 'Hardcopy': None, 'Hardcopy.Film': None, 'Hardcopy.Microfiche': None, 'Hardcopy.Microfilm': None, 'Hardcopy.Photograph': None, 'Hardcopy.PhotographicPlate': None, 'Hardcopy.Print': None, 'IDFS': None, 'IDL': 'application/octet-stream', 'JPEG': 'image/jpeg ', 'MATLAB_4': 'application/octet-stream', 'MATLAB_6': 'application/octet-stream', 'MATLAB_7': 'application/octet-stream', 'MPEG': 'video/mpeg', 'NCAR': None, 'NetCDF': 'application/x-netcdf', 'PDF': 'application/pdf', 'PNG': 'image/png', 'Postscript': 'application/postscript', 'QuickTime': 'video/quicktime', 'TIFF': 'image/tiff', 'Text': 'text/plain', 'Text.ASCII': 'text/plain', 'Text.Unicode': 'text/plain', 'UDF': None, 'VOTable': 'application/x-votable+xml', 'XML': 'text/xml'} # All default SQL values for missing parameters in dataset DEFAULT_DATASET_VALUES = { 'dataproduct_type': 'Unknown', 'target_name': 'Unknown', 'target_class': 'Unknown', 'target_region': None, 'spase_region': None, 'instrument_host_name': None, 'instrument_name': None, 'measurement_type': None, 'spatial_frame_type': None, 'processing_level': 0, 'time_sampling_step_min': None, 'time_sampling_step_max': None, 'time_exp_min': None, 'access_format': 'application/x-cdf-istp' } # All default SQL values for missing parameters in granule DEFAULT_GRANULE_VALUES = { # obs_id: if missing, the script exits directly. 'time_min': 0.0, 'time_max': 0.0, 'access_url': None, 'access_estsize': 0, 'release_date': '01-01-0001' } # SQL code SQL_HEADER = '''-- Generated by build_BDD.py on %s. -- SQL procedure to define amdadb data table. Other parameters comes in the epn_core view. -- Name: amdadb; Type: SCHEMA; Schema: amdadb; Owner: postgres DROP SCHEMA IF EXISTS amdadb cascade; CREATE SCHEMA amdadb; SET search_path = public, pg_catalog; SET default_tablespace = ''; SET default_with_oids = false; SET client_encoding = 'UTF8'; -- Name: data_table; Type: TABLE; Schema: amdadb; Owner: postgres; Tablespace: CREATE TABLE amdadb.data_table ( -- header parameters id SERIAL PRIMARY KEY, obs_id TEXT, dataproduct_type TEXT, target_name TEXT, time_min DOUBLE PRECISION, -- date as JD time_max DOUBLE PRECISION, -- date as JD -- important parameters access_url TEXT, target_class TEXT, target_region TEXT, spase_region TEXT, instrument_host_name TEXT, instrument_name TEXT, measurement_type TEXT, spase_measurement_type TEXT, spatial_frame_type TEXT, processing_level INTEGER, release_date DATE, access_estsize INTEGER, access_format TEXT, time_sampling_step_min DOUBLE PRECISION, -- duration in seconds time_sampling_step_max DOUBLE PRECISION, -- duration in seconds time_exp_min DOUBLE PRECISION -- duration in seconds ); ''' % datetime.now().strftime('%c') SQL_ROW = 'INSERT INTO amdadb.data_table(%s) VALUES (%s);\n' SQL_FOOTER = '''REVOKE ALL ON SCHEMA "amdadb" FROM PUBLIC; REVOKE ALL ON SCHEMA "amdadb" FROM postgres; GRANT ALL ON SCHEMA "amdadb" TO postgres; GRANT ALL PRIVILEGES ON SCHEMA amdadb TO gavo WITH GRANT OPTION; GRANT ALL PRIVILEGES ON SCHEMA amdadb TO gavoadmin WITH GRANT OPTION; GRANT ALL PRIVILEGES ON amdadb.data_table TO gavo WITH GRANT OPTION; GRANT ALL PRIVILEGES ON amdadb.data_table TO gavoadmin WITH GRANT OPTION;''' def log(message: str) -> None: """Log a warning in a log file or the stdout. - ``message``: The message to display or to print in the log file. """ if log_file: log_file.write(message + '\n') else: print(message) def get_nb_files() -> int: """Get the number of files in the ``SPASE`` directory, in order to be able to show a progress bar.""" return sum([len(walker[2]) for walker in walk(SPASE_DIR)]) def get_spase() -> Optional[SpaseDic]: """Get all the spase files - ``return``: a dictionary, where: - **key** = dataset type ('numerical_data', 'granules', etc) ; - **value** = A list of spase ElementTree nodes. """ spase_dic = {} n_file = 0 for dir_path, _, files in walk(SPASE_DIR): for file_path in [op.join(dir_path, file_name) for file_name in files]: try: root = ElTr.parse(file_path).getroot() except FileNotFoundError: print('\nThe spase file is not found on %s.\n' % file_path) with open(file_path) as spase_file: print(spase_file.read()) return for child in root: key = str(child.tag).split('}')[-1] if key != 'Version': if key not in spase_dic: spase_dic[key] = [] spase_dic[key].append(child) print('Parsed {:<23.23} {:<19.19} [{:<50.50}]'.format( '%d/%d (%.2f%%)' % (n_file + 1, nb_files, 100 * float(n_file + 1) / nb_files), op.splitext(op.basename(file_path))[0], '.' * int((n_file + 1) / nb_files * 50)), end='\r') n_file += 1 print() if not spase_dic: print('The SPASE dictionary is empty, please check the SPASE folder: %s.' % SPASE_DIR) return return spase_dic def get_observatory(spase_dic: SpaseDic, observatory_id: str) -> ElTr.Element: """Given the ``observatory_id``, return the *observatory ElementTree node* (by looking in the Observatory spase file). """ obs_ids = [obs.find('{%s}ResourceID' % XMLNS).text for obs in spase_dic['Observatory']] return spase_dic['Observatory'][obs_ids.index(observatory_id)] def get_instrument(spase_dic: SpaseDic, instrument_id: str) -> ElTr.Element: """Given the ``instrument_id``, return the *instrument ElementTree node*, by looking in the Instrument spase file. """ instru_ids = [instru.find('{%s}ResourceID' % XMLNS).text for instru in spase_dic['Instrument']] return spase_dic['Instrument'][instru_ids.index(instrument_id)] def get_access_format(numerical_data_node: ElTr.Element) -> SQLDic: """Given the ``NumericalData`` node, return a dictionary containing the access format (mime-type).""" access_formats = set() for access_info in numerical_data_node.findall('{%s}AccessInformation' % XMLNS): spase_format_node = access_info.find('{%s}Format' % XMLNS) if spase_format_node and spase_format_node.text: access_formats.add(spase_format_node.text) access_format = SEP.join(access_formats) try: return {'access_format': MIME_TYPE_LIST[access_format]} except KeyError: return {'access_format': None} def get_region_info(numerical_data_node: ElTr.Element) -> SQLDic: """Given the ``NumericalData`` node, return a dictionary containing: - **target_class**: the ```target_class`` EPN-TAP parameter; - **target_name**: the ```target_name`` EPN-TAP parameter; - **target_region**: the ``target_region`` EPN-TAP parameter. - **spase_region**: the ``spase_region`` parameter, added to the EPN-TAP parameters for the purposes of AMDA. """ target_name = set() target_class = set() target_region = set() spase_region = set() obs_regions = numerical_data_node.findall('{%s}ObservedRegion' % XMLNS) for target in [o_reg.text.split('.') for o_reg in obs_regions if o_reg.text is not None]: offset = 1 if len(target) >= 2 and target[1] in TARGET_CLASS_DIC \ and TARGET_CLASS_DIC[target[1]] == 'satellite' else 0 target_class.add(TARGET_CLASS_DIC[target[offset]]) target_name.add(target[offset] if target[offset] != 'Heliosphere' else 'Sun') target_region.add('.'.join(target[offset + 1:])) spase_region.add('.'.join(target)) return {'target_class': SEP.join(target_class) if target_class else None, 'target_name': SEP.join(target_name) if target_name else None, 'target_region': SEP.join(target_region) if target_region else None, 'spase_region': SEP.join(spase_region) if spase_region else None} def get_instru_name_and_host_name(spase_dic: SpaseDic, numerical_data_node: ElTr.Element) -> SQLDic: """Given the ``NumericalData`` node, return a dictionary containing: - **instrument_name**: the ``instrument_name`` EPN-TAP parameter; - **instrument_host_name**: the ``instrument_host_name`` EPN-TAP parameter. """ instru_names = set() instru_host_names = set() for instru_id in [i.text for i in numerical_data_node.findall('{%s}InstrumentID' % XMLNS)]: instru = get_instrument(spase_dic, instru_id) instru_names.add(instru.find('{%s}ResourceHeader' % XMLNS).find('{%s}ResourceName' % XMLNS).text) observatory = get_observatory(spase_dic, instru.find('{%s}ObservatoryID' % XMLNS).text) instru_host_names.add(observatory.find('{%s}ResourceHeader' % XMLNS).find('{%s}ResourceName' % XMLNS).text) return {'instrument_name': SEP.join(instru_names) if instru_names else None, 'instrument_host_name': SEP.join(instru_host_names) if instru_host_names else None} def get_types(numerical_data_node: ElTr.Element) -> SQLDic: """Given the ``NumericalData`` node, return a dictionary containing: - **dataproduct_type**: the ``dataproduct_type`` EPN-TAP parameter; - **spatial_frame_type**: the ``spatial_frame_type`` EPN-TAP parameter; - **measurement_type**: the ``measurement_type`` EPN-TAP parameter. - **spase_measurement_type**: the ``spase_measurement_type`` parameter, added to the EPN-TAP parameters for the purposes of AMDA. """ with open('log', 'w') as f_out: dataproduct_types = set() sp_frame_types = set() measurement_types = set() spase_measurement_type = getattr(numerical_data_node.find('{%s}MeasurementType' % XMLNS), 'text', None) for param in numerical_data_node.findall('{%s}Parameter' % XMLNS): hints = param.findall('{%s}RenderingHints' % XMLNS) dt_nodes = [hint.find('{%s}DisplayType' % XMLNS) for hint in hints] for display in [display.text for display in dt_nodes if display is not None and display.text is not None]: dataproduct_types.add(DATAPRODUCT_TYPE_DIC[display]) coord_sys = param.find('{%s}CoordinateSystem' % XMLNS) if coord_sys is not None: sp_frame_types.add(coord_sys.find('{%s}CoordinateRepresentation' % XMLNS).text.lower()) measurement_type = param.find('{%s}Ucd' % XMLNS) if measurement_type is not None and measurement_type.text is not None: f_out.write(measurement_type.text) measurement_types.add(measurement_type.text) return {'dataproduct_type': SEP.join(dataproduct_types) if dataproduct_types else None, 'spatial_frame_type': SEP.join(sp_frame_types) if sp_frame_types else None, 'measurement_type': SEP.join(measurement_types) if measurement_types else None, 'spase_measurement_type': spase_measurement_type} def get_times_min_max(numerical_data_node: ElTr.Element) -> SQLDic: """Given the ``NumericalData`` node, return a dictionary containing: - **time_sampling_step_min**: the ``time_sampling_step_min`` EPN-TAP parameter; - **time_sampling_step_max**: the ``time_sampling_step_max`` EPN-TAP parameter; - **time_exp_min**: the ``time_exp_min`` EPN-TAP parameter. """ temporal_description_node = numerical_data_node.find('{%s}TemporalDescription' % XMLNS) if temporal_description_node is None: return {'time_sampling_step_min': None, 'time_sampling_step_max': None, 'time_exp_min': None} return {'time_sampling_step_min': str(xml_duration_to_seconds(getattr(temporal_description_node.find( '{%s}%s' % (XMLNS, 'Cadence_Min')), 'text', None))), 'time_sampling_step_max': str(xml_duration_to_seconds(getattr(temporal_description_node.find( '{%s}%s' % (XMLNS, 'Cadence_Max')), 'text', None))), 'time_exp_min': str(xml_duration_to_seconds(getattr(temporal_description_node.find( '{%s}%s' % (XMLNS, 'Exposure')), 'text', None))) } def get_processing_lvl(numerical_data_node: ElTr.Element) -> SQLDic: """Given the ``NumericalData`` node, return a dictionary containing: - **processing_level**: the ``processing_level`` EPN-TAP parameter; """ proc_lvl = getattr(numerical_data_node.find('{%s}ProcessingLevel' % XMLNS), 'text', None) return {'processing_level': PROCESSING_LEVEL_DIC.get(proc_lvl, None)} def get_granule_and_parent(gr_node: ElTr.Element) -> Tuple[str, SQLDic]: """Given a Granule node, return a dictionary containing all the parameters inside it: - **obs_id**: the ``obs_id`` EPN-TAP parameter; - **creation_date**: the ``creation_date`` EPN-TAP parameter; - **release_date**: the ``release_date`` EPN-TAP parameter; - **time_min**: the ``time_min`` EPN-TAP parameter; - **time_max**: the ``time_max`` EPN-TAP parameter; - **access_url**: the ``access_url`` EPN-TAP parameter; - **access_estsize**: the ``access_estsize`` EPN-TAP parameter. """ parent_id = getattr(gr_node.find('{%s}ParentID' % XMLNS), 'text', None) obs_id = getattr(gr_node.find('{%s}ResourceID' % XMLNS), 'text', '').split('/')[-1] if not obs_id: print('Can not get the ResourceID content of a granule. Exiting here.') sys.exit() release_date = getattr(gr_node.find('{%s}ReleaseDate' % XMLNS), 'text', None) tim_min = xml_date_to_jd(getattr(gr_node.find('{%s}StartDate' % XMLNS), 'text', None)) time_max = xml_date_to_jd(getattr(gr_node.find('{%s}StopDate' % XMLNS), 'text', None)) src_n = gr_node.find('{%s}Source' % XMLNS) access_url = getattr(src_n.find('{%s}URL' % XMLNS), 'text', None) if src_n else None data_extent_node = src_n.find('{%s}DataExtent' % XMLNS) if src_n else None access_estsize = getattr(data_extent_node.find('{%s}Quantity' % XMLNS), 'text', None) return parent_id, {'obs_id': obs_id, 'release_date': release_date, 'time_min': tim_min, 'time_max': time_max, 'access_url': access_url, 'access_estsize': int(access_estsize) if access_estsize else None} def xml_date_to_jd(xml_date: str) -> Optional[float]: """Convert a *XML date* to *Julian day*.""" try: output_date = datetime.strptime(xml_date, XML_DATE_FORMAT) except ValueError: # Date is not well formatted return None if output_date.month == 1 or output_date.month == 2: year_p = output_date.year - 1 month_p = output_date.month + 12 else: year_p = output_date.year month_p = output_date.month # this checks where we are in relation to October 15, 1582, the beginning # of the Gregorian calendar. if ((output_date.year < 1582) or (output_date.year == 1582 and output_date.month < 10) or (output_date.year == 1582 and output_date.month == 10 and output_date.day < 15)): j_day = 0 else: j_day = 2 - math.trunc(year_p / 100.) + math.trunc(math.trunc(year_p / 100.) / 4.) j_day += math.trunc((365.25 * year_p) - 0.75) if year_p < 0 else math.trunc(365.25 * year_p) j_day += math.trunc(30.6001 * (month_p + 1)) + output_date.day + 1720994.5 j_day += output_date.hour/24 + output_date.minute/1440 + output_date.second/86400 return j_day def xml_date_to_sql_date(xml_date: str) -> str: """Convert a *XML date* to a *SQL date*.""" return datetime.strptime(xml_date, XML_DATE_FORMAT).strftime(SQL_DATE_FORMAT) def xml_duration_to_seconds(xml_duration: str) -> int: """Convert a *XML duration* to seconds.""" if not xml_duration: return 0 regex = re.compile(r'(?P-?)P(?:(?P\d+)Y)?(?:(?P\d+)M)?(?:(?P\d+)D)?' + r'(?:T(?:(?P\d+)H)?(?:(?P\d+)M)?(?:(?P\d+)S)?)?') time = regex.match(xml_duration.upper()).groupdict(0) delta = timedelta( days=int(time['days']) + (int(time['months']) * 30) + (int(time['years']) * 365), hours=int(time['hours']), minutes=int(time['minutes']), seconds=int(time['seconds'])) return (delta * -1 if time['sign'] == "-" else delta).total_seconds() def get_parameters(spase_dic: SpaseDic) -> List[SQLDic]: """Get all the parameters of the entire dataset. Return a list containing the granules, where each granule is a dictionary, with: - **keys**: the EPN-TAP parameter name; - **values**: the EPN-TAP value corresponding to the parameter name. """ datasets = {} missing_parameters = {} nb_elements = len(spase_dic['NumericalData']) + len(spase_dic['NumericalOutput']) + len(spase_dic['Granule']) n_dataset = 0 for numerical_data_node in spase_dic['NumericalData'] + spase_dic['NumericalOutput']: print('Dataset %d/%d' % (n_dataset, nb_elements), end=' ' * 99 + '\r') n_dataset += 1 try: dataset_key = getattr(numerical_data_node.find('{%s}ResourceID' % XMLNS), 'text', None).split('/')[-1] except AttributeError: print('Can not get the ResourceID content of a dataset. Exiting here.') sys.exit() dataset = get_region_info(numerical_data_node) dataset.update(get_instru_name_and_host_name(spase_dic, numerical_data_node)) dataset.update(get_types(numerical_data_node)) dataset.update(get_access_format(numerical_data_node)) dataset.update(get_times_min_max(numerical_data_node)) dataset.update(get_processing_lvl(numerical_data_node)) # Looking for None parameters in each dataset for parameter, default_value in DEFAULT_DATASET_VALUES.items(): if not dataset[parameter]: dataset[parameter] = default_value if dataset_key not in missing_parameters: missing_parameters[dataset_key] = set() missing_parameters[dataset_key].add(parameter) datasets[dataset_key] = dataset granules_list = [] for granule_node in spase_dic['Granule']: parent_id, granule = get_granule_and_parent(granule_node) dataset_key = parent_id.split('/')[-1] print('Granule {:<23.23} {:<18.18} [{:<50.50}]'.format( '%d/%d (%.2f%%)' % (n_dataset + 1, nb_elements, 100 * float(n_dataset + 1) / nb_elements), dataset_key, '.' * int((n_dataset + 1) / nb_files * 50)), end='\r') # Looking for None parameters in each granule for parameter, default_value in DEFAULT_GRANULE_VALUES.items(): if not granule[parameter]: granule[parameter] = default_value if dataset_key not in missing_parameters: missing_parameters[dataset_key] = set() missing_parameters[dataset_key].add(parameter) try: granule.update(datasets[dataset_key]) except KeyError: print('The parent id "%s" of the granule "%s" is not found in the dataset dictionary.' % (parent_id, granule['access_url'])) granules_list.append(granule) n_dataset += 1 print() for bad_dataset, missings in missing_parameters.items(): log('%s\tmissing %s' % (bad_dataset, ', '.join(missings))) return granules_list def write_sql(granules_list): """Write a SQL script which insert all the granules in the database.""" with open(OUTPUT_SQL_FILE_PATH, 'w') as sql_file: sql_file.write(SQL_HEADER) for gr in granules_list: keys = ', '.join(gr.keys()) values = ', '.join(['NULL' if param is None else "'%s'" % param if isinstance(param, str) else str(param) for param in gr.values()]) sql_file.write(SQL_ROW % (keys, values)) sql_file.write(SQL_FOOTER) if __name__ == '__main__': log_file = open(LOG_FILE_PATH, 'w+') if LOG_FILE_PATH else None print('Getting number of files in %s...' % SPASE_DIR) nb_files = get_nb_files() print('Parsing %d files...' % nb_files) spase = get_spase() print('Done. Found these types of data: %s.' % ', '.join([key for (key, val) in spase.items()])) print('Loading numerical data...') granules = get_parameters(spase) print('Creating SQL script...') write_sql(granules) import subprocess subprocess.Popen(['notify-send', 'The SQL script %s has been generated.' % OUTPUT_SQL_FILE_PATH])