AgentMCO.py 4.3 KB
#!/usr/bin/env python3


import subprocess
import sys
import os, argparse
##import utils.Logger as L

pwd = os.environ['PROJECT_ROOT_PATH']
if pwd not in sys.path:
    sys.path.append(pwd)

##from .Agent import Agent
from src.core.pyros_django.agent.Agent import Agent, build_agent, log

##log = L.setupLogger("AgentXTaskLogger", "AgentX")



class AgentMCO(Agent):

    # FOR TEST ONLY
    # Run this agent in simulator mode
    TEST_MODE = False
    # Run the assertion tests at the end
    TEST_WITH_FINAL_TEST = False
    #TEST_MAX_DURATION_SEC = None
    TEST_MAX_DURATION_SEC = 100
    # Who should I send commands to ?
    TEST_COMMANDS_DEST = "myself"

    # This agent SPECIFIC commands
    # @override
    _AGENT_SPECIFIC_COMMANDS = {
        # Format : “cmd_name” : (timeout, exec_mode, tooltip)
        "do_process_image" : (10, Agent.EXEC_MODE.SEQUENTIAL, ''),
    }


    # This agent TEST scenario (automatically executed in TEST mode)
    # @override
    _TEST_COMMANDS_LIST = [
        # Format : (DO_IT, "self cmd_name cmd_args", validity, "expected_result", expected_status),
        (True, "self do_process_image i.fit", 200, None, Agent.CMD_STATUS.CMD_EXECUTED),
        (True, "self do_exit", 500, "STOPPING", Agent.CMD_STATUS.CMD_EXECUTED),
    ]



    """
    =================================================================
        FUNCTIONS RUN INSIDE MAIN THREAD
    =================================================================
    """

    def __init__(self, name:str=None):
        if name is None:
            name = self.__class__.__name__
        super().__init__()
        #super().__init__(RUN_IN_THREAD)        



    # @override
    def _init(self):
        self.processes = []
        mco_folder = self.get_config().get_agent_information(self.get_config().unit_name,self.name)["path"]
        mco_folder_abs_path = pwd + "/" + mco_folder + "/"
        self.mco_folder_abs_path = mco_folder_abs_path

        if not os.path.exists(mco_folder_abs_path+"output/"):
            os.makedirs(mco_folder_abs_path+"output/")
        if not os.path.exists(mco_folder_abs_path+"input/"):
            os.makedirs(mco_folder_abs_path+"input/")
        self.output_folder = mco_folder_abs_path+"output/"
        self.input_folder = mco_folder_abs_path+"input/"

    def do_process_image(self, image_name:str):
        image_path = self.input_folder + image_name
        output_file_name = image_name.split(".")[0] + ".json"
        output_file_path = self.output_folder + output_file_name
        if os.path.exists(image_path):
            cmd = "python3 " + self.mco_folder_abs_path + "estimation_FWHM.py -i " + image_path + " -o " + self.output_folder
            print(f"Running cmd {cmd}")
            process = subprocess.Popen(cmd,shell=True)
            self.processes.append(process)
            cmd = "python3 " + self.mco_folder_abs_path + "image_stat.py -i " + image_path + " -o " + self.output_folder
            print(f"Running cmd {cmd}")
            process = subprocess.Popen(cmd,shell=True)
            self.processes.append(process)
            for process in self.processes: 
                process.wait()
        else:  
            print(f"Can't find image {image_path}")
    #@override
    def main_loop_start(self):
        print("LOOP START")

    #@override
    def main_loop_end(self):
        print("LOOP END");

    '''
    # @override
    def load_config(self):
        super().load_config()
    '''

    '''
    # @override
    def update_survey(self):
        super().update_survey()
    '''

    '''
    # @override
    def get_next_command(self):
        return super().get_next_command()
    '''

    # @override
    def do_log(self):
        super().do_log()

    # @override
    def do_things_before_exit(self, stopper_agent_name=None):
        print("AgentMCO fait quelques trucs à lui avant de stopper...")

"""
=================================================================
    MAIN FUNCTION
=================================================================
"""
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Start a agentSST.')
    parser.add_argument("--computer",dest="computer",help='Launch agent with simulated computer hostname',action="store")
    parser.add_argument("-t", action="store_true")
    args = vars(parser.parse_args())
    agent = build_agent(AgentMCO,args)

    agent.run()