from django import conf from django.shortcuts import render from .configpyros import ConfigPyros from django.conf import settings from src.core.pyros_django.dashboard.decorator import level_required from django.contrib.auth.decorators import login_required from django.http import HttpResponse from datetime import datetime import re, yaml,tempfile,json,os def get_nested_dictionaries_as_list(dic,result=[]): for key,value in dic.items(): print(value,type(value)) print(result) if isinstance(value, dict): print("recursive call") get_nested_dictionaries_as_list(value,result) elif isinstance(value,list) and key == "CAPABILITIES": print("VALUE OF LIST ======= ",value) for element in value: #print(element) if isinstance(element,dict): get_nested_dictionaries_as_list(element,result) else: result.append((key,value)) return result @login_required @level_required("Admin","Observer","Management","Operator","Unit-PI") def obs_global_config(request): config = ConfigPyros(os.environ["PATH_TO_OBSCONF_FILE"]) units_names = list(config.get_units().keys()) pickle_file_mtime = os.path.getmtime(config.CONFIG_PATH+config.pickle_file) pickle_datetime = datetime.utcfromtimestamp(pickle_file_mtime).strftime("%Y/%m/%d %H:%M:%S") return render(request,"obsconfig/global_obs_configuration.html",{"units_names" : units_names, "obs_name": config.get_obs_name(),"last_modification_time":pickle_datetime}) @login_required @level_required("Admin","Unit-PI","Operator") def obs_hardware_config(request): config = ConfigPyros(os.environ["PATH_TO_OBSCONF_FILE"]) devices = config.get_devices() active_devices = config.get_active_devices() computers = config.get_computers() active_computers = config.get_active_computers() # give for a device name (key) the unit name where it is used (value) device_to_unit = {} # give for a computer name (key) the unit name where it is used (value) computer_to_unit = {} for device in active_devices: device_to_unit[device] = config.get_unit_of_device(device) for computer in active_computers: computer_to_unit[computer] = config.get_unit_of_computer(computer) devices_links = config.devices_links return render(request, 'obsconfig/obs_hardware_configuration.html', {'devices':devices,"active_devices":active_devices,"computers":computers,"active_computers":active_computers, "device_to_unit" : device_to_unit, "computer_to_unit" : computer_to_unit,"devices_links":devices_links}) @login_required @level_required("Admin","Unit-PI","Operator") def unit_hardware_configuration(request,unit_name): config = ConfigPyros(os.environ["PATH_TO_OBSCONF_FILE"]) devices = config.get_devices() return render(request, 'obsconfig/unit_hardware_configuration.html', {'config':config}) @login_required @level_required("Admin","Unit-PI","Operator") def computer_details(request,computer_name): config = ConfigPyros(os.environ["PATH_TO_OBSCONF_FILE"]) computer_detail = yaml.dump(config.get_computers()[computer_name]) """ computer_detail = { re.sub("^_","",key):value for key,value in config.get_computers()[computer_name].items() } # We're removing "_" at the beginning of each key : power = { re.sub("^_","",key):value for key,value in config.get_computer_power(computer_name).items() } computer_config = computer_detail["computer_config"] # We're removing "_" at the beginning of each key : computer_detail["computer_config"] = { re.sub("^_","",key):value for key,value in computer_config.items() } # Remove power key as we have this information in the same if power is not None: computer_detail["computer_config"].pop("power") return render(request,"obsconfig/computer_details.html", {"computer_detail" : computer_detail, "power" : power}) """ return render(request,"obsconfig/computer_details.html", {"computer_detail" : computer_detail}) @login_required @level_required("Admin","Unit-PI","Operator") def device_details(request,device_name): config = ConfigPyros(os.environ["PATH_TO_OBSCONF_FILE"]) # We're removing "_" at the beginning of each key : device = config.get_devices()[device_name] #device_detail = yaml.dump(device) #config_device = open(config.CONFIG_PATH+device["file"],"r").read() devices_links = config.devices_links file_name_to_device_name = {v:k for k,v in config.get_devices_names_and_file().items()} active_devices = config.get_active_devices() """ Alternative solutions: - test_device -> we're using a recursive function to get all nested dicitonaries but we loose a level of "precision" (no separations of capabilities for example) - device_detail -> we use multiple function to get specific value (not global enougth, if we had another key we have to change the code) """ device_detail = { re.sub("^_","",key):value for key,value in config.get_devices()[device_name].items()} #test_device= get_nested_dictionaries_as_list(device_detail,[]) if config.get_device_power(device_name) is not None: # We're removing "_" at the beginning of each key : power = { re.sub("^_","",key):value for key,value in config.get_device_power(device_name).items() } else : power = None if config.get_device_connector(device_name) is not None: # We're removing "_" at the beginning of each key : connector = { re.sub("^_","",key):value for key,value in config.get_device_connector(device_name).items() } else: connector = None capabilities = [] if config.get_device_capabilities(device_name) is not None: copy_capabilities = config.get_device_capabilities(device_name) for capability in copy_capabilities: # We're removing "_" at the beginning of each key : capabilities.append( { re.sub("^_","",key):value for key,value in capability.items() }) device_config = device_detail["device_config"] # We're removing "_" at the beginning of each key : device_detail["device_config"] = { re.sub("^_","",key):value for key,value in device_config.items() } # Remove power key as we have this information in the same if power is not None: device_detail["device_config"].pop("power") if connector is not None: device_detail["device_config"].pop("connector") if len(capabilities)!= 0: device_detail["device_config"].pop("CAPABILITIES") return render(request,"obsconfig/device_details.html", { "device_detail" : device_detail, "power" : power, "connector" : connector, "capabilities" : capabilities,"devices_links":devices_links,"file_name_to_device_name": file_name_to_device_name,"active_devices":active_devices} ) #return render(request,"obsconfig/device_details.html", { "device_detail" : device_detail, "config" : config_device }) @login_required @level_required("Admin","Observer","Management","Operator","Unit-PI","TAC") def obs_astronomer_config(request): config = ConfigPyros(os.environ["PATH_TO_OBSCONF_FILE"]) units = config.get_units() units_topologies = {} # for each unit for unit_name in units: units_topologies[unit_name] = config.get_topology(units[unit_name]) # removing channel_groups, not useful in this view units_topologies[unit_name].pop("CHANNEL_GROUPS") # for each category (security, mount, channels) for category in units_topologies[unit_name]: if category != "CHANNELS": # Security and Mount are directly a dictionary containing the attributes of those categories # However, component_agents is a list so we need to iterate through this list for component_agent in units_topologies[unit_name][category]["COMPONENT_AGENTS"]: component_name = list(component_agent.keys())[0] agent = component_agent[component_name] device_of_agent = config.get_device_for_agent(units[unit_name],agent) # get the index of the current component, agent couple index = units_topologies[unit_name][category]["COMPONENT_AGENTS"].index(component_agent) units_topologies[unit_name][category]["COMPONENT_AGENTS"][index][component_name] = device_of_agent else: # Channels is composed of a list of channel, we're looping through it for channel_name in units_topologies[unit_name]["CHANNELS"] : for component_agent in units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"] : component_name = list(component_agent.keys())[0] agent = component_agent[component_name] device_of_agent = config.get_device_for_agent(units[unit_name],agent) index = units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"].index(component_agent) units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"][index][component_name] = device_of_agent return render(request,"obsconfig/obs_astronomer_config.html", { "units_topologies" : units_topologies }) @login_required @level_required("Admin") def obs_agents_config(request): config = ConfigPyros(os.environ["PATH_TO_OBSCONF_FILE"]) units = config.get_units() units_topologies = {} active_agents_by_unit = {} # for each unit for unit_name in units: agents = config.get_agents(units[unit_name]) # list of active agents of the current unit active_agents_by_unit[unit_name] = config.get_active_agents(config.get_unit_by_name(unit_name)) # topology of the current unit units_topologies[unit_name] = config.get_topology(units[unit_name]) # removing channel_groups, not useful in this view units_topologies[unit_name].pop("CHANNEL_GROUPS") for category in units_topologies[unit_name]: if category != "CHANNELS": # Security and Mount are directly a dictionary containing the attributes of those categories # However, component_agents is a list so we need to iterate through this list for component_agent in units_topologies[unit_name][category]["COMPONENT_AGENTS"]: component_name = list(component_agent.keys())[0] agent = agents[component_agent[component_name]] # get the index of the current component, agent couple index = units_topologies[unit_name][category]["COMPONENT_AGENTS"].index(component_agent) units_topologies[unit_name][category]["COMPONENT_AGENTS"][index][component_name] = agent else: # Channels is composed of a list of channel, we're looping through it for channel_name in units_topologies[unit_name]["CHANNELS"] : for component_agent in units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"] : component_name = list(component_agent.keys())[0] agent = agents[component_agent[component_name]] # get the index of the current component, agent couple index = units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"].index(component_agent) units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"][index][component_name] = agent return render(request,"obsconfig/obs_agents_config.html", { "units_topologies" : units_topologies, "active_agents_by_unit" : active_agents_by_unit}) @login_required @level_required("Admin","Operator","Unit-PI") def edit_config(request): config = ConfigPyros(os.environ["PATH_TO_OBSCONF_FILE"]) return render(request,"obsconfig/edit_config.html",{"config_file":yaml.dump(config.obs_config_file_content)}) @login_required @level_required("Admin","Operator","Unit-PI") def verify_config(request): if request.POST.get("config"): temp_config_file = tempfile.NamedTemporaryFile(mode = 'w+',suffix=".yml") temp_config_file.write(request.POST.get("config")) temp_config_file.seek(0) response_data = {} config_file = yaml.safe_load(temp_config_file.read()) temp_config_file.seek(0) schema = config_file["schema"] errors = [] if schema == None: response["is_valid"] = False response_data["message"] = "Missing schema" schema_path = os.path.join(os.environ["DJANGO_PATH"],"../../../config/schemas/") config = ConfigPyros.check_config(temp_config_file.name,schema_path+schema) if type(config) == bool and config: response_data["is_valid"] = True else: response_data["is_valid"] = False for error in config: errors.append((f"Error : {str(error).split('. Path')[0]}",f"Path to error : '{error.path}'")) response_data["message"] = errors temp_config_file.close() return HttpResponse(json.dumps(response_data), content_type="application/json") @login_required @level_required("Admin","Operator","Unit-PI") def save_config(request): if request.POST: if request.POST["config"]: with open(settings.PATH_TO_OBSCONF_FILE,"w") as obs_config_file: obs_config_file.write(request.POST.get("config")) return HttpResponse("Ok !")