views.py 17.5 KB
from collections import OrderedDict
from django import conf
from django.shortcuts import render
from .obsconfig_class import OBSConfig
from django.conf import settings
from src.core.pyros_django.dashboard.decorator import level_required
from django.contrib.auth.decorators import login_required
from django.http import HttpResponse
from datetime import datetime
from django.views.decorators.csrf import csrf_exempt
import re
import yaml
import tempfile
import json
import os


def get_nested_dictionaries_as_list(dic, result=[]):
    for key, value in dic.items():
        print(value, type(value))
        print(result)
        if isinstance(value, dict):
            print("recursive call")
            get_nested_dictionaries_as_list(value, result)
        elif isinstance(value, list) and key == "CAPABILITIES":
            print("VALUE OF LIST =======          ", value)
            for element in value:
                # print(element)
                if isinstance(element, dict):
                    get_nested_dictionaries_as_list(element, result)

        else:
            result.append((key, value))
    return result


@login_required
@level_required("Admin", "Observer", "Management", "Operator", "Unit-PI")
def obs_global_config(request):
    config = OBSConfig(
        os.environ["PATH_TO_OBSCONF_FILE"], os.environ["unit_name"])
    units_names = list(config.get_units().keys())
    pickle_file_mtime = os.path.getmtime(config.CONFIG_PATH+config.pickle_file)
    pickle_datetime = datetime.utcfromtimestamp(
        pickle_file_mtime).strftime("%Y/%m/%d %H:%M:%S")
    CAN_EDIT_CONFIG = request.session.get("role") in (
        "Admin", "Operator", "Unit-PI", "Unit-board")
    CAN_VIEW_HARDWARE_CONFIG = request.session.get("role") in (
        "Admin", "Operator", "Unit-PI", "Unit-board")
    CAN_VIEW_ASTRONOMER_CONFIG = request.session.get("role") in (
        "Admin", "Operator", "Unit-PI", "Observer", "Management board manager", "Unit-board")
    CAN_VIEW_AGENTS_CONFIG = request.session.get("role") == "Admin"
    return render(request, "obsconfig/global_obs_configuration.html", {
        "units_names": units_names,
        "obs_name": config.get_obs_name(),
        "last_modification_time": pickle_datetime,
        "CAN_EDIT_CONFIG": CAN_EDIT_CONFIG,
        "CAN_VIEW_HARDWARE_CONFIG": CAN_VIEW_HARDWARE_CONFIG,
        "CAN_VIEW_ASTRONOMER_CONFIG": CAN_VIEW_ASTRONOMER_CONFIG,
        "CAN_VIEW_AGENTS_CONFIG": CAN_VIEW_AGENTS_CONFIG
    })


@login_required
@level_required("Admin", "Unit-PI", "Operator")
def obs_hardware_config(request):
    config = OBSConfig(
        os.environ["PATH_TO_OBSCONF_FILE"], os.environ["unit_name"])
    devices = config.get_devices()
    active_devices = config.get_active_devices()
    computers = config.get_computers()
    active_computers = config.get_active_computers()
    # give for a device name (key) the unit name where it is used (value)
    device_to_unit = {}
    # give for a computer name (key) the unit name where it is used (value)
    computer_to_unit = {}
    for device in active_devices:
        device_to_unit[device] = config.get_unit_of_device(device)
    for computer in active_computers:
        computer_to_unit[computer] = config.get_unit_of_computer(computer)
    devices_links = config.devices_links
    return render(request, 'obsconfig/obs_hardware_configuration.html', {'devices': devices, "active_devices": active_devices, "computers": computers, "active_computers": active_computers, "device_to_unit": device_to_unit, "computer_to_unit": computer_to_unit, "devices_links": devices_links})


@login_required
@level_required("Admin", "Unit-PI", "Operator")
def unit_hardware_configuration(request, unit_name):
    config = OBSConfig(
        os.environ["PATH_TO_OBSCONF_FILE"], os.environ["unit_name"])
    devices = config.get_devices()

    return render(request, 'obsconfig/unit_hardware_configuration.html', {'config': config})


@login_required
@level_required("Admin", "Unit-PI", "Operator")
def computer_details(request, computer_name):
    config = OBSConfig(
        os.environ["PATH_TO_OBSCONF_FILE"], os.environ["unit_name"])
    computer_detail = yaml.dump(config.get_computers()[computer_name])
    """
    computer_detail = { re.sub("^_","",key):value for key,
                               value in config.get_computers()[computer_name].items() }

     # We're removing "_" at the beginning of each key :
    power = { re.sub("^_","",key):value for key,
                     value in config.get_computer_power(computer_name).items() }

    computer_config = computer_detail["computer_config"]
    # We're removing "_" at the beginning of each key :
    computer_detail["computer_config"] = {
        re.sub("^_","",key):value for key,value in computer_config.items() }
    # Remove power key as we have this information in the same
    if power is not None:
        computer_detail["computer_config"].pop("power")
    return render(request,"obsconfig/computer_details.html", {"computer_detail" : computer_detail, "power" : power})
    """
    return render(request, "obsconfig/computer_details.html", {"computer_detail": computer_detail})


@login_required
@level_required("Admin", "Unit-PI",  "Operator", "Observer")
def device_details(request, device_name):
    config = OBSConfig(
        os.environ["PATH_TO_OBSCONF_FILE"], os.environ["unit_name"])
    # We're removing "_" at the beginning of each key :
    device = config.get_devices()[device_name]
    # device_detail  = yaml.dump(device)
    # config_device = open(config.CONFIG_PATH+device["file"],"r").read()
    devices_links = config.devices_links
    file_name_to_device_name = {
        v: k for k, v in config.get_devices_names_and_file().items()}
    active_devices = config.get_active_devices()
    """
    Alternative solutions:
        - test_device -> we're using a recursive function to get all nested dicitonaries but we loose a level of
        "precision" (no separations of capabilities for example)
        - device_detail -> we use multiple function to get specific value (not global enougth, if we had another key we have to change the code)
    """

    device_detail = {re.sub("^_", "", key): value for key,
                     value in config.get_devices()[device_name].items()}
    # test_device=  get_nested_dictionaries_as_list(device_detail,[])

    if config.get_device_power(device_name) is not None:
        # We're removing "_" at the beginning of each key :
        power = {re.sub("^_", "", key): value for key,
                 value in config.get_device_power(device_name).items()}
    else:
        power = None
    if config.get_device_connector(device_name) is not None:
        # We're removing "_" at the beginning of each key :
        connector = {re.sub("^_", "", key): value for key,
                     value in config.get_device_connector(device_name).items()}
    else:
        connector = None

    capabilities = []
    if config.get_device_capabilities(device_name) is not None:
        copy_capabilities = config.get_device_capabilities(device_name)
        for capability in copy_capabilities:
            # We're removing "_" at the beginning of each key :
            capabilities.append(
                {re.sub("^_", "", key): value for key, value in capability.items()})

    device_config = device_detail["device_config"]
    # We're removing "_" at the beginning of each key :
    device_detail["device_config"] = {
        re.sub("^_", "", key): value for key, value in device_config.items()}
    # Remove power key as we have this information in the same
    if power is not None:
        device_detail["device_config"].pop("power")
    if connector is not None:
        device_detail["device_config"].pop("connector")
    if len(capabilities) != 0:
        device_detail["device_config"].pop("CAPABILITIES")
    return render(request, "obsconfig/device_details.html", {"device_detail": device_detail, "power": power, "connector": connector, "capabilities": capabilities, "devices_links": devices_links, "file_name_to_device_name": file_name_to_device_name, "active_devices": active_devices})

    # return render(request,"obsconfig/device_details.html", { "device_detail" : device_detail, "config" : config_device })


@login_required
@level_required("Admin", "Observer", "Management", "Operator", "Unit-PI", "TAC")
def obs_astronomer_config(request):
    config = OBSConfig(
        os.environ["PATH_TO_OBSCONF_FILE"], os.environ["unit_name"])
    units = config.get_units()
    units_topologies = {}
    # for each unit
    for unit_name in units:

        units_topologies[unit_name] = config.get_topology(unit_name)
        layouts = units_topologies[unit_name].pop("LAYOUTS")["layouts"]
        albums = units_topologies[unit_name].pop("ALBUMS")["albums"]
        # for each category (security, mount, channels)
        for category in units_topologies[unit_name]:
            if category != "CHANNELS":
                # Security and Mount are directly a dictionary containing the attributes of those categories
                # However, component_agents is a list so we need to iterate through this list
                for component_agent in units_topologies[unit_name][category]["COMPONENT_AGENTS"]:
                    component_name = list(component_agent.keys())[0]
                    agent = component_agent[component_name]
                    device_of_agent = config.get_device_for_agent(
                        unit_name, agent)
                    # get the index of the current component, agent couple
                    index = units_topologies[unit_name][category]["COMPONENT_AGENTS"].index(
                        component_agent)
                    units_topologies[unit_name][category]["COMPONENT_AGENTS"][index][component_name] = device_of_agent
            else:
                # Channels is composed of a list of channel, we're looping through it
                for channel_name in units_topologies[unit_name]["CHANNELS"]:
                    for component_agent in units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"]:
                        component_name = list(component_agent.keys())[0]
                        agent = component_agent[component_name]
                        device_of_agent = config.get_device_for_agent(
                            unit_name, agent)
                        index = units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"].index(
                            component_agent)
                        units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"][index][component_name] = device_of_agent
        # Re add layouts and albums
        units_topologies[unit_name]["LAYOUTS"] = layouts
        units_topologies[unit_name]["ALBUMS"] = albums
        order_of_keys = ["SECURITY", "MOUNT", "LAYOUTS", "ALBUMS", "CHANNELS"]
        list_of_tuples = [(key, units_topologies[unit_name][key])
                          for key in order_of_keys]
        units_topologies[unit_name] = OrderedDict(list_of_tuples)
    return render(request, "obsconfig/obs_astronomer_config.html", {
        "units_topologies": units_topologies,
    })


@ login_required
@ level_required("Admin")
def obs_agents_config(request):
    config = OBSConfig(
        os.environ["PATH_TO_OBSCONF_FILE"], os.environ["unit_name"])
    units = config.get_units()
    units_topologies = {}
    active_agents_by_unit = {}
    # for each unit
    for unit_name in units:
        agents = config.get_agents(unit_name)
        # list of active agents of the current unit
        active_agents_by_unit[unit_name] = config.get_active_agents(unit_name)
        # topology of the current unit
        units_topologies[unit_name] = config.get_topology(unit_name)
        # removing albums and layouts, not useful in this view
        units_topologies[unit_name].pop("LAYOUTS")
        units_topologies[unit_name].pop("ALBUMS")
        for category in units_topologies[unit_name]:
            if category != "CHANNELS":
                # Security and Mount are directly a dictionary containing the attributes of those categories
                # However, component_agents is a list so we need to iterate through this list
                for component_agent in units_topologies[unit_name][category]["COMPONENT_AGENTS"]:
                    component_name = list(component_agent.keys())[0]
                    agent = agents[component_agent[component_name]]
                    # get the index of the current component, agent couple
                    index = units_topologies[unit_name][category]["COMPONENT_AGENTS"].index(
                        component_agent)
                    units_topologies[unit_name][category]["COMPONENT_AGENTS"][index][component_name] = agent
            else:
                # Channels is composed of a list of channel, we're looping through it
                for channel_name in units_topologies[unit_name]["CHANNELS"]:
                    for component_agent in units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"]:
                        component_name = list(component_agent.keys())[0]
                        agent = agents[component_agent[component_name]]
                        # get the index of the current component, agent couple
                        index = units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"].index(
                            component_agent)
                        units_topologies[unit_name]["CHANNELS"][channel_name]["COMPONENT_AGENTS"][index][component_name] = agent

    return render(request, "obsconfig/obs_agents_config.html", {"units_topologies": units_topologies, "active_agents_by_unit": active_agents_by_unit})


@ login_required
@ level_required("Admin", "Operator", "Unit-PI", "Unit-board")
def edit_config(request):
    config = OBSConfig(
        os.environ["PATH_TO_OBSCONF_FILE"], os.environ["unit_name"])
    return render(request, "obsconfig/edit_config.html", {"config_file": config.raw_config})


@ login_required
@ level_required("Admin", "Operator", "Unit-PI", "Unit-board")
def verify_config(request):
    if request.POST.get("config"):
        temp_config_file = tempfile.NamedTemporaryFile(
            mode='w+', suffix=".yml")
        temp_config_file.write(request.POST.get("config"))
        temp_config_file.seek(0)
        response_data = {}
        try:
            config_file = yaml.safe_load(temp_config_file.read())
        except yaml.YAMLError as exc:
            if hasattr(exc, 'problem_mark'):
                yaml_error_message = ""
                if exc.context != None:
                    yaml_error_message += str(exc.problem_mark) + \
                        '\n  ' + str(exc.problem) + ' ' + str(exc.context)
                else:
                    yaml_error_message = str(
                        exc.problem_mark.name) + '\n  ' + str(exc.problem)
                response_data["is_valid"] = False
                response_data["yaml_error_message"] = yaml_error_message
            return HttpResponse(json.dumps(response_data), content_type="application/json")
        temp_config_file.seek(0)
        schema = config_file["schema"]
        errors = []
        if schema == None:
            response_data["is_valid"] = False
            response_data["message"] = "Missing schema"
        schema_path = os.path.join(
            os.environ["DJANGO_PATH"], "../../../config/schemas/")
        config = OBSConfig.check_config(
            temp_config_file.name, schema_path+schema)
        if type(config) == bool and config:
            response_data["is_valid"] = True
        else:
            response_data["is_valid"] = False
            for error in config:
                errors.append(
                    (f"Error : {str(error).split('. Path')[0]}", f"Path to error : '{error.path}'"))
            response_data["message"] = errors
        temp_config_file.close()
        return HttpResponse(json.dumps(response_data), content_type="application/json")


@ login_required
@ level_required("Admin", "Operator", "Unit-PI", "Unit-board")
def save_config(request):
    if request.POST:
        if request.POST["config"]:
            with open(os.environ["PATH_TO_OBSCONF_FILE"], "w") as obs_config_file:
                obs_config_file.write(request.POST.get("config"))

            return HttpResponse("Ok !")


@ login_required
@ level_required("Admin", "Unit-PI",  "Operator", "Observer")
@ csrf_exempt
def view_raw_component_config_file(request):
    COMPONENT_PATH = os.path.join(
        os.environ["DJANGO_PATH"], "../../../config/components/")

    if request.POST:
        try:
            yaml_file = open(COMPONENT_PATH+request.POST["yaml_file"])
            content = yaml_file.readlines()
        except:
            content = "Component defined within the device configuration file"
        return HttpResponse(content)


@ login_required
@ level_required("Admin", "Unit-PI",  "Operator", "Observer")
@ csrf_exempt
def view_raw_generic_device_config_file(request):
    GENERIC_DEVICES_PATH = os.path.join(
        os.environ["DJANGO_PATH"], "../../../config/devices/")
    if request.POST:
        yaml_file = open(GENERIC_DEVICES_PATH+request.POST["yaml_file"])
        content = yaml_file.readlines()
        return HttpResponse(content)


@ login_required
@ level_required("Admin", "Unit-PI",  "Operator", "Observer")
@ csrf_exempt
def view_raw_device_config_file(request):
    obs_folder = os.environ["PATH_TO_OBSCONF_FOLDER"]
    if request.POST:
        yaml_file = open(obs_folder+request.POST["yaml_file"])
        content = yaml_file.readlines()
        return HttpResponse(content)


@ login_required
@ level_required("Admin", "Unit-PI",  "Operator", "Observer")
def obs_config_help(request):
    return render(request, "obsconfig/obs_config_help.html")