AgentTriton.py 5.05 KB
#!/usr/bin/env python3


import subprocess
import sys
import os
##import utils.Logger as L

pwd = os.environ['PROJECT_ROOT_PATH']
if pwd not in sys.path:
    sys.path.append(pwd)

##from .Agent import Agent
from src.core.pyros_django.agent.Agent import Agent, build_agent

##log = L.setupLogger("AgentXTaskLogger", "AgentX")



class AgentTriton(Agent):

    # FOR TEST ONLY
    # Run this agent in simulator mode
    TEST_MODE = False
    # Run the assertion tests at the end
    TEST_WITH_FINAL_TEST = False
    #TEST_MAX_DURATION_SEC = None
    TEST_MAX_DURATION_SEC = 100
    # Who should I send commands to ?
    TEST_COMMANDS_DEST = "myself"
    #TEST_COMMANDS_DEST = "AgentB"
    # Scenario to be executed

    _AGENT_SPECIFIC_COMMANDS = [
        ("do_process_image",10,0),
    ]

    _TEST_COMMANDS_LIST = [
        ("self do_process_image i.fit", 200, None, "CMD_EXECUTED"),
        ("self do_exit", 500, "STOPPING", "CMD_EXECUTED"),
    ]
    """
    =================================================================
        FUNCTIONS RUN INSIDE MAIN THREAD
    =================================================================
    """

    # @Override
    '''
    #def __init__(self, name:str=None, config_filename=None, RUN_IN_THREAD=True):
    def __init__(self, config_filename=None):
        ##if name is None: name = self.__class__.__name__
        #super().__init__(name, config_filename, RUN_IN_THREAD)
        super().__init__(config_filename)
        #self._log.print(f"init done for {name}")
        self._log.print("init done")
    '''
    def __init__(self, name:str=None):
        if name is None:
            name = self.__class__.__name__
        super().__init__()
        #super().__init__(RUN_IN_THREAD)        


    def shell_source(self,script):
        """
        Sometime you want to emulate the action of "source" in bash,
        settings some environment variables. Here is a way to do it.
        """
        
        pipe = subprocess.Popen(". %s && env -0" % script, stdout=subprocess.PIPE, shell=True)
        output = pipe.communicate()[0].decode('utf-8')
        output = output[:-1] # fix for index out for range in 'env[ line[0] ] = line[1]'

        env = {}
        # split using null char
        for line in output.split('\x00'):
            line = line.split( '=', 1)
            # print(line)
            env[ line[0] ] = line[1]

        os.environ.update(env)

    # @Override
    def _init(self):
        triton_folder = self.get_config().get_agent_information(self.get_config().unit_name,self.name)["path"]
        triton_folder_abs_path = pwd + "/" + triton_folder + "/"
        self.triton_folder_abs_path = triton_folder_abs_path

        if not os.path.exists(triton_folder_abs_path+"output/"):
            os.makedirs(triton_folder_abs_path+"output/")
        if not os.path.exists(triton_folder_abs_path+"input/"):
            os.makedirs(triton_folder_abs_path+"input/")
        self.output_folder = triton_folder_abs_path+"output/"
        self.input_folder = triton_folder_abs_path+"input/"
        self.shell_source(triton_folder_abs_path+'triton/Triton_ubuntu_20.04/Triton_3.1.2/conf/configure.sh')
        #main_cmd = triton_folder_abs_path + "triton/Triton_ubuntu_20.04/Triton_3.1.2/bin/triton -i " + triton_folder_abs_path + "input/" + " -s " + triton_folder_abs_path + "output/"
        #subprocess.Popen(main_cmd)
        # --- Set the mode according the startmode value
        ##agent_alias = self.__class__.__name__
        ##self.set_mode_from_config(agent_alias)

    def do_process_image(self, image_name:str):
        image_path = self.input_folder + image_name
        output_file_name = image_name.split(".")[0] + ".txt"
        output_file_path = self.output_folder + output_file_name
        if os.path.exists(image_path):
            main_cmd = self.triton_folder_abs_path + "triton/Triton_ubuntu_20.04/Triton_3.1.2/bin/triton -i " + image_path + " -s " + output_file_path

            subprocess.Popen(main_cmd,shell=True)
        else:  
            print(f"Can't find image {image_path}")
    #@override
    def _main_loop_start(self):
        print("LOOP START")

    #@override
    def _main_loop_end(self):
        print("LOOP END");

    '''
    # @override
    def load_config(self):
        super().load_config()
    '''

    '''
    # @override
    def update_survey(self):
        super().update_survey()
    '''

    '''
    # @override
    def get_next_command(self):
        return super().get_next_command()
    '''

    # @override
    def do_log(self):
        super().do_log()

    # @override
    def _do_things_before_exit(self, stopper_agent_name=None):
        print("AgentBasic fait quelques trucs à lui avant de stopper...")

"""
=================================================================
    MAIN FUNCTION
=================================================================
"""
if __name__ == "__main__":

    agent = build_agent(AgentTriton)

    '''
    TEST_MODE, configfile = extract_parameters()
    #agent = AgentX()
    agent = AgentA("AgentA", configfile, RUN_IN_THREAD)
    agent.setSimulatorMode(TEST_MODE)
    print(agent)
    '''
    agent.run()