AgentSST.py 13.4 KB
#!/usr/bin/env python3

from pathlib import Path
import subprocess
import sys, os
from datetime import datetime, timezone, timedelta
##import utils.Logger as L
#import threading #, multiprocessing, os
import time

#from django.db import transaction
#from common.models import Command

from Agent import Agent, build_agent, log, extract_parameters
import socket
from common.models import AgentCmd, AgentSurvey

from src.core.pyros_django.obsconfig.obsconfig_class import OBSConfig

class AgentSST(Agent):
    computer = "XCY1"
    _previous_dir = ""
    PROJECT_ROOT_PATH = ""
    VENV_PYTHON = ""
    subprocess_dict = {}
    agent_in_mode_test = {}

    AGENT_SPECIFIC_COMMANDS = [
        ("do_kill_agent",10),
        ("do_restart_agent",20),
        ("do_start_agent",20),
    ]
    TEST_COMMANDS_LIST = [
    ]

    def __init__(self, name:str=None,sim_computer=None):
        
        super().__init__()
        self.PROJECT_ROOT_PATH = os.environ["PROJECT_ROOT_PATH"]
        if name is None:
            name = self.__class__.__name__
        # self.computer = socket.gethostname()        
        WITH_DOCKER = False
        if os.environ.get("WITH_DOCKER"):
            WITH_DOCKER = True
            # if WITH_DOCKER socket.gethostname() bizarre 
        if WITH_DOCKER:
            VENV_ROOT = ""
            VENV = ""
            VENV_BIN = ""
        else:
            VENV_ROOT = "venv"
            VENV = "venv_py3_pyros"
            VENV_BIN = (
                self.PROJECT_ROOT_PATH
                + os.sep + VENV_ROOT
                + os.sep + VENV
                + os.sep + "bin"
                + os.sep
            )
        self.VENV_PYTHON = VENV_BIN + "python3"
        log.info(f"PC hostname is {self.computer}")
        #name_from_config = self.get_config().get_agent_sst_of_current_computer()
    
    def init(self):
        super().init()
        self.start_agents()
        self.TEST_MODE = False
        time.sleep(10)
        self.set_delay(3)

    def start_agents(self,agent_name=None):
        """
        Start all agents or one agent of obs_config

        Args:
            agent_name (_type_, optional): Specific agent name to start. Defaults to None.
        """
        obs_config = self.get_config()
        test_mode = " -t"

        if agent_name:
            agent = agent_name
            # Start a specific agent of obs_config (restart)
            agent_informations = obs_config.get_agent_information(obs_config.unit_name,agent)
            protocol = agent_informations.get("protocol")
            if protocol:
                protocol_folder_abs_path = os.path.join(self.PROJECT_ROOT_PATH, os.path.dirname(protocol))
                
                protocol_script_name = protocol.split("/")[-1]
                if os.path.exists(protocol_folder_abs_path + os.sep + protocol_script_name):
                    cmd = self.VENV_PYTHON +" "+ protocol_folder_abs_path + os.sep + protocol_script_name
                    if not agent in self.agent_in_mode_test:
                        self.agent_in_mode_test = self.TEST_MODE
                    if self.agent_in_mode_test[agent]:
                        cmd += test_mode
                    process = subprocess.Popen(f"{cmd}",shell=True)
                    process.poll()
                    
                    self.subprocess_dict[agent]["process"] = process
                    nb_try_restart = self.subprocess_dict[agent].get("nb_try_restart",0)
                    nb_try_restart += 1
                    self.subprocess_dict[agent]["nb_try_restart"] = nb_try_restart
                    log.info(f"Start agent {agent} with cmd {cmd}")

        else:
            agents = obs_config.get_agents_per_computer(obs_config.unit_name).get(self.computer)
            if agents is None:
                available_hostnames = obs_config.get_agents_per_computer(obs_config.unit_name).keys()
                log.info("Computer not found in obs config")
                log.info(f"Available hostnames {available_hostnames}. Current hostname is {self.computer}")
                exit(1)
            #self.change_dir(self.PROJECT_ROOT_PATH)
            else:
                log.info(f"Agents associated to this computer : {agents}")
            # Start every agent of obs_config (initial start)
            for agent in agents:
                agent_informations = obs_config.get_agent_information(obs_config.unit_name,agent)
                protocol = agent_informations.get("protocol")
                if protocol:
                    protocol_folder_abs_path = os.path.join(self.PROJECT_ROOT_PATH, os.path.dirname(protocol))
                    
                    protocol_script_name = protocol.split("/")[-1]
                    if os.path.exists(protocol_folder_abs_path + os.sep + protocol_script_name):
                        
                        cmd = self.VENV_PYTHON +" "+ protocol_folder_abs_path + os.sep + protocol_script_name
                        if not agent in self.agent_in_mode_test:
                            self.agent_in_mode_test[agent] = self.TEST_MODE
                        if self.agent_in_mode_test[agent]:
                            cmd += test_mode
                        # process = subprocess.Popen(f"{cmd}", shell=True, stdout=subprocess.DEVNULL,stderr=subprocess.STDOUT)
                        process = subprocess.Popen(f"{cmd}", shell=True)
                        self.subprocess_dict[agent] = {}
                        self.subprocess_dict[agent]["process"] = process
                        # Reset to zero nb_try when AgentSST start (launch all agents)
                        self.subprocess_dict[agent]["nb_try_restart"] = 0
                        log.info(f"Start agent {agent} with cmd {cmd}")

    def do_start_agent(self, agent_name:str):
        """
        Start a specific agent of obs_config (Restart)

        Args:
            agent_name (str): Name of agent to start
        """
        self.start_agents(agent_name)
        nb_try_restart_agent = self.subprocess_dict[agent_name]["nb_try_restart"]
        agent_survey = AgentSurvey.objects.get(name=agent_name)
        agent_survey.current_nb_restart = nb_try_restart_agent
        
        agent_survey.save()


    def do_kill_agent(self, agent)->None:
        # agent = args[0]
        if agent in self.subprocess_dict.keys():
            cmd = self.send_cmd_to(agent,"do_exit")
            #cmd = self.send_cmd_to(agent,"do_exit","asap")
            return cmd

    def do_restart_agent(self, agent)->None:
        # agent = args[0]
        nb_try_restart_agent = self.subprocess_dict[agent]["nb_try_restart"]
        if nb_try_restart_agent < AgentSurvey.objects.get(name=agent).nb_restart_max:
            self.do_kill_agent(agent)
            self.do_start_agent(agent)
        else:
            #sendmail
            pass
        agent_survey = AgentSurvey.objects.get(name=agent)
        agent_survey.current_nb_restart = nb_try_restart_agent
        agent_survey.save()

        # if agent in self.subprocess_dict.keys():
        #     cmd.set_result(f"Agent {agent} restarted")
        #     cmd.set_as_processed()
        # else:
        #     cmd.set_result(f"Agent {agent} failed to restart")
        #     log.debug(f"Agent {agent} failed to restart")

    def force_kill_agent(self, *args)->None:
        if args:
            agent = args[0]
            if self.subprocess_dict.get(agent) is not None:
                process = self.subprocess_dict.get(agent).get("process")
                # process.terminate()
                # process.wait()
                # Kill is better when using Popen(shell=True) because it will remove the created child process
                process.kill()
            else:
                return None

    def do_things_before_exit(self,abort_cmd_sender):
        kill_agent_commands = {}
        for agent in self.subprocess_dict.keys():
            cmd = self.do_kill_agent(agent)    
            kill_agent_commands[agent] = cmd
            agent_survey = AgentSurvey.objects.get(name=agent)
            # Reset counter before exiting
            agent_survey.current_nb_restart = 0
            agent_survey.save()                             
        # wait 10 seconds in order to agents to exit themselves properly 
        time.sleep(10)
        for agent in self.subprocess_dict.keys():
            while self.subprocess_dict[agent].get("process").poll() is None:
                time.sleep(0.5)

    def routine_process_after_body(self):
        now_time = datetime.now(timezone.utc) 
        last_running_commands = AgentCmd.get_commands_sent_by_agent("AgentSST").filter(state="CMD_RUNNING",recipient__in=list(self.subprocess_dict.keys()))
        for cmd in last_running_commands:
            last_running_cmd = cmd.full_name
            if last_running_cmd == "KILL_AGENT" and cmd.is_expired():
                agent = cmd.args[0]
                self.force_kill_agent(agent)

        # checking status of agent if they are timeout
        for agent in self.subprocess_dict.keys():
            try:
                agent_survey = AgentSurvey.objects.get(name=agent)
            except AgentSurvey.DoesNotExist:
                # If there is no entry in AgentSurvey for this agent go to next iteration (it surely means that the agentSST launched this agent for the first time, and it didn't had enough time to create an entry in AgentSurvey)
                continue
            
            validity_duration = agent_survey.validity_duration
            last_update_from_agent = agent_survey.updated
            validity_duration_timedelta = timedelta(seconds=validity_duration)
            timeout_datetime = last_update_from_agent + validity_duration_timedelta
            timeout_datetime = timeout_datetime.replace(tzinfo=timezone.utc)
            # if agent latest state is timeout, restart it
            if timeout_datetime < now_time:
                if self.subprocess_dict[agent].get("process").poll() != None:
                    last_executed_start_agent_cmd =  AgentCmd.objects.filter(state="CMD_EXECUTED",full_name=f"do_start_agent {agent}",recipient=self.name).order_by("-s_deposit_time")
                    if last_executed_start_agent_cmd.exists():
                        cmd_outdated_datetime_start = datetime.utcnow() - timedelta(seconds=30)
                        cmd_outdated_datetime_end = datetime.utcnow() - timedelta(seconds=25)
                        cmd_outdated_datetime_start = cmd_outdated_datetime_start.replace(tzinfo=timezone.utc)
                        cmd_outdated_datetime_end = cmd_outdated_datetime_end.replace(tzinfo=timezone.utc)
                        # cmd outdated if deposit time was between 25 and 30 seconds ago from now
                        # if last start cmd for this agent was executed and this agent isn't currently running, ask again a start.
                        if cmd_outdated_datetime_start >= last_executed_start_agent_cmd.first().s_deposit_time and cmd_outdated_datetime_end >= last_executed_start_agent_cmd.first().s_deposit_time:
                            self.send_cmd_to("AgentSST","do_start_agent", agent)
                    else:
                        try:
                            # Check if do_start_agent cmd already asked by agentSST in previous iterations and not exectuted. If the query success (no exception raised), we don't send again a cmd
                            AgentCmd.get_pending_and_running_commands_for_agent(self.name).get(full_name=f"do_start_agent {agent}")
                        except:
                            self.send_cmd_to(self.name,"do_start_agent", agent)
                else:
                    last_executed_start_or_restart_agent_cmd =  AgentCmd.objects.filter(state="CMD_EXECUTED",full_name__in=(f"do_start_agent {agent}",f"do_restart_agent {agent}"),recipient=self.name).order_by("-s_deposit_time")
                    if last_executed_start_or_restart_agent_cmd.exists():
                        cmd_outdated_datetime_start = datetime.utcnow() - timedelta(seconds=30)
                        cmd_outdated_datetime_end = datetime.utcnow() - timedelta(seconds=25)
                        cmd_outdated_datetime_start = cmd_outdated_datetime_start.replace(tzinfo=timezone.utc)
                        cmd_outdated_datetime_end = cmd_outdated_datetime_end.replace(tzinfo=timezone.utc)
                        # cmd outdated if deposit time was between 25 and 30 seconds ago from now
                        # if last start or restart cmd for this agent was executed and this agent isn't currently running, ask again a restart.
                        if cmd_outdated_datetime_start >= last_executed_start_or_restart_agent_cmd.first().s_deposit_time and cmd_outdated_datetime_end >=  last_executed_start_or_restart_agent_cmd.first().s_deposit_time:
                            self.send_cmd_to(self.name,"do_restart_agent", agent)    
                    else:
                        try:
                            # Check if do_restart_agent cmd already asked by agentSST in previous iterations and not exectuted. If the query success (no exception raised), we don't send again a cmd
                            AgentCmd.get_pending_and_running_commands_for_agent(self.name).get(full_name=f"do_restart_agent {agent}")
                        except:
                            self.send_cmd_to(self.name,"do_restart_agent", agent)

        log.info("Check status of process")
        for agent in self.subprocess_dict:
            proc = self.subprocess_dict.get(agent).get("process")
            log.info(f"{agent} poll result is {proc.poll()}")
     


if __name__ == "__main__":

    agent = build_agent(AgentSST)
    agent.run()