from django import conf from django.shortcuts import render from .configpyros import ConfigPyros from django.conf import settings from src.core.pyros_django.dashboard.decorator import level_required from django.contrib.auth.decorators import login_required from django.http import HttpResponse from datetime import datetime from django.views.decorators.csrf import csrf_exempt import re, yaml,tempfile,json,os def get_nested_dictionaries_as_list(dic,result=[]): for key,value in dic.items(): print(value,type(value)) print(result) if isinstance(value, dict): print("recursive call") get_nested_dictionaries_as_list(value,result) elif isinstance(value,list) and key == "CAPABILITIES": print("VALUE OF LIST ======= ",value) for element in value: #print(element) if isinstance(element,dict): get_nested_dictionaries_as_list(element,result) else: result.append((key,value)) return result @login_required @level_required("Admin","Observer","Management","Operator","Unit-PI") def obs_global_config(request): config = ConfigPyros(os.environ["PATH_TO_OBSCONF_FILE"]) units_names = list(config.get_units().keys()) pickle_file_mtime = os.path.getmtime(config.CONFIG_PATH+config.pickle_file) pickle_datetime = datetime.utcfromtimestamp(pickle_file_mtime).strftime("%Y/%m/%d %H:%M:%S") CAN_EDIT_CONFIG = request.session.get("role") in ("Admin","Operator","Unit-PI","Unit-board") CAN_VIEW_HARDWARE_CONFIG = request.session.get("role") in ("Admin","Operator","Unit-PI","Unit-board") CAN_VIEW_ASTRONOMER_CONFIG = request.session.get("role") in ("Admin","Operator","Unit-PI","Observer","Management board manager","Unit-board") CAN_VIEW_AGENTS_CONFIG = request.session.get("role") == "Admin" return render(request,"obsconfig/global_obs_configuration.html",{ "units_names" : units_names, "obs_name": config.get_obs_name(), "last_modification_time":pickle_datetime, "CAN_EDIT_CONFIG":CAN_EDIT_CONFIG, "CAN_VIEW_HARDWARE_CONFIG": CAN_VIEW_HARDWARE_CONFIG, "CAN_VIEW_ASTRONOMER_CONFIG":CAN_VIEW_ASTRONOMER_CONFIG, "CAN_VIEW_AGENTS_CONFIG":CAN_VIEW_AGENTS_CONFIG }) @login_required @level_required("Admin","Unit-PI","Operator") def obs_hardware_config(request): config = ConfigPyros(os.environ["PATH_TO_OBSCONF_FILE"]) devices = config.get_devices() active_devices = config.get_active_devices() computers = config.get_computers() active_computers = config.get_active_computers() # give for a device name (key) the unit name where it is used (value) device_to_unit = {} # give for a computer name (key) the unit name where it is used (value) computer_to_unit = {} for device in active_devices: device_to_unit[device] = config.get_unit_of_device(device) for computer in active_computers: computer_to_unit[computer] = config.get_unit_of_computer(computer) devices_links = config.devices_links return render(request, 'obsconfig/obs_hardware_configuration.html', {'devices':devices,"active_devices":active_devices,"computers":computers,"active_computers":active_computers, "device_to_unit" : device_to_unit, "computer_to_unit" : computer_to_unit,"devices_links":devices_links}) @login_required @level_required("Admin","Unit-PI","Operator") def unit_hardware_configuration(request,unit_name): config = ConfigPyros(os.environ["PATH_TO_OBSCONF_FILE"]) devices = config.get_devices() return render(request, 'obsconfig/unit_hardware_configuration.html', {'config':config}) @login_required @level_required("Admin","Unit-PI","Operator") def computer_details(request,computer_name): config = ConfigPyros(os.environ["PATH_TO_OBSCONF_FILE"]) computer_detail = yaml.dump(config.get_computers()[computer_name]) """ computer_detail = { re.sub("^_","",key):value for key,value in config.get_computers()[computer_name].items() } # We're removing "_" at the beginning of each key : power = { re.sub("^_","",key):value for key,value in config.get_computer_power(computer_name).items() } computer_config = computer_detail["computer_config"] # We're removing "_" at the beginning of each key : computer_detail["computer_config"] = { re.sub("^_","",key):value for key,value in computer_config.items() } # Remove power key as we have this information in the same if power is not None: computer_detail["computer_config"].pop("power") return render(request,"obsconfig/computer_details.html", {"computer_detail" : computer_detail, "power" : power}) """ return render(request,"obsconfig/computer_details.html", {"computer_detail" : computer_detail}) @login_required @level_required("Admin","Unit-PI","Unit-board","Operator","Observer") def device_details(request,device_name): config = ConfigPyros(os.environ["PATH_TO_OBSCONF_FILE"]) # We're removing "_" at the beginning of each key : device = config.get_devices()[device_name] #device_detail = yaml.dump(device) #config_device = open(config.CONFIG_PATH+device["file"],"r").read() devices_links = config.devices_links file_name_to_device_name = {v:k for k,v in config.get_devices_names_and_file().items()} active_devices = config.get_active_devices() """ Alternative solutions: - test_device -> we're using a recursive function to get all nested dicitonaries but we loose a level of "precision" (no separations of capabilities for example) - device_detail -> we use multiple function to get specific value (not global enougth, if we had another key we have to change the code) """ device_detail = { re.sub("^_","",key):value for key,value in config.get_devices()[device_name].items()} #test_device= get_nested_dictionaries_as_list(device_detail,[]) if config.get_device_power(device_name) is not None: # We're removing "_" at the beginning of each key : power = { re.sub("^_","",key):value for key,value in config.get_device_power(device_name).items() } else : power = None if config.get_device_connector(device_name) is not None: # We're removing "_" at the beginning of each key : connector = { re.sub("^_","",key):value for key,value in config.get_device_connector(device_name).items() } else: connector = None capabilities = [] if config.get_device_capabilities(device_name) is not None: copy_capabilities = config.get_device_capabilities(device_name) for capability in copy_capabilities: # We're removing "_" at the beginning of each key : capabilities.append( { re.sub("^_","",key):value for key,value in capability.items() }) device_config = device_detail["device_config"] # We're removing "_" at the beginning of each key : device_detail["device_config"] = { re.sub("^_","",key):value for key,value in device_config.items() } # Remove power key as we have this information in the same if power is not None: device_detail["device_config"].pop("power") if connector is not None: device_detail["device_config"].pop("connector") if len(capabilities)!= 0: device_detail["device_config"].pop("CAPABILITIES") return render(request,"obsconfig/device_details.html", { "device_detail" : device_detail, "power" : power, "connector" : connector, "capabilities" : capabilities,"devices_links":devices_links,"file_name_to_device_name": file_name_to_device_name,"active_devices":active_devices} ) #return render(request,"obsconfig/device_details.html", { "device_detail" : device_detail, "config" : config_device }) @login_required @level_required("Admin","Observer","Management","Operator","Unit-PI","TAC") def obs_astronomer_config(request): config = ConfigPyros(os.environ["PATH_TO_OBSCONF_FILE"]) units = config.get_units() units_topologies = {} # for each unit for unit_name in units: units_topologies[unit_name] = config.get_topology(units[unit_name]) # removing channel_groups, not useful in this view units_topologies[unit_name].pop("CHANNEL_GROUPS") # for each category (security, mount, channels) for category in units_topologies[unit_name]: if category != "CHANNELS": # Security and Mount are directly a dictionary containing the attributes of those categories # However, component_agents is a list so we need to iterate through this list for component_agent in units_topologies[unit_name][category]["COMPONENT_AGENTS"]: component_name = list(component_agent.keys())[0] agent = component_agent[component_name] device_of_agent = config.get_device_for_agent(units[unit_name],agent) # get the index of the current component, agent couple index = units_topologies[unit_name][category]["COMPONENT_AGENTS"].index(component_agent) units_topologies[unit_name][category]["COMPONENT_AGENTS"][index][component_name] = device_of_agent else: # Channels is composed of a list of channel, we're looping through it for channel_name in units_topologies[unit_name]["CHANNELS"] : for component_agent in units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"] : component_name = list(component_agent.keys())[0] agent = component_agent[component_name] device_of_agent = config.get_device_for_agent(units[unit_name],agent) index = units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"].index(component_agent) units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"][index][component_name] = device_of_agent return render(request,"obsconfig/obs_astronomer_config.html", { "units_topologies" : units_topologies }) @login_required @level_required("Admin") def obs_agents_config(request): config = ConfigPyros(os.environ["PATH_TO_OBSCONF_FILE"]) units = config.get_units() units_topologies = {} active_agents_by_unit = {} # for each unit for unit_name in units: agents = config.get_agents(units[unit_name]) # list of active agents of the current unit active_agents_by_unit[unit_name] = config.get_active_agents(config.get_unit_by_name(unit_name)) # topology of the current unit units_topologies[unit_name] = config.get_topology(units[unit_name]) # removing channel_groups, not useful in this view units_topologies[unit_name].pop("CHANNEL_GROUPS") for category in units_topologies[unit_name]: if category != "CHANNELS": # Security and Mount are directly a dictionary containing the attributes of those categories # However, component_agents is a list so we need to iterate through this list for component_agent in units_topologies[unit_name][category]["COMPONENT_AGENTS"]: component_name = list(component_agent.keys())[0] agent = agents[component_agent[component_name]] # get the index of the current component, agent couple index = units_topologies[unit_name][category]["COMPONENT_AGENTS"].index(component_agent) units_topologies[unit_name][category]["COMPONENT_AGENTS"][index][component_name] = agent else: # Channels is composed of a list of channel, we're looping through it for channel_name in units_topologies[unit_name]["CHANNELS"] : for component_agent in units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"] : component_name = list(component_agent.keys())[0] agent = agents[component_agent[component_name]] # get the index of the current component, agent couple index = units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"].index(component_agent) units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"][index][component_name] = agent return render(request,"obsconfig/obs_agents_config.html", { "units_topologies" : units_topologies, "active_agents_by_unit" : active_agents_by_unit}) @login_required @level_required("Admin","Operator","Unit-PI","Unit-board") def edit_config(request): config = ConfigPyros(os.environ["PATH_TO_OBSCONF_FILE"]) return render(request,"obsconfig/edit_config.html",{"config_file":config.raw_config}) @login_required @level_required("Admin","Operator","Unit-PI","Unit-board") def verify_config(request): if request.POST.get("config"): temp_config_file = tempfile.NamedTemporaryFile(mode = 'w+',suffix=".yml") temp_config_file.write(request.POST.get("config")) temp_config_file.seek(0) response_data = {} try: config_file = yaml.safe_load(temp_config_file.read()) except yaml.YAMLError as exc: if hasattr(exc, 'problem_mark'): yaml_error_message = "" if exc.context != None: yaml_error_message += str(exc.problem_mark) + '\n ' + str(exc.problem) + ' ' + str(exc.context) else: yaml_error_message = str(exc.problem_mark.name) + '\n ' +str(exc.problem) response_data["is_valid"] = False response_data["yaml_error_message"] = yaml_error_message return HttpResponse(json.dumps(response_data), content_type="application/json") temp_config_file.seek(0) schema = config_file["schema"] errors = [] if schema == None: response_data["is_valid"] = False response_data["message"] = "Missing schema" schema_path = os.path.join(os.environ["DJANGO_PATH"],"../../../config/schemas/") config = ConfigPyros.check_config(temp_config_file.name,schema_path+schema) if type(config) == bool and config: response_data["is_valid"] = True else: response_data["is_valid"] = False for error in config: errors.append((f"Error : {str(error).split('. Path')[0]}",f"Path to error : '{error.path}'")) response_data["message"] = errors temp_config_file.close() return HttpResponse(json.dumps(response_data), content_type="application/json") @login_required @level_required("Admin","Operator","Unit-PI","Unit-board") def save_config(request): if request.POST: if request.POST["config"]: with open(os.environ["PATH_TO_OBSCONF_FILE"],"w") as obs_config_file: obs_config_file.write(request.POST.get("config")) return HttpResponse("Ok !") @login_required @level_required("Admin","Unit-PI","Unit-board","Operator","Observer") @csrf_exempt def view_raw_component_config_file(request): COMPONENT_PATH = os.path.join(os.environ["DJANGO_PATH"],"../../../config/components/") if request.POST: try: yaml_file = open(COMPONENT_PATH+request.POST["yaml_file"]) content = yaml_file.readlines() except: content = "Component defined within the device configuration file" return HttpResponse(content) @login_required @level_required("Admin","Unit-PI","Unit-board","Operator","Observer") @csrf_exempt def view_raw_generic_device_config_file(request): GENERIC_DEVICES_PATH = os.path.join(os.environ["DJANGO_PATH"],"../../../config/devices/") if request.POST: yaml_file = open(GENERIC_DEVICES_PATH+request.POST["yaml_file"]) content = yaml_file.readlines() return HttpResponse(content) @login_required @level_required("Admin","Unit-PI","Unit-board","Operator","Observer") @csrf_exempt def view_raw_device_config_file(request): obs_folder = os.environ["PATH_TO_OBSCONF_FOLDER"] if request.POST: yaml_file = open(obs_folder+request.POST["yaml_file"]) content = yaml_file.readlines() return HttpResponse(content) @login_required @level_required("Admin","Unit-PI","Unit-board","Operator","Observer") def obs_config_help(request): return render(request,"obsconfig/obs_config_help.html")