AgentSST.py
7.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
#!/usr/bin/env python3
from pathlib import Path
import subprocess
import sys, os
from datetime import datetime, timezone, timedelta
##import utils.Logger as L
#import threading #, multiprocessing, os
import time
#from django.db import transaction
#from common.models import Command
from Agent import Agent, build_agent, log, extract_parameters
import socket
from common.models import AgentCmd, AgentSurvey
from src.core.pyros_django.obsconfig.obsconfig_class import OBSConfig
class AgentSST(Agent):
computer = "MainComputer"
_previous_dir = ""
PROJECT_ROOT_PATH = ""
VENV_PYTHON = ""
subprocess_dict = {}
AGENT_SPECIFIC_COMMANDS = [
"do_kill_agent",
"do_restart_agent"
]
def __init__(self, name:str=None,sim_computer=None):
super().__init__()
self.PROJECT_ROOT_PATH = os.environ["PROJECT_ROOT_PATH"]
if name is None:
name = self.__class__.__name__
# if sim_computer:
# self.computer = sim_computer
# else:
# self.computer = socket.gethostname()
WITH_DOCKER = False
if os.environ.get("WITH_DOCKER"):
WITH_DOCKER = True
if WITH_DOCKER:
VENV_ROOT = ""
VENV = ""
VENV_BIN = ""
else:
VENV_ROOT = "venv"
VENV = "venv_py3_pyros"
VENV_BIN = (
self.PROJECT_ROOT_PATH
+ os.sep + VENV_ROOT
+ os.sep + VENV
+ os.sep + "bin"
+ os.sep
)
self.VENV_PYTHON = VENV_BIN + "python3"
log.info(f"PC hostname is {self.computer}")
self.start_agents()
def start_agents(self,agent_name=None):
"""
Start all agents or one agent of obs_config
Args:
agent_name (_type_, optional): Specific agent name to start. Defaults to None.
"""
obs_config = self.get_config()
agents = obs_config.get_agents_per_computer(obs_config.unit_name).get(self.computer)
if agents is None:
print("Computer not found in obs config")
exit(1)
#self.change_dir(self.PROJECT_ROOT_PATH)
if agent_name:
agent = agent_name
# Start a specific agent of obs_config (restart)
agent_informations = obs_config.get_agent_information(obs_config.unit_name,agent)
protocol = agent_informations.get("protocol")
if protocol:
protocol_folder_abs_path = os.path.join(self.PROJECT_ROOT_PATH, os.path.dirname(protocol))
protocol_script_name = protocol.split("/")[-1]
if os.path.exists(protocol_folder_abs_path + os.sep + protocol_script_name):
cmd = self.VENV_PYTHON +" "+ protocol_folder_abs_path + os.sep + protocol_script_name
process = subprocess.Popen(f"{cmd}", shell=True, stdout=subprocess.DEVNULL,stderr=subprocess.STDOUT)
process.poll()
self.subprocess_dict[agent] = process
log.info(f"Start agent {agent} with cmd {cmd}")
else:
# Start every agent of obs_config (initial start)
for agent in agents:
agent_informations = obs_config.get_agent_information(obs_config.unit_name,agent)
protocol = agent_informations.get("protocol")
if protocol:
protocol_folder_abs_path = os.path.join(self.PROJECT_ROOT_PATH, os.path.dirname(protocol))
protocol_script_name = protocol.split("/")[-1]
if os.path.exists(protocol_folder_abs_path + os.sep + protocol_script_name):
cmd = self.VENV_PYTHON +" "+ protocol_folder_abs_path + os.sep + protocol_script_name
# process = subprocess.Popen(f"{cmd}", shell=True, stdout=subprocess.DEVNULL,stderr=subprocess.STDOUT)
process = subprocess.Popen(f"{cmd}", shell=True)
self.subprocess_dict[agent] = process
log.info(f"Start agent {agent} with cmd {cmd}")
def start_agent(self, agent_name:str):
"""
Start a specific agent of obs_config (Restart)
Args:
agent_name (str): Name of agent to start
"""
self.start_agents(agent_name)
def do_kill_agent(self, *args)->None:
agent = args[0]
if agent in self.subprocess_dict.keys():
cmd = self.send_cmd_to(agent,"do_exit")
return cmd
def do_restart_agent(self, *args)->None:
agent = args[0]
self.do_kill_agent(agent)
self.start_agent(agent)
# if agent in self.subprocess_dict.keys():
# cmd.set_result(f"Agent {agent} restarted")
# cmd.set_as_processed()
# else:
# cmd.set_result(f"Agent {agent} failed to restart")
# log.debug(f"Agent {agent} failed to restart")
def force_kill_agent(self, *args)->None:
if args:
agent = args[0]
if self.subprocess_dict.get(agent) is not None:
process = self.subprocess_dict.get(agent)
# process.terminate()
# process.wait()
# Kill is better when using Popen(shell=True) because it will remove the created child process
process.kill()
else:
return None
def do_things_before_exit(self,abort_cmd_sender):
kill_agent_commands = {}
for agent in self.subprocess_dict.keys():
cmd = self.do_kill_agent(agent)
kill_agent_commands[agent] = cmd
# wait 10 seconds in order to agents to exit themselves properly
time.sleep(10)
for agent in kill_agent_commands:
while self.subprocess_dict[agent].poll() is None:
time.sleep(0.5)
def routine_process_after_body(self):
now_time = datetime.now(timezone.utc)
last_running_commands = AgentCmd.get_commands_sent_by_agent("AgentSST").filter(state="CMD_RUNNING")
for cmd in last_running_commands:
last_running_cmd, agent = cmd.full_name.split(" ")[:1]
if last_running_cmd == "KILL_AGENT" and cmd.is_expired():
self.force_killed_agent(agent)
# chekcing status of agent if they are timeout
for agent in self.subprocess_dict.keys():
agent_survey = AgentSurvey.objects.get(name=agent)
validity_duration = agent_survey.validity_duration
last_update_from_agent = agent_survey.updated
validity_duration_timedelta = timedelta(seconds=validity_duration)
timeout_datetime = last_update_from_agent + validity_duration_timedelta
timeout_datetime = timeout_datetime.replace(tzinfo=timezone.utc)
# if agent is timeout, restart it
if timeout_datetime < now_time:
# restart agent
self.send_cmd_to("AgentSST","restart_agent", agent)
# next_cmd = self._get_next_valid_and_not_running_command()
# if next_cmd is not None:
# cmd, agent = next_cmd.full_name.split(" ")[:1]
# if cmd == "KILL_AGENT":
# self.cmd_kill_agent(next_cmd,agent)
# elif cmd == "RESTART_AGENT":
# self.cmd_restart_agent(next_cmd,agent)
log.info("Check status of process")
for agent in self.subprocess_dict:
proc = self.subprocess_dict.get(agent)
log.info(f"{agent} poll result is {proc.poll()}")
if __name__ == "__main__":
agent = build_agent(AgentSST)
agent.run()