checkDDBase.py 6.65 KB
import os
import sys
import logging
from lxml import etree
import netCDF4 as nc
import numpy as np
import gzip
import pickle
from datetime import datetime

vi_to_exclude = [
    'juno_ephem_orb1',
    'juno_fgm_orbfull',
    'juno_fgm_orb1',
    'juno_fgm_orb60',
    'juno_jedi_i090',
    'juno_jedi_i180',
    'juno_jedi_i270',
    'juno_jedi_e270',
    'juno_jedi_e180',
    'juno_jedi_e090',
    'juno_fgm_cruise60',
    'ros_magob_1s',
]

def datetime_to_ddtime(date_time):
    return "%04d%03d%02d%02d%02d%03d" % (date_time.year, int(date_time.timetuple().tm_yday)-1, date_time.hour, date_time.minute, date_time.second, date_time.microsecond/1000.)


def is_sorted(l):
    return all(a <= b for a, b in zip(l, l[1:]))


def check_vi(e, cachedir):
    name = e.find("NAME").text
    logging.info('========== {} =========='.format(name))
    base = e.find("NAME").attrib['base']
    if base != 'LOCAL':
        return True
    location = e.find("LOCATION").text
    times = e.find("TIMES").text
    info = e.find("INFO").text
    cache = e.find("CACHE").text

    if name == 'iball_acc_all':
        # specific VI used to manage users
        return True

    cachefile = os.path.join(cachedir, name)
    if os.path.isfile(cachefile):
        with open(cachefile, 'rb') as handle:
            cache_check = pickle.load(handle)
    else:
        cache_check = {}

    if not os.path.isdir(location):
        logging.error('{} not exists'.format(location))
        return False

    times_path = os.path.join(location, times)
    info_path = os.path.join(location, info)
    cache_path = os.path.join(location, cache)

    for f in [times_path, info_path, cache_path]:
        if not os.path.isfile(f):
            logging.error('{} not exists'.format(f))
            return False

    ds = nc.Dataset(times_path)

    for v in ['StartTime', 'StopTime', 'FileName']:
        if v not in ds.variables:
            logging.error('Missing {} variable in times file'.format(v))
            return False

    start_times = []
    for st in np.array(ds.variables["StartTime"]):
        try:
            st_str = "".join([k.decode("UTF-8") for k in st])
            start_times.append(int(st_str))
        except:
            logging.error('Cannot parse StartTime in times file')
            return False

    stop_times = []
    for et in np.array(ds.variables["StopTime"]):
        try:
            et_str = "".join([k.decode("UTF-8") for k in et])
            stop_times.append(int(et_str))
        except:
            logging.error('Cannot parse StopTime in times file')
            return False

    files_names = []
    for fn in np.array(ds.variables["FileName"]):
        try:
            fn_str = "".join([k.decode("UTF-8") for k in fn])
            files_names.append(fn_str)
        except:
            logging.error('Cannot parse FileName in times file')
            return False

    if len(start_times) != len(stop_times) or len(start_times) != len(files_names):
        logging.error('Incoherence between variables size in times file')
        return False

    if len(start_times) == 0:
        logging.warning('Dataset is empty')
        return True

    prev = None
    for d in start_times:
        if prev:
            if d < prev:
                logging.warning("Previous start time is higher {}".format(d))
        prev = d

    prev = None
    for d in stop_times:
        if prev:
            if d < prev:
                logging.warning("Previous stop time is higher {}".format(d))
        prev = d

    for i in range(len(start_times)):
        if int(start_times[i]) > int(stop_times[i]):
            logging.warning("Start time is higher than Stop time {} - {}".format(start_times[i], stop_times[i]))

    if os.path.isfile(os.path.join(location,'LOCK')):
        logging.warning("LOCK file detected")

    for i in range(len(files_names)):
        f = files_names[i]
        start = start_times[i]
        stop = stop_times[i]

        gzipped_f = os.path.join(location, f) + ".gz"

        if f in cache_check:
            if cache_check[f]['status'] and os.path.getmtime(gzipped_f) == cache_check[f]['mtime']:
                continue

        logging.info(f)

        if not os.path.isfile(gzipped_f):
            logging.error("Missing data file {}".format(gzipped_f))
            continue

        try:
            cache_check[f] = {
                'status': True,
                'mtime': os.path.getmtime(gzipped_f)
            }

            gf = gzip.open(gzipped_f, 'rb')
            ncdata = gf.read()

            dds = nc.Dataset("in-mem-file", mode='r', memory=ncdata)
        except:
            logging.error("Cannot load or extract data file {}".format(gzipped_f))
            cache_check[f]['status'] = False
            continue

        for v in ['Time', 'StartTime', 'StopTime']:
            if v not in dds.variables:
                logging.error("Missing {} variable in data file {}".format(v, gzipped_f))
                cache_check[f]['status'] = False
                continue

        times = []
        for t in np.array(dds.variables["Time"]):
            if dds['Time'].dtype == np.float64:
                # TimeStamp
                t_str = datetime_to_ddtime(datetime.utcfromtimestamp(t))
            else:
                # DDTime
                t_str = "".join([k.decode("UTF-8") for k in t])
            times.append(int(t_str))

        if len(times) != len(set(times)):
            logging.warning("Duplicate times in {}".format(gzipped_f))
            cache_check[f]['status'] = False

        if not is_sorted(times):
            logging.warning("Time not sorted in {}".format(gzipped_f))
            cache_check[f]['status'] = False

        for t in times:
            if t < start or t > stop:
                logging.warning("Time outside [StartTime, StopTime] detected in {}".format(gzipped_f))
                cache_check[f]['status'] = False
                break

    with open(cachefile, 'wb') as handle:
        pickle.dump(cache_check, handle, protocol=pickle.HIGHEST_PROTOCOL)

    return True


def check_ddbase(ddsys, vi=None, cachedir=None):
    tree=etree.parse(ddsys)
    for e in tree.iter(tag="VI"):
        if vi and e.find("NAME").text != vi:
            continue
        if not vi and e.find("NAME").text in vi_to_exclude:
            continue
        check_vi(e, cachedir)


if __name__ == '__main__':
    logging.basicConfig(format='[%(levelname)s] %(message)s', level=logging.INFO)
    ddbase = '/data1/DDBASE'
    ddsys = os.path.join(ddbase, 'DATA', 'DDsys.xml')
    cachedir = os.path.join(os.path.dirname(__file__), '.cache')
    if not os.path.exists(cachedir):
        os.makedirs(cachedir)
    vi = None
    if len(sys.argv) > 1:
        vi = sys.argv[1]
    check_ddbase(ddsys, vi=vi, cachedir=cachedir)