plc_checker.py 6.53 KB
import json
import logging
logger = logging.getLogger(__name__)
logger.setLevel('DEBUG')




class PlcChecker(object):

    def __init__(self):
        self.origin = {}
        self.origin["plc_origin"] = None
        self.origin["plc_site"] = None
        self.struct = None
        self.path_list = None
        # self.plc_global = copy.copy(self.struct["devices"])
        self.device_name = None
        self.device_type = None
        self.serial_number = None
        self.valid = None
        self.sensors_monitor = {
            "Temperature_outside": None,
            "Humidity_outside": None,
            "Rain_boolean": None,
            "Wind_direction": None,
            "Wind_speed": None,
            "Temperature_sky": None,
            "status": None,
            "current": None,
            "plc_mode": None,
            "LIGHTS": None,
            "SHUTTERS": None,
        }
        #self.sensors_conf = {}
        self.sensors_table = []
        self.load_config()

    def load_config(self):
        try:
            colibri_json = open("./monitoring/plc_config.json")
            _struct = json.load(colibri_json)
            self.origin = _struct["origin"]
            self.sensors_table = _struct["sensors"]
            self.sensors_monitor= _struct["selected_sensors"]
            logger.info("Loaded : plc_config.json")
            colibri_json.close
        except:
            logger.info("No plc_config.json")

    def save_config(self):
        #try:
            _struct = {}
            _struct["origin"] = self.origin
            _struct["sensors"] = self.sensors_table
            _struct["selected_sensors"] = self.sensors_monitor
            _file = open("./monitoring/plc_config.json" , 'w')
            _file.write(json.dumps(_struct))
            _file.close()
            logger.info("Saved : plc_config.json")
        #except:
            logger.info("Not saved plc_config.json")

    def init_sensors_monitor(self):
        for sensor in self.sensors_monitor:
            self.sensors_monitor[sensor] = None

    def chk_config(self, status_plc):
        try:
            struct = json.loads(status_plc)
        except:
            logger.info("Error loading plc json")
        if isinstance(struct, list):
            self.struct = struct[0]
        else:
            self.struct = struct
        if not self.same_origin():
                self.scan_sensors()
                self.save_config()
        return True

    def same_origin(self):
        # check new origin
        print('?')
        if self.struct["origin"] == self.origin["plc_origin"] and self.struct["site"] == self.origin["plc_site"]:
            return True
        else:
            # update origin
            self.origin["plc_origin"] = self.struct["origin"]
            self.origin["plc_site"] = self.struct["site"]
            # init sensors_monitor
            self.init_sensors_monitor()
            return False

    def known_sensor(self, name):
        known = None
        if name in self.sensors_monitor.keys():
            known = name
        return known

    def monitored_sensor(self, name, _sensor_id):
        selected = ""
        if self.sensors_monitor[name] == None:
           self.sensors_monitor[name] = _sensor_id
           selected = "checked"
        return selected

    def get_key(self, struct, key):
        try:
            ret = struct[key]
        except:
            ret = None
        return ret

    def get_sensor(self, monitored):
        #_id: 'DHT22:/MiFe_DHT1_1:/Humidity'
        _id = self.sensors_monitor[monitored]
        _value = None
        if _id != None:
            sp_id = _id.split(":/")
            for device in self.struct["devices"]:
                if device["device_name"] == sp_id[0] and device["serial_number"] == sp_id[1]:
                    for device_values in device["device_values"]:
                        if device_values["name"] == sp_id[2]:
                            _value = device_values["value"]
        return _value


    def scan_sensors(self):
        # return sensors table from struct
        self.sensors_table = []
        try:
            logger.debug(self.struct["date"])
        except Exception as e:
            logger.debug("No date")

        for device in self.struct["devices"]:
            self.device_name = self.get_key(device, "device_name")
            self.device_type = self.get_key(device, "device_type")
            self.serial_number = self.get_key(device, "serial_number")
            self.valid = True if self.get_key(device, "error_code") == '0' else False
            for device_values in device["device_values"]:
                sensor = []
                sensor.append(self.device_name)
                sensor.append(self.device_type)
                sensor.append(self.serial_number)
                sensor.append(self.valid)
                _name = self.get_key(device_values, "name")
                sensor.append(_name)
                _monitoring_name = self.known_sensor(self.get_key(device_values, "monitoring_name"))
                _sensor_id = self.device_name + ":/" + self.serial_number + ":/" + _name
                _selected = self.monitored_sensor(_monitoring_name, _sensor_id) if (self.valid and _monitoring_name) else ""
                sensor.append(_monitoring_name)
                sensor.append(self.get_key(device_values, "type"))
                sensor.append(_selected)
                sensor.append(_sensor_id)
                self.sensors_table.append(sensor)


    # deprecated find sensor list in struct
    """
    def find_sensors(self):
        try:
            logger.debug(self.struct["date"])
        except Exception as e:
            logger.debug("No date")
        for sensor_type in self.sensors:
            self.sensors_conf[sensor_type] = {}
            logger.debug(sensor_type + ":")
            try:
                self.cross_devices_list(sensor_type)
            except Exception as e:
                logger.debug(sensor_type + " error")

    def cross_devices_list(self, sensor_type):
        """"""
        for device in self.struct["devices"]:
            if device["device_name"] != "GLOBAL" and device["device_type"] == "meteo" and device["valid"] == "yes":
                self.cros_sensor_list(device, sensor_type)

    def cros_sensor_list(self, device, sensor_type):
        device_id = device["device_name"] + "/" + device["serial_number"]
        for sensor in device["device_values"]:
            if sensor["name"] == sensor_type:
                self.sensors_conf[sensor_type][device_id] = sensor["value"]
                logger.debug("-- " + device_id + " : " + str(sensor["value"]))
    """