AgentMajordome.py
2.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#!/usr/bin/env python3
from datetime import datetime, timezone, timedelta
import time,sys
sys.path.append("..")
sys.path.append("../../../..")
from src.core.pyros_django.agent.Agent import Agent, build_agent, log
from common.models import SiteWatch, WeatherWatch, Majordome, AgentCmd
class AgentMajordome(Agent):
AGENT_SPECIFIC_COMMANDS = [
]
def __init__(self, name:str=None,sim_computer=None):
super().__init__()
if name is None:
name = self.__class__.__name__
def _init(self):
super()._init()
def close_dome_if_openned(self):
is_dome_openned = SiteWatch.objects.last().dome == "Open"
if is_dome_openned:
dome_agent = self._oc.get_components_agents().get("BuildingCover")
self.send_cmd_to(to_agent=dome_agent,cmd_name="do_close_dome")
def open_dome_if_closed(self):
is_dome_closed = SiteWatch.objects.last().dome == "Closed"
if is_dome_closed:
dome_agent = self._oc.get_components_agents().get("BuildingCover")
self.send_cmd_to(to_agent=dome_agent,cmd_name="do_open_dome")
def do_switch_obs_mode(self,obs_mode):
if obs_mode == Majordome.DAY_MODE:
# Close roof if dome is openned
self.close_dome_if_openned()
else:
self.open_dome_if_closed()
def do_switch_obs_mode(self,mode):
# Need table to register obs mode DAY / NIGHT
if mode == "DAY":
obj, created = Majordome.objects.update_or_create(obs_mode=mode)
is_dome_openned = SiteWatch.objects.last().dome == "Open"
if is_dome_openned:
self.do_close_dome()
elif mode == "NIGHT":
obj, created = Majordome.objects.update_or_create(obs_mode=mode)
is_dome_openned = SiteWatch.objects.last().dome == "Open"
if is_dome_openned:
dome_agent = self._oc.get_components_agents().get("BuildingCover")
sendcmd_to("do_close_dome",dome_agent)
pass
def _routine_process_before_body(self):
# Check time to know if we need to switch DAY/NIGHT mode
# TBD
pass
def routine_process_after(self):
check_weather = WeatherWatch.objects.last().global_status
global_status_weather = check_weather
soft_mode = Majordome.objects.objects.last().soft_mode
if soft_mode == AUTO:
if global_status_weather != "OK":
log.info(f"Weather {global_status_weather}. Asking to close dome")
# check if command already created, if x seconds has passed and command wasn't executed -> send mail to operator to manually operate dome
is_dome_openned = SiteWatch.objects.last().dome == "Open"
if is_dome_openned:
self.close_dome_if_openned()
if __name__ == "__main__":
agent = build_agent(AgentMajordome)
agent.run()