from collections import OrderedDict from django import conf from django.shortcuts import render from .obsconfig_class import OBSConfig from django.conf import settings from src.core.pyros_django.dashboard.decorator import level_required from django.contrib.auth.decorators import login_required from django.http import HttpResponse from datetime import datetime from django.views.decorators.csrf import csrf_exempt import re import yaml import tempfile import json import os def get_nested_dictionaries_as_list(dic, result=[]): for key, value in dic.items(): print(value, type(value)) print(result) if isinstance(value, dict): print("recursive call") get_nested_dictionaries_as_list(value, result) elif isinstance(value, list) and key == "CAPABILITIES": print("VALUE OF LIST ======= ", value) for element in value: # print(element) if isinstance(element, dict): get_nested_dictionaries_as_list(element, result) else: result.append((key, value)) return result @login_required @level_required("Admin", "Observer", "Management", "Operator", "Unit-PI") def obs_global_config(request): config = OBSConfig( os.environ["PATH_TO_OBSCONF_FILE"], os.environ["unit_name"]) units_names = list(config.get_units().keys()) pickle_file_mtime = os.path.getmtime(config.CONFIG_PATH+config.pickle_file) pickle_datetime = datetime.utcfromtimestamp( pickle_file_mtime).strftime("%Y/%m/%d %H:%M:%S") CAN_EDIT_CONFIG = request.session.get("role") in ( "Admin", "Operator", "Unit-PI", "Unit-board") CAN_VIEW_HARDWARE_CONFIG = request.session.get("role") in ( "Admin", "Operator", "Unit-PI", "Unit-board") CAN_VIEW_ASTRONOMER_CONFIG = request.session.get("role") in ( "Admin", "Operator", "Unit-PI", "Observer", "Management board manager", "Unit-board") CAN_VIEW_AGENTS_CONFIG = request.session.get("role") == "Admin" return render(request, "obsconfig/global_obs_configuration.html", { "units_names": units_names, "obs_name": config.get_obs_name(), "last_modification_time": pickle_datetime, "CAN_EDIT_CONFIG": CAN_EDIT_CONFIG, "CAN_VIEW_HARDWARE_CONFIG": CAN_VIEW_HARDWARE_CONFIG, "CAN_VIEW_ASTRONOMER_CONFIG": CAN_VIEW_ASTRONOMER_CONFIG, "CAN_VIEW_AGENTS_CONFIG": CAN_VIEW_AGENTS_CONFIG }) @login_required @level_required("Admin", "Unit-PI", "Operator") def obs_hardware_config(request): config = OBSConfig( os.environ["PATH_TO_OBSCONF_FILE"], os.environ["unit_name"]) devices = config.get_devices() active_devices = config.get_active_devices() computers = config.get_computers() active_computers = config.get_active_computers() # give for a device name (key) the unit name where it is used (value) device_to_unit = {} # give for a computer name (key) the unit name where it is used (value) computer_to_unit = {} for device in active_devices: device_to_unit[device] = config.get_unit_of_device(device) for computer in active_computers: computer_to_unit[computer] = config.get_unit_of_computer(computer) devices_links = config.devices_links return render(request, 'obsconfig/obs_hardware_configuration.html', {'devices': devices, "active_devices": active_devices, "computers": computers, "active_computers": active_computers, "device_to_unit": device_to_unit, "computer_to_unit": computer_to_unit, "devices_links": devices_links}) @login_required @level_required("Admin", "Unit-PI", "Operator") def unit_hardware_configuration(request, unit_name): config = OBSConfig( os.environ["PATH_TO_OBSCONF_FILE"], os.environ["unit_name"]) devices = config.get_devices() return render(request, 'obsconfig/unit_hardware_configuration.html', {'config': config}) @login_required @level_required("Admin", "Unit-PI", "Operator") def computer_details(request, computer_name): config = OBSConfig( os.environ["PATH_TO_OBSCONF_FILE"], os.environ["unit_name"]) computer_detail = yaml.dump(config.get_computers()[computer_name]) """ computer_detail = { re.sub("^_","",key):value for key, value in config.get_computers()[computer_name].items() } # We're removing "_" at the beginning of each key : power = { re.sub("^_","",key):value for key, value in config.get_computer_power(computer_name).items() } computer_config = computer_detail["computer_config"] # We're removing "_" at the beginning of each key : computer_detail["computer_config"] = { re.sub("^_","",key):value for key,value in computer_config.items() } # Remove power key as we have this information in the same if power is not None: computer_detail["computer_config"].pop("power") return render(request,"obsconfig/computer_details.html", {"computer_detail" : computer_detail, "power" : power}) """ return render(request, "obsconfig/computer_details.html", {"computer_detail": computer_detail}) @login_required @level_required("Admin", "Unit-PI", "Operator", "Observer") def device_details(request, device_name): config = OBSConfig( os.environ["PATH_TO_OBSCONF_FILE"], os.environ["unit_name"]) # We're removing "_" at the beginning of each key : device = config.get_devices()[device_name] # device_detail = yaml.dump(device) # config_device = open(config.CONFIG_PATH+device["file"],"r").read() devices_links = config.devices_links file_name_to_device_name = { v: k for k, v in config.get_devices_names_and_file().items()} active_devices = config.get_active_devices() """ Alternative solutions: - test_device -> we're using a recursive function to get all nested dicitonaries but we loose a level of "precision" (no separations of capabilities for example) - device_detail -> we use multiple function to get specific value (not global enougth, if we had another key we have to change the code) """ device_detail = {re.sub("^_", "", key): value for key, value in config.get_devices()[device_name].items()} # test_device= get_nested_dictionaries_as_list(device_detail,[]) if config.get_device_power(device_name) is not None: # We're removing "_" at the beginning of each key : power = {re.sub("^_", "", key): value for key, value in config.get_device_power(device_name).items()} else: power = None if config.get_device_connector(device_name) is not None: # We're removing "_" at the beginning of each key : connector = {re.sub("^_", "", key): value for key, value in config.get_device_connector(device_name).items()} else: connector = None capabilities = [] if config.get_device_capabilities(device_name) is not None: copy_capabilities = config.get_device_capabilities(device_name) for capability in copy_capabilities: # We're removing "_" at the beginning of each key : capabilities.append( {re.sub("^_", "", key): value for key, value in capability.items()}) device_config = device_detail["device_config"] # We're removing "_" at the beginning of each key : device_detail["device_config"] = { re.sub("^_", "", key): value for key, value in device_config.items()} # Remove power key as we have this information in the same if power is not None: device_detail["device_config"].pop("power") if connector is not None: device_detail["device_config"].pop("connector") if len(capabilities) != 0: device_detail["device_config"].pop("CAPABILITIES") return render(request, "obsconfig/device_details.html", {"device_detail": device_detail, "power": power, "connector": connector, "capabilities": capabilities, "devices_links": devices_links, "file_name_to_device_name": file_name_to_device_name, "active_devices": active_devices}) # return render(request,"obsconfig/device_details.html", { "device_detail" : device_detail, "config" : config_device }) @login_required @level_required("Admin", "Observer", "Management", "Operator", "Unit-PI", "TAC") def obs_astronomer_config(request): config = OBSConfig( os.environ["PATH_TO_OBSCONF_FILE"], os.environ["unit_name"]) units = config.get_units() units_topologies = {} # for each unit for unit_name in units: units_topologies[unit_name] = config.get_topology(unit_name) layouts = units_topologies[unit_name].pop("LAYOUTS")["layouts"] albums = units_topologies[unit_name].pop("ALBUMS")["albums"] # for each category (security, mount, channels) for category in units_topologies[unit_name]: if category != "CHANNELS": # Security and Mount are directly a dictionary containing the attributes of those categories # However, component_agents is a list so we need to iterate through this list for component_agent in units_topologies[unit_name][category]["COMPONENT_AGENTS"]: component_name = list(component_agent.keys())[0] agent = component_agent[component_name] device_of_agent = config.get_device_for_agent( unit_name, agent) # get the index of the current component, agent couple index = units_topologies[unit_name][category]["COMPONENT_AGENTS"].index( component_agent) units_topologies[unit_name][category]["COMPONENT_AGENTS"][index][component_name] = device_of_agent else: # Channels is composed of a list of channel, we're looping through it for channel_name in units_topologies[unit_name]["CHANNELS"]: for component_agent in units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"]: component_name = list(component_agent.keys())[0] agent = component_agent[component_name] device_of_agent = config.get_device_for_agent( unit_name, agent) index = units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"].index( component_agent) units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"][index][component_name] = device_of_agent # Re add layouts and albums units_topologies[unit_name]["LAYOUTS"] = layouts units_topologies[unit_name]["ALBUMS"] = albums order_of_keys = ["SECURITY", "MOUNT", "LAYOUTS", "ALBUMS", "CHANNELS"] list_of_tuples = [(key, units_topologies[unit_name][key]) for key in order_of_keys] units_topologies[unit_name] = OrderedDict(list_of_tuples) return render(request, "obsconfig/obs_astronomer_config.html", { "units_topologies": units_topologies, }) @ login_required @ level_required("Admin") def obs_agents_config(request): config = OBSConfig( os.environ["PATH_TO_OBSCONF_FILE"], os.environ["unit_name"]) units = config.get_units() units_topologies = {} active_agents_by_unit = {} # for each unit for unit_name in units: agents = config.get_agents(unit_name) # list of active agents of the current unit active_agents_by_unit[unit_name] = config.get_active_agents(unit_name) # topology of the current unit units_topologies[unit_name] = config.get_topology(unit_name) # removing albums and layouts, not useful in this view units_topologies[unit_name].pop("LAYOUTS") units_topologies[unit_name].pop("ALBUMS") for category in units_topologies[unit_name]: if category != "CHANNELS": # Security and Mount are directly a dictionary containing the attributes of those categories # However, component_agents is a list so we need to iterate through this list for component_agent in units_topologies[unit_name][category]["COMPONENT_AGENTS"]: component_name = list(component_agent.keys())[0] agent = agents[component_agent[component_name]] # get the index of the current component, agent couple index = units_topologies[unit_name][category]["COMPONENT_AGENTS"].index( component_agent) units_topologies[unit_name][category]["COMPONENT_AGENTS"][index][component_name] = agent else: # Channels is composed of a list of channel, we're looping through it for channel_name in units_topologies[unit_name]["CHANNELS"]: for component_agent in units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"]: component_name = list(component_agent.keys())[0] agent = agents[component_agent[component_name]] # get the index of the current component, agent couple index = units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"].index( component_agent) units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"][index][component_name] = agent agents = config.get_agents_per_computer(config.unit_name) computers = {} for computer in config.get_computers(): computers[config.get_computers().get(computer).get("computer_config").get("hostname")] = computer units_topologies[unit_name]["COMPUTERS"] = agents return render(request, "obsconfig/obs_agents_config.html", {"units_topologies": units_topologies, "active_agents_by_unit": active_agents_by_unit,"computers":computers}) @ login_required @ level_required("Admin", "Operator", "Unit-PI", "Unit-board") def edit_config(request): config = OBSConfig( os.environ["PATH_TO_OBSCONF_FILE"], os.environ["unit_name"]) return render(request, "obsconfig/edit_config.html", {"config_file": config.raw_config}) @ login_required @ level_required("Admin", "Operator", "Unit-PI", "Unit-board") def verify_config(request): if request.POST.get("config"): temp_config_file = tempfile.NamedTemporaryFile( mode='w+', suffix=".yml") temp_config_file.write(request.POST.get("config")) temp_config_file.seek(0) response_data = {} try: config_file = yaml.safe_load(temp_config_file.read()) except yaml.YAMLError as exc: if hasattr(exc, 'problem_mark'): yaml_error_message = "" if exc.context != None: yaml_error_message += str(exc.problem_mark) + \ '\n ' + str(exc.problem) + ' ' + str(exc.context) else: yaml_error_message = str( exc.problem_mark.name) + '\n ' + str(exc.problem) response_data["is_valid"] = False response_data["yaml_error_message"] = yaml_error_message return HttpResponse(json.dumps(response_data), content_type="application/json") temp_config_file.seek(0) schema = config_file["schema"] errors = [] if schema == None: response_data["is_valid"] = False response_data["message"] = "Missing schema" schema_path = os.path.join( os.environ["DJANGO_PATH"], "../../../config/schemas/") config = OBSConfig.check_config( temp_config_file.name, schema_path+schema) if type(config) == bool and config: response_data["is_valid"] = True else: response_data["is_valid"] = False for error in config: errors.append( (f"Error : {str(error).split('. Path')[0]}", f"Path to error : '{error.path}'")) response_data["message"] = errors temp_config_file.close() return HttpResponse(json.dumps(response_data), content_type="application/json") @ login_required @ level_required("Admin", "Operator", "Unit-PI", "Unit-board") def save_config(request): if request.POST: if request.POST["config"]: with open(os.environ["PATH_TO_OBSCONF_FILE"], "w") as obs_config_file: obs_config_file.write(request.POST.get("config")) return HttpResponse("Ok !") @ login_required @ level_required("Admin", "Unit-PI", "Operator", "Observer") @ csrf_exempt def view_raw_component_config_file(request): COMPONENT_PATH = os.path.join( os.environ["DJANGO_PATH"], "../../../config/components/") if request.POST: try: yaml_file = open(COMPONENT_PATH+request.POST["yaml_file"]) content = yaml_file.readlines() except: content = "Component defined within the device configuration file" return HttpResponse(content) @ login_required @ level_required("Admin", "Unit-PI", "Operator", "Observer") @ csrf_exempt def view_raw_generic_device_config_file(request): GENERIC_DEVICES_PATH = os.path.join( os.environ["DJANGO_PATH"], "../../../config/devices/") if request.POST: yaml_file = open(GENERIC_DEVICES_PATH+request.POST["yaml_file"]) content = yaml_file.readlines() return HttpResponse(content) @ login_required @ level_required("Admin", "Unit-PI", "Operator", "Observer") @ csrf_exempt def view_raw_device_config_file(request): obs_folder = os.environ["PATH_TO_OBSCONF_FOLDER"] if request.POST: yaml_file = open(obs_folder+request.POST["yaml_file"]) content = yaml_file.readlines() return HttpResponse(content) @ login_required @ level_required("Admin", "Unit-PI", "Operator", "Observer") def obs_config_help(request): return render(request, "obsconfig/obs_config_help.html")