#!/usr/bin/env python3 import pykwalify.core import sys import yaml import logging import os import pickle import time import socket from datetime import datetime from pykwalify.errors import PyKwalifyException, SchemaError from pathlib import Path class OBSConfig: # (AKo) : Config file path is checked on the settings file, if the file isn't valid (i.e not found) the error will be launched by the settings file when starting the application devices_links = {} current_file = None #COMPONENT_PATH = os.path.join(os.environ["DJANGO_PATH"],"../../../config/components/") #GENERIC_DEVICES_PATH = os.path.join(os.environ["DJANGO_PATH"],"../../../config/devices/") pickle_file = "obsconfig.p" obs_config = None devices = None computers = None agents = None obs_config_file_content = None #obs_config_path = os.environ.get("PATH_TO_OBSCONF_FOLDER",os.path.join(os.environ["DJANGO_PATH"],"../../../privatedev/config/default/")) errors = None MANDATORY_AGENTS = { "AgentSP":None, "AgentScheduler":None, "AgentMajordome":None, "AgentM":None } def verify_if_pickle_needs_to_be_updated(self, observatory_config_file) -> bool: """ Args: observatory_config_file ([type]): [description] Returns: bool: [description] """ self.CONFIG_PATH = os.path.dirname(observatory_config_file)+"/" self.obs_config_path = self.CONFIG_PATH #self.CONFIG_PATH = self.obs_config_path if os.path.isfile(self.CONFIG_PATH+self.pickle_file) == False: return True else: pickle_file_mtime = os.path.getmtime( self.CONFIG_PATH+self.pickle_file) obs_config_mtime = os.path.getmtime(observatory_config_file) obs_config = self.read_and_check_config_file( observatory_config_file) if obs_config_mtime > pickle_file_mtime: # create obs file (yaml) from pickle["obsconfig"] with date of pickle within history folder-> nom ficher + année + mois + jour + datetime (avec secondes) -> YYYY/MM/DD H:m:s pickle_datetime = datetime.utcfromtimestamp( pickle_file_mtime).strftime("%Y%m%d_%H%M%S") # Create history folder if doesn't exist Path(self.obs_config_path + "/history/").mkdir(parents=True, exist_ok=True) file_name = f"{self.obs_config_path}/history/observatory_{pickle_datetime}.yml" config_file = open(observatory_config_file, "r") with open(file_name, 'w') as f: f.write(config_file.read()) return True if obs_config == None: print( f"Error when trying to read config file (path of config file : {observatory_config_file}") return -1 self.obs_config = obs_config # check last date of modification for devices files for device in self.obs_config["OBSERVATORY"]["INVENTORY"]["DEVICES"]: device_file = self.CONFIG_PATH+device["DEVICE"]["file"] device_file_mtime = os.path.getmtime(device_file) if device_file_mtime > pickle_file_mtime: return True for computer in self.obs_config["OBSERVATORY"]["INVENTORY"]["COMPUTERS"]: computer_file = self.CONFIG_PATH+computer["COMPUTER"]["file"] computer_file_mtime = os.path.getmtime(computer_file) if computer_file_mtime > pickle_file_mtime: return True return False def load(self, observatory_config_file): pickle_needs_to_be_updated = self.verify_if_pickle_needs_to_be_updated( observatory_config_file) # check if we already read and load devices configuration and if pickle needs to be updated if pickle_needs_to_be_updated == False and self.devices != None: return None else: if os.path.isfile(self.CONFIG_PATH+self.pickle_file) and pickle_needs_to_be_updated == False: print("Reading pickle file") try: can_pickle_file_be_read = False while can_pickle_file_be_read != True: if os.access(self.CONFIG_PATH+self.pickle_file, os.R_OK): pickle_dict = pickle.load( open(self.CONFIG_PATH+self.pickle_file, "rb")) can_pickle_file_be_read = True else: print(f"{self.CONFIG_PATH+self.pickle_file} can't be accessed, waiting for availability") time.sleep(0.5) except IOError: print("Error when reading the pickle file") try: self.obs_config = pickle_dict["obs_config"] self.computers = pickle_dict["computers"] self.devices = pickle_dict["devices"] self.devices_links = pickle_dict["devices_links"] self.obs_config_file_content = pickle_dict["obs_config_file_content"] self.raw_config = pickle_dict["raw_config"] #self.agents = pickle_dict["agents"] except: # we rewrite the pickle file, the content will be the same otherwise we would be in the else case print( "Rewritting the pickle file (an error occured while reading it, the content will be the same as it was") pickle_dict = {} self.obs_config = self.read_and_check_config_file( observatory_config_file) obs_file = open(observatory_config_file, "r") pickle_dict["raw_config"] = obs_file.read() obs_file.close() self.raw_config = pickle_dict["raw_config"] pickle_dict["obs_config"] = self.obs_config pickle_dict["devices"] = self.get_devices() pickle_dict["computers"] = self.get_computers() pickle_dict["devices_links"] = self.devices_links pickle_dict["obs_config_file_content"] = self.read_and_check_config_file( observatory_config_file) print("Writing pickle file") pickle.dump(pickle_dict, open( self.CONFIG_PATH+self.pickle_file, "wb")) else: print("Pickle file needs to be created or updated") pickle_dict = {} self.obs_config = self.read_and_check_config_file( observatory_config_file) pickle_dict["obs_config"] = self.obs_config pickle_dict["devices"] = self.get_devices() pickle_dict["computers"] = self.get_computers() pickle_dict["devices_links"] = self.devices_links pickle_dict["obs_config_file_content"] = self.read_and_check_config_file( observatory_config_file) print("Writing pickle file") pickle.dump(pickle_dict, open( self.CONFIG_PATH+self.pickle_file, "wb")) def check_and_return_config(self, yaml_file: str, schema_file: str) -> dict: """ Check if yaml_file is valid for the schema_file and return an dictionary of the config file Args: yaml_file (str): Path to the config_file to be validated schema_file (str): Path to the schema file Returns: dict: dictionary of the config file (with values) """ # disable pykwalify error to clean the output logging.disable(logging.ERROR) try: can_yaml_file_be_read = False while can_yaml_file_be_read != True: if os.access(yaml_file, os.R_OK): can_yaml_file_be_read = True else: print( f"{yaml_file} can't be accessed, waiting for availability") time.sleep(0.5) c = pykwalify.core.Core(source_file=yaml_file, schema_files=[ self.SCHEMA_PATH+schema_file]) return c.validate(raise_exception=True) except SchemaError: for error in c.errors: print("Error :", str(error).split(". Path")[0]) print("Path to error :", error.path) self.errors = c.errors return None except IOError: print("Error when reading the observatory config file") @staticmethod def check_config(yaml_file: str, schema_file: str) -> any: """ Check if yaml_file is valid for the schema_file and return a boolean or list of errors according the schema Args: yaml_file (str): Path to the config_file to be validated schema_file (str): Path to the schema file Returns: any: boolean (True) if the configuration is valid according the schema or a list of error otherwise """ # disable pykwalify error to clean the output # logging.disable(logging.ERROR) try: can_yaml_file_be_read = False while can_yaml_file_be_read != True: if os.access(yaml_file, os.R_OK): can_yaml_file_be_read = True else: print( f"{yaml_file} can't be accessed, waiting for availability") time.sleep(0.5) c = pykwalify.core.Core( source_file=yaml_file, schema_files=[schema_file]) c.validate(raise_exception=True) return True except SchemaError: for error in c.errors: print("Error :", str(error).split(". Path")[0]) print("Path to error :", error.path) return c.errors except IOError: print("Error when reading the observatory config file") def read_and_check_config_file(self, yaml_file: str) -> dict: """ Read the schema key of the config file to retrieve schema name and proceed to the checking of that config file Call check_and_return_config function and print its return. Args: yaml_file (str): path to the config file Returns: dict: Dictionary of the config file (with values) """ self.current_file = yaml_file try: can_config_file_be_read = False while can_config_file_be_read != True: if os.access(yaml_file, os.R_OK): can_config_file_be_read = True else: print( f"{yaml_file} can't be accessed, waiting for availability") time.sleep(0.5) with open(yaml_file, 'r') as stream: print(f"Reading {yaml_file}") config_file = yaml.safe_load(stream) self.DJANGO_PATH = os.environ.get( "DJANGO_PATH", os.path.abspath(os.path.dirname(yaml_file))) self.SCHEMA_PATH = os.path.join( self.DJANGO_PATH, "../../../config/schemas/") self.CONFIG_PATH = self.obs_config_path self.COMPONENT_PATH = os.path.join( self.DJANGO_PATH, "../../../config/components/") self.GENERIC_DEVICES_PATH = os.path.join( self.DJANGO_PATH, "../../../config/devices/") result = self.check_and_return_config( yaml_file, config_file["schema"]) if result == None: print( "Error when reading and validating config file, please check the errors right above") exit(1) return result except yaml.YAMLError as exc: print(exc) except Exception as e: print(e) return None def read_generic_component_and_return_attributes(self, component_name: str) -> dict: file_path = self.COMPONENT_PATH + component_name + ".yml" try: with open(file_path, 'r') as stream: config_file = yaml.safe_load(stream) attributes = {} for attribute in config_file: attribute = attribute["attribute"] attributes[attribute.pop("key")] = attribute return attributes except yaml.YAMLError as exc: print(exc) except Exception as e: print(e) return None def read_capability_of_device(self, capability: dict) -> dict: """ Read capability of device and inherit attributes from generic component then overwrite attributes defined in device config Args: capability (dict): dictionary containing a capabilitiy (keys : component and attributes) Returns: dict: dictionary of capability inherited by generic component and overwritten his attributes by current attributes of capability """ component_attributes = self.read_generic_component_and_return_attributes( capability["component"]) attributes = {} # get all attributes of device's capability for attribute in capability["attributes"]: attribute = attribute["attribute"] attributes[attribute.pop("key")] = attribute output_data = {} if capability.get("output_data"): for data in capability['output_data']: cap_data = data["data"] name = cap_data.pop("key") output_data[name] = cap_data # for each attributes of generic component attributes for attribute_name in attributes.keys(): # merge attributes of general component with specified component in device config file new_attributes = { **component_attributes[attribute_name], **attributes[attribute_name]} if "is_enum" in component_attributes[attribute_name].keys(): # make an intersection of both list of values new_attributes["value"] = list(set(attributes[attribute_name]["value"]) & set( component_attributes[attribute_name]["value"])) if len(new_attributes["value"]) == 0: print( f"Value of lastly read device's attribute '{attribute_name}' isn't one of the values of component configuration for this device (component configuration value(s): {component_attributes[attribute_name]['value']}) (actual value : {attributes[attribute_name]['value']})") exit(1) component_attributes[attribute_name] = new_attributes # return inherited and overwritten attributes of capability capability["attributes"] = component_attributes capability["output_data"] = output_data return capability def get_devices_names_and_file(self) -> dict: """ Return a dictionary giving the device file name by the device name Returns: dict: key is device name, value is file name """ devices_names_and_files = {} for device in self.obs_config["OBSERVATORY"]["INVENTORY"]["DEVICES"]: device = device["DEVICE"] devices_names_and_files[device["name"]] = device["file"] return devices_names_and_files def read_device_config_file(self, config_file_name: str, is_generic=False) -> dict: """ Read the device config file, inherit generic device config if "generic" key is present in "DEVICE". Associate capabilities of attached_devices if this device has attached_devices. Inherit capabilities from generic component and overwritte attributes defined in the device config Args: config_file_name (str): file name to be read is_generic (bool, optional): tells if we're reading a generic configuration (is_generic =True) or not (is_generic = False). Defaults to False. Returns: dict: formatted device configuration (attributes, capabilities...) """ self.current_file = config_file_name devices_name_and_file = self.get_devices_names_and_file() if not is_generic: # check if device config file is listed in observatory configuration current_device_name = [device for device, file_name in devices_name_and_file.items( ) if file_name in config_file_name[len(self.CONFIG_PATH):]] if len(current_device_name) <= 0: print( f"Current file '{config_file_name[len(self.CONFIG_PATH):]}' isn't listed in observatory configuration") print("The devices names and files are: ") for device_name, file_name in devices_name_and_file.items(): print( f"device name: '{device_name}', device filename: '{file_name}'") exit(1) print(f"Reading {config_file_name}") try: with open(config_file_name, 'r') as stream: config_file = yaml.safe_load(stream) # if we're reading a generic device configuration, the path to get the schema is different than usual self.SCHEMA_PATH = os.path.join( self.DJANGO_PATH, "../../../config/schemas/") # read and verify that the device configuration match the schema # result will contain the final device configuration (processed) result = self.check_and_return_config( config_file_name, config_file["schema"]) # if the configuration didn't match the schema or had an error when reading the file if result == None: # TODO : throw exception in check_and_return_config ? print( "Error when reading and validating config file, please check the errors right above") exit(1) else: # the configuration is valid # storing DEVICE key in device (DEVICE can contains : attributes of device, capabilities, attached devices) # device will be used to navigate through the configuration device = result["DEVICE"] generic_device_config = None # if the device is associated to an generic configuration, we'll read that generic configuration to inherit his attributes if "generic" in device: # storing the whole current config current_config = result # read and get the generic device config generic_device_config = self.read_device_config_file( self.GENERIC_DEVICES_PATH+device["generic"], is_generic=True) # merge whole device config but we need to merge capabilities differently after new_config = { **generic_device_config["DEVICE"], **current_config["DEVICE"]} result["DEVICE"] = new_config # device has capabilities if "CAPABILITIES" in device: capabilities = [] # if the device is associated to a generic device we need to associate his capabilities with the generic capabilities and overwrite them if generic_device_config != None: # we're making a copy of generic device config so we can remove items during loop copy_generic_device_config = generic_device_config.copy() # We have to extend capabilities of generic device configuration for capability in current_config["DEVICE"]["CAPABILITIES"]: is_capability_in_generic_config = False current_config_capability = capability["CAPABILITY"] current_config_component = current_config_capability["component"] # find if this component was defined in generic_device_config for index, generic_config_capability in enumerate(generic_device_config["DEVICE"]["CAPABILITIES"]): # if the current capability is the capability of the component we're looking for if current_config_component == generic_config_capability["component"]: is_capability_in_generic_config = True # we're merging their attributes new_attributes = generic_config_capability["attributes"].copy( ) attributes = {} current_config_attributes = current_config_capability["attributes"] generic_config_attributes = generic_config_capability["attributes"] for attribute in current_config_attributes: attribute = attribute["attribute"] attributes[attribute.pop( "key")] = attribute # for each attributes of device component attributes for attribute_name in attributes.keys(): # merge attributes of general component with specified component in device config file new_attributes[attribute_name] = { **generic_config_attributes[attribute_name], **attributes[attribute_name]} if "is_enum" in generic_config_attributes[attribute_name].keys(): # make an intersection of both list of values new_attributes[attribute_name]["value"] = list(set(attributes[attribute_name]["value"]) & set( generic_config_attributes[attribute_name]["value"])) if len(new_attributes[attribute_name]["value"]) == 0: print( f"Value of device '{config_file_name}' for attribute '{attribute_name}' isn't one of the values of generic configuration for this device (generic value(s): {generic_config_attributes[attribute_name]['value']}) (actual value : {attributes[attribute_name]['value']})") exit(1) # removing this capability from generic device configuration generic_device_config["DEVICE"]["CAPABILITIES"].pop( index) capabilities.append( {"component": current_config_component, "attributes": new_attributes}) break if is_capability_in_generic_config == False: current_config_capability = self.read_capability_of_device( current_config_capability) # the component defined in the current_config isn't defined in generic config (should not happen but we'll deal with that case anyway) : we're simply adding this capability capabilities.append( current_config_capability) # looping through generic device config's capabilities in order to add them to current device configuration for generic_config_capability in generic_device_config["DEVICE"]["CAPABILITIES"]: capabilities.append(generic_config_capability) else: # device not associated to a generic device configuration for capability in device["CAPABILITIES"]: capability = capability["CAPABILITY"] capabilities.append( self.read_capability_of_device(capability)) device["CAPABILITIES"] = capabilities # associate capabilities to final device configuration (stored in result variable) result["DEVICE"]["CAPABILITIES"] = device["CAPABILITIES"] if "ATTACHED_DEVICES" in device.keys(): # device has attached devices, we need to read their configuration in order to get their capabilities and add them to the current device devices_name_and_file = self.get_devices_names_and_file() active_devices = self.get_active_devices() for attached_device in device["ATTACHED_DEVICES"]: is_attached_device_link_to_agent = False # we're looking for if the attached device is associated to an agent (i.e. the device is considered as 'active'). However an attached_device shoudn't be active for active_device in active_devices: if devices_name_and_file[active_device] == attached_device["file"]: # the attached device is an active device (so it's linked to an agent) is_attached_device_link_to_agent = True break if self.CONFIG_PATH+attached_device["file"] != config_file_name and not is_attached_device_link_to_agent: # if the attached device isn't the device itself and not active # get configuration of attached device config_of_attached_device = self.read_device_config_file( self.CONFIG_PATH+attached_device["file"]) capabilities_of_attached_device = None if "CAPABILITIES" in config_of_attached_device["DEVICE"].keys(): capabilities_of_attached_device = config_of_attached_device[ "DEVICE"]["CAPABILITIES"] if capabilities_of_attached_device != None: # get name of device corresponding to the config file name parent_device_name = [device for device, file_name in devices_name_and_file.items( ) if file_name == config_file_name[len(self.CONFIG_PATH):]] attached_device_name = [device for device, file_name in devices_name_and_file.items( ) if file_name == attached_device["file"]] if len(parent_device_name) > 0: parent_device_name = parent_device_name[0] else: print( f"Attached device filename '{config_file_name[len(self.CONFIG_PATH):]}' is not listed in observatory devices files names") print( "The devices names and files are: ") for device_name, file_name in devices_name_and_file.items(): print( f"device name: '{device_name}', device filename: '{file_name}'") exit(1) if len(attached_device_name) > 0: attached_device_name = attached_device_name[0] else: print( f"Attached device filename '{attached_device['file']}' is not listed in observatory devices files names") print( "The devices names and files are: ") for device_name, file_name in devices_name_and_file.items(): print( f"device name: '{device_name}', device filename: '{file_name}'") exit(1) # associate attached device to his 'parent' device (parent device is the currently read device) self.devices_links[attached_device_name] = parent_device_name for capability in capabilities_of_attached_device: # add capabilities of attached device to current device result["DEVICE"]["CAPABILITIES"].append( capability) return result except yaml.YAMLError as exc: print(exc) except Exception as e: print(e) exit(1) # return None def __init__(self, observatory_config_file: str, unit_name: str = "") -> None: """ Initiate class with the config file set content attribute to a dictionary containing all values from the config file Args: config_file_name (str): path to the config file """ self.load(observatory_config_file) if unit_name == "": # By default we will use the first unit self.unit_name = self.get_units_name()[0] else: self.unit_name = unit_name # call get_agents so the class will check if mandatory agents are in the obsconfig self.get_agents(self.unit_name) def get_obs_name(self) -> str: """ Return name of the observatory Returns: str: Name of the observatory """ return self.obs_config["OBSERVATORY"]["name"] def get_channels(self, unit_name: str) -> dict: """ return dictionary of channels Args: unit_name (str): Name of the unit Returns: dict: [description] """ unit = self.get_unit_by_name(unit_name) channels = {} for channel_id in range(len(unit["TOPOLOGY"]["CHANNELS"])): channel = unit["TOPOLOGY"]["CHANNELS"][channel_id]["CHANNEL"] channels[channel["name"]] = channel return channels def get_computers(self) -> dict: """ return dictionary of computers Returns: dict: [description] """ if self.computers != None: return self.computers else: computers = {} for computer_id in range(len(self.obs_config["OBSERVATORY"]["INVENTORY"]["COMPUTERS"])): computer = self.obs_config["OBSERVATORY"]["INVENTORY"]["COMPUTERS"][computer_id]["COMPUTER"] if "file" in computer.keys(): computer["computer_config"] = self.read_and_check_config_file( self.CONFIG_PATH+computer["file"])["COMPUTER"] computers[computer["name"]] = computer return computers def get_devices(self) -> dict: """ return dictionary of devices Returns: dict: [description] """ if self.devices != None: return self.devices else: devices = {} for device_id in range(len(self.obs_config["OBSERVATORY"]["INVENTORY"]["DEVICES"])): device = self.obs_config["OBSERVATORY"]["INVENTORY"]["DEVICES"][device_id]["DEVICE"] if("file" in device.keys()): device["device_config"] = self.read_device_config_file( self.CONFIG_PATH+device["file"])["DEVICE"] devices[device["name"]] = device return devices def get_devices_names(self) -> list: """Return the list of devices names of an observatory Returns: list: list of names of devices """ devices_names = [] for device in self.obs_config["OBSERVATORY"]["INVENTORY"]["DEVICES"]: devices_names.append(device["DEVICE"]["name"]) return devices_names def get_agents(self, unit_name) -> dict: """ return dictionary of agents Args: unit_name (str): name of the unit Returns: dict: dictionary of agents. For each agents tell the name, computer, device, protocole, etc... """ unit = self.get_unit_by_name(unit_name) if self.agents != None: return self.agents else: agents = {} for agent_id in range(len(unit["AGENTS"])): # Agents is a list containing dictionary that have only one key key = list(unit["AGENTS"][agent_id].keys())[0] agent = unit["AGENTS"][agent_id][key] agents[agent["name"]] = agent if agent["name"] in self.MANDATORY_AGENTS.keys(): self.MANDATORY_AGENTS[agent["name"]] = agent for key in self.MANDATORY_AGENTS: if self.MANDATORY_AGENTS[key] == None: raise MissingMandatoryAgentException(key) return agents def get_layouts(self, unit_name: str) -> dict: """ Return dictionary of layouts Args: unit_name (str): name of the unit Returns: dict: dictionary of layouts """ unit = self.get_unit_by_name(unit_name) info = {} info["layouts"] = {} for layout_id in range(len(unit["TOPOLOGY"]["LAYOUTS"])): layout = unit["TOPOLOGY"]["LAYOUTS"][layout_id]["LAYOUT"] info["layouts"][layout["name"]] = layout return info def get_albums(self, unit_name: str) -> dict: """ Return dictionary of layouts Args: unit_name (str): name of the unit Returns: dict: dictionary of layouts """ unit = self.get_unit_by_name(unit_name) info = {} info["albums"] = {} for album_id in range(len(unit["TOPOLOGY"]["ALBUMS"])): album = unit["TOPOLOGY"]["ALBUMS"][album_id]["ALBUM"] info["albums"][album["name"]] = album return info def get_channel_information(self, unit_name: str, channel_name: str) -> dict: """ Return information of the given channel name of a unit Args: unit_name (str): Name of the unit channel_name (str): name of the channel Returns: dict: dictionary containing all values that define this channel """ channels = self.get_channels(unit_name) return channels[channel_name] def get_topology(self, unit_name: str) -> dict: """ Return dictionary of the topology of the observatory Args: unit_name (str): Name of the unit Returns: dict: dictionary representing the topology of an unit (security, mount, channels, layouts, albums) """ unit = self.get_unit_by_name(unit_name) topology = {} for key in unit["TOPOLOGY"].keys(): branch = unit["TOPOLOGY"][key] if key == "CHANNELS": topology[key] = self.get_channels(unit_name) elif key == "LAYOUTS": topology[key] = self.get_layouts(unit_name) elif key == "ALBUMS": topology[key] = self.get_albums(unit_name) else: topology[key] = branch return topology def get_image_calibrations(self, unit_name: str, channel_name: str, category: str) -> dict: """ Return dictionary of image calibrations Args: unit_name (str): name of the unit channel_name (str): name of the channel category (str): category name of image calibration (BI, DA, FL) Returns: dict: dictionary of image calibrations """ unit = self.get_unit_by_name(unit_name) info = {} info["image_calibrations"] = {} for kserie in range(len(unit["IMAGE_CALIBRATIONS"]["SERIES"])): scategory = unit["IMAGE_CALIBRATIONS"]["SERIES"][kserie]["category"] if category == scategory: for kchannel in range(len(unit["IMAGE_CALIBRATIONS"]["SERIES"][kserie]["CHANNELS"])): schannel = unit["IMAGE_CALIBRATIONS"]["SERIES"][kserie]["CHANNELS"][kchannel]["name"] if channel_name == schannel: dico = unit["IMAGE_CALIBRATIONS"]["SERIES"][kserie]["CHANNELS"][kchannel] for key, val in dico: info[key] = val return info def get_active_agents(self, unit_name: str) -> list: """ Return the list of active agents (i.e. agents that have an association with a device) Args: unit_name (str): Name of the unit Returns: list: kist of the name of active agents """ return list(self.get_agents(unit_name).keys()) def get_units(self) -> dict: """ Return all units sort by name defined in the config file Returns: dict: dictionary giving for a unit_name, his content (name,database,topology,agents,...) """ result = {} units = self.obs_config["OBSERVATORY"]["UNITS"] for unit in units: unit = unit["UNIT"] result[unit["name"]] = unit return result def get_components_agents(self, unit_name: str) -> dict: """ Return dictionary of component_agents of the given unit Args: unit_name (str): Name of the unit Returns: dict: dictionary sort by component name giving the associated agent (agent name) """ components_agents = {} topology = self.get_topology(unit_name) for element in topology: if element in ("SECURITY", "MOUNT", "CHANNELS"): if(element != "CHANNELS"): for component_agent in topology[element]["COMPONENT_AGENTS"]: component_name = list(component_agent.keys())[0] components_agents[component_name] = component_agent[component_name] else: for channel in topology[element]: for component_agent in topology[element][channel]["COMPONENT_AGENTS"]: component_name = list(component_agent.keys())[0] components_agents[component_name] = component_agent[component_name] return components_agents def get_units_name(self) -> list: """ Return list of units names Returns: [list]: names of units """ return list(self.get_units().keys()) def get_unit_by_name(self, name: str) -> dict: """ Return dictionary containing definition of the unit that matches the given name Args: name (str): name of the unit Returns: dict: dictonary representing the unit """ return self.get_units()[name] def get_agents_per_computer(self, unit_name: str) -> dict: """ Return dictionary that give for each computer, what are the associated agents to it as a list Args: unit_name (str): Name of the unit Returns: dict: dictionary that give for each computer, what are the associated agents to it as a list """ agents_per_computer = {} agents = self.get_agents(self.unit_name) for agent in agents: computer_name = agents[agent]["computer"] computer_hostname = self.get_computers().get(computer_name).get("computer_config").get("hostname") if(computer_hostname not in agents_per_computer.keys()): agents_per_computer[computer_hostname] = [agent] else: agents_per_computer[computer_hostname].append(agent) return agents_per_computer def get_agents_per_device(self, unit_name: str) -> dict: """ Return dictionary that give for each device, what are the associated agents to it as a list Args: unit_name (str): Name of the unit Returns: dict: dictionary that give for each device, what are the associated agents to it as a list """ agents_per_device = {} agents = self.get_agents(unit_name) for agent in agents: if("device" in agents[agent].keys()): device_name = agents[agent]["device"] if device_name in self.get_devices_names(): if(agents[agent]["device"] not in agents_per_device.keys()): agents_per_device[device_name] = [agent] else: agents_per_device[device_name].append(agent) else: print( f"Error: device name '{device_name}' for agent '{agent}' is not known in the configuration file. The device name must match one of the names defined in the DEVICES section") exit(1) return agents_per_device def get_active_devices(self) -> list: """ Return a list of active device names Returns: list: list of active device names """ active_devices = [] for unit_name in self.get_units(): for device in self.get_agents_per_device(unit_name): active_devices.append(device) return active_devices def get_active_computers(self) -> list: """ Return a list of active computer names Returns: list: list of active computer names """ active_computers = [] for unit_name in self.get_units(): unit = self.get_unit_by_name(unit_name) for computer in self.get_agents_per_computer(unit_name): active_computers.append(computer) return active_computers def get_agent_information(self, unit_name: str, agent_name: str) -> dict: """ Give the dictionary of attributes of the agent for an unit. Args: unit (dict): dictonary representing the unit agent_name (str): agent name Returns: dict: dictionary containing attributes of the agent """ return self.get_agents(unit_name)[agent_name] def get_device_information(self, device_name: str) -> dict: """ Give the dictionary of the attributes of the device Args: device_name (str): device name Returns: dict: dictionary containing attributes of the device """ return self.get_devices()[device_name] def get_database_for_unit(self, unit_name: str) -> dict: """ Return dictionary of attributes of the database for an unit Args: unit_name (str): unit name Returns: dict: dictionary of attributes of the database for an unit """ return self.get_unit_by_name(unit_name)["DATABASE"] def get_database_environment(self, unit_name:str)->str: """ Return file path to environment file with the attributes of the database for an unit Args: unit_name (str): unit name Returns: str: path to the environment file of the database for an unit """ database_dict = self.get_unit_by_name(unit_name)["DATABASE"] complete_envfile_path = os.path.join(self.obs_config_path, database_dict["file"]) return complete_envfile_path def get_device_for_agent(self, unit_name: str, agent_name: str) -> str: """ Return device name associated to the agent Args: unit (dict): dictonary representing the unit agent_name (str): agent name Returns: str: device name associated to this agent """ agents_per_device = self.get_agents_per_device(unit_name) for device in agents_per_device: if agent_name in agents_per_device[device]: return self.get_device_information(device) def get_unit_of_computer(self, computer_name: str) -> str: """ Return the name of the unit where the computer is used Args: computer_name (str): computer name Returns: str: unit name """ for unit_name in self.get_units(): if(computer_name in self.get_agents_per_computer(unit_name)): return unit_name def get_unit_of_device(self, device_name: str) -> str: """ Return the name of the unit where the device is used Args: device_name (str): device name Returns: str: unit name """ for unit_name in self.get_units(): if(device_name in self.get_agents_per_device(unit_name)): return unit_name def get_device_power(self, device_name: str) -> dict: """ Return dictionary that contains informations about power if this information is present in the device config file Return None if this information isn't stored in device's config file Args: device_name (str): name of the device Returns: dict: informations about power of device """ return self.get_devices()[device_name]["device_config"].get("power") def get_device_capabilities(self, device_name: str) -> list: """ Return dictionary that contains informations about capabilities if this information is present in the device config file Return empty list if this information isn't stored in device's config file Args: device_name (str): name of the device Returns: list: list of capabilities of device """ list_of_capabilities = [] capabilities = self.get_devices( )[device_name]["device_config"].get("CAPABILITIES") return capabilities def get_device_connector(self, device_name: str) -> dict: """ Return dictionary that contains informations about connector if this information is present in the device config file Return None if this information isn't stored in device's config file Args: device_name (str): name of the device Returns: dict: informations about connector of device """ return self.get_devices()[device_name]["device_config"].get("connector") def get_computer_power(self, computer_name: str) -> dict: """ Return dictionary that contains informations about power if this information is present in the device config file Return None if this information isn't stored in device's config file Args: device_name (str): name of the device Returns: dict: informations about connector of device """ return self.get_computers()[computer_name]["computer_config"].get("power") def getDeviceControllerNameForAgent(self, unit_name: str, agent_name: str) -> tuple: agent = self.get_agent_information(unit_name, agent_name) return (agent["device"], "DeviceController"+agent["device"]) def getDeviceConfigForDeviceController(self, device_name: str) -> dict: return self.get_devices()[device_name]["device_config"] def getCommParamsForAgentDevice(self, unit_name: str, agent_name: str) -> tuple: agent = self.get_agent_information(unit_name, agent_name) device_config = self.getDeviceConfigForDeviceController( agent["device"]) comm_access = agent.get("comm_access", None) comm = device_config["comm"] return (comm_access, comm) def getChannelCapabilities(self, unit_name: str, channel_name: str) -> list: channel = self.get_channel_information(unit_name, channel_name) result = [] for component_agent in channel["COMPONENT_AGENTS"]: component = list(component_agent.keys())[0] agent = component_agent[component] device = self.getDeviceControllerNameForAgent(unit_name, agent)[0] device_capabilities = self.get_device_capabilities(device) for capability in device_capabilities: if capability["component"] == component: result.append(capability) return result def getEditableAttributesOfCapability(self, capability: dict) -> dict: editable_fields = {} attributes = capability.get("attributes") for attribute in attributes: if attributes[attribute]["is_editable"]: editable_fields[attribute] = attributes[attribute] return editable_fields def getUneditableAttributesOfCapability(self, capability: dict) -> dict: uneditable_fields = {} attributes = capability.get("attributes") for attribute in attributes: if attributes[attribute]["is_editable"] == False: uneditable_fields[attribute] = attributes[attribute] return uneditable_fields def getEditableAttributesOfChannel(self, unit_name: str, channel_name: str) -> list: capabilities = self.getChannelCapabilities(unit_name, channel_name) # merged_result = {} # for capability in capabilities: # merged_result[capability["component"]] = self.getEditableAttributesOfCapability(capability) # return merged_result merged_result = [] for capability in capabilities: attributes = self.getEditableAttributesOfCapability(capability) if len(attributes.keys()) > 0: merged_result.append(attributes) return merged_result def getUneditableAttributesOfChannel(self, unit_name: str, channel_name: str) -> list: capabilities = self.getChannelCapabilities(unit_name, channel_name) # merged_result = {} # for capability in capabilities: # merged_result[capability["component"]] = self.getEditableAttributesOfCapability(capability) # return merged_result merged_result = [] for capability in capabilities: attributes = self.getUneditableAttributesOfCapability(capability) if len(attributes.keys()) > 0: merged_result.append(attributes) return merged_result # def getLogicOfChannelGroups(self,unit_name): # return self.get_layouts(unit_name)["global_group_logic"] def getLayoutByName(self, unit_name: str, name_of_layout): return self.get_layouts(unit_name)["layouts"][name_of_layout] def getAlbumByName(self, unit_name: str, name_of_album): return self.get_albums(unit_name)["albums"][name_of_album] def getEditableAttributesOfMount(self, unit_name): capabilities = self.get_device_capabilities( self.get_device_for_agent(unit_name, "mount")["name"]) merged_result = [] for capability in capabilities: attributes = self.getEditableAttributesOfCapability(capability) if len(attributes.keys()) > 0: merged_result.append(attributes) return merged_result def getHorizonLine(self, unit_name): horizon = self.get_unit_by_name(unit_name).get("horizon") return horizon.get("line") def getHome(self): """ Return home of current unit Returns: str: string reprensenting home of unit """ home = self.get_unit_by_name(self.unit_name).get("home") return home def get_agent_path_data_root(self, agent_name:str) -> str: """ Return agent path_data_root Args: agent_name (str): _description_ Returns: str|None: String representing path_data_root """ agent = self.get_agent_information(self.unit_name, agent_name) path_data_root = agent.get("path_data_root", None) if path_data_root == None: # default with docker should be /home/pyros_user/app path_data_root = os.environ['PROJECT_ROOT_PATH'] return path_data_root def get_agent_path_data_tree(self, agent_name:str, makedirs: bool=False) -> dict: """ Return a dictionary containing all paths to store data. Args: agent_name (str): _description_ makedirs (bool): True to create all directories Returns: All paths to store data """ path_data_root = self.get_agent_path_data_root(agent_name) data_paths = {} data_paths['data_root'] = path_data_root data_paths['ima_incoming'] = os.path.join(path_data_root,"data/images/incoming") data_paths['ima_processed'] = os.path.join(path_data_root,"data/images/processed") data_paths['ima_tmp'] = os.path.join(path_data_root,"data/images/tmp") data_paths['ima_darks'] = os.path.join(path_data_root,"data/images/darks") data_paths['ima_flats'] = os.path.join(path_data_root,"data/images/flats") data_paths['ima_bias'] = os.path.join(path_data_root,"data/images/bias") data_paths['triton_input'] = os.path.join(path_data_root,"data/triton/input") data_paths['triton_output'] = os.path.join(path_data_root,"data/triton/out") if makedirs == True: for path in data_paths.values(): os.makedirs(path, exist_ok = True) return data_paths def get_image_calibrations(self, unit_name: str) -> dict: """ Return a dictionary containing all strategies of image calibration. Args: agent_name (str): _description_ Returns: All """ path_data_root = self.get_agent_path_data_root(agent_name) data_paths = {} data_paths['data_root'] = path_data_root data_paths['ima_incoming'] = os.path.join(path_data_root,"data/images/incoming") data_paths['ima_processed'] = os.path.join(path_data_root,"data/images/processed") data_paths['ima_tmp'] = os.path.join(path_data_root,"data/images/tmp") data_paths['ima_darks'] = os.path.join(path_data_root,"data/images/darks") data_paths['ima_flats'] = os.path.join(path_data_root,"data/images/flats") data_paths['ima_bias'] = os.path.join(path_data_root,"data/images/bias") data_paths['triton_input'] = os.path.join(path_data_root,"data/triton/input") data_paths['triton_output'] = os.path.join(path_data_root,"data/triton/out") if makedirs == True: for path in data_paths.values(): os.makedirs(path, exist_ok = True) return data_paths def get_agent_sst_of_computer(self,computer:str)->str: """ Return agent SST config name The agentSST's name in obsconfig must contains AgentSST Args: computer (str): _description_ Returns: str: _description_ """ agents = self.get_agents_per_computer(self.unit_name).get(computer) for agent in agents: if agent.startswith("AgentSST"): return agent return None def get_agent_name_from_config(self,base_agent_name,simulated_computer): computer = socket.gethostname() if simulated_computer: computer = simulated_computer agents = self.get_agents_per_computer(self.unit_name).get(computer) try: for agent in agents: if agent.startswith(base_agent_name): return agent except: # do nothing, only goes to in except condition when launching pyros tests because we're simulating obsconfig pass def get_dependencies(self): unit = self.unit_name dependencies = self.get_unit_by_name(unit).get("DEPENDENCIES") dependencies_return = [] for dependency in dependencies: computers = dependency.get("computers") commands = [] for link in dependency.get("links"): type = link.get("type") url = link.get("url") cmd = "" if type == "git": cmd += f"git clone {url}" elif type == "wget": cmd += f"wget {url}" commands.append(cmd) path_projet_root = os.environ['PROJECT_ROOT_PATH'] install_section = link.get("install") install_cmd = f"{install_section.get('shell')} {path_projet_root}{install_section.get('script')}" cmd.append(install_cmd) dependencies_return.append((computers,commands)) return dependencies_return def get_output_data_device(self,device): device = self.get_device_capabilities(device) result = {} for component in device: if component.get("output_data"): output_data = component.get("output_data") for data in output_data: data = data["data"] key = data.pop("key") result[key] = data return result def get_output_data_device_sort_monitoring_name(self,device): output_data = self.get_output_data_device(device) monitoring_names = {} if len(output_data) > 0: for data in output_data: key = data data_dict = output_data[data] monitoring_name = data_dict.pop("monitoring_name") data_dict["key"] = key monitoring_names[monitoring_name] = data_dict return monitoring_names class MissingMandatoryAgentException(Exception): """ Exception raised when an mandatory Pyros Agent is missing in the observatory configuration. """ def __init__(self, agent_name): message = f"Missing agent {agent_name} in observatory config. Please add an Agent/AgentDevice section in your observatory configuration file with the agent name {agent_name}." super().__init__(message) def main(): # config = ConfigPyros("../../../../privatedev/config/guitalens/observatory_guitalens.yml") # unit_name = config.get_units_name()[0] # dc = config.getDeviceControllerNameForAgent(unit_name,"mount")[0] # print(config.getDeviceConfigForDeviceController(dc)) # print(config.getCommParamsForAgentDevice(unit_name,"mount")) # print(config.getChannelCapabilities(unit_name,"OpticalChannel_up")) # print(config.get_channel_groups(unit_name)) # print(config.getEditableAttributesOfCapability(config.getChannelCapabilities(unit_name,"OpticalChannel_up")[0])) # print(config.getEditableAttributesOfChannel(unit_name,"OpticalChannel_up")) config = OBSConfig("../../../../privatedev/config/tnc/observatory_tnc.yml") unit_name = config.get_units_name()[0] # dc = config.getDeviceControllerNameForAgent(unit_name,"mount")[0] # print(config.getDeviceConfigForDeviceController(dc)) # print(config.getCommParamsForAgentDevice(unit_name,"mount")) # print(config.getChannelCapabilities(unit_name,"OpticalChannel_down2")) # print(config.get_channel_groups(unit_name)) # print(config.getEditableAttributesOfCapability(config.getChannelCapabilities(unit_name,"OpticalChannel_down2")[0])) # print(config.getEditableAttributesOfChannel(unit_name,"OpticalChannel_down2")) #print(config.getEditableAttributesOfMount(unit_name)) config.get_devices() #print(config.get_output_data_device("TAROT_meteo")) print(config.get_output_data_device_sort_monitoring_name("TAROT_meteo").keys()) #print(config.get_device_capabilities("TAROT_meteo")) # print(config.get_devices()["FLI-Kepler4040"]["device_config"]) # print(config.get_devices()["FLI-Kepler4040"]["device_config"]["CAPABILITIES"][1]["attributes"]["manufacturer"]) # print(config.get_devices()["FLI-Kepler4040"]["device_config"]["CAPABILITIES"]) # print(config.get_devices()["AstroMecCA-TM350"]["device_config"]["CAPABILITIES"]) if __name__ == "__main__": main()