You need to sign in before continuing.
checkDDBase.py 9.65 KB
import os
import sys
import logging
from lxml import etree
import netCDF4 as nc
import numpy as np
import gzip
import pickle
import glob
from datetime import datetime

vi_to_exclude = [
    #'juno_fgm_orbfull',
    'juno_jedi_i090',
    'juno_jedi_i180',
    'juno_jedi_i270',
    'juno_jedi_e270',
    'juno_jedi_e180',
    'juno_jedi_e090',
    'ros_magob_1s',
    'juno_fgm_orb1',
    'gtl_epic_ed'
]

def datetime_to_ddtime(date_time):
    return "%04d%03d%02d%02d%02d%03d" % (date_time.year, int(date_time.timetuple().tm_yday)-1, date_time.hour, date_time.minute, date_time.second, date_time.microsecond/1000.)


def is_sorted(l):
    return all(a <= b for a, b in zip(l, l[1:]))

def diff_dicts(a, b, drop_similar=True):
    res = a.copy()

    for k in res:
        if k not in b:
            res[k] = (res[k], None)

    for k in b:
        if k in res:
            res[k] = (res[k], b[k])
        else:
            res[k] = (None, b[k])

    if drop_similar:
        res = {k:v for k,v in res.items() if v[0] != v[1]}

    return res

def check_vi(e, cachedir, clean):
    name = e.find("NAME").text
    logging.info('========== {} =========='.format(name))
    base = e.find("NAME").attrib['base']
    if base != 'LOCAL':
        return True
    location = e.find("LOCATION").text
    times = e.find("TIMES").text
    info = e.find("INFO").text
    cache = e.find("CACHE").text

    if name == 'iball_acc_all':
        # specific VI used to manage users
        return True

    cachefile = os.path.join(cachedir, name)
    if os.path.isfile(cachefile):
        with open(cachefile, 'rb') as handle:
            cache_check = pickle.load(handle)
    else:
        cache_check = {}

    if not os.path.isdir(location):
        logging.error('{} not exists'.format(location))
        return False

    times_path = os.path.join(location, times)
    info_path = os.path.join(location, info)
    cache_path = os.path.join(location, cache)

    for f in [times_path, info_path, cache_path]:
        if not os.path.isfile(f):
            logging.error('{} not exists'.format(f))
            return False

    ds = nc.Dataset(times_path)

    for v in ['StartTime', 'StopTime', 'FileName']:
        if v not in ds.variables:
            logging.error('Missing {} variable in times file'.format(v))
            return False

    start_times = []
    for st in np.array(ds.variables["StartTime"]):
        try:
            st_str = "".join([k.decode("UTF-8") for k in st])
            start_times.append(int(st_str))
        except:
            logging.error('Cannot parse StartTime in times file')
            return False

    stop_times = []
    for et in np.array(ds.variables["StopTime"]):
        try:
            et_str = "".join([k.decode("UTF-8") for k in et])
            stop_times.append(int(et_str))
        except:
            logging.error('Cannot parse StopTime in times file')
            return False

    files_names = []
    for fn in np.array(ds.variables["FileName"]):
        try:
            fn_str = "".join([k.decode("UTF-8") for k in fn])
            files_names.append(fn_str)
        except:
            logging.error('Cannot parse FileName in times file')
            return False

    if len(start_times) != len(stop_times) or len(start_times) != len(files_names):
        logging.error('Incoherence between variables size in times file')
        return False

    if len(start_times) == 0:
        logging.warning('Dataset is empty')
        return True

    prev = None
    for d in start_times:
        if prev:
            if d < prev:
                logging.warning("Previous start time is higher {}".format(d))
        prev = d

    prev = None
    for d in stop_times:
        if prev:
            if d < prev:
                logging.warning("Previous stop time is higher {}".format(d))
        prev = d

    for i in range(len(start_times)):
        if int(start_times[i]) > int(stop_times[i]):
            logging.warning("Start time is higher than Stop time {} - {}".format(start_times[i], stop_times[i]))

    if os.path.isfile(os.path.join(location,'LOCK')):
        logging.warning("LOCK file detected")

    files_in_dir = []
    for f in glob.glob(location+'*.nc.gz'):
        files_in_dir.append(f)

    first_file_structure = None
    for i in range(len(files_names)):
        f = files_names[i]
        start = start_times[i]
        stop = stop_times[i]

        gzipped_f = os.path.join(location, f) + ".gz"

        if gzipped_f in files_in_dir:
            files_in_dir.remove(gzipped_f)

        if not os.path.isfile(gzipped_f):
            logging.error("Missing data file {}".format(gzipped_f))
            continue

        if (f in cache_check) and (first_file_structure is not None): #always check first dataset file
            if cache_check[f]['status'] and os.path.getmtime(gzipped_f) == cache_check[f]['mtime']:
                continue

        logging.info(f)

        try:
            cache_check[f] = {
                'status': True,
                'mtime': os.path.getmtime(gzipped_f)
            }

            gf = gzip.open(gzipped_f, 'rb')
            ncdata = gf.read()

            dds = nc.Dataset("in-mem-file", mode='r', memory=ncdata)
        except:
            logging.error("Cannot load or extract data file {}".format(gzipped_f))
            cache_check[f]['status'] = False
            continue

        for v in ['Time', 'StartTime', 'StopTime']:
            if v not in dds.variables:
                logging.error("Missing {} variable in data file {}".format(v, gzipped_f))
                cache_check[f]['status'] = False
                continue

        crt_file_structure = {}
        for v in dds.variables:
            if v in ['Time', 'StartTime', 'StopTime']:
                continue
            if not dds.dimensions[dds.variables[v].dimensions[0]].isunlimited():
                continue
            crt_file_structure[v] = {"shape": dds.variables[v].shape[1:], "dtype": dds.variables[v].dtype}

        if first_file_structure is None:
            first_file_structure = crt_file_structure
        elif first_file_structure != crt_file_structure:
            logging.error("Incoherence in file structure {}".format(gzipped_f))
            print(first_file_structure)
            print(crt_file_structure)
            print(diff_dicts(first_file_structure, crt_file_structure))
            cache_check[f]['status'] = False
            continue

        times = []
        for t in np.array(dds.variables["Time"]):
            if dds['Time'].dtype == np.float64:
                # TimeStamp
                t_str = datetime_to_ddtime(datetime.utcfromtimestamp(t))
            else:
                # DDTime
                t_str = "".join([k.decode("UTF-8") for k in t])
            try:
                times.append(int(t_str))
            except:
                logging.error("Bad time format in data file {}".format(gzipped_f))
                cache_check[f]['status'] = False
                continue

        remove_duplicate = False
        if len(times) != len(set(times)):
            logging.warning("Duplicate times in {}".format(gzipped_f))
            #print(set([x for x in times if times.count(x) > 1]))
            cache_check[f]['status'] = False
            if clean:
                remove_duplicate = True

        sort_times = False
        if not is_sorted(times):
            prev = 0.
            for t in times:
                if prev > t:
                    print(t)
                prev = t
            logging.warning("Time not sorted in {}".format(gzipped_f))
            cache_check[f]['status'] = False
            if clean:
                sort_times = True

        remove_outside_times = False
        for t in times:
            if t < start or t > stop:
                logging.warning("Time {} outside [{}, {}] detected in {}".format(t, start, stop, gzipped_f))
                cache_check[f]['status'] = False
                if clean:
                    remove_outside_times = True
                break

        dds.close()
        if clean and (remove_duplicate or sort_times or remove_outside_times):
            clean_f = os.path.join("./", "clean_"+f)
            logging.warning(clean_f)
            with open(clean_f, "wb") as clean_nc_file:
                clean_nc_file.write(ncdata)
            dds_clean = nc.Dataset(clean_f, mode='a')
            clean_data_file(dds_clean, remove_duplicate, sort_times, remove_outside_times)
            dds_clean.close()

    with open(cachefile, 'wb') as handle:
        pickle.dump(cache_check, handle, protocol=pickle.HIGHEST_PROTOCOL)

    if files_in_dir:
        for f in files_in_dir:
            logging.warning("File {} in {} but not in *_times.nc file".format(f, location))

    return True


def clean_data_file(dds_clean, remove_duplicate, sort_times, remove_outside_times):
    records_to_remove = []
    if remove_outside_times:
        pass
    if remove_duplicate:
        pass
    if sort_times:
        pass


def check_ddbase(ddsys, vi=None, cachedir=None, clean=False):
    tree=etree.parse(ddsys)
    for e in tree.iter(tag="VI"):
        if vi and e.find("NAME").text != vi:
            continue
        if not vi and e.find("NAME").text in vi_to_exclude:
            continue
        check_vi(e, cachedir, clean)


if __name__ == '__main__':
    logging.basicConfig(format='[%(levelname)s] %(message)s', level=logging.INFO)
    ddbase = '/data1/DDBASE'
    ddsys = os.path.join(ddbase, 'DATA', 'DDsys.xml')
    cachedir = os.path.join(os.path.dirname(__file__), '.cache')
    if not os.path.exists(cachedir):
        os.makedirs(cachedir)
    vi = None
    clean = False
    if len(sys.argv) > 1:
        vi = sys.argv[1]
        if (len(sys.argv) > 2) and (sys.argv[2] == "clean"):
            clean = True
    check_ddbase(ddsys, vi=vi, cachedir=cachedir, clean=clean)