import os import sys import logging from lxml import etree import netCDF4 as nc import numpy as np import gzip import pickle from datetime import datetime vi_to_exclude = [ 'juno_ephem_orb1', 'juno_fgm_orbfull', 'juno_fgm_orb1', 'juno_fgm_orb60', 'juno_jedi_i090', 'juno_jedi_i180', 'juno_jedi_i270', 'juno_jedi_e270', 'juno_jedi_e180', 'juno_jedi_e090', 'juno_fgm_cruise60', 'ros_magob_1s', ] def datetime_to_ddtime(date_time): return "%04d%03d%02d%02d%02d%03d" % (date_time.year, int(date_time.timetuple().tm_yday)-1, date_time.hour, date_time.minute, date_time.second, date_time.microsecond/1000.) def is_sorted(l): return all(a <= b for a, b in zip(l, l[1:])) def check_vi(e, cachedir): name = e.find("NAME").text logging.info('========== {} =========='.format(name)) base = e.find("NAME").attrib['base'] if base != 'LOCAL': return True location = e.find("LOCATION").text times = e.find("TIMES").text info = e.find("INFO").text cache = e.find("CACHE").text if name == 'iball_acc_all': # specific VI used to manage users return True cachefile = os.path.join(cachedir, name) if os.path.isfile(cachefile): with open(cachefile, 'rb') as handle: cache_check = pickle.load(handle) else: cache_check = {} if not os.path.isdir(location): logging.error('{} not exists'.format(location)) return False times_path = os.path.join(location, times) info_path = os.path.join(location, info) cache_path = os.path.join(location, cache) for f in [times_path, info_path, cache_path]: if not os.path.isfile(f): logging.error('{} not exists'.format(f)) return False ds = nc.Dataset(times_path) for v in ['StartTime', 'StopTime', 'FileName']: if v not in ds.variables: logging.error('Missing {} variable in times file'.format(v)) return False start_times = [] for st in np.array(ds.variables["StartTime"]): try: st_str = "".join([k.decode("UTF-8") for k in st]) start_times.append(int(st_str)) except: logging.error('Cannot parse StartTime in times file') return False stop_times = [] for et in np.array(ds.variables["StopTime"]): try: et_str = "".join([k.decode("UTF-8") for k in et]) stop_times.append(int(et_str)) except: logging.error('Cannot parse StopTime in times file') return False files_names = [] for fn in np.array(ds.variables["FileName"]): try: fn_str = "".join([k.decode("UTF-8") for k in fn]) files_names.append(fn_str) except: logging.error('Cannot parse FileName in times file') return False if len(start_times) != len(stop_times) or len(start_times) != len(files_names): logging.error('Incoherence between variables size in times file') return False if len(start_times) == 0: logging.warning('Dataset is empty') return True prev = None for d in start_times: if prev: if d < prev: logging.warning("Previous start time is higher {}".format(d)) prev = d prev = None for d in stop_times: if prev: if d < prev: logging.warning("Previous stop time is higher {}".format(d)) prev = d for i in range(len(start_times)): if int(start_times[i]) > int(stop_times[i]): logging.warning("Start time is higher than Stop time {} - {}".format(start_times[i], stop_times[i])) if os.path.isfile(os.path.join(location,'LOCK')): logging.warning("LOCK file detected") for i in range(len(files_names)): f = files_names[i] start = start_times[i] stop = stop_times[i] gzipped_f = os.path.join(location, f) + ".gz" if f in cache_check: if cache_check[f]['status'] and os.path.getmtime(gzipped_f) == cache_check[f]['mtime']: continue logging.info(f) if not os.path.isfile(gzipped_f): logging.error("Missing data file {}".format(gzipped_f)) continue try: cache_check[f] = { 'status': True, 'mtime': os.path.getmtime(gzipped_f) } gf = gzip.open(gzipped_f, 'rb') ncdata = gf.read() dds = nc.Dataset("in-mem-file", mode='r', memory=ncdata) except: logging.error("Cannot load or extract data file {}".format(gzipped_f)) cache_check[f]['status'] = False continue for v in ['Time', 'StartTime', 'StopTime']: if v not in dds.variables: logging.error("Missing {} variable in data file {}".format(v, gzipped_f)) cache_check[f]['status'] = False continue times = [] for t in np.array(dds.variables["Time"]): if dds['Time'].dtype == np.float64: # TimeStamp t_str = datetime_to_ddtime(datetime.utcfromtimestamp(t)) else: # DDTime t_str = "".join([k.decode("UTF-8") for k in t]) times.append(int(t_str)) if len(times) != len(set(times)): logging.warning("Duplicate times in {}".format(gzipped_f)) cache_check[f]['status'] = False if not is_sorted(times): logging.warning("Time not sorted in {}".format(gzipped_f)) cache_check[f]['status'] = False for t in times: if t < start or t > stop: logging.warning("Time outside [StartTime, StopTime] detected in {}".format(gzipped_f)) cache_check[f]['status'] = False break with open(cachefile, 'wb') as handle: pickle.dump(cache_check, handle, protocol=pickle.HIGHEST_PROTOCOL) return True def check_ddbase(ddsys, vi=None, cachedir=None): tree=etree.parse(ddsys) for e in tree.iter(tag="VI"): if vi and e.find("NAME").text != vi: continue if not vi and e.find("NAME").text in vi_to_exclude: continue check_vi(e, cachedir) if __name__ == '__main__': logging.basicConfig(format='[%(levelname)s] %(message)s', level=logging.INFO) ddbase = '/data1/DDBASE' ddsys = os.path.join(ddbase, 'DATA', 'DDsys.xml') cachedir = os.path.join(os.path.dirname(__file__), '.cache') if not os.path.exists(cachedir): os.makedirs(cachedir) vi = None if len(sys.argv) > 1: vi = sys.argv[1] check_ddbase(ddsys, vi=vi, cachedir=cachedir)