views.py 16.5 KB
from django import conf
from django.shortcuts import render
from .configpyros import ConfigPyros
from django.conf import settings
from src.core.pyros_django.dashboard.decorator import level_required
from django.contrib.auth.decorators import login_required
from django.http import HttpResponse
from datetime import datetime
from django.views.decorators.csrf import csrf_exempt
import re, yaml,tempfile,json,os

def get_nested_dictionaries_as_list(dic,result=[]):
    for key,value in dic.items():
        print(value,type(value))
        print(result)
        if isinstance(value, dict):
            print("recursive call")
            get_nested_dictionaries_as_list(value,result)
        elif isinstance(value,list) and key == "CAPABILITIES":
            print("VALUE OF LIST =======          ",value)
            for element in value:
                #print(element)
                if isinstance(element,dict):
                    get_nested_dictionaries_as_list(element,result)
                
        else:
            result.append((key,value))
    return result

@login_required
@level_required("Admin","Observer","Management","Operator","Unit-PI")
def obs_global_config(request):
    config = ConfigPyros(os.environ["PATH_TO_OBSCONF_FILE"],os.environ["unit_name"])
    units_names = list(config.get_units().keys())
    pickle_file_mtime = os.path.getmtime(config.CONFIG_PATH+config.pickle_file)
    pickle_datetime = datetime.utcfromtimestamp(pickle_file_mtime).strftime("%Y/%m/%d %H:%M:%S")
    CAN_EDIT_CONFIG = request.session.get("role") in ("Admin","Operator","Unit-PI","Unit-board")
    CAN_VIEW_HARDWARE_CONFIG = request.session.get("role") in ("Admin","Operator","Unit-PI","Unit-board")
    CAN_VIEW_ASTRONOMER_CONFIG = request.session.get("role") in ("Admin","Operator","Unit-PI","Observer","Management board manager","Unit-board")
    CAN_VIEW_AGENTS_CONFIG =  request.session.get("role") == "Admin"
    return render(request,"obsconfig/global_obs_configuration.html",{
        "units_names" : units_names,
        "obs_name": config.get_obs_name(),
        "last_modification_time":pickle_datetime,
        "CAN_EDIT_CONFIG":CAN_EDIT_CONFIG,
        "CAN_VIEW_HARDWARE_CONFIG": CAN_VIEW_HARDWARE_CONFIG,
        "CAN_VIEW_ASTRONOMER_CONFIG":CAN_VIEW_ASTRONOMER_CONFIG,
        "CAN_VIEW_AGENTS_CONFIG":CAN_VIEW_AGENTS_CONFIG
    })


@login_required
@level_required("Admin","Unit-PI","Operator")
def obs_hardware_config(request):
    config = ConfigPyros(os.environ["PATH_TO_OBSCONF_FILE"],os.environ["unit_name"])
    devices = config.get_devices()
    active_devices = config.get_active_devices()
    computers = config.get_computers()
    active_computers = config.get_active_computers()
    # give for a device name (key) the unit name where it is used (value)
    device_to_unit = {}
    # give for a computer name (key) the unit name where it is used (value)
    computer_to_unit = {}
    for device in active_devices:
        device_to_unit[device] = config.get_unit_of_device(device)
    for computer in active_computers:
        computer_to_unit[computer] = config.get_unit_of_computer(computer)
    devices_links = config.devices_links
    return render(request, 'obsconfig/obs_hardware_configuration.html', {'devices':devices,"active_devices":active_devices,"computers":computers,"active_computers":active_computers, "device_to_unit" : device_to_unit, "computer_to_unit" : computer_to_unit,"devices_links":devices_links})                              
    
@login_required
@level_required("Admin","Unit-PI","Operator")
def unit_hardware_configuration(request,unit_name):
    config = ConfigPyros(os.environ["PATH_TO_OBSCONF_FILE"],os.environ["unit_name"])
    devices = config.get_devices()
   
    return render(request, 'obsconfig/unit_hardware_configuration.html', {'config':config})                              
    
@login_required
@level_required("Admin","Unit-PI","Operator")
def computer_details(request,computer_name):
    config = ConfigPyros(os.environ["PATH_TO_OBSCONF_FILE"],os.environ["unit_name"])
    computer_detail = yaml.dump(config.get_computers()[computer_name])
    """
    computer_detail = { re.sub("^_","",key):value for key,value in config.get_computers()[computer_name].items() }

     # We're removing "_" at the beginning of each key :
    power = { re.sub("^_","",key):value for key,value in config.get_computer_power(computer_name).items() }
    
    computer_config = computer_detail["computer_config"]
    # We're removing "_" at the beginning of each key :
    computer_detail["computer_config"] = { re.sub("^_","",key):value for key,value in computer_config.items() }
    # Remove power key as we have this information in the same 
    if power is not None:
        computer_detail["computer_config"].pop("power")
    return render(request,"obsconfig/computer_details.html", {"computer_detail" : computer_detail, "power" : power})
    """
    return render(request,"obsconfig/computer_details.html", {"computer_detail" : computer_detail})


@login_required
@level_required("Admin","Unit-PI","Unit-board","Operator","Observer")
def device_details(request,device_name):
    config = ConfigPyros(os.environ["PATH_TO_OBSCONF_FILE"],os.environ["unit_name"])
    # We're removing "_" at the beginning of each key :
    device = config.get_devices()[device_name]
    #device_detail  = yaml.dump(device)
    #config_device = open(config.CONFIG_PATH+device["file"],"r").read()
    devices_links = config.devices_links
    file_name_to_device_name = {v:k for k,v in config.get_devices_names_and_file().items()}
    active_devices = config.get_active_devices()
    """
    Alternative solutions:
        - test_device -> we're using a recursive function to get all nested dicitonaries but we loose a level of 
        "precision" (no separations of capabilities for example)
        - device_detail -> we use multiple function to get specific value (not global enougth, if we had another key we have to change the code)
    """


    device_detail = { re.sub("^_","",key):value for key,value in config.get_devices()[device_name].items()}
    #test_device=  get_nested_dictionaries_as_list(device_detail,[])
    
    if config.get_device_power(device_name) is not None:
        # We're removing "_" at the beginning of each key :
        power = { re.sub("^_","",key):value for key,value in config.get_device_power(device_name).items() }
    else :
        power = None
    if config.get_device_connector(device_name) is not None:
        # We're removing "_" at the beginning of each key :
        connector = { re.sub("^_","",key):value for key,value in config.get_device_connector(device_name).items() }
    else:
        connector = None
    
    capabilities = []
    if config.get_device_capabilities(device_name) is not None:
        copy_capabilities = config.get_device_capabilities(device_name)
        for capability in copy_capabilities:
            # We're removing "_" at the beginning of each key :
            capabilities.append( { re.sub("^_","",key):value for key,value in capability.items() })
    
        
    device_config = device_detail["device_config"]
    # We're removing "_" at the beginning of each key :
    device_detail["device_config"] = { re.sub("^_","",key):value for key,value in device_config.items() }
    # Remove power key as we have this information in the same 
    if power is not None:
        device_detail["device_config"].pop("power")
    if connector is not None:
        device_detail["device_config"].pop("connector")
    if len(capabilities)!= 0:
        device_detail["device_config"].pop("CAPABILITIES")
    return render(request,"obsconfig/device_details.html", { "device_detail" : device_detail, "power" : power, "connector" : connector, "capabilities" : capabilities,"devices_links":devices_links,"file_name_to_device_name": file_name_to_device_name,"active_devices":active_devices} )
    
    #return render(request,"obsconfig/device_details.html", { "device_detail" : device_detail, "config" : config_device })

@login_required
@level_required("Admin","Observer","Management","Operator","Unit-PI","TAC")
def obs_astronomer_config(request):
    config = ConfigPyros(os.environ["PATH_TO_OBSCONF_FILE"],os.environ["unit_name"])
    units = config.get_units()
    units_topologies = {}
    # for each unit
    for unit_name in units:
       
        units_topologies[unit_name] = config.get_topology(unit_name)
        # removing channel_groups, not useful in this view
        units_topologies[unit_name].pop("CHANNEL_GROUPS")
        # for each category (security, mount, channels)
        for category in units_topologies[unit_name]:
            if category != "CHANNELS":
                # Security and Mount are directly a dictionary containing the attributes of those categories
                # However, component_agents is a list so we need to iterate through this list
                for component_agent in units_topologies[unit_name][category]["COMPONENT_AGENTS"]:
                    component_name = list(component_agent.keys())[0]
                    agent = component_agent[component_name]
                    device_of_agent = config.get_device_for_agent(unit_name,agent)
                    # get the index of the current component, agent couple
                    index = units_topologies[unit_name][category]["COMPONENT_AGENTS"].index(component_agent)
                    units_topologies[unit_name][category]["COMPONENT_AGENTS"][index][component_name] = device_of_agent
            else:
                # Channels is composed of a list of channel, we're looping through it
                for channel_name in units_topologies[unit_name]["CHANNELS"] :
                    for component_agent in units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"] :
                        component_name = list(component_agent.keys())[0]
                        agent = component_agent[component_name]
                        device_of_agent = config.get_device_for_agent(unit_name,agent)
                        index = units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"].index(component_agent)
                        units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"][index][component_name] = device_of_agent

    return render(request,"obsconfig/obs_astronomer_config.html", { "units_topologies" : units_topologies })


@login_required
@level_required("Admin")
def obs_agents_config(request):
    config = ConfigPyros(os.environ["PATH_TO_OBSCONF_FILE"],os.environ["unit_name"])
    units = config.get_units()
    units_topologies = {}
    active_agents_by_unit = {}
    # for each unit
    for unit_name in units:
        agents = config.get_agents(unit_name)
        # list of active agents of the current unit
        active_agents_by_unit[unit_name] = config.get_active_agents(unit_name)
        # topology of the current unit
        units_topologies[unit_name] = config.get_topology(unit_name)
        # removing channel_groups, not useful in this view
        units_topologies[unit_name].pop("CHANNEL_GROUPS")
        for category in units_topologies[unit_name]:
            if category != "CHANNELS":
                # Security and Mount are directly a dictionary containing the attributes of those categories
                # However, component_agents is a list so we need to iterate through this list
                for component_agent in units_topologies[unit_name][category]["COMPONENT_AGENTS"]:
                    component_name = list(component_agent.keys())[0]
                    agent = agents[component_agent[component_name]]
                     # get the index of the current component, agent couple
                    index = units_topologies[unit_name][category]["COMPONENT_AGENTS"].index(component_agent)
                    units_topologies[unit_name][category]["COMPONENT_AGENTS"][index][component_name] = agent
            else:
                # Channels is composed of a list of channel, we're looping through it
                for channel_name in units_topologies[unit_name]["CHANNELS"] :
                    for component_agent in units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"] :
                        component_name = list(component_agent.keys())[0]
                        agent = agents[component_agent[component_name]]
                         # get the index of the current component, agent couple
                        index = units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"].index(component_agent)
                        units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"][index][component_name] = agent

    return render(request,"obsconfig/obs_agents_config.html", { "units_topologies" : units_topologies, "active_agents_by_unit" : active_agents_by_unit})


@login_required
@level_required("Admin","Operator","Unit-PI","Unit-board")
def edit_config(request):
    config = ConfigPyros(os.environ["PATH_TO_OBSCONF_FILE"],os.environ["unit_name"])
    return render(request,"obsconfig/edit_config.html",{"config_file":config.raw_config})

@login_required
@level_required("Admin","Operator","Unit-PI","Unit-board")
def verify_config(request):
    if request.POST.get("config"):
        temp_config_file = tempfile.NamedTemporaryFile(mode = 'w+',suffix=".yml")
        temp_config_file.write(request.POST.get("config"))
        temp_config_file.seek(0)
        response_data = {}
        try:
            config_file = yaml.safe_load(temp_config_file.read())
        except yaml.YAMLError as exc:
            if hasattr(exc, 'problem_mark'):
                yaml_error_message = ""
                if exc.context != None:
                    yaml_error_message += str(exc.problem_mark) + '\n  ' + str(exc.problem) + ' ' + str(exc.context) 
                else:
                    yaml_error_message = str(exc.problem_mark.name) + '\n  ' +str(exc.problem)
                response_data["is_valid"] = False
                response_data["yaml_error_message"] = yaml_error_message
            return HttpResponse(json.dumps(response_data), content_type="application/json")
        temp_config_file.seek(0)
        schema = config_file["schema"]
        errors = []
        if schema == None:
            response_data["is_valid"] = False
            response_data["message"] = "Missing schema"
        schema_path = os.path.join(os.environ["DJANGO_PATH"],"../../../config/schemas/")
        config = ConfigPyros.check_config(temp_config_file.name,schema_path+schema)
        if type(config) == bool and config:
            response_data["is_valid"] = True
        else:    
            response_data["is_valid"] = False
            for error in config:
                errors.append((f"Error : {str(error).split('. Path')[0]}",f"Path to error : '{error.path}'"))
            response_data["message"] = errors
        temp_config_file.close()
        return HttpResponse(json.dumps(response_data), content_type="application/json")

@login_required
@level_required("Admin","Operator","Unit-PI","Unit-board")
def save_config(request):
    if request.POST:
        if request.POST["config"]:
            with open(os.environ["PATH_TO_OBSCONF_FILE"],"w") as obs_config_file:
                obs_config_file.write(request.POST.get("config"))


            return HttpResponse("Ok !")

@login_required
@level_required("Admin","Unit-PI","Unit-board","Operator","Observer")
@csrf_exempt
def view_raw_component_config_file(request):
    COMPONENT_PATH = os.path.join(os.environ["DJANGO_PATH"],"../../../config/components/")

    if request.POST:
        try:
            yaml_file = open(COMPONENT_PATH+request.POST["yaml_file"])
            content = yaml_file.readlines()
        except:
            content = "Component defined within the device configuration file"
        return HttpResponse(content)

@login_required
@level_required("Admin","Unit-PI","Unit-board","Operator","Observer")
@csrf_exempt
def view_raw_generic_device_config_file(request):
    GENERIC_DEVICES_PATH = os.path.join(os.environ["DJANGO_PATH"],"../../../config/devices/")
    if request.POST:
        yaml_file = open(GENERIC_DEVICES_PATH+request.POST["yaml_file"])
        content = yaml_file.readlines()
        return HttpResponse(content)

@login_required
@level_required("Admin","Unit-PI","Unit-board","Operator","Observer")
@csrf_exempt
def view_raw_device_config_file(request):
    obs_folder = os.environ["PATH_TO_OBSCONF_FOLDER"]
    if request.POST:
        yaml_file = open(obs_folder+request.POST["yaml_file"])
        content = yaml_file.readlines()
        return HttpResponse(content)

@login_required
@level_required("Admin","Unit-PI","Unit-board","Operator","Observer")
def obs_config_help(request):
    return render(request,"obsconfig/obs_config_help.html")