#!/usr/bin/env python3 import pykwalify.core import yaml,sys,logging,os, pickle, time from pykwalify.errors import PyKwalifyException,SchemaError class ConfigPyros: # (AKo) : Config file path is checked on the settings file, if the file isn't valid (i.e not found) the error will be launched by the settings file when starting the application # read_files is an history of which config files has been read, where the key is the source file which is linked to a list of files associated to this key devices_links = {} current_file = None COMPONENT_PATH = "../../../../config/components/" GENERIC_DEVICES_PATH = "../../../../config/devices/" pickle_file = "obsconfig.p" obs_config = None devices = None computers = None agents = None def verify_if_pickle_needs_to_be_updated(self,observatory_config_file)->bool: """[summary] Args: observatory_config_file ([type]): [description] Returns: bool: [description] """ if os.path.isfile(self.pickle_file) == False: return True else: pickle_file_mtime = os.path.getmtime(self.pickle_file) obs_config_mtime = os.path.getmtime(observatory_config_file) if obs_config_mtime > pickle_file_mtime: return True obs_config = self.read_and_check_config_file(observatory_config_file) if obs_config is None: print(f"Error when trying to read config file (path of config file : {observatory_config_file}") return -1 self.obs_config = obs_config # check last date of modification for devices files for device in self.obs_config["OBSERVATORY"]["DEVICES"]: device_file = self.CONFIG_PATH+device["DEVICE"]["file"] device_file_mtime = os.path.getmtime(device_file) if device_file_mtime > pickle_file_mtime: return True for computer in self.obs_config["OBSERVATORY"]["COMPUTERS"]: computer_file = self.CONFIG_PATH+computer["COMPUTER"]["file"] computer_file_mtime = os.path.getmtime(computer_file) if computer_file_mtime > pickle_file_mtime: return True return False def load(self,observatory_config_file): does_pickle_needs_to_be_updated = self.verify_if_pickle_needs_to_be_updated(observatory_config_file) if os.path.isfile(self.pickle_file) and does_pickle_needs_to_be_updated == False: print("Reading pickle file") try: can_pickle_file_be_read = False while can_pickle_file_be_read != True: if os.access(self.pickle_file, os.R_OK): pickle_dict = pickle.load(open(self.pickle_file,"rb")) can_pickle_file_be_read = True else: time.sleep(0.5) except IOError: print("Error when reading the pickle file") self.obs_config = pickle_dict["obs_config"] self.computers = pickle_dict["computers"] self.devices = pickle_dict["devices"] self.devices_links = pickle_dict["devices_links"] else: print("Pickle file needs to be created or updated") pickle_dict = {} self.obs_config = self.read_and_check_config_file(observatory_config_file) pickle_dict["obs_config"] = self.obs_config pickle_dict["devices"] = self.get_devices() pickle_dict["computers"] = self.get_computers() pickle_dict["devices_links"] = self.devices_links print("Writing pickle file") pickle.dump(pickle_dict,open(self.pickle_file,"wb")) def check_and_return_config(self,yaml_file:str,schema_file:str)->dict: """ Check if yaml_file is valid for the schema_file and return an dictionary of the config file Args: yaml_file (str): Path to the config_file to be validated schema_file (str): Path to the schema file Returns: dict: dictionary of the config file (with values) """ # disable pykwalify error to clean the output logging.disable(logging.ERROR) try: can_yaml_file_be_read = False while can_yaml_file_be_read != True: if os.access(yaml_file, os.R_OK): can_yaml_file_be_read = True else: print(f"{yaml_file} can't be accessed, waiting for availability") time.sleep(0.5) c = pykwalify.core.Core(source_file=yaml_file, schema_files=[schema_file]) return c.validate(raise_exception=True) except SchemaError: for error in c.errors: print("Error :",str(error).split(". Path")[0]) print("Path to error :",error.path) return None except IOError: print("Error when reading the observatory config file") def read_and_check_config_file(self,yaml_file:str)->dict: """ Read the schema key of the config file to retrieve schema name and proceed to the checking of that config file Call check_and_return_config function and print its return. Args: yaml_file (str): path to the config file Returns: dict: Dictionary of the config file (with values) """ self.current_file = yaml_file try: can_config_file_be_read = False while can_config_file_be_read != True: if os.access(yaml_file, os.R_OK): can_config_file_be_read = True else: print(f"{yaml_file} can't be accessed, waiting for availability") time.sleep(0.5) with open(yaml_file, 'r') as stream: print(f"Reading {yaml_file}") config_file = yaml.safe_load(stream) self.SCHEMA_PATH = os.path.join(os.path.abspath(os.path.dirname(yaml_file)),"../../../config/schemas/") self.CONFIG_PATH = os.path.dirname(yaml_file)+"/" self.COMPONENT_PATH = os.path.join(os.path.abspath(os.path.dirname(yaml_file)),"../../../config/components/") self.GENERIC_DEVICES_PATH = os.path.join(os.path.abspath(os.path.dirname(yaml_file)),"../../../config/devices/") result = self.check_and_return_config(yaml_file,self.SCHEMA_PATH+config_file["schema"]) if result is None: print("Error when reading and validating config file, please check the errors right above") return result except yaml.YAMLError as exc: print(exc) except Exception as e: print(e) return None def read_generic_component_and_return_attributes(self,component_name:str)->dict: file_path = self.COMPONENT_PATH + component_name + ".yml" try: with open(file_path, 'r') as stream: config_file = yaml.safe_load(stream) attributes = {} for attribute in config_file: attribute = attribute["attribute"] attributes[attribute.pop("key")] = attribute return attributes except yaml.YAMLError as exc: print(exc) except Exception as e: print(e) return None def read_capability_of_device(self,capability:dict)->dict: """ Args: dict ([type]): [description] Returns: dict: [description] """ component_attributes = self.read_generic_component_and_return_attributes(capability["component"]) attributes = {} for attribute in capability["attributes"]: attribute = attribute["attribute"] attributes[attribute.pop("key")] = attribute for attribute_name in attributes.keys(): # merging values from general component to specific values of component in config file if "is_Enum" in component_attributes[attribute_name].keys(): # check if value of this attribute is in Enum of general component attribute definition enum_values = component_attributes[attribute_name]["value"] if attributes[attribute_name]["value"][0] in enum_values: # merge attributes of general component with specified component in device config file new_attributes = {**component_attributes[attribute_name],**attributes[attribute_name]} else: print(f"Error in file {self.current_file} : attribute \"{attribute_name}\" isn't in enum. Accepted values for this attribute : \"{enum_values}\"") exit(1) # merge attributes of general component with specified component in device config file new_attributes = {**component_attributes[attribute_name],**attributes[attribute_name]} component_attributes[attribute_name] = new_attributes capability["attributes"] = component_attributes return capability def get_devices_names_and_file(self)->dict: """ Returns: dict: [description] """ devices_names_and_files = {} for device in self.obs_config["OBSERVATORY"]["DEVICES"]: device = device["DEVICE"] devices_names_and_files[device["name"]] = device["file"] return devices_names_and_files def read_device_config_file(self,config_file_name:str,is_generic=False)->dict: """ Args: config_file_name (str): [description] Returns: dict: [description] """ self.current_file = config_file_name print(f"Reading {config_file_name}") try: with open(config_file_name, 'r') as stream: config_file = yaml.safe_load(stream) if is_generic: self.SCHEMA_PATH = os.path.join(os.path.abspath(os.path.dirname(config_file_name)),"../schemas/") else: self.SCHEMA_PATH = os.path.join(os.path.abspath(os.path.dirname(config_file_name)),"../../../config/schemas/") result = self.check_and_return_config(config_file_name,self.SCHEMA_PATH+config_file["schema"]) if result is None: print("Error when reading and validating config file, please check the errors right above") else: device = result["DEVICE"] if "generic" in device: current_config = result generic_device_config = self.read_device_config_file(self.GENERIC_DEVICES_PATH+device["generic"],True) new_config = {**generic_device_config["DEVICE"],**current_config["DEVICE"]} result["DEVICE"] = new_config if "CAPABILITIES" in device: capabilities = [] for capability in device["CAPABILITIES"]: capability = capability["CAPABILITY"] capabilities.append(self.read_capability_of_device(capability)) device["CAPABILITIES"] = capabilities if "ATTACHED_DEVICES" in device.keys(): devices_name_and_file = self.get_devices_names_and_file() active_devices = self.get_active_devices() for attached_device in device["ATTACHED_DEVICES"]: is_attached_device_link_to_agent = False for active_device in active_devices: if devices_name_and_file[active_device] == attached_device["file"]: is_attached_device_link_to_agent = True break if self.CONFIG_PATH+attached_device["file"] != config_file_name and not is_attached_device_link_to_agent: config_of_attached_device = self.read_device_config_file(self.CONFIG_PATH+attached_device["file"]) capabilities_of_attached_device = None if "CAPABILITIES" in config_of_attached_device["DEVICE"].keys(): capabilities_of_attached_device = config_of_attached_device["DEVICE"]["CAPABILITIES"] if capabilities_of_attached_device is not None: # get name of device corresponding to the config file name parent_device_name = [device for device,file_name in devices_name_and_file.items() if file_name == config_file_name[len(self.CONFIG_PATH):] ][0] attached_device_name = [device for device,file_name in devices_name_and_file.items() if file_name == attached_device["file"]][0] self.devices_links[attached_device_name] = parent_device_name for capability in capabilities_of_attached_device: result["DEVICE"]["CAPABILITIES"].append(capability) return result except yaml.YAMLError as exc: print(exc) except Exception as e: print(e) return None def __init__(self,observatory_config_file:str) -> None: """ Initiate class with the config file set content attribute to a dictionary containing all values from the config file Args: config_file_name (str): path to the config file """ self.load(observatory_config_file) def get_obs_name(self)->str: """ Return name of the observatory Returns: str: Name of the observatory """ return self.obs_config["OBSERVATORY"]["name"] def get_channels(self,unit)->dict: """ return dictionary of channels Returns: dict: [description] """ channels = {} for channel_id in range(len(unit["TOPOLOGY"]["CHANNELS"])): channel = unit["TOPOLOGY"]["CHANNELS"][channel_id]["CHANNEL"] channels[channel["name"]] = channel return channels def get_computers(self)->dict: """ return dictionary of computers Returns: dict: [description] """ if self.computers != None: return self.computers else: computers = {} for computer_id in range(len(self.obs_config["OBSERVATORY"]["COMPUTERS"])): computer = self.obs_config["OBSERVATORY"]["COMPUTERS"][computer_id]["COMPUTER"] if( "file" in computer.keys() ): computer["computer_config"]= self.read_and_check_config_file(self.CONFIG_PATH+computer["file"])["COMPUTER"] computers[computer["name"]] = computer return computers def get_devices(self)->dict: """ return dictionary of devices Returns: dict: [description] """ if self.devices != None: return self.devices else: devices = {} for device_id in range(len(self.obs_config["OBSERVATORY"]["DEVICES"])): device = self.obs_config["OBSERVATORY"]["DEVICES"][device_id]["DEVICE"] if( "file" in device.keys() ): device["device_config"] = self.read_device_config_file(self.CONFIG_PATH+device["file"])["DEVICE"] devices[device["name"]] = device return devices def get_agents(self,unit)->dict: """ return dictionary of agents Returns: dict: dictionary of agents. For each agents tell the name, computer, device, protocole, etc... """ if self.agents != None: return self.agents else: agents = {} for agent_id in range(len(unit["AGENTS"])): # Agents is a list containing dictionary that have only one key key = list(unit["AGENTS"][agent_id].keys())[0] agent = unit["AGENTS"][agent_id][key] agents[agent["name"]] = agent return agents def get_channel_groups(self,unit:dict)->dict: """ Return dictionary of channel groups, tell the logic between groups of channels and within a group of channels Args: unit (dict): dictonary contaning all values of a unit Returns: dict: dictionary of channel groups (tell the logic and groups of channels) """ info = {} info["global_groups_logic"] = unit["TOPOLOGY"]["CHANNEL_GROUPS"]["logic"] info["groups"] = {} for group_id in range(len(unit["TOPOLOGY"]["CHANNEL_GROUPS"]["GROUPS"])): group = unit["TOPOLOGY"]["CHANNEL_GROUPS"]["GROUPS"][group_id]["GROUP"] info["groups"][group_id] = group return info def get_channel_information(self,unit:dict,channel_name:str)->dict: """ Return information of the given channel name of a unit Args: unit (dict): dictionary representing the unit channel_name (str): name of the channel Returns: dict: dictionary containing all values that define this channel """ channels = self.get_channels(unit) return channels[channel_name] def get_topology(self,unit:dict)->dict: """ Return dictionary of the topology of the observatory Args: unit (dict): dictionary representing the unit Returns: dict: dictionary representing the topology of an unit (security, mount, channels, channel_groups) """ topology = {} for key in unit["TOPOLOGY"].keys(): branch = unit["TOPOLOGY"][key] if key == "CHANNELS": topology[key] = self.get_channels(unit) elif key == "CHANNEL_GROUPS": topology[key] = self.get_channel_groups(unit) else: topology[key] = branch return topology def get_active_agents(self,unit:dict)->list: """ Return the list of active agents (i.e. agents that have an association with a device) Args: unit (dict): dictionary representing the unit Returns: list: kist of the name of active agents """ return list(self.get_agents(unit).keys()) def get_units(self)->dict: """ Return all units sort by name defined in the config file Returns: dict: dictionary giving for a unit_name, his content (name,database,topology,agents,...) """ result = {} units = self.obs_config["OBSERVATORY"]["UNITS"] for unit in units: unit = unit["UNIT"] result[unit["name"]] = unit return result def get_components_agents(self,unit:dict)->dict: """ Return dictionary of component_agents of the given unit Args: unit (dict): dictionary representing the unit Returns: dict: dictionary sort by component name giving the associated agent (agent name) """ components_agents = {} topology = self.get_topology(unit) for element in topology: if element in ("SECURITY","MOUNT","CHANNELS"): if(element != "CHANNELS"): for component_agent in topology[element]["COMPONENT_AGENTS"]: component_name = list(component_agent.keys())[0] components_agents[component_name] = component_agent[component_name] else: for channel in topology[element]: for component_agent in topology[element][channel]["COMPONENT_AGENTS"]: component_name = list(component_agent.keys())[0] components_agents[component_name] = component_agent[component_name] return components_agents def get_units_name(self)->list: """ Return list of units names Returns: [list]: names of units """ return list(self.get_units().keys()) def get_unit_by_name(self,name:str)->dict: """ Return dictionary containing definition of the unit that match the given name Args: name (str): name of the unit Returns: dict: dictonary representing the unit """ return self.get_units()[name] def get_agents_per_computer(self,unit:dict)->dict: """ Return dictionary that give for each computer, what are the associated agents to it as a list Args: unit (dict): dictonary representing the unit Returns: dict: dictionary that give for each computer, what are the associated agents to it as a list """ agents_per_computer = {} agents = self.get_agents(unit) for agent in agents: computer_name = agents[agent]["computer"] if(agents[agent]["computer"] not in agents_per_computer.keys()): agents_per_computer[computer_name] = [agent] else: agents_per_computer[computer_name].append(agent) return agents_per_computer def get_agents_per_device(self,unit:dict)->dict: """ Return dictionary that give for each device, what are the associated agents to it as a list Args: unit (dict): dictonary representing the unit Returns: dict: dictionary that give for each device, what are the associated agents to it as a list """ agents_per_device = {} agents = self.get_agents(unit) for agent in agents: if("device" in agents[agent].keys()): device_name = agents[agent]["device"] if(agents[agent]["device"] not in agents_per_device.keys()): agents_per_device[device_name] = [agent] else: agents_per_device[device_name].append(agent) return agents_per_device def get_active_devices(self)->list: """ Return a list of active device names Returns: list: list of active device names """ active_devices = [] for unit_name in self.get_units(): unit = self.get_unit_by_name(unit_name) for device in self.get_agents_per_device(unit): active_devices.append(device) return active_devices def get_active_computers(self)->list: """ Return a list of active computer names Returns: list: list of active computer names """ active_computers = [] for unit_name in self.get_units(): unit = self.get_unit_by_name(unit_name) for computer in self.get_agents_per_computer(unit): active_computers.append(computer) return active_computers def get_agent_information(self,unit:dict,agent_name:str)->dict: """ Give the dictionary of attributes of the agent for an unit. Args: unit (dict): dictonary representing the unit agent_name (str): agent name Returns: dict: dictionary containing attributes of the agent """ return self.get_agents(unit)[agent_name] def get_device_information(self,device_name:str)->dict: """ Give the dictionary of the attributes of the device Args: device_name (str): device name Returns: dict: dictionary containing attributes of the device """ return self.get_devices()[device_name] def get_database_for_unit(self,unit_name:str)->dict: """ Return dictionary of attributes of the database for an unit Args: unit_name (str): unit name Returns: dict: dictionary of attributes of the database for an unit """ return self.get_unit_by_name(unit_name)["DATABASE"] def get_device_for_agent(self,unit:dict,agent_name:str)->str: """ Return device name associated to the agent Args: unit (dict): dictonary representing the unit agent_name (str): agent name Returns: str: device name associated to this agent """ agents_per_device = self.get_agents_per_device(unit) for device in agents_per_device: if agent_name in agents_per_device[device]: return self.get_device_information(device) def get_unit_of_computer(self,computer_name:str)->str: """ Return the name of the unit where the computer is used Args: computer_name (str): computer name Returns: str: unit name """ for unit_name in self.get_units(): unit = self.get_unit_by_name(unit_name) if(computer_name in self.get_agents_per_computer(unit)): return unit_name def get_unit_of_device(self,device_name:str)->str: """ Return the name of the unit where the device is used Args: device_name (str): device name Returns: str: unit name """ for unit_name in self.get_units(): unit = self.get_unit_by_name(unit_name) if(device_name in self.get_agents_per_device(unit)): return unit_name def get_device_power(self,device_name:str)->dict: """ Return dictionary that contains informations about power if this information is present in the device config file Return None if this information isn't stored in device's config file Args: device_name (str): name of the device Returns: dict: informations about power of device """ return self.get_devices()[device_name]["device_config"].get("power") def get_device_capabilities(self,device_name:str)->list: """ Return dictionary that contains informations about capabilities if this information is present in the device config file Return empty list if this information isn't stored in device's config file Args: device_name (str): name of the device Returns: list: list of capabilities of device """ list_of_capabilities = [] capabilities = self.get_devices()[device_name]["device_config"].get("CAPABILITIES") return capabilities def get_device_connector(self,device_name:str)->dict: """ Return dictionary that contains informations about connector if this information is present in the device config file Return None if this information isn't stored in device's config file Args: device_name (str): name of the device Returns: dict: informations about connector of device """ return self.get_devices()[device_name]["device_config"].get("connector") def get_computer_power(self,computer_name:str)->dict: """ Return dictionary that contains informations about power if this information is present in the device config file Return None if this information isn't stored in device's config file Args: device_name (str): name of the device Returns: dict: informations about connector of device """ return self.get_computers()[computer_name]["computer_config"].get("power") def main(): config = ConfigPyros("../../../../privatedev/config/guitalens/observatory_guitalens.yml") print(config.get_devices()["FLI-Kepler4040"]["device_config"]) print(config.get_devices()["FLI-Kepler4040"]["device_config"]["CAPABILITIES"][1]["attributes"]["manufacturer"]) print(config.get_devices()["FLI-Kepler4040"]["device_config"]["CAPABILITIES"]) if __name__ == "__main__": main()