checkDDBase.py 9.65 KB
import os
import sys
import logging
from lxml import etree
import netCDF4 as nc
import numpy as np
import gzip
import pickle
import glob
from datetime import datetime

vi_to_exclude = [
    #'juno_fgm_orbfull',
    'juno_jedi_i090',
    'juno_jedi_i180',
    'juno_jedi_i270',
    'juno_jedi_e270',
    'juno_jedi_e180',
    'juno_jedi_e090',
    'ros_magob_1s',
    'juno_fgm_orb1',
    'gtl_epic_ed'
]

def datetime_to_ddtime(date_time):
    return "%04d%03d%02d%02d%02d%03d" % (date_time.year, int(date_time.timetuple().tm_yday)-1, date_time.hour, date_time.minute, date_time.second, date_time.microsecond/1000.)


def is_sorted(l):
    return all(a <= b for a, b in zip(l, l[1:]))

def diff_dicts(a, b, drop_similar=True):
    res = a.copy()

    for k in res:
        if k not in b:
            res[k] = (res[k], None)

    for k in b:
        if k in res:
            res[k] = (res[k], b[k])
        else:
            res[k] = (None, b[k])

    if drop_similar:
        res = {k:v for k,v in res.items() if v[0] != v[1]}

    return res

def check_vi(e, cachedir, clean):
    name = e.find("NAME").text
    logging.info('========== {} =========='.format(name))
    base = e.find("NAME").attrib['base']
    if base != 'LOCAL':
        return True
    location = e.find("LOCATION").text
    times = e.find("TIMES").text
    info = e.find("INFO").text
    cache = e.find("CACHE").text

    if name == 'iball_acc_all':
        # specific VI used to manage users
        return True

    cachefile = os.path.join(cachedir, name)
    if os.path.isfile(cachefile):
        with open(cachefile, 'rb') as handle:
            cache_check = pickle.load(handle)
    else:
        cache_check = {}

    if not os.path.isdir(location):
        logging.error('{} not exists'.format(location))
        return False

    times_path = os.path.join(location, times)
    info_path = os.path.join(location, info)
    cache_path = os.path.join(location, cache)

    for f in [times_path, info_path, cache_path]:
        if not os.path.isfile(f):
            logging.error('{} not exists'.format(f))
            return False

    ds = nc.Dataset(times_path)

    for v in ['StartTime', 'StopTime', 'FileName']:
        if v not in ds.variables:
            logging.error('Missing {} variable in times file'.format(v))
            return False

    start_times = []
    for st in np.array(ds.variables["StartTime"]):
        try:
            st_str = "".join([k.decode("UTF-8") for k in st])
            start_times.append(int(st_str))
        except:
            logging.error('Cannot parse StartTime in times file')
            return False

    stop_times = []
    for et in np.array(ds.variables["StopTime"]):
        try:
            et_str = "".join([k.decode("UTF-8") for k in et])
            stop_times.append(int(et_str))
        except:
            logging.error('Cannot parse StopTime in times file')
            return False

    files_names = []
    for fn in np.array(ds.variables["FileName"]):
        try:
            fn_str = "".join([k.decode("UTF-8") for k in fn])
            files_names.append(fn_str)
        except:
            logging.error('Cannot parse FileName in times file')
            return False

    if len(start_times) != len(stop_times) or len(start_times) != len(files_names):
        logging.error('Incoherence between variables size in times file')
        return False

    if len(start_times) == 0:
        logging.warning('Dataset is empty')
        return True

    prev = None
    for d in start_times:
        if prev:
            if d < prev:
                logging.warning("Previous start time is higher {}".format(d))
        prev = d

    prev = None
    for d in stop_times:
        if prev:
            if d < prev:
                logging.warning("Previous stop time is higher {}".format(d))
        prev = d

    for i in range(len(start_times)):
        if int(start_times[i]) > int(stop_times[i]):
            logging.warning("Start time is higher than Stop time {} - {}".format(start_times[i], stop_times[i]))

    if os.path.isfile(os.path.join(location,'LOCK')):
        logging.warning("LOCK file detected")

    files_in_dir = []
    for f in glob.glob(location+'*.nc.gz'):
        files_in_dir.append(f)

    first_file_structure = None
    for i in range(len(files_names)):
        f = files_names[i]
        start = start_times[i]
        stop = stop_times[i]

        gzipped_f = os.path.join(location, f) + ".gz"

        if gzipped_f in files_in_dir:
            files_in_dir.remove(gzipped_f)

        if not os.path.isfile(gzipped_f):
            logging.error("Missing data file {}".format(gzipped_f))
            continue

        if (f in cache_check) and (first_file_structure is not None): #always check first dataset file
            if cache_check[f]['status'] and os.path.getmtime(gzipped_f) == cache_check[f]['mtime']:
                continue

        logging.info(f)

        try:
            cache_check[f] = {
                'status': True,
                'mtime': os.path.getmtime(gzipped_f)
            }

            gf = gzip.open(gzipped_f, 'rb')
            ncdata = gf.read()

            dds = nc.Dataset("in-mem-file", mode='r', memory=ncdata)
        except:
            logging.error("Cannot load or extract data file {}".format(gzipped_f))
            cache_check[f]['status'] = False
            continue

        for v in ['Time', 'StartTime', 'StopTime']:
            if v not in dds.variables:
                logging.error("Missing {} variable in data file {}".format(v, gzipped_f))
                cache_check[f]['status'] = False
                continue

        crt_file_structure = {}
        for v in dds.variables:
            if v in ['Time', 'StartTime', 'StopTime']:
                continue
            if not dds.dimensions[dds.variables[v].dimensions[0]].isunlimited():
                continue
            crt_file_structure[v] = {"shape": dds.variables[v].shape[1:], "dtype": dds.variables[v].dtype}

        if first_file_structure is None:
            first_file_structure = crt_file_structure
        elif first_file_structure != crt_file_structure:
            logging.error("Incoherence in file structure {}".format(gzipped_f))
            print(first_file_structure)
            print(crt_file_structure)
            print(diff_dicts(first_file_structure, crt_file_structure))
            cache_check[f]['status'] = False
            continue

        times = []
        for t in np.array(dds.variables["Time"]):
            if dds['Time'].dtype == np.float64:
                # TimeStamp
                t_str = datetime_to_ddtime(datetime.utcfromtimestamp(t))
            else:
                # DDTime
                t_str = "".join([k.decode("UTF-8") for k in t])
            try:
                times.append(int(t_str))
            except:
                logging.error("Bad time format in data file {}".format(gzipped_f))
                cache_check[f]['status'] = False
                continue

        remove_duplicate = False
        if len(times) != len(set(times)):
            logging.warning("Duplicate times in {}".format(gzipped_f))
            #print(set([x for x in times if times.count(x) > 1]))
            cache_check[f]['status'] = False
            if clean:
                remove_duplicate = True

        sort_times = False
        if not is_sorted(times):
            prev = 0.
            for t in times:
                if prev > t:
                    print(t)
                prev = t
            logging.warning("Time not sorted in {}".format(gzipped_f))
            cache_check[f]['status'] = False
            if clean:
                sort_times = True

        remove_outside_times = False
        for t in times:
            if t < start or t > stop:
                logging.warning("Time {} outside [{}, {}] detected in {}".format(t, start, stop, gzipped_f))
                cache_check[f]['status'] = False
                if clean:
                    remove_outside_times = True
                break

        dds.close()
        if clean and (remove_duplicate or sort_times or remove_outside_times):
            clean_f = os.path.join("./", "clean_"+f)
            logging.warning(clean_f)
            with open(clean_f, "wb") as clean_nc_file:
                clean_nc_file.write(ncdata)
            dds_clean = nc.Dataset(clean_f, mode='a')
            clean_data_file(dds_clean, remove_duplicate, sort_times, remove_outside_times)
            dds_clean.close()

    with open(cachefile, 'wb') as handle:
        pickle.dump(cache_check, handle, protocol=pickle.HIGHEST_PROTOCOL)

    if files_in_dir:
        for f in files_in_dir:
            logging.warning("File {} in {} but not in *_times.nc file".format(f, location))

    return True


def clean_data_file(dds_clean, remove_duplicate, sort_times, remove_outside_times):
    records_to_remove = []
    if remove_outside_times:
        pass
    if remove_duplicate:
        pass
    if sort_times:
        pass


def check_ddbase(ddsys, vi=None, cachedir=None, clean=False):
    tree=etree.parse(ddsys)
    for e in tree.iter(tag="VI"):
        if vi and e.find("NAME").text != vi:
            continue
        if not vi and e.find("NAME").text in vi_to_exclude:
            continue
        check_vi(e, cachedir, clean)


if __name__ == '__main__':
    logging.basicConfig(format='[%(levelname)s] %(message)s', level=logging.INFO)
    ddbase = '/data1/DDBASE'
    ddsys = os.path.join(ddbase, 'DATA', 'DDsys.xml')
    cachedir = os.path.join(os.path.dirname(__file__), '.cache')
    if not os.path.exists(cachedir):
        os.makedirs(cachedir)
    vi = None
    clean = False
    if len(sys.argv) > 1:
        vi = sys.argv[1]
        if (len(sys.argv) > 2) and (sys.argv[2] == "clean"):
            clean = True
    check_ddbase(ddsys, vi=vi, cachedir=cachedir, clean=clean)