Commit dd27c2bcf059a98268b8c0d34767cac7c440e487
1 parent
564bc49b
Exists in
dev
Updating agent constructor and adding new-start for pyros.py (launching agentSST)
Showing
9 changed files
with
238 additions
and
28 deletions
Show diff stats
CHANGELOG
privatedev/plugin/agent/AgentSP.py
... | ... | @@ -29,12 +29,12 @@ class AgentSP(Agent): |
29 | 29 | # period = next_period |
30 | 30 | |
31 | 31 | # new init with obsconfig |
32 | - def __init__(self, RUN_IN_THREAD=True,use_db_test=False): | |
32 | + def __init__(self,use_db_test=False): | |
33 | 33 | ##if name is None: name = self.__class__.__name__ |
34 | 34 | if use_db_test: |
35 | 35 | print("USE DB TEST") |
36 | 36 | setup_test_environment() |
37 | - super().__init__(RUN_IN_THREAD) | |
37 | + super().__init__() | |
38 | 38 | next_period = Period.objects.next_period() |
39 | 39 | period = next_period |
40 | 40 | |
... | ... | @@ -228,7 +228,7 @@ if __name__ == "__main__": |
228 | 228 | agentSP.run() |
229 | 229 | #agent = build_agent(agentSP, RUN_IN_THREAD=True) |
230 | 230 | else: |
231 | - agent = build_agent(AgentSP, RUN_IN_THREAD=RUN_IN_THREAD) | |
231 | + agent = build_agent(AgentSP) | |
232 | 232 | ''' |
233 | 233 | TEST_MODE, configfile = extract_parameters() |
234 | 234 | agent = AgentM("AgentM", configfile, RUN_IN_THREAD) | ... | ... |
privatedev/plugin/agent/AgentScheduler.py
... | ... | @@ -68,10 +68,10 @@ class AgentScheduler(Agent): |
68 | 68 | # super().__init__(config_filename, RUN_IN_THREAD) |
69 | 69 | |
70 | 70 | # new config (obsconfig) |
71 | - def __init__(self, name:str=None, RUN_IN_THREAD=True): | |
71 | + def __init__(self, name:str=None): | |
72 | 72 | if name is None: |
73 | 73 | name = self.__class__.__name__ |
74 | - super().__init__(RUN_IN_THREAD) | |
74 | + super().__init__() | |
75 | 75 | # @override |
76 | 76 | def init(self): |
77 | 77 | super().init() | ... | ... |
privatedev/plugin/agent_devices/AgentM.py
... | ... | @@ -7,6 +7,7 @@ import sys |
7 | 7 | |
8 | 8 | #from django.db import transaction |
9 | 9 | #from common.models import Command |
10 | + | |
10 | 11 | sys.path.append("..") |
11 | 12 | sys.path.append("../../../..") |
12 | 13 | from src.core.pyros_django.agent.Agent import Agent, build_agent, log |
... | ... | @@ -75,10 +76,10 @@ class AgentM(Agent): |
75 | 76 | # super().__init__(config_filename, RUN_IN_THREAD) |
76 | 77 | |
77 | 78 | # new config (obsconfig) |
78 | - def __init__(self, name:str=None, RUN_IN_THREAD=True): | |
79 | + def __init__(self, name:str=None): | |
79 | 80 | if name is None: |
80 | 81 | name = self.__class__.__name__ |
81 | - super().__init__(RUN_IN_THREAD) | |
82 | + super().__init__() | |
82 | 83 | PYROS_CONFIG_FILE = os.environ.get("pyros_config_file") |
83 | 84 | if PYROS_CONFIG_FILE: |
84 | 85 | CONFIG_PYROS = ConfigPyros(PYROS_CONFIG_FILE).pyros_config |
... | ... | @@ -265,7 +266,7 @@ if __name__ == "__main__": |
265 | 266 | # with process |
266 | 267 | #RUN_IN_THREAD=False |
267 | 268 | |
268 | - agent = build_agent(AgentM, RUN_IN_THREAD=RUN_IN_THREAD) | |
269 | + agent = build_agent(AgentM) | |
269 | 270 | ''' |
270 | 271 | TEST_MODE, configfile = extract_parameters() |
271 | 272 | agent = AgentM("AgentM", configfile, RUN_IN_THREAD) | ... | ... |
pyros.py
... | ... | @@ -997,6 +997,161 @@ def start(agent: str, configfile: str, observatory: str, unit: str, computer_hos |
997 | 997 | return True if p.returncode == 0 else False |
998 | 998 | |
999 | 999 | |
1000 | + | |
1001 | +@pyros_launcher.command(help="Launch an agent") | |
1002 | +# @global_test_options | |
1003 | +@click.option('--configfile', '-c', help='the configuration file to be used') | |
1004 | +@click.option('--observatory', '-o', help='the observatory name to be used') | |
1005 | +@click.option('--unit', '-u', help='the unit name to be used') | |
1006 | +@click.option("--computer_hostname","-cp", help="The name of simulated computer hostname") | |
1007 | +def new_start(configfile: str, observatory: str, unit: str, computer_hostname: str): | |
1008 | + log.debug("Running start command") | |
1009 | + try: | |
1010 | + from config.pyros.config_pyros import ConfigPyros | |
1011 | + except: | |
1012 | + # pip = "pip" if platform.system() == "Windows" else "pip3" | |
1013 | + # TODO: "python -m pip" au lieu de "pip" | |
1014 | + pip = "pip" if IS_WINDOWS else "pip3" | |
1015 | + process = subprocess.Popen( | |
1016 | + pip + " install --user --upgrade pykwalify", shell=True) | |
1017 | + process.wait() | |
1018 | + if process.returncode == 0: | |
1019 | + # self.addExecuted(self.current_command, command) | |
1020 | + from config.pyros.config_pyros import configpyros | |
1021 | + else: | |
1022 | + log.error( | |
1023 | + "pykwalify package (required for obsconfig class) installation failed") | |
1024 | + if configfile: | |
1025 | + log.debug("With config file" + configfile) | |
1026 | + os.environ["pyros_config_file"] = os.path.join(os.path.abspath( | |
1027 | + PYROS_DJANGO_BASE_DIR), "../../../config/pyros/", configfile) | |
1028 | + else: | |
1029 | + configfile = 'config_pyros.yml' | |
1030 | + os.environ["pyros_config_file"] = os.path.join(os.path.abspath( | |
1031 | + PYROS_DJANGO_BASE_DIR), "../../../config/pyros/", configfile) | |
1032 | + | |
1033 | + logo_name = ConfigPyros( | |
1034 | + os.environ["pyros_config_file"]).pyros_config["general"]["logo"] | |
1035 | + logo_path = os.path.join(os.path.abspath( | |
1036 | + PYROS_DJANGO_BASE_DIR), "../../../config/pyros/", logo_name) | |
1037 | + media_path = os.path.join(os.path.abspath( | |
1038 | + PYROS_DJANGO_BASE_DIR), "misc/static/media", logo_name) | |
1039 | + shutil.copy(logo_path, media_path) | |
1040 | + # if test_mode(): print("in test mode") | |
1041 | + # if verbose_mode(): print("in verbose mode") | |
1042 | + if observatory == None or len(observatory) == 0: | |
1043 | + observatory = "default" | |
1044 | + observatories_configuration_folder = os.path.join( | |
1045 | + os.path.abspath(PYROS_DJANGO_BASE_DIR), "../../../privatedev/config/") | |
1046 | + if len(glob.glob(observatories_configuration_folder+observatory+"/")) != 1: | |
1047 | + # Observatory configuration folder not found | |
1048 | + print( | |
1049 | + f"Observatory configuration folder for observatory '{observatory}' not found in {observatories_configuration_folder}") | |
1050 | + print("Available observatories configuration :") | |
1051 | + for obs_conf_folder in os.listdir(observatories_configuration_folder): | |
1052 | + print(obs_conf_folder) | |
1053 | + exit(1) | |
1054 | + | |
1055 | + path_to_obs_config_folder = observatories_configuration_folder+observatory+"/" | |
1056 | + obs_config_file_name = "" | |
1057 | + # Search for observatory config file | |
1058 | + obs_config_file_name = glob.glob( | |
1059 | + path_to_obs_config_folder+"/observatory*.yml")[0] | |
1060 | + | |
1061 | + obs_config_file_path = os.path.join( | |
1062 | + path_to_obs_config_folder, obs_config_file_name) | |
1063 | + os.environ["PATH_TO_OBSCONF_FILE"] = obs_config_file_path | |
1064 | + os.environ["PATH_TO_OBSCONF_FOLDER"] = path_to_obs_config_folder | |
1065 | + os.environ["unit_name"] = unit if unit else '' | |
1066 | + ''' | |
1067 | + if unit: | |
1068 | + os.environ["unit_name"] = unit | |
1069 | + else: | |
1070 | + os.environ["unit_name"] = "" | |
1071 | + ''' | |
1072 | + | |
1073 | + # add path to pyros_django folder as the config class is supposed to work within this folder | |
1074 | + # cmd_test_obs_config = f"-c \"from src.core.pyros_django.obsconfig.obsconfig_class import OBSConfig\nOBSConfig('{os.path.join(PYROS_DJANGO_BASE_DIR,os.environ.get('PATH_TO_OBSCONF_FILE'))}')\"" | |
1075 | + cmd_test_obs_config = f"-c \"from src.core.pyros_django.obsconfig.obsconfig_class import OBSConfig\nOBSConfig('{obs_config_file_path}')\"" | |
1076 | + if not execProcessFromVenv(cmd_test_obs_config): | |
1077 | + # Observatory configuration has an issue | |
1078 | + exit(1) | |
1079 | + | |
1080 | + current_processes = [] | |
1081 | + # cmd = "manage.py runserver" | |
1082 | + # if(WITH_DOCKER): | |
1083 | + # # If we're running pyros within docker, we need to specify a specific adress in order to access the website on our host machine | |
1084 | + # PYROS_WEBSITE_PORT = os.environ.get("PYROS_WEBSITE_PORT") | |
1085 | + # cmd = f"manage.py runserver 0.0.0.0:{PYROS_WEBSITE_PORT}" | |
1086 | + # os.chdir(PYROS_DJANGO_BASE_DIR) | |
1087 | + # current_processes.append( | |
1088 | + # [execProcessFromVenvAsync(cmd), "webserver", -1]) | |
1089 | + # Start AgentSST (process) | |
1090 | + agent_name = "agentSST" | |
1091 | + agent_folder = AGENTS.get(agent_name) | |
1092 | + current_dir = os.getcwd() | |
1093 | + | |
1094 | + os.chdir(PYROS_DJANGO_BASE_DIR + os.sep + agent_folder) | |
1095 | + cmd = f"Agent{agent_name[5:]}.py" | |
1096 | + if debug_mode(): | |
1097 | + cmd += " -d" | |
1098 | + if sim_mode(): | |
1099 | + cmd += " -s" | |
1100 | + if test_mode(): | |
1101 | + cmd += " -t" | |
1102 | + if verbose_mode(): | |
1103 | + cmd += " -v" | |
1104 | + if configfile: | |
1105 | + cmd += " {configfile}" | |
1106 | + if computer_hostname: | |
1107 | + cmd += " -c {computer_hostname}" | |
1108 | + | |
1109 | + # if not test_mode(): current_processes.append( [execProcessFromVenvAsync(cmd), agent_name, -1] ) | |
1110 | + # Append this process ( [process id, agent_name, result=failure] ) | |
1111 | + # ("result" will be updated at the end of execution) | |
1112 | + current_processes.append( | |
1113 | + [execProcessFromVenvAsync(cmd), agent_name, -1]) | |
1114 | + # self.change_dir("..") | |
1115 | + os.chdir(current_dir) | |
1116 | + | |
1117 | + # Go back to root folder (/) | |
1118 | + # self.change_dir('..') | |
1119 | + # os.chdir("..") | |
1120 | + # Wait for end of each process execution | |
1121 | + # for (p,agent) in current_processes: | |
1122 | + for process in current_processes: | |
1123 | + p, agent, _ = process | |
1124 | + log.debug( | |
1125 | + f"************ Waiting for end of execution of agent {agent} ************") | |
1126 | + p.wait() | |
1127 | + process[2] = p.returncode | |
1128 | + print(f"************ END of execution of agent {agent} ************") | |
1129 | + if p.returncode == 0: | |
1130 | + printFullTerm( | |
1131 | + Colors.GREEN, f"Process {agent} executed successfully") | |
1132 | + # self.addExecuted(self.current_command, command) | |
1133 | + else: | |
1134 | + printFullTerm(Colors.WARNING, f"Process {agent} execution failed") | |
1135 | + # self.addError(self.current_command, command) | |
1136 | + | |
1137 | + print() | |
1138 | + print() | |
1139 | + print("Synthesis of the results:") | |
1140 | + for process in current_processes: | |
1141 | + p, agent, returncode = process | |
1142 | + if returncode == 0: | |
1143 | + printFullTerm( | |
1144 | + Colors.GREEN, f"Process {agent} executed successfully") | |
1145 | + # self.addExecuted(self.current_command, command) | |
1146 | + else: | |
1147 | + printFullTerm(Colors.WARNING, f"Process {agent} execution failed") | |
1148 | + # self.addError(self.current_command, command) | |
1149 | + | |
1150 | + # print("************ end of START() ************") | |
1151 | + # Only according to the last process status: | |
1152 | + # return True if p.returncode==0 else False | |
1153 | + return True if p.returncode == 0 else False | |
1154 | + | |
1000 | 1155 | # TODO: implรฉmenter le STOP !!! |
1001 | 1156 | @pyros_launcher.command(help="Kill an agent") |
1002 | 1157 | @click.argument('agent') | ... | ... |
src/core/pyros_django/agent/AgentSST.py
... | ... | @@ -2,7 +2,7 @@ |
2 | 2 | |
3 | 3 | from pathlib import Path |
4 | 4 | import subprocess |
5 | -import sys, os | |
5 | +import sys, os, datetime | |
6 | 6 | ##import utils.Logger as L |
7 | 7 | #import threading #, multiprocessing, os |
8 | 8 | import time |
... | ... | @@ -10,9 +10,11 @@ import time |
10 | 10 | #from django.db import transaction |
11 | 11 | #from common.models import Command |
12 | 12 | |
13 | -from Agent import Agent, build_agent, log | |
13 | +from Agent import Agent, build_agent, log, extract_parameters | |
14 | 14 | import socket |
15 | +from common.models import AgentCmd | |
15 | 16 | |
17 | +from src.core.pyros_django.obsconfig.obsconfig_class import OBSConfig | |
16 | 18 | |
17 | 19 | class AgentSST(Agent): |
18 | 20 | computer = "MainComputer" |
... | ... | @@ -21,10 +23,14 @@ class AgentSST(Agent): |
21 | 23 | VENV_PYTHON = "" |
22 | 24 | subprocess_dict = {} |
23 | 25 | |
26 | + AGENT_SPECIFIC_COMMANDS = [ | |
27 | + "KILL_AGENT", | |
28 | + "RESTART_AGENT" | |
29 | + ] | |
24 | 30 | |
25 | - def __init__(self, name:str=None, RUN_IN_THREAD=True,sim_computer=None): | |
31 | + def __init__(self, name:str=None,sim_computer=None): | |
26 | 32 | |
27 | - super().__init__(RUN_IN_THREAD) | |
33 | + super().__init__() | |
28 | 34 | self.PROJECT_ROOT_PATH = os.environ["PROJECT_ROOT_PATH"] |
29 | 35 | if name is None: |
30 | 36 | name = self.__class__.__name__ |
... | ... | @@ -49,7 +55,7 @@ class AgentSST(Agent): |
49 | 55 | + os.sep + "bin" |
50 | 56 | + os.sep |
51 | 57 | ) |
52 | - VENV_PYTHON = VENV_BIN + "python3" | |
58 | + self.VENV_PYTHON = VENV_BIN + "python3" | |
53 | 59 | log.info(f"PC hostname is {self.computer}") |
54 | 60 | self.start_agents() |
55 | 61 | |
... | ... | @@ -77,7 +83,7 @@ class AgentSST(Agent): |
77 | 83 | |
78 | 84 | protocol_script_name = protocol.split("/")[-1] |
79 | 85 | if os.path.exists(protocol_folder_abs_path + os.sep + protocol_script_name): |
80 | - cmd = self.VENV_PYTHON + protocol_folder_abs_path + os.sep + protocol_script_name | |
86 | + cmd = self.VENV_PYTHON +" "+ protocol_folder_abs_path + os.sep + protocol_script_name | |
81 | 87 | |
82 | 88 | process = subprocess.Popen(f"{cmd}", shell=True, stdout=subprocess.DEVNULL,stderr=subprocess.STDOUT) |
83 | 89 | self.subprocess_dict[agent] = process |
... | ... | @@ -93,7 +99,7 @@ class AgentSST(Agent): |
93 | 99 | |
94 | 100 | protocol_script_name = protocol.split("/")[-1] |
95 | 101 | if os.path.exists(protocol_folder_abs_path + os.sep + protocol_script_name): |
96 | - cmd = self.VENV_PYTHON + protocol_folder_abs_path + os.sep + protocol_script_name | |
102 | + cmd = self.VENV_PYTHON +" "+ protocol_folder_abs_path + os.sep + protocol_script_name | |
97 | 103 | |
98 | 104 | process = subprocess.Popen(f"{cmd}", shell=True, stdout=subprocess.DEVNULL,stderr=subprocess.STDOUT) |
99 | 105 | self.subprocess_dict[agent] = process |
... | ... | @@ -108,13 +114,56 @@ class AgentSST(Agent): |
108 | 114 | """ |
109 | 115 | self.start_agents(agent_name) |
110 | 116 | |
111 | - def routine_process_body(self): | |
112 | - for process in self.subprocess_dict.values(): | |
117 | + | |
118 | + def cmd_kill_agent(self, cmd:AgentCmd, agent:str)->None: | |
119 | + if agent in self.subprocess_dict.keys(): | |
120 | + self.send_cmd_to(agent,"do_exit") | |
121 | + | |
122 | + def cmd_restart_agent(self, cmd:AgentCmd ,agent:str)->None: | |
123 | + self.start_agent(agent) | |
124 | + if agent in self.subprocess_dict.keys(): | |
125 | + cmd.set_result(f"Agent {agent} restarted") | |
126 | + cmd.set_as_processed() | |
127 | + else: | |
128 | + cmd.set_result(f"Agent {agent} failed to restart") | |
129 | + log.debug(f"Agent {agent} failed to restart") | |
130 | + | |
131 | + def force_kill_agent(self, cmd:AgentCmd, agent:str)->None: | |
132 | + if agent in self.subprocess_dict.get(agent) is not None: | |
133 | + process = self.subprocess_dict.get(agent) | |
113 | 134 | process.terminate() |
114 | - process.wait() | |
115 | - exit(0) | |
116 | - # needs to receive new commands to stop or start new agents | |
117 | - #start_agents(self) | |
135 | + if process.poll() is not None: | |
136 | + cmd.set_as_processed() | |
137 | + else: | |
138 | + return None | |
139 | + | |
140 | + def routine_process_before_body(self): | |
141 | + try: | |
142 | + now_time = datetime.datetime.utcnow() | |
143 | + last_running_commands = AgentCmd.get_commands_sent_by_agent("AgentSST").filter(state="CMD_RUNNING") | |
144 | + | |
145 | + for cmd in last_running_commands: | |
146 | + last_running_cmd, agent = cmd.full_name.split(" ")[:1] | |
147 | + if last_running_cmd == "KILL_AGENT" and cmd.is_expired(): | |
148 | + self.force_killed_agent(agent) | |
149 | + next_cmd = self._get_next_valid_and_not_running_command() | |
150 | + if next_cmd is not None: | |
151 | + cmd, agent = next_cmd.full_name.split(" ")[:1] | |
152 | + if cmd == "KILL_AGENT": | |
153 | + self.cmd_kill_agent(next_cmd,agent) | |
154 | + elif cmd == "RESTART_AGENT": | |
155 | + self.cmd_restart_agent(next_cmd,agent) | |
156 | + log.info("Check status of process") | |
157 | + for agent in self.subprocess_dict: | |
158 | + proc = self.subprocess_dict.get(agent) | |
159 | + log.info(f"{agent} poll result is {proc.poll()}") | |
160 | + except KeyboardInterrupt: | |
161 | + | |
162 | + for process in self.subprocess_dict.values(): | |
163 | + print(process) | |
164 | + process.kill() | |
165 | + exit(0) | |
166 | + | |
118 | 167 | |
119 | 168 | if __name__ == "__main__": |
120 | 169 | ... | ... |
src/core/pyros_django/monitoring/AgentM.py
... | ... | @@ -76,10 +76,10 @@ class AgentM(Agent): |
76 | 76 | # super().__init__(config_filename, RUN_IN_THREAD) |
77 | 77 | |
78 | 78 | # new config (obsconfig) |
79 | - def __init__(self, name:str=None, RUN_IN_THREAD=True): | |
79 | + def __init__(self, name:str=None): | |
80 | 80 | if name is None: |
81 | 81 | name = self.__class__.__name__ |
82 | - super().__init__(RUN_IN_THREAD) | |
82 | + super().__init__() | |
83 | 83 | PYROS_CONFIG_FILE = os.environ.get("pyros_config_file") |
84 | 84 | if PYROS_CONFIG_FILE: |
85 | 85 | CONFIG_PYROS = ConfigPyros(PYROS_CONFIG_FILE).pyros_config |
... | ... | @@ -266,7 +266,7 @@ if __name__ == "__main__": |
266 | 266 | # with process |
267 | 267 | #RUN_IN_THREAD=False |
268 | 268 | |
269 | - agent = build_agent(AgentM, RUN_IN_THREAD=RUN_IN_THREAD) | |
269 | + agent = build_agent(AgentM) | |
270 | 270 | ''' |
271 | 271 | TEST_MODE, configfile = extract_parameters() |
272 | 272 | agent = AgentM("AgentM", configfile, RUN_IN_THREAD) | ... | ... |
src/core/pyros_django/obsconfig/obsconfig_class.py
src/core/pyros_django/scientific_program/AgentSP.py
... | ... | @@ -29,12 +29,12 @@ class AgentSP(Agent): |
29 | 29 | # period = next_period |
30 | 30 | |
31 | 31 | # new init with obsconfig |
32 | - def __init__(self, RUN_IN_THREAD=True,use_db_test=False): | |
32 | + def __init__(self,use_db_test=False): | |
33 | 33 | ##if name is None: name = self.__class__.__name__ |
34 | 34 | if use_db_test: |
35 | 35 | print("USE DB TEST") |
36 | 36 | setup_test_environment() |
37 | - super().__init__(RUN_IN_THREAD) | |
37 | + super().__init__() | |
38 | 38 | next_period = Period.objects.next_period() |
39 | 39 | period = next_period |
40 | 40 | |
... | ... | @@ -228,7 +228,7 @@ if __name__ == "__main__": |
228 | 228 | agentSP.run() |
229 | 229 | #agent = build_agent(agentSP, RUN_IN_THREAD=True) |
230 | 230 | else: |
231 | - agent = build_agent(AgentSP, RUN_IN_THREAD=RUN_IN_THREAD) | |
231 | + agent = build_agent(AgentSP) | |
232 | 232 | ''' |
233 | 233 | TEST_MODE, configfile = extract_parameters() |
234 | 234 | agent = AgentM("AgentM", configfile, RUN_IN_THREAD) | ... | ... |