# python c:\srv\develop\pyros\config\configpyros.py # ---------------------------------------------------------------------------------------------------- # Parameter attributes = section, key, value, type, unit, access, comment, label, reference, timestamp # ---------------------------------------------------------------------------------------------------- """ Example of config = ConfigPyros() config.set_configfile('/tmp/myconfig.xml') stmod = config.get_paramvalue('angentX','general','startmode') """ # ---------------------------------------------------------------------------------------------------- from lxml import etree import os import socket import random class ConfigPyros: # === Constants NO_ERROR = 0 ERR_FILE_NOT_EXISTS = 101 ERR_XML_FILE_ERROR = 102 ERR_A_SUBTAG_UNIT_HAVE_NO_ATTRIBUTE_ALIAS = 301 ERR_ASSEMBLED_ALIAS_NOT_DEFINED = 302 ERR_NO_MOUNT_ASSEMBLED = 303 ERR_ONLY_ONE_MOUNT_CAN_BE_ASSEMBLED = 304 ERR_NO_CHANNEL_ASSEMBLED = 305 ERR_NO_DATABASE_ASSEMBLED = 306 # === Private variables _last_errno = NO_ERROR _last_errparam1 = "" # --- XML file _config_filename = "" _config_filename_mtime = 0 _config_filename_need_to_be_read = True # --- XML structure _tags_to_scan = None # --- XML contents _config_contents_valid = False _config_contents_changed = False _tree = None _root = None _unit_subtags = None _tag_aliases = None _assemblies = None # --- Computer infos _hostname = None _this_computer_alias = None # ===================================================================== # ===================================================================== # Private methods # ===================================================================== # ===================================================================== def _configfile_verify(self): """ Update parameters of the configuration file (existing, modification date) """ self._config_filename_need_to_be_read = False self._last_errno = self.NO_ERROR if os.path.isfile(self._config_filename): # --- verify the date of last modification of the file config_filename_mtime = os.path.getmtime(self._config_filename) if config_filename_mtime > self._config_filename_mtime: self._config_filename_need_to_be_read = True #### self._config_filename_mtime = config_filename_mtime else: self._last_errno = self.ERR_FILE_NOT_EXISTS def _parse_xml2lists(self, verbose:bool = False): """ Transform the DOM XML structure into usable lists for PyROS """ if (verbose==True): print("============== Subtags of Information with attribute alias") unit_subtags = [] tags_to_scan = [''] xpath = '/unit' + '/*' for elem in self._root.xpath(xpath): if str(type(elem)) != "": continue if elem.tag in unit_subtags: continue if elem.attrib.get('alias',None) == None: continue unit_subtags.append(elem.tag) tags_to_scan.append(elem.tag) if (verbose==True): print("<{}> subtags = {}".format(xpath, unit_subtags)) # --- verify the presence of at less one of the following subtag of mandatory_tags = ['', 'database', 'mount', 'channel', 'computer', 'agent'] for mandatory_tag in mandatory_tags: if mandatory_tag not in tags_to_scan: self._last_errno = self.ERR_A_SUBTAG_UNIT_HAVE_NO_ATTRIBUTE_ALIAS if mandatory_tag == '': self._last_errparam1 = 'unit' else: self._last_errparam1 = mandatory_tag return False # --- Get all aliases of subtags of # --- Set the tag_aliases dictionnary of lists tag_aliases = {} for tag in tags_to_scan: if tag == "": xpath = '/unit' tag = 'unit' else: xpath = '/unit/' + tag if (verbose==True): print("============== Aliases of <" + tag + ">") attrib_key_to_get = 'alias' aliases = self.get_attribute_level0(xpath,attrib_key_to_get) if (verbose==True): print("<{} {}> = {}".format(xpath, attrib_key_to_get, aliases)) if aliases == [None]: self._last_errno = self.ERR_TAG_MOUNT_HAVE_NO_ATTRIBUTE_ALIAS return False tag_aliases[tag] = aliases if (verbose==True): print("============== Unit all alias intantiations in one list") # --- get all the assemblies xpath = '/unit' attrib_to_select = {'alias': (tag_aliases['unit'])[0]} tag_to_get = "param" tag_attrib_to_select = {'section': 'assembly', 'key': 'alias'} tag_attrib_key_to_get = 'value' assembled_aliases = self.get_attribute_level1(xpath,attrib_to_select,tag_to_get,tag_attrib_to_select,tag_attrib_key_to_get) if (verbose==True): print("{} <{} {}> = {}".format(attrib_to_select, tag_to_get, tag_attrib_to_select, assembled_aliases)) if (verbose==True): print("============== Unit alias intantiations in dictionnary of tags") # --- Set the assembly dictionnary of lists assemblies = {} for assembled_alias in assembled_aliases: als = [] for tag in tags_to_scan: if tag == '': continue #print("tag = {}".format(tag)) #print("assembled_alias = {}".format(assembled_alias)) #print("tag_aliases[tag] = {}".format(tag_aliases[tag])) if assembled_alias in tag_aliases[tag]: als.append(assembled_alias) the_tag = tag #print("n = {}".format(n)) if len(als) == 0: self._last_errno = self.ERR_ASSEMBLED_ALIAS_NOT_DEFINED self._last_errparam1 = tag return False else: assemblies[the_tag] = als if (verbose==True): print("assemblies = {}".format(assemblies)) # --- check the number of mounts assembled if 'mount' in assemblies.keys(): if (len(assemblies['mount']) > 1): if (verbose==True): print("Error: Only one mount can be assembled") self._last_errno = self.ERR_ONLY_ONE_MOUNT_CAN_BE_ASSEMBLED return False else: if (verbose==True): print("Error: No mount assembled") self._last_errno = self.ERR_NO_MOUNT_ASSEMBLED return False # --- check the number of channels assembled if 'channel' not in assemblies.keys(): if (verbose==True): print("Error: No channel assembled") self._last_errno = self.ERR_NO_CHANNEL_ASSEMBLED return False # --- Chack the computer infos if (verbose==True): print("============== Unit all alias intantiations in one list") # --- get all the assemblies this_computer_alias = None hostname = socket.gethostname() aliases = tag_aliases['computer'] xpath = '/unit/computer' tag_to_get = "param" tag_attrib_to_select = {'section': 'local', 'key': 'hostname'} tag_attrib_key_to_get = 'value' for alias in aliases: attrib_to_select = {'alias': alias} hname = self.get_attribute_level1(xpath,attrib_to_select,tag_to_get,tag_attrib_to_select,tag_attrib_key_to_get) if hname[0] == hostname: this_computer_alias = alias # --- Everything was OK until this point. Validate the XML self._unit_subtags = unit_subtags self._tags_to_scan = tags_to_scan self._tag_aliases = tag_aliases self._assemblies = assemblies self._this_computer_alias = this_computer_alias return True def _alias2xpath(self, alias, subtag=""): #print("alias={}".format(alias)) tag = self.get_unit_subtag(alias) #print("tag={}".format(tag)) if tag == 'unit': xpath = '/unit' elif tag == None: return None else: xpath = '/unit/' + tag if subtag != "": xpath += '/' + subtag return xpath def _encode(self, texte:str, method:str = 'crypted1', phrase:str = "(Af%"): if method == 'crypted1': lphrase = len(phrase) plus = random.randint(1,9) for i in range(plus): texte += chr(random.randint(32,255)) texte += chr(ord(str(plus))) texte_hexa_codeds = "" for i in range(len(texte)): phrase_deci = ord(phrase[i%lphrase])+i texte_deci = ord(texte[i]) #texte_hexa = hex(texte_deci) texte_deci_coded = (texte_deci + phrase_deci)%256 texte_hexa_coded = "{0:#0{1}x}".format(texte_deci_coded,4) texte_hexa_codeds += texte_hexa_coded[2:4] #print("{} raw={} {} {} coded={} {}".format(i,texte[i],texte_deci,phrase_deci,texte_deci_coded,texte_hexa_coded)) return texte_hexa_codeds else: return texte def _decode(self, crypted:str, method:str = 'crypted1', phrase:str = "(Af%"): if method == 'crypted1': if crypted == '': return '' lphrase = len(phrase) texte_hexa_codeds = crypted texte = "" for i in range(int(len(texte_hexa_codeds)/2)): phrase_deci = ord(phrase[i%lphrase])+i texte_hexa_coded = '0x'+texte_hexa_codeds[2*i:2*i+2] texte_deci_coded = int(texte_hexa_coded,16) texte_deci = (texte_deci_coded - phrase_deci + 256)%256 texte += chr(texte_deci) #print("{} coded={} raw={} {}".format(i,texte_hexa_coded,texte_deci,chr(texte_deci))) plus = int(texte[-1]) texte = texte[0:-1-plus] return texte else: return crypted # ===================================================================== # ===================================================================== # Methods for experimented users # ===================================================================== # ===================================================================== def get_attribute_level0(self, xpath:str, attrib_key_to_get:str): """ Read a list of values ['d', 'e'] from the following XML structure: ... ... Select the path: xpath = '/a/b' Select key attribute: attrib_key_to_get = 'c' Call the method: result = ConfigPyros.get_attribute_level0(xpath,attrib_to_select,tag_to_get,tag_attrib_to_select tag_attrib_key_to_get) """ if self._config_contents_valid == False: return "" root = self._root # # 1) get the path of /unit/agent with a given attribute # -- attrib_value_to_gets = [] for elem in root.xpath(xpath): if str(type(elem)) != "": continue attrib_value_to_gets.append( elem.attrib.get(attrib_key_to_get,None) ) return attrib_value_to_gets def get_attribute_level1(self, xpath:str, attrib_to_select:dict, tag_to_get:str, tag_attrib_to_select:dict, tag_attrib_key_to_get:str): """ Read a value 'k' from the following XML structure: Select the path: xpath = '/a/b' Select the attribute dictionary: attrib_to_select = {'c': 'd'} Select tag: tag_to_get = "e" Select tag attribute dictionary: tag_attrib_to_select = {'f': 'g', 'h': 'i'} Select key attribute: tag_attrib_key_to_get = 'j' Call the method: result = ConfigPyros.get_attribute_level1(xpath,attrib_to_select,tag_to_get,tag_attrib_to_select tag_attrib_key_to_get) """ if self._config_contents_valid == False: return "" root = self._root tree = self._tree # # 1) get the path of /a/b with a given attribute c="d" # -- #print("============== xpath {}".format(xpath)) nb = 0 for elem in root.xpath(xpath): if str(type(elem)) != "": continue nbitems = len(attrib_to_select) nb = 0 for key in attrib_to_select.keys(): value = attrib_to_select[key] vxml = elem.attrib.get(key,None) #print("key={} value={} vxml={}".format(key,value,vxml)) if value == vxml: nb += 1 #print("nb={} nbitems={}".format(nb,nbitems)) if nb == nbitems: path = tree.getpath(elem) #print("tree.getpath(elem) = {}".format(tree.getpath(elem))) if nb==0: nb = 1 break # # 2) get the list of params correspondong to the selected 'b' attrib_value_to_gets = [] if nb==0: return attrib_value_to_gets #print("---------------") for elem in root.xpath(path+"/*"): if str(type(elem)) != "": continue #print(" elem.tag= {}".format(elem.tag)) if elem.tag == tag_to_get: if tag_attrib_to_select == "" and tag_attrib_key_to_get == "": attrib_value_to_gets.append(elem.attrib) #print(" elem.attrib = {}".format(elem.attrib)) else: nbitems = len(tag_attrib_to_select) #print(" elem.attrib = {}".format(elem.attrib)) nb = 0 for key in tag_attrib_to_select.keys(): value = tag_attrib_to_select[key] vxml = elem.attrib.get(key,None) #print("key={} value={} vxml={}".format(key,value,vxml)) if value == vxml: nb += 1 #print("nb={} nbitems={}".format(nb,nbitems)) if nb == nbitems: if tag_attrib_key_to_get == "": attrib_value_to_gets.append( elem.attrib ) else: attrib_value_to_gets.append( elem.attrib.get(tag_attrib_key_to_get,None) ) return attrib_value_to_gets # ===================================================================== # ===================================================================== # Methods for users # ===================================================================== # ===================================================================== def get_last_errno(self): """ Get the last error code """ return self._last_errno def get_last_errmsg(self): """ Get the last error message """ msg = "Unknown error" if self._last_errno == self.NO_ERROR: msg = "No error" elif self._last_errno == self.ERR_FILE_NOT_EXISTS: msg = "File does not exists" elif self._last_errno == self.ERR_XML_FILE_ERROR: msg = "XML file error" elif self._last_errno == self.ERR_A_SUBTAG_UNIT_HAVE_NO_ATTRIBUTE_ALIAS: msg = "Tags <{}> have no attribute alias".format(self._last_errparam1) elif self._last_errno == self.ERR_ASSEMBLED_ALIAS_NOT_DEFINED: msg = "Instantiated {} not defined in a <{}>".format(self._last_errparam1,self._last_errparam1) elif self._last_errno == self.ERR_NO_MOUNT_ASSEMBLED: msg = "No mount assembled" elif self._last_errno == self.ERR_ONLY_ONE_MOUNT_CAN_BE_ASSEMBLED: msg = "Only one mount can be assembled" elif self._last_errno == self.ERR_NO_CHANNEL_ASSEMBLED: msg = "No channel assembled" elif self._last_errno == self.ERR_NO_DATABASE_ASSEMBLED: msg = "No database assembled" return msg def set_configfile(self, config_filename:str ): """ To set the config file name :param config_filename: A string that contains the full name of the configuration file. :type config_filename: string """ if config_filename == "" or config_filename != self._config_filename: self._config_filename = config_filename self._config_filename_mtime = 0 self._config_contents_valid = False self._tree = None self._root = None self._configfile_verify() return def get_configfile(self): """ Get the current config file name """ return self._config_filename def is_config_contents_changed(self): response =self._config_contents_changed self._config_contents_changed = False return response def load(self, verbose:bool = False, force:bool = False): """ To load the config file if necessary :Example: ConfigPyros().load() """ # --- Verify the existence and the last modification of the XML file self._configfile_verify() errno = self.get_last_errno() if errno != self.NO_ERROR: if (verbose==True): print("Load Error = {}".format(errno)) return if self._config_filename_need_to_be_read == False and force == False: self._config_contents_changed = False return # --- Read the configuration file and parse it self._last_errno = self.NO_ERROR try: # --- Save the current tree and root as '_old' self._tree_old = self._tree self._root_old = self._root # --- Parse the XML file into DOM tree = etree.parse(self._config_filename) # --- Get root from the tree root = tree.getroot() # --- Set the new tree and root self._tree = tree self._root = root # --- Parse the DOM into usable lists self._config_contents_valid = True valid = self._parse_xml2lists(verbose) # --- Validation step if valid == True: self._config_filename_mtime = os.path.getmtime(self._config_filename) self._config_contents_changed = True else: # --- Se the current tree and root from '_old' self._config_contents_valid = False self._tree = self._tree_old self._root = self._root_old except: self._last_errno = self.ERR_XML_FILE_ERROR return def get_unit_subtags(self): if self._config_contents_valid == False: return "" return self._unit_subtags def get_aliases(self, unit_subtag): if self._config_contents_valid == False: return "" return self._tag_aliases[unit_subtag] def get_unit_subtag(self, alias): if self._config_contents_valid == False: return "" for unit_subtag in self._tag_aliases: aliases = self.get_aliases(unit_subtag) #print("alias={} in aliases={}".format(alias,aliases)) if alias in aliases: return unit_subtag return None def get_params(self, alias, subtag=""): """ Select the tag corresponding to alias Return the list of dictionaries of all attributes """ if self._config_contents_valid == False: return "" # --- search the tag from its alias xpath = self._alias2xpath(alias, subtag) attrib_to_select = {'alias': alias} tag_to_get = "param" tag_attrib_to_select = "" tag_attrib_key_to_get = "" #print("get_params alias={} subtag={} xpath={}".format(alias,subtag,xpath)) value = self.get_attribute_level1(xpath,attrib_to_select,tag_to_get,tag_attrib_to_select,tag_attrib_key_to_get) #print("{} <{} {}> = {}".format(attrib_to_select, tag_to_get, tag_attrib_to_select, value)) if (len(value)>0): return value return None def get_param(self, alias, section, key, subtag=""): """ Select the tag corresponding to alias, and tag with attributes section=, key= Return the dictionary of all attributes """ if self._config_contents_valid == False: return "" # --- search the tag from its alias xpath = self._alias2xpath(alias, subtag) attrib_to_select = {'alias': alias} tag_to_get = "param" tag_attrib_to_select = {'section': section, 'key': key} tag_attrib_key_to_get = "" #print("get_param alias={} subtag={} xpath={}".format(alias,subtag,xpath)) value = self.get_attribute_level1(xpath,attrib_to_select,tag_to_get,tag_attrib_to_select,tag_attrib_key_to_get) #print("{} <{} {}> = {}".format(attrib_to_select, tag_to_get, tag_attrib_to_select, value)) if (len(value)>0): return value[0] return None def get_paramvalue(self, alias, section, key, subtag=""): """ Select the tag corresponding to alias, and tag with attributes section=, key= Return only the value= """ if self._config_contents_valid == False: return "" # --- search the tag from its alias xpath = self._alias2xpath(alias, subtag) attrib_to_select = {'alias': alias} tag_to_get = "param" tag_attrib_to_select = {'section': section, 'key': key} tag_attrib_key_to_get = 'value' #print("get_paramvalue alias={} subtag={} xpath={}".format(alias,subtag,xpath)) value = self.get_attribute_level1(xpath,attrib_to_select,tag_to_get,tag_attrib_to_select,tag_attrib_key_to_get) #print("{} <{} {}> = {}".format(attrib_to_select, tag_to_get, tag_attrib_to_select, value)) if (len(value)>0): return value[0] return None def get_plan(self, alias): """ Select the tag corresponding to alias, and tag with attributes section=, key= Return the dictionary of all attributes """ if self._config_contents_valid == False: return "" # --- search the tag from its alias subtag = 'plan' xpath = self._alias2xpath(alias, subtag) #attrib_to_select = {'alias': 'current'} attrib_to_select = {} tag_to_get = "param" tag_attrib_to_select = "" tag_attrib_key_to_get = "" #print("get_plan alias={} subtag={} xpath={}".format(alias,subtag,xpath)) value = self.get_attribute_level1(xpath,attrib_to_select,tag_to_get,tag_attrib_to_select,tag_attrib_key_to_get) #print("{} <{} {}> = {}".format(attrib_to_select, tag_to_get, tag_attrib_to_select, value)) if (len(value)>0): return value return None def get_computer_alias(self): if self._config_contents_valid == False: return "" return self._this_computer_alias def encode(self, text:str, method:str = 'crypted1'): return self._encode(text,method) def decode(self, crypted:str, method:str = 'crypted1'): return self._decode(crypted,method) # ===================================================================== # ===================================================================== # Special methods # ===================================================================== # ===================================================================== def __init__(self, config_filename:str = ""): self._last_errno = self.NO_ERROR self._unit_component_params = [] self.set_configfile(config_filename) self._configfile_changed = False # ===================================================================== # ===================================================================== # Test if main # ===================================================================== # ===================================================================== if __name__ == "__main__": cwd = os.getcwd() # --- Set the filename of the configuration config_filename = cwd + '/config_unit_simulunit1.xml' print("config_filename = {}\n".format(config_filename)) # --- Instanciate an object for configuration config = ConfigPyros() config.set_configfile(config_filename) if config.get_last_errno() != config.NO_ERROR: print("Error {}. {}".format(config.get_last_errno(),config.get_last_errmsg())) else: # --- Load the configuration file if needed only (i.e. modified) config.load(False) if config.get_last_errno() != config.NO_ERROR: print("Error {}. {}".format(config.get_last_errno(),config.get_last_errmsg())) else: # --- Get the home of the mount[0] mount_alias = config.get_aliases('mount')[0] home = config.get_param(mount_alias,'MountPointing','home') print("home= {}\n".format(home)) # --- Get the serial number of the optics of the channel[0] channel_alias = config.get_aliases('channel')[0] optsn = config.get_paramvalue(channel_alias,'optic','serial_number') print("Serial number {}\n".format(optsn)) # --- Get the startmode of the AgentX agent_alias = 'AgentX' stmod = config.get_paramvalue(agent_alias,'general','startmode') print("startmode {}\n".format(stmod)) # --- Get all the assembly of this unit[0] (mount + channels) unit_alias = config.get_aliases('unit')[0] params = config.get_params(unit_alias) print("params {} = {}\n".format(unit_alias,params)) # --- Get the plan parameters for web pages of the channel[0] channel_alias = config.get_aliases('channel')[0] plans = config.get_plan(channel_alias) print("Plan form = {}\n".format(plans)) # --- Get the alias of this computer print("Alias of this computer = {}\n".format(config.get_computer_alias())) # --- Get the password of the database database_alias = config.get_aliases('database')[0] paswds = config.get_param(database_alias,'access','pswd') if paswds.get('unit',None) == 'crypted1': paswd = config.decode(paswds['value']) else: paswd = paswds['value'] print("db paswd = {}\n".format(paswd))