# General imports
from ast import arguments
import re
import sys
import datetime
from datetime import timezone
import utils.Logger as l
import json
from random import randint
import time,os
from collections import OrderedDict
# Django imports
from django.http import HttpResponse
from django.shortcuts import render, redirect
from django.contrib.auth.decorators import login_required
from django.core.paginator import Paginator
from django.core import serializers
from django.forms import modelformset_factory
from django.http import HttpResponseRedirect
from django.views.generic.edit import UpdateView
from django.shortcuts import get_object_or_404
from django.utils.decorators import method_decorator
from django.urls import reverse_lazy, reverse
from django.http import Http404
from django.db.models.query import QuerySet
from django.conf import settings as pyros_settings
from django.contrib import messages
from django.http import JsonResponse
from django.utils.http import urlencode
# Project imports
from common.models import Log, WeatherWatch, SiteWatch, Config, WeatherWatchHistory, Majordome, AgentSurvey, AgentCmd, Env_data, Env_data_hist
#from user_manager.models import PyrosUser, UserLevel
from user_manager.models import ScientificProgram, SP_Period_Guest
#from scientific_program.models import ScientificProgram, SP_Period_Guest
from devices.models import PlcDeviceStatus, TelescopeCommand #, Telescope
from dashboard.forms import ConfigForm, MajordomeForm #, UserForm
from dashboard.decorator import level_required
from devices.Telescope import TelescopeController
from devices.TelescopeRemoteControlDefault import TelescopeRemoteControlDefault
from devices.CameraVISRemoteControlDefault import CameraVISRemoteControlDefault
from devices.CameraNIRRemoteControlDefault import CameraNIRRemoteControlDefault
from devices import PLC
from src.core.pyros_django.obsconfig.obsconfig_class import OBSConfig
sys.path.append("../../..")
from config.pyros.config_pyros import ConfigPyros
#import utils.celme as celme
import vendor.guitastro.src.guitastro as guitastro
SUN_ELEV_DAY_THRESHOLD = -10
MAX_LOGS_LINES = 100
log = l.setupLogger("dashboard", "dashboard")
def index(request):
config = OBSConfig(os.environ["PATH_TO_OBSCONF_FILE"],os.environ["unit_name"])
observatory_name = config.get_obs_name()
unit_name = config.unit_name
request.session["obsname"] = observatory_name+" "+unit_name
request.session["pyros_config"] = pyros_settings.CONFIG_PYROS
logo = pyros_settings.CONFIG_PYROS.get("general").get("logo")
request.session["logo"] = "media/"+logo
message = ""
if request.user.is_authenticated:
if SP_Period_Guest.objects.filter(email=request.user.email):
message = "You are requested to join the following proposals, click on the following link(s) to accept those invitations :
"
domain = pyros_settings.DEFAULT_DOMAIN
for sp_period_guest in SP_Period_Guest.objects.filter(email=request.user.email):
url = f"http://{domain}{reverse('sp_register',args=(sp_period_guest.SP_Period.scientific_program.id,sp_period_guest.SP_Period.period.id))}"
message+=f"- {url}
"
message+="
"
# return the initial view (the dashboard's one)
return render(request, 'dashboard/index.html', {'USER_LEVEL': request.user.get_priority(), 'base_template' : "base.html","message":message})
return render(request, 'dashboard/index.html', {"USER_LEVEL" : "Visitor", 'base_template' : "base.html"})
def observation_index(request):
if request.user.is_authenticated:
CAN_VIEW_SCIENCE_THEMES = request.session.get("role") in ("Admin","Unit-PI","Unit-board")
# return the initial view (the dashboard's one)
return render(request, 'dashboard/observation_index.html',{
"base_template":"base.html",
"CAN_VIEW_SCIENCE_THEMES": CAN_VIEW_SCIENCE_THEMES
})
return render(request, 'dashboard/observation_index.html', {'USER_LEVEL': "Visitor", 'base_template' : "base.html"})
def retrieve_env(request):
try:
#weather_status = WeatherWatch.objects.latest('updated')
weather_status = get_latest_from_db_or_empty(WeatherWatch)
#plc_device_status = PlcDeviceStatus.objects.exclude(plc_mode=None).latest('created')
plc_device_status = get_latest_plc_device_status_or_empty()
plc_timeout = Config.objects.get(pk=1).plc_timeout_seconds
timeout = (datetime.datetime.now() - plc_device_status.created).total_seconds()
timeout_affichage = datetime.datetime.now() - datetime.timedelta(seconds=timeout)
sunelev = get_sunelev()
t = datetime.datetime.now() + datetime.timedelta(hours=-7)
isDay = False
if sunelev >= SUN_ELEV_DAY_THRESHOLD:
isDay = True
return render(request, 'dashboard/observation_status_env.html', locals())
except WeatherWatch.DoesNotExist:
raise Http404("No WeatherWatch matches the given query.")
'''
Function that call guitastro code to determine sun elevation, maybe it should be moved somewhere else ?
'''
def get_sunelev():
date = guitastro.dates.Date("now")
site = guitastro.siteobs.Siteobs("MPC 244.5367 0.85792 +0.51292") # coords of san pedro martir site
skyobj= {'planet':'Sun'}
outputs = ['name','ra','dec','elev']
target = guitastro.Target()
target.define(skyobj)
output0s = ['ra','dec']
results = target.ephem(date,site,output0s)
ra = results['RA']
dec = results['DEC']
skyobj= {'RA':ra , 'DEC':dec }
target.define(skyobj)
results = target.ephem(date,site,outputs)
elev = round(results['ELEV'],2)
return elev
def retrieve_env_navbar(request):
'''
Function which get the last status of the plc in th db, the config info, the sunelev, put it in a json
and send it to the client.
This function is called every REFRESH_ICONS_FREQUENCE_MILLISECONDS in base.html
'''
try:
weather = get_weather_data()
return HttpResponse(json.dumps(weather), content_type="application/json")
except WeatherWatch.DoesNotExist:
raise Http404("No WeatherWatch matches the given query.")
'''
##if request.is_ajax():
try:
#weather_status = WeatherWatch.objects.latest('updated')
weather_status = WeatherWatch.objects.latest('updated') if WeatherWatch.objects.all().exists() else WeatherWatch()
#plc_device_status = PlcDeviceStatus.objects.exclude(plc_mode=None).latest('created')
#plc_device_status = PlcDeviceStatus.objects.exclude(plc_mode=None).latest('created') if PlcDeviceStatus.objects.all().exists() else PlcDeviceStatus()
if PlcDeviceStatus.objects.all().exists():
plc_device_status = PlcDeviceStatus.objects.exclude(plc_mode=None).latest('created')
else:
plc_device_status = PlcDeviceStatus()
#plc_device_status.created='2018-07-26 12:48:49.949228'
plc_device_status.created=datetime.datetime(2018, 6, 21, 9, 53, 30, 599462)
plc_mode = plc_device_status.plc_mode
is_safe = plc_device_status.is_safe
weather = serializers.serialize('json', [weather_status])
weather = json.loads(weather)
ack = Config.objects.get(id=1).ack
plc_timeout = Config.objects.get(pk=1).plc_timeout_seconds
#if not PlcDeviceStatus.objects.all().exists() plc_device_status.created='2018-07-26 12:48:49.949228'
#timeout = (datetime.datetime.now() - LAST_PLC_ITEM_TIME).total_seconds()
timeout = (datetime.datetime.now() - plc_device_status.created).total_seconds()
weather[0]['max_sunelev'] = SUN_ELEV_DAY_THRESHOLD
#weather[0]['sunelev'] = 10
weather[0]['sunelev'] = get_sunelev()
weather[0]["plc_mode"] = plc_mode
weather[0]["is_safe"] = is_safe
weather[0]["ACK"] = ack
weather[0]["plc_timeout"] = timeout
weather[0]["max_plc_timeout"] = plc_timeout
weather[0]["pyros_mode"] = Config.objects.get(id=1).pyros_state
if SiteWatch.objects.all().exists():
sitewatch = SiteWatch.objects.latest('updated')
else:
sitewatch = SiteWatch()
#sitewatch.global_status='KO'
weather[0]["sitewatch_global_status"] = sitewatch.global_status
#weather[0]["sitewatch_global_status"] = SiteWatch.objects.latest('updated').global_status
#weather[0]["sitewatch_global_status"] = SiteWatch.objects.latest('updated').global_status if SiteWatch.objects.all().exists() else SiteWatch()
return HttpResponse(json.dumps(weather), content_type="application/json")
except WeatherWatch.DoesNotExist:
raise Http404("No WeatherWatch matches the given query.")
'''
@login_required
def settings(request):
'''
View called to see the settings (for the software, observatory, users...) page
'''
CAN_VIEW_PERIOD = request.session.get("role") in ("Admin", "Unit-PI", "Observer", "Unit-board")
CAN_CHANGE_SOFT_MODE = request.session.get("role") in ("Admin", "Unit-PI", "Operator")
CAN_VIEW_AGENTS_STATE = request.session.get("role") in ("Admin", "Unit-PI", "Operator")
return(render(request, "dashboard/settings.html", locals()))
def get_last_all_cmds(agent_name):
last_agent_all_cmds = None
last_do_stop_cmd = None
if "AgentSST" not in agent_name:
try:
last_agent_all_cmds = AgentCmd.objects.filter(full_name="get_all_cmds",recipient=agent_name,state__contains="CMD_EXECUTED").latest("s_deposit_time")
last_do_stop_cmd = AgentCmd.objects.filter(full_name__contains="do_stop",recipient=agent_name).latest("s_deposit_time")
if last_agent_all_cmds.s_deposit_time <= last_do_stop_cmd.s_deposit_time:
last_agent_all_cmds = AgentCmd.send_cmd_from_to("System",agent_name,"get_all_cmds")
while not AgentCmd.objects.get(id=last_agent_all_cmds.id).is_executed() and not AgentCmd.objects.get(id=last_agent_all_cmds.id).is_exec_error():
time.sleep(0.5)
return AgentCmd.objects.get(id=last_agent_all_cmds.id)
except AgentCmd.DoesNotExist:
if last_do_stop_cmd == None and last_agent_all_cmds != None:
return last_agent_all_cmds
last_agent_all_cmds = AgentCmd.send_cmd_from_to("System",agent_name,"get_all_cmds")
while not AgentCmd.objects.get(id=last_agent_all_cmds.id).is_executed() and not AgentCmd.objects.get(id=last_agent_all_cmds.id).is_exec_error():
time.sleep(0.5)
return AgentCmd.objects.get(id=last_agent_all_cmds.id)
else:
# AgentSST doesn't have do_stop cmd... (for the moment)
datetime_now = datetime.datetime.now(tz=timezone.utc)
time_delta = datetime_now - datetime.timedelta(minutes=30)
last_agent_all_cmds = AgentCmd.objects.filter(full_name="get_all_cmds",recipient=agent_name,state__contains="CMD_EXECUTED")
if last_agent_all_cmds.exists() :
last_agent_all_cmds = last_agent_all_cmds.latest("s_deposit_time")
if time_delta <= last_agent_all_cmds.s_deposit_time:
last_agent_all_cmds = AgentCmd.send_cmd_from_to("System",agent_name,"get_all_cmds")
max_wait_time = 3
current_wait_time = 0
while not AgentCmd.objects.get(id=last_agent_all_cmds.id).is_executed() and not AgentCmd.objects.get(id=last_agent_all_cmds.id).is_exec_error():
time.sleep(0.5)
current_wait_time+=0.5
if max_wait_time <= current_wait_time:
break
return AgentCmd.objects.get(id=last_agent_all_cmds.id)
else:
return AgentCmd.objects.get(id=last_agent_all_cmds.id)
else:
# There is no get_all_cmds for this agent in database, it's the first time we send this command, we have to wait until the command is executed.
last_agent_all_cmds = AgentCmd.send_cmd_from_to("System",agent_name,"get_all_cmds")
while not AgentCmd.objects.get(id=last_agent_all_cmds.id).is_executed() and not AgentCmd.objects.get(id=last_agent_all_cmds.id).is_exec_error():
time.sleep(0.5)
@login_required
def agent_detail(request, agent_name):
agent_log_path = os.path.join(os.environ.get("PROJECT_ROOT_PATH"),f"logs/{agent_name}/",f"{agent_name}.log")
try:
log = open(agent_log_path,"r")
log_content = log.readlines()
if len(log_content) > 30:
log_last_30_lines = log_content[-30:]
else:
log_last_30_lines = log_content
log.close()
except FileNotFoundError:
error_message = f"Cannot find agent logs. (Agent log path tried : {agent_log_path})"
specific_cmd_with_args = None
cmd_with_choices = []
cmds_description = {}
agent_general_commands = AgentCmd._AGENT_GENERAL_COMMANDS.copy()
if not AgentSurvey.objects.get(name=agent_name).is_stopping():
agent_specific_cmds = get_last_all_cmds(agent_name)
wait_time = 0
max_wait_time = 20
unimplemented_command = None
#while not AgentCmd.objects.get(id=agent_specific_cmds.id).is_executed() and wait_time <= max_wait_time:
# while not AgentCmd.objects.get(id=agent_specific_cmds.id).is_executed() and not AgentCmd.objects.get(id=agent_specific_cmds.id).is_exec_error():
# time.sleep(0.5)
# wait_time += 0.5
#cmd = AgentCmd.objects.get(id=agent_specific_cmds.id)
cmd = agent_specific_cmds
if cmd.is_exec_error():
unimplemented_command = cmd.result
else:
agent_specific_cmd_to_list = cmd.result.split(";")
specific_cmd_with_args = {}
for specific_cmd in agent_specific_cmd_to_list:
# if "get_all_cmds" in specific_cmd or "get_specific_cmds" in specific_cmd:
# continue
if "(" in specific_cmd:
splitted_cmd = specific_cmd.split("/")
specific_cmd = splitted_cmd.pop(0)
if len(splitted_cmd) > 0:
description = splitted_cmd.pop(0)
else:
description = ""
cmd = specific_cmd.split("(")[0]
specific_cmd_with_args[cmd] = []
cmds_description[cmd] = description
# get text between paretheses
args = re.findall(pattern="\(([^)]+)\)",string=specific_cmd)
for arg in args:
if "typing.Literal" in arg:
arg_name = arg.split(":")[0]
values = re.findall(pattern="\[(.*?)\]",string=arg)
trim_values = []
for value in values:
if "'" in value:
value = value.replace("'","")
trim_values.append(value)
specific_cmd_with_args[cmd].append([arg_name,trim_values])
cmd_with_choices.append(cmd)
else:
arguments = arg.split("|")
for index ,arg in enumerate(arguments):
if ("typing.Tuple" in arg or "typing.List" in arg):
arg_name = arg.split(":")[0]
arg_type = arg.split(":")[1]
trim_values = []
values_type = re.findall(pattern="\[(.*?)\]",string=arg_type)
if values_type:
# best way to tell user the type allowed in list or tuple is to put the raw content in one field
specific_cmd_with_args[cmd].append([arg_name+" "+str(values_type),"str"])
#for arg in values_type:
# specific_cmd_with_args[cmd].append([arg_name+" "+arg_type,arg])
else:
specific_cmd_with_args[cmd].append(arg.split(":"))
if request.GET.get("cmd"):
if request.GET.get("cmd") != "all":
if request.GET.get("cmd") in list(specific_cmd_with_args.keys()):
return JsonResponse({"specific_cmd_with_args":specific_cmd_with_args.get(request.GET.get("cmd")),"cmd_with_choices":cmd_with_choices,"cmds_description":cmds_description},safe=False)
else:
return JsonResponse(None,safe=False)
else:
return JsonResponse({"agent_general_commands":agent_general_commands,"specific_cmd_with_args":specific_cmd_with_args,"unimplemented_command":unimplemented_command,"cmd_with_choices":cmd_with_choices,"cmds_description":cmds_description},safe=False)
if request.GET.get("cmd"):
return JsonResponse(None,safe=False)
# commands_sent_by_agent = AgentCmd.get_commands_sent_by_agent(agent_name)
# commands_recivied_by_agent = AgentCmd.get_commands_sent_to_agent(agent_name)
# agent_cmds = commands_sent_by_agent | commands_recivied_by_agent
# agent_cmds = agent_cmds.exclude(full_name="get_specific_cmds")
# agent_cmds = agent_cmds.exclude(full_name="get_all_cmds")
# agent_cmds = agent_cmds.order_by("-s_deposit_time")
# paginator = Paginator(agent_cmds, pyros_settings.NB_ELEMENT_PER_PAGE)
# page_number = request.GET.get("page",1)
config = OBSConfig(os.environ["PATH_TO_OBSCONF_FILE"],os.environ["unit_name"])
managed_agents = None
agents_status = None
managed_by = None
if "AgentSST" in agent_name:
# get computer of agentSST
# get agents of this computer
# mettre des boutons d'actions pour arrêter ou démarrer ou redémarrer l'agent
agent_computer = config.get_agent_information(config.unit_name,agent_name).get("computer")
computer_hostname = config.get_computers().get(agent_computer).get("computer_config").get("hostname")
agents = config.get_agents_per_computer(config.unit_name).get(computer_hostname)
managed_agents = agents
agents_status = {}
for agent in managed_agents:
if "AgentSST" in agent:
managed_agents.remove(agent)
continue
try:
agents_status[agent] = AgentSurvey.objects.get(name=agent).status
except AgentSurvey.DoesNotExist:
agents_status[agent] = "None"
else:
managed_by = None
agent_computer = config.get_agent_information(config.unit_name,agent_name).get("computer")
computer_hostname = config.get_computers().get(agent_computer).get("computer_config").get("hostname")
agents = config.get_agents_per_computer(config.unit_name).get(computer_hostname)
for agent in agents:
if "AgentSST" in agent:
managed_by = agent
break
obj, created = Majordome.objects.get_or_create(id=1)
CAN_SEND_COMMAND = obj.soft_mode == Majordome.MANUAL_MODE
# try:
# commands = paginator.page(page_number)
# except PageNotAnInteger:
# commands = paginator.page(1)
# except EmptyPage:
# commands = paginator.page(paginator.num_pages)
return render(request, "dashboard/agent_detail.html", locals())
@login_required
def agent_action(request):
if request.POST:
action = request.POST.get("action")
agentsst = request.POST.get("agentsst")
recipient = request.POST.get("recipient")
args = request.POST.get("args")
if action == "start":
new_cmd = AgentCmd.send_cmd_from_to(request.user,agentsst,"do_start_agent",recipient)
elif action == "restart":
if args:
if args == "soft":
new_cmd = AgentCmd.send_cmd_from_to(request.user,agentsst,"do_restart_agent",recipient+ " soft")
else:
new_cmd = AgentCmd.send_cmd_from_to(request.user,agentsst,"do_restart_agent",recipient+ " hard")
elif action == "stop":
new_cmd = AgentCmd.send_cmd_from_to(request.user,agentsst,"do_stop_agent",recipient)
if new_cmd != None:
messages.add_message(request, messages.INFO, f"Command sent !")
else:
messages.add_message(request, messages.INFO, f"Error while creating command, please try again.")
return redirect(agent_detail,agent_name=agentsst)
@login_required
def agents_commands(request):
agents_cmds = AgentCmd.objects.all()
cmd_status = AgentCmd.CMD_STATUS_CODES
url_filters = ""
start_datetime = ""
end_datetime = ""
selected_cmd_status = ""
if request.GET.get("start_datetime"):
start_datetime = request.GET.get("start_datetime")
end_datetime = request.GET.get("end_datetime")
if request.GET.get("selected_cmd_status"):
selected_cmd_status = request.GET.get("selected_cmd_status")
# if len(start_datetime)>0 or len(end_datetime) > 0:
# start_datetime_tz = datetime.datetime.strptime(start_datetime,"%Y-%m-%d %H:%M:%S")
# start_datetime_tz = start_datetime_tz.replace(tzinfo=timezone.utc)
# end_datetime_tz = datetime.datetime.strptime(end_datetime,"%Y-%m-%d %H:%M:%S")
# end_datetime_tz = end_datetime_tz.replace(tzinfo=timezone.utc)
# agents_cmds = agents_cmds.filter(s_deposit_time__range=(start_datetime_tz,end_datetime_tz))
# url_filters = urlencode({"start_datetime":start_datetime,"end_datetime":end_datetime})
if request.POST:
start_datetime = request.POST.get("start_datetime")
end_datetime = request.POST.get("end_datetime")
filtered_cmd_status = request.POST.get("cmd_status")
selected_cmd_status = filtered_cmd_status
if len(start_datetime)>0:
#start_datetime_tz = datetime.datetime.strptime(start_datetime,"%Y-%m-%d %H:%M:%S")
start_datetime_tz = datetime.datetime.strptime(start_datetime,"%Y/%m/%d %H:%M")
start_datetime_tz = start_datetime_tz.replace(tzinfo=timezone.utc)
else:
start_datetime_tz = agents_cmds.first().s_deposit_time
if len(end_datetime) > 0:
#end_datetime_tz = datetime.datetime.strptime(end_datetime,"%Y-%m-%d %H:%M:%S")
end_datetime_tz = datetime.datetime.strptime(end_datetime,"%Y/%m/%d %H:%M")
end_datetime_tz = end_datetime_tz.replace(tzinfo=timezone.utc)
agents_cmds = agents_cmds.filter(s_deposit_time__range=(start_datetime_tz,end_datetime_tz))
url_filters = urlencode({"start_datetime":start_datetime,"end_datetime":end_datetime})
else:
end_datetime_tz = datetime.datetime.now(tz=timezone.utc)
if selected_cmd_status and selected_cmd_status != "all":
agents_cmds = agents_cmds.filter(state=selected_cmd_status)
url_filters = urlencode({"start_datetime":start_datetime,"end_datetime":end_datetime,"selected_cmd_status":selected_cmd_status})
agents_cmds = agents_cmds.exclude(full_name="get_specific_cmds")
agents_cmds = agents_cmds.exclude(full_name="get_all_cmds")
agents_cmds = agents_cmds.order_by("-s_deposit_time","-id")
paginator = Paginator(agents_cmds, pyros_settings.NB_ELEMENT_PER_PAGE)
page_number = request.GET.get("page",1)
try:
commands = paginator.page(page_number)
except PageNotAnInteger:
commands = paginator.page(1)
except EmptyPage:
commands = paginator.page(paginator.num_pages)
# if request.POST:
# return redirect(request.META['HTTP_REFERER'])
return render(request, "dashboard/agents_commands.html", locals())
def send_agent_cmd(request):
if request.POST:
# make a copy because request.POST is immutable
post_data = request.POST.copy()
reciever = post_data.get("agent_name")
cmd_name = post_data.get("cmd_name")
post_data.pop("agent_name")
post_data.pop("cmd_name")
post_data.pop("csrfmiddlewaretoken")
cmd_args = ""
for arg_name in post_data:
cmd_args += post_data.get(arg_name)+ " "
if len(cmd_args)>0:
cmd_args = cmd_args[:-1]
new_cmd = AgentCmd.send_cmd_from_to(request.user,reciever,cmd_name,cmd_args)
if new_cmd != None:
# messages.add_message(request, messages.INFO, f"Command sent !")
return HttpResponse("OK")
else:
# messages.add_message(request, messages.INFO, f"Error while creating command, please try again.")
return HttpResponse("Error")
return redirect(agent_detail,agent_name=reciever)
def retrieve_log_content(request):
if request.POST:
agent_name = request.POST.get("agent_name")
agent_log_path = os.path.join(os.environ.get("PROJECT_ROOT_PATH"),f"logs/{agent_name}/",f"{agent_name}.log")
try:
log = open(agent_log_path,"r")
log_content = log.readlines()
if len(log_content) > 30:
log_last_30_lines = log_content[-30:]
else:
log_last_30_lines = log_content
log.close()
return HttpResponse(log_last_30_lines)
except FileNotFoundError:
error_message = f"Cannot find agent logs. (Agent log path tried : {agent_log_path})"
return HttpResponse(error_message)
@login_required
@level_required("Admin", "Unit-PI", "Operator")
def soft_mode(request):
if request.POST:
form = MajordomeForm(request.POST)
if form.is_valid():
obj, created = Majordome.objects.get_or_create(id=1)
obj.soft_mode = form.cleaned_data["soft_mode"]
obj.save()
new_soft_mode = form.instance.soft_mode
messages.add_message(request, messages.INFO, f"Changed observatory mode to {new_soft_mode}")
obj, created = Majordome.objects.get_or_create(id=1)
form = MajordomeForm(instance=obj)
soft_mode = obj.soft_mode
is_auto = soft_mode == Majordome.AUTO_MODE
return render(request, "dashboard/soft_mode.html", locals())
@login_required
def agents_state(request):
agents = AgentSurvey.objects.all()
datetime_now = datetime.datetime.now(tz=timezone.utc)
if request.GET:
agent_name = request.GET["agent_name"]
try:
running_commands = AgentCmd.objects.get(state="CMD_RUNNING",sender=agent_name)
return HttpResponse(running_commands.full_name)
except AgentCmd.DoesNotExist:
return HttpResponse("None")
# running_command_per_agent = {}
# for cmd in running_commands:
# running_command_per_agent = running_commands[cmd.get("sender")] = cmd.get("full_name")
# return HttpResponse(running_command_per_agent)
soon_out_of_date_agents = []
for agent in agents:
datetime_now_minus_thirty_sec = datetime_now - datetime.timedelta(seconds=30)
datetime_now_minus_thirty_sec = datetime_now_minus_thirty_sec.replace(tzinfo=timezone.utc)
if datetime_now_minus_thirty_sec >= agent.updated:
soon_out_of_date_agents.append(agent.name)
return render(request, "dashboard/agents_state.html", locals())
def retrieve_main_icon(request):
if request.is_ajax():
try:
weather_status = WeatherWatch.objects.latest('updated')
plc_mode = PlcDeviceStatus.objects.exclude(plc_mode=None).latest('created').plc_mode
weather = serializers.serialize('json', [weather_status])
weather = json.loads(weather)
weather[0]["plc_mode"] = plc_mode
weather[0]["sitewatch_global_status"] = SiteWatch.objects.latest('updated').global_status
return HttpResponse(json.dumps(weather), content_type="application/json")
except WeatherWatch.DoesNotExist:
raise Http404("No WeatherWatch matches the given query.")
@login_required
#@level_required(2)
def routines(request):
url_ = reverse('admin:common_request_changelist')
return redirect(url_)
def get_lastest_and_last_x_minutes_before_latest_weather(x:float=0):
if Env_data.objects.all().exists():
latest_entry = Env_data.objects.latest('updated')
# Get datetime of last weather datetime entry minus x
time_start_range = latest_entry.updated - datetime.timedelta(minutes=x)
# If we have entries in WeatherWatchHistory
if Env_data_hist.objects.all().exists():
# Get the first entry in history that is less or equal than time_start_range
first_weather_of_time_start = Env_data.objects.filter(updated__lte=time_start_range).order_by("-updated").first()
if first_weather_of_time_start != None:
first_weather_of_time_start_id = first_weather_of_time_start.id
# get all id in history that are after the first_weather_of_time_start
list_id_weather_history = Env_data_hist.objects.filter(weather__id__gte=first_weather_of_time_start_id).values_list("weather__id",flat=True)
list_id_weather_history = list(list_id_weather_history)
# add the last entry of weatherwatch at the end of the list
list_id_weather_history.append(latest_entry.id)
# Get all WeatherWatch instance that match those ids
weather_data = Env_data.objects.filter(id__in=list_id_weather_history).order_by("updated")
if weather_data.count() > 0:
return weather_data
else:
# We don't have a matching history entry yet, return the last entry of WeatherWatch (plot with one dot)
return latest_entry
# We don't have an WeatherWatchHistory entry yet, return the last entry of WeatherWatch (plot with one dot)
return Env_data.objects.all()
else:
# We don't have WeatherWatch data, return empty WeatherWatch (empty plot)
return Env_data()
def get_latest_from_db_or_empty(db_model):
return db_model.objects.latest('updated') if db_model.objects.all().exists() else db_model()
def get_latest_plc_device_status_or_empty():
if PlcDeviceStatus.objects.all().exists() and PlcDeviceStatus.objects.exclude(plc_mode=None).exists():
plc_device_status = PlcDeviceStatus.objects.exclude(plc_mode=None).latest('created')
else:
# Create an empty object
plc_device_status = PlcDeviceStatus()
# Set a dummy date
#plc_device_status.created='2018-07-26 12:48:49.949228'
plc_device_status.created=datetime.datetime(2018, 6, 21, 9, 53, 30, 599462)
plc_device_status.is_safe = False
return plc_device_status
def get_weather_data():
#weather_status = WeatherWatch.objects.latest('updated') if WeatherWatch.objects.all().exists() else WeatherWatch()
#weather_status = get_latest_from_db_or_empty(WeatherWatch)
pyros_config_file = os.environ.get("pyros_config_file")
config_pyros = ConfigPyros(pyros_config_file).pyros_config
env_config = config_pyros.get("ENV")
last_x_minutes = 15
if env_config:
last_x_minutes = env_config.get("time_before_plot")
weather_history = get_lastest_and_last_x_minutes_before_latest_weather(last_x_minutes)
plc_device_status = get_latest_plc_device_status_or_empty()
plc_mode = plc_device_status.plc_mode
is_safe = plc_device_status.is_safe
if isinstance(weather_history,QuerySet):
# If we have weather history + last entry of weather
weather = serializers.serialize('json', weather_history)
else:
# We have only the last entry of weather
weather = serializers.serialize('json', [weather_history])
weather = json.loads(weather)
ack = Config.objects.get(id=1).ack
plc_timeout = Config.objects.get(pk=1).plc_timeout_seconds
#if not PlcDeviceStatus.objects.all().exists() plc_device_status.created='2018-07-26 12:48:49.949228'
#timeout = (datetime.datetime.now() - LAST_PLC_ITEM_TIME).total_seconds()
# PM 20190416 TODO: Check error
#timeout = (datetime.datetime.now() - plc_device_status.created).total_seconds()
timeout = 0
weather[0]['max_sunelev'] = SUN_ELEV_DAY_THRESHOLD
#weather[0]['sunelev'] = 10
weather[0]['sunelev'] = get_sunelev()
weather[0]["plc_mode"] = plc_mode
weather[0]["is_safe"] = is_safe
weather[0]["ACK"] = ack
weather[0]["plc_timeout"] = timeout
weather[0]["max_plc_timeout"] = plc_timeout
weather[0]["pyros_mode"] = Config.objects.get(id=1).pyros_state
# only for retrieve_env_navbar:
sitewatch = get_latest_from_db_or_empty(SiteWatch)
'''
if SiteWatch.objects.all().exists():
sitewatch = SiteWatch.objects.latest('updated')
else:
sitewatch = SiteWatch()
#sitewatch.global_status='KO'
'''
weather[0]["sitewatch_global_status"] = sitewatch.global_status
#weather[0]["sitewatch_global_status"] = SiteWatch.objects.latest('updated').global_status
#weather[0]["sitewatch_global_status"] = SiteWatch.objects.latest('updated').global_status if SiteWatch.objects.all().exists() else SiteWatch()
return weather
def weather(request):
pyros_config_file = os.environ.get("pyros_config_file")
config_pyros = ConfigPyros(pyros_config_file).pyros_config
monitoring_names_to_plot = config_pyros.get("ENV").get("monitoring_names_to_be_plotted")
return render(request, 'dashboard/reload_weather.html', {'base_template' : "base.html","monitoring_names_to_plot":monitoring_names_to_plot})
def weather_current(request):
# PM 20180718
#if request.is_ajax():
try:
weather = get_weather_data()
return HttpResponse(json.dumps(weather), content_type="application/json")
except WeatherWatch.DoesNotExist:
raise Http404("No WeatherWatch matches the given query.")
'''
try:
#weather_status = WeatherWatch.objects.latest('updated')
#weather_status = WeatherWatch.objects.latest('updated') if WeatherWatch.objects.all().exists() else WeatherWatch()
EMPTY_TABLE = WeatherWatch.objects.all().exists()
weather_status = WeatherWatch.objects.latest('updated') if not EMPTY_TABLE else WeatherWatch()
#plc_device_status = PlcDeviceStatus.objects.exclude(plc_mode=None).latest('created')
plc_device_status = PlcDeviceStatus.objects.exclude(plc_mode=None).latest('created') if PlcDeviceStatus.objects.all().exists() else PlcDeviceStatus()
plc_mode = plc_device_status.plc_mode
is_safe = plc_device_status.is_safe
weather = serializers.serialize('json', [weather_status])
weather = json.loads(weather)
ack = Config.objects.get(id=1).ack
plc_timeout = Config.objects.get(pk=1).plc_timeout_seconds
#timeout = (datetime.datetime.now() - plc_device_status.created).total_seconds()
LAST_PLC_ITEM_TIME = plc_device_status.created if not EMPTY_TABLE else '2018-07-26 12:48:49.949228'
timeout = (datetime.datetime.now() - LAST_PLC_ITEM_TIME).total_seconds()
weather[0]['max_sunelev'] = SUN_ELEV_DAY_THRESHOLD
weather[0]['sunelev'] = get_sunelev()
weather[0]["plc_mode"] = plc_mode
weather[0]["is_safe"] = is_safe
weather[0]["ACK"] = ack
weather[0]["plc_timeout"] = timeout
weather[0]["max_plc_timeout"] = plc_timeout
weather[0]["pyros_mode"] = Config.objects.get(id=1).pyros_state
return HttpResponse(json.dumps(weather), content_type="application/json")
except WeatherWatch.DoesNotExist:
raise Http404("No WeatherWatch matches the given query.")
'''
def weather_config(request):
""" PM 20180926 prototype without database
http://127.0.0.1:8000/dashboard/weather/config
Moved to monitoring
http://127.0.0.1:8000/monitoring/weather/config
"""
try:
# Import PLC status sensor parser
from monitoring.plc_checker import PlcChecker
# Parse PLC status in colibri-new-fixed-2.json
#colibri_json = open("../simulators/plc/colibri-new-fixed-2.json")
#colibri_struct = json.load(colibri_json)
#plc_status = colibri_struct["statuses"][1]["entities"][0]
plc_checker = PlcChecker()
_struct = {"origin":plc_checker.origin, "sensors_table":plc_checker.sensors_table}
# Return template with sensors list
return render(request, 'dashboard/config_weather.html', {'weather_config' : _struct, 'base_template' : "base.html"})
except Config.DoesNotExist:
return render(request, 'dashboard/config_weather.html', {'weather_info' : None})
def weather_current_old(request):
try:
if (len(Config.objects.all()) == 1):
monitoring = int(int(Config.objects.get(id=1).row_data_save_frequency) / 5)
else:
monitoring = 60
if (len(WeatherWatch.objects.all()) > 0):
weather_info = WeatherWatch.objects.order_by("-id")[:monitoring] # Use 300 seconds by default with an iteration every 5 seconds # Get the number of data available
else:
weather_info = None
return render(request, 'dashboard/_current_weather.html', {'weather_info' : site_current, 'iteration' : monitoring})
except Config.DoesNotExist:
return render(request, 'dashboard/_current_weather.html', {'weather_info' : None, 'iteration' : 60})
def site(request):
if request.user.is_authenticated:
return render(request, 'dashboard/reload_site.html', {'base_template' : "base.html"}) # return the needed html file
return render(request, 'dashboard/reload_site.html', {'base_template' : "base.html"}) # return the needed html file
def site_current(request):
try:
if (len(Config.objects.all()) == 1):
monitoring = int(int(Config.objects.get(id=1).row_data_save_frequency) / 5)
else:
monitoring = 60
#if (len(SiteWatch.objects.all()) > 0):
if SiteWatch.objects.all().exists():
site_info = SiteWatch.objects.order_by("-id")[:monitoring]
else:
site_info = None
return render(request, 'dashboard/current_site.html', {'site_info' : site_info, 'iteration' : monitoring, 'global_mode' : Config.objects.get(id=1).global_mode})
except Config.DoesNotExist:
return render(request, 'dashboard/current_site.html', {'site_info' : None, 'iteration' : 60})
@login_required
@level_required("Admin")
def proposal(request):
if (len(ScientificProgram.objects.all()) > 0): # checking if the observatory table is empty
proposal_info = ScientificProgram.objects.order_by("-id")[:100] # Sorting Weather table
nb_info_proposal = len(proposal_info) # Get the number of data available
else: # if empty set everything to 0 / None (variables are checked in src/templates/scheduler/_current_weather.html)
proposal_info = None
nb_info_proposal = 0
return render(request, 'dashboard/proposal.html', {'proposal_info' : proposal_info, 'nb_info_proposal' : nb_info_proposal})
@login_required
#@level_required(5)
def configUpdate(request):
instance = get_object_or_404(Config, id=1)
form = ConfigForm(request.POST or None, instance=instance)
if form.is_valid():
form.save()
return reverse('settings')
return render(request, 'dashboard/configuration.html', {'form': form})
@login_required
def devices(request):
url_ = reverse('admin:common_device_changelist')
return redirect(url_)
@login_required
def system(request):
return render(request, 'dashboard/system.html')
@login_required
def system_retrieve_logs(request):
'''
Called by the dashboard system page with ajax request every seconds, to get the logs and print them
'''
if request.is_ajax():
alert_logs = Log.objects.filter(agent='Alert manager').order_by("-created")[:MAX_LOGS_LINES]
scheduler_logs = Log.objects.filter(agent='Scheduler').order_by("-created")[:MAX_LOGS_LINES]
majordome_logs = Log.objects.filter(agent='Majordome').order_by("-created")[:MAX_LOGS_LINES]
obs_logs = Log.objects.filter(agent='Observation manager').order_by("-created")[:MAX_LOGS_LINES]
analyzer_logs = Log.objects.filter(agent='Analyzer').order_by("-created")[:MAX_LOGS_LINES]
monitoring_logs = Log.objects.filter(agent='Monitoring').order_by("-created")[:MAX_LOGS_LINES]
return render(request, 'dashboard/system_logs.html', locals())
def schedule(request):
url_ = reverse('admin:common_schedule_changelist')
return redirect(url_)
@login_required
#@level_required(6)
def quotas(request):
url_ = reverse('admin:common_pyrosuser_changelist')
return redirect(url_)
@login_required
#@level_required(3)
def change_globalMode(request):
try :
config = get_object_or_404(Config, id=1)
#plc_device_status = PlcDeviceStatus.objects.exclude(plc_mode=None).latest('created')
plc_device_status = get_latest_plc_device_status_or_empty()
if config.global_mode == False and plc_device_status.is_safe == False:
return redirect('states')
config.global_mode = not config.global_mode
config.save()
return redirect('states')
except Config.DoesNotExist:
return redirect('states')
@login_required
#@level_required(4)
def change_bypass(request):
try :
config = get_object_or_404(Config, id=1)
#plc_device_status = PlcDeviceStatus.objects.exclude(plc_mode=None).latest('created')
plc_device_status = get_latest_plc_device_status_or_empty()
if plc_device_status.is_safe == True:
return redirect('states')
config.bypass = not config.bypass
config.save()
return redirect('states')
except Config.DoesNotExist:
return redirect('states')
@login_required
#@level_required(3)
def change_lock(request):
try :
config = get_object_or_404(Config, id=1)
#plc_device_status = PlcDeviceStatus.objects.exclude(plc_mode=None).latest('created')
plc_device_status = get_latest_plc_device_status_or_empty()
if config.lock == False and plc_device_status.is_safe == False:
return redirect('states')
config.lock = not config.lock
if (config.lock == True):
config.ntc = not config.ntc
config.save()
return redirect('states')
except Config.DoesNotExist:
return redirect('states')
@login_required
#@level_required(3)
def send_command_to_cameraVIS_1(request):
data = ""
with open('../simulators/config/grammar.json') as f:
data = json.load(f, object_pairs_hook=OrderedDict)
json_str = json.dumps(data)
return render(request, "dashboard/send_command_cameraVIS_1.html", locals())
@login_required
#@level_required(3)
def submit_command_to_cameraVIS_1(request):
'''
function called when a command it submitted for the cameraVIS_1 in the remote control page
(called in the corresponding control_command file)
'''
if request.method == 'POST':
commands = [request.POST.get("first_command"), request.POST.get("first_param")]
try: #TODO faire un truc plus joli pour gérer les params queqlue soit leur nombre
input_0 = request.POST.get("input_number_0")
input_1 = request.POST.get("input_number_1")
input_2 = request.POST.get("input_number_2")
input_3 = request.POST.get("input_number_3")
if input_0:
commands.append(input_0)
if input_1:
commands.append(input_1)
if input_2:
commands.append(input_2)
if input_3:
commands.append(input_3)
except Exception:
pass
response = CameraVISRemoteControlDefault(commands, expert_mode=False, chosen_camera="ddrago_r").exec_command()
#TODO passer en JS pour send les réponses en AJAX
return redirect('send_command_to_cameraVIS_1')
@login_required
#@level_required(3)
def submit_command_to_cameraVIS_1_expert(request):
'''
function called when a command it submitted in expert mode for the cameraVIS_1 in the remote control page
(called in the corresponding control_command file)
'''
if request.method == 'POST':
param = request.POST.get("commande_expert")
if param:
response = CameraVISRemoteControlDefault(param, expert_mode=True, chosen_camera="ddrago_r").exec_command()
#os.system("echo \"status :" + response + "\" >> /home/portos/IRAP/pyros/src/status")
return HttpResponse(json.dumps({'message': "Command send OK", 'response': response}))
return HttpResponse(json.dumps({'message': "Missing command data"}))
return redirect('submit_command_to_telescope')
@login_required
#@level_required(3)
def send_command_to_cameraNIR(request):
data = ""
with open('../simulators/config/grammar.json') as f:
data = json.load(f, object_pairs_hook=OrderedDict)
json_str = json.dumps(data)
return render(request, "dashboard/send_command_cameraNIR.html", locals())
@login_required
#@level_required(3)
def submit_command_to_cameraNIR(request):
'''
function called when a command it submitted for the cameraNIR in the remote control page
(called in the corresponding control_command file)
'''
if request.method == 'POST':
commands = [request.POST.get("first_command"), request.POST.get("first_param")]
try: #TODO faire un truc plus joli pour gérer les params queqlue soit leur nombre
input_0 = request.POST.get("input_number_0")
input_1 = request.POST.get("input_number_1")
input_2 = request.POST.get("input_number_2")
input_3 = request.POST.get("input_number_3")
if input_0:
commands.append(input_0)
if input_1:
commands.append(input_1)
if input_2:
commands.append(input_2)
if input_3:
commands.append(input_3)
except Exception:
pass
response = CameraNIRRemoteControlDefault(commands, expert_mode=False).exec_command()
#TODO passer en JS pour send les réponses en AJAX
return redirect('send_command_to_cameraNIR')
@login_required
#@level_required(3)
def submit_command_to_cameraNIR_expert(request):
'''
function called when a command it submitted in expert mode for the cameraNIR in the remote control page
(called in the corresponding control_command file)
'''
if request.method == 'POST':
param = request.POST.get("commande_expert")
if param:
response = CameraNIRRemoteControlDefault(param, expert_mode=True).exec_command()
return HttpResponse(json.dumps({'message': "Command send OK", 'response': response}))
return HttpResponse(json.dumps({'message': "Missing command data"}))
return redirect('submit_command_to_cameraNIR')
@login_required
#@level_required(3)
def send_command_to_cameraVIS_2(request):
data = ""
with open('../simulators/config/grammar.json') as f:
data = json.load(f, object_pairs_hook=OrderedDict)
json_str = json.dumps(data)
return render(request, "dashboard/send_command_cameraVIS_2.html", locals())
@login_required
#@level_required(3)
def submit_command_to_cameraVIS_2(request):
'''
function called when a command it submitted for the cameraVIS_2 in the remote control page
(called in the corresponding control_command file)
'''
if request.method == 'POST':
commands = [request.POST.get("first_command"), request.POST.get("first_param")]
try: #TODO faire un truc plus joli pour gérer les params queqlue soit leur nombre
input_0 = request.POST.get("input_number_0")
input_1 = request.POST.get("input_number_1")
input_2 = request.POST.get("input_number_2")
input_3 = request.POST.get("input_number_3")
if input_0:
commands.append(input_0)
if input_1:
commands.append(input_1)
if input_2:
commands.append(input_2)
if input_3:
commands.append(input_3)
except Exception:
pass
response = CameraVISRemoteControlDefault(commands, expert_mode=False, chosen_camera="ddrago_b").exec_command()
#TODO passer en JS pour send les réponses en AJAX
return redirect('send_command_to_cameraVIS_2')
@login_required
#@level_required(3)
def submit_command_to_cameraVIS_2_expert(request):
'''
function called when a command it submitted in expert mode for the cameraVIS_2 in the remote control page
(called in the corresponding control_command file)
'''
if request.method == 'POST':
param = request.POST.get("commande_expert")
if param:
response = CameraVISRemoteControlDefault(param, expert_mode=True, chosen_camera="ddrago_b").exec_command()
return HttpResponse(json.dumps({'message': "Command send OK", 'response': response}))
return HttpResponse(json.dumps({'message': "Missing command data"}))
return redirect('submit_command_to_cameraVIS_2')
@login_required
#@level_required(3)
def send_command_to_telescope(request):
data = ""
with open('../simulators/config/grammar.json') as f:
data = json.load(f, object_pairs_hook=OrderedDict)
json_str = json.dumps(data)
return render(request, "dashboard/send_command_telescope.html", locals())
@login_required
#@level_required(3)
def submit_command_to_telescope(request):
'''
function called when a command it submitted for the Telescope in the remote control page
(called in control_command.js)
'''
if request.method == 'POST':
commands = [request.POST.get("first_command"), request.POST.get("first_param")]
try: #TODO faire un truc plus joli pour gérer les params quelque soit leur nombre la c'est du rapide super moche et max 4 input
input_0 = request.POST.get("input_number_0")
input_1 = request.POST.get("input_number_1")
input_2 = request.POST.get("input_number_2")
input_3 = request.POST.get("input_number_3")
if input_0:
commands.append(input_0)
if input_1:
commands.append(input_1)
if input_2:
commands.append(input_2)
if input_3:
commands.append(input_3)
except Exception:
pass
request = ' '.join(commands)
NEW_MODE = True
if NEW_MODE:
TelescopeCommand.objects.create(request=request)
else:
response = TelescopeRemoteControlDefault(commands, False).exec_command()
#TODO passer en JS pour send les réponses en AJAX
return redirect('send_command_to_telescope')
@login_required
#@level_required(3)
def submit_command_to_telescope_expert(request):
'''
function called when a command it submitted in expert mode for the Telescope in the remote control page
(called in control_command.js)
'''
if request.method == 'POST':
param = request.POST.get("commande_expert")
if param:
request = TelescopeCommand.objects.create(request=param)
time.sleep(1)
response = TelescopeCommand.objects.get(pk=request.id).answer
return HttpResponse(json.dumps({'message': "Command send OK", 'response': response}))
return HttpResponse(json.dumps({'message': "Missing command data"}))
return redirect('submit_command_to_telescope')
@login_required
#@level_required(3)
def submit_command_to_plc(request):
if request.method == 'POST':
param = request.POST.get("commande_expert")
if param:
plc_controller = PLC.PLCController()
response = "Unrecognized command"
if param.startswith("GET STATUS ") or param =="GET STATUS": #pas beau a revoir (classe dédiée ?)
response = plc_controller.getStatus(param)
elif param.startswith("SWITCH LIGHTS"):
response = plc_controller.switch_lights(param)
elif param.endswith(" SHUTTERS"):
response = plc_controller.manage_shutters(param)
return HttpResponse(json.dumps({'message': "Command send OK", 'response': response}))
return HttpResponse(json.dumps({'message': "Missing command data"}))
return redirect('submit_command_to_plc')
@login_required
#@level_required(3)
def send_command_to_plc(request):
return render(request, 'dashboard/send_command_to_plc.html')
@login_required
#@level_required(3)
def operator_state(request):
instance = get_object_or_404(Config, id=1)
#plc_device_status = PlcDeviceStatus.objects.exclude(plc_mode=None).latest('created')
plc_device_status = get_latest_plc_device_status_or_empty()
return render(request, 'dashboard/operator_state.html', {'config' : instance, 'is_safe' : plc_device_status.is_safe})
@login_required
#@level_required(3)
def simulator(request):
try :
config = get_object_or_404(Config, id=1)
#plc_status = PlcDeviceStatus.objects.exclude(plc_mode=None).latest('created')
plc_status = get_latest_plc_device_status_or_empty()
return(render(request, 'dashboard/simulator.html', locals()))
except Config.DoesNotExist:
return redirect('simulator')
@login_required
@level_required("Admin")
def simulator_switch_ack(request):
try :
config = get_object_or_404(Config, id=1)
config.ack = not config.ack
config.save()
return redirect('simulator')
except Config.DoesNotExist:
return redirect('simulator')
# if safe => switch to unsafe
# if unsafe => switch to safe
@login_required
@level_required("Admin")
def simulator_switch_safe(request):
try :
#plc_device_status = PlcDeviceStatus.objects.exclude(plc_mode=None).latest('created')
plc_device_status = get_latest_plc_device_status_or_empty()
plc_device_status.is_safe = not plc_device_status.is_safe
plc_device_status.save()
return redirect('simulator')
except Config.DoesNotExist:
return redirect('simulator')
@login_required
@level_required("Admin")
def simulator_give_auto(request):
try :
#plc_device_status = PlcDeviceStatus.objects.exclude(plc_mode=None).latest('created')
plc_device_status = get_latest_plc_device_status_or_empty()
plc_device_status.plc_mode = "AUTO"
plc_device_status.save()
return redirect('simulator')
except Config.DoesNotExist:
return redirect('simulator')
@login_required
@level_required("Admin")
def simulator_give_manu(request):
try :
#plc_device_status = PlcDeviceStatus.objects.exclude(plc_mode=None).latest('created')
plc_device_status = get_latest_plc_device_status_or_empty()
plc_device_status.plc_mode = "MANU"
plc_device_status.save()
return redirect('simulator')
except Config.DoesNotExist:
return redirect('simulator')
@login_required
@level_required("Admin")
def simulator_give_ko(request):
try :
#plc_device_status = PlcDeviceStatus.objects.exclude(plc_mode=None).latest('created')
plc_device_status = get_latest_plc_device_status_or_empty()
plc_device_status.plc_mode = "OFF"
plc_device_status.save()
return redirect('simulator')
except Config.DoesNotExist:
return redirect('simulator')
@login_required
@level_required("Admin")
def simulator_switch_bypass(request):
try :
config = get_object_or_404(Config, id=1)
config.bypass = not config.bypass
config.save()
return redirect('simulator')
except Config.DoesNotExist:
return redirect('simulator')
@login_required
@level_required("Admin")
def simulator_switch_lock(request):
try :
config = get_object_or_404(Config, id=1)
config.lock = not config.lock
config.save()
return redirect('simulator')
except Config.DoesNotExist:
return redirect('simulator')
@login_required
@level_required("Admin")
def simulator_switch_globalMode(request):
try :
config = get_object_or_404(Config, id=1)
config.global_mode = not config.global_mode
config.save()
return redirect('simulator')
except Config.DoesNotExist:
return redirect('simulator')
@login_required
@level_required("Admin")
def simulator_majordome_restart(request):
try :
config = get_object_or_404(Config, id=1)
config.majordome_state = "OCS-RESTART"
config.save()
return redirect('simulator')
except Config.DoesNotExist:
return redirect('simulator')
@login_required
@level_required("Admin")
def simulator_majordome_shutdown(request):
try :
config = get_object_or_404(Config, id=1)
config.majordome_state = "OCS-SHUTDOWN"
config.save()
return redirect('simulator')
except Config.DoesNotExist:
return redirect('simulator')