AgentBasic.py 7.42 KB
#!/usr/bin/env python3


import sys
##import utils.Logger as L

##from .Agent import Agent
##sys.path.append("..")
###from agent.Agent import Agent, build_agent
sys.path.append("../../../..")
from src.core.pyros_django.agent.Agent import Agent, build_agent

##log = L.setupLogger("AgentXTaskLogger", "AgentX")



class AgentBasic(Agent):

    # FOR TEST ONLY
    # Run this agent in simulator mode
    TEST_MODE = False
    # Run the assertion tests at the end
    TEST_WITH_FINAL_TEST = False
    #TEST_MAX_DURATION_SEC = None
    TEST_MAX_DURATION_SEC = 100
    # Who should I send commands to ?
    TEST_COMMANDS_DEST = "myself"
    #TEST_COMMANDS_DEST = "AgentB"
    # Scenario to be executed


    TEST_COMMANDS_LIST = [

        # Format : ("self cmd_name cmd_args", timeout, "expected_result", expected_status),
        #("self do_stop now", 200, '', Agent.CMD_STATUS.CMD_EXECUTED),
        ("self do_specific1 2 2 3.5 titi (3,'titi',5) [1,3,5,7,9]", 200, '16.5', None),


        # 1) First, 3 EXCEPTION CASES (uncomment to activate exception)
        # Each of these lines will stop execution with an exception
        # ------------------------------------------------------------

        # - Agent command, unknown => ko, UnknownCmdException
        #("self do_unknown", 10, None,None),

        # - Agent general command malformed (missing arg) => ko, AgentCmdBadArgsException
        #("Agent set_mode", 10, None,None),
        
        # - Agent specific command, known but not implemented => ko, AgentCmdUnimplementedException
        #("self set_specific2", 10, None,None),

        # - Agent specific command, implemented but missing args => ko, AgentCmdBadArgsException
        #("    self      do_specific1    1   ", 10, None,None),


        # 2) NORMAL CASES (test scenario)
        # All these commands should be executed without error, from the 1st to the last one
        # -------------------------------

        # This command has a validity of 0s and thus should be tagged as "expired"
        ("self set_mode ATTENTIVE", 0, "MODE is ATTENTIVE", Agent.CMD_STATUS.CMD_EXPIRED),
       # ("self set_mode ATTENTIVE", 0, "MODE = ATTENTIVE", AgentCmd.CMD_STATUS_CODES),

        # Agent general command
        ("self set_mode ROUTINE", 200, "MODE is ROUTINE", Agent.CMD_STATUS.CMD_EXECUTED),
        ("self get_mode", 200, "MODE is ROUTINE", Agent.CMD_STATUS.CMD_EXECUTED),
        ("self set_mode ATTENTIVE", 200, "MODE is ATTENTIVE", Agent.CMD_STATUS.CMD_EXECUTED),
        ("self get_mode", 200, "MODE is ATTENTIVE", Agent.CMD_STATUS.CMD_EXECUTED),
        
        # End test
        ("self do_stop asap", 200, None, Agent.CMD_STATUS.CMD_EXECUTED),
    ]


    """
    =================================================================
        FUNCTIONS RUN INSIDE MAIN THREAD
    =================================================================
    """

    # @override
    '''
    #def __init__(self, name:str=None, config_filename=None, RUN_IN_THREAD=True):
    def __init__(self, config_filename=None):
        ##if name is None: name = self.__class__.__name__
        #super().__init__(name, config_filename, RUN_IN_THREAD)
        super().__init__(config_filename)
        #self._log.print(f"init done for {name}")
        self._log.print("init done")
    '''
    def __init__(self, name:str=None):
        if name is None:
            name = self.__class__.__name__
        super().__init__()
        #super().__init__(RUN_IN_THREAD)        

    # @override
    def init(self):
        #TODO: a faire
        super().init()
        # --- Set the mode according the startmode value
        ##agent_alias = self.__class__.__name__
        ##self.set_mode_from_config(agent_alias)

    #@override
    def main_loop_start(self):
        print("LOOP START")

    #@override
    def main_loop_end(self):
        print("LOOP END");

    '''
    # @override
    def load_config(self):
        super().load_config()
    '''

    '''
    # @override
    def update_survey(self):
        super().update_survey()
    '''

    '''
    # @override
    def get_next_command(self):
        return super().get_next_command()
    '''

    # @override
    def do_log(self):
        super().do_log()

    # @override
    def do_things_before_exit(self, stopper_agent_name=None):
        print("AgentBasic fait quelques trucs à lui avant de stopper...")


    """
    =================================================================
        FUNCTIONS RUN INSIDE A SUB-THREAD (OR A PROCESS) (thread_*())
    =================================================================
    """

    # Define your own command step(s) here
    def cmd_step1(self, step:int):
        cmd = self._current_specific_cmd
        cmd.result = f"in step #{step}/{self._thread_total_steps_number}"
        cmd.save()
        """
        if self.RUN_IN_THREAD:
            print("(save from thread)")
            cmd.save()
        else:
            #@transaction.atomic
            print("(save from process)")
            with transaction.atomic():
                cmd.save()
                #Command.objects.select_for_update()
        """

    def cmd_step2(self, step:int):
        self.cmd_step1(step)
    def cmd_step3(self, step:int):
        self.cmd_step1(step)
    def cmd_step4(self, step:int):
        self.cmd_step1(step)

    """
    # @override
    def thread_exec_specific_cmd_step(self, step:int, sleep_time:float=1.0):
        self.thread_stop_if_asked()
        cmd = self._current_specific_cmd
        print(f">>>>> Thread (cmd {cmd.name}): step #{step}/5")
        self.sleep(sleep_time)
    """

    '''
    # @override
    def exec_specific_cmd_start(self, cmd:Command, from_thread=True):
        super().exec_specific_cmd_start(cmd, from_thread)
    '''



    # @override
    def simulator_test_results_main(self, commands):
        nb_asserted = 0
        for cmd in commands:
            if cmd.name == "flush_commands":
                assert cmd.is_executed()
                nb_asserted+=1
            # 2 times
            if cmd.name == "go_active":
                assert cmd.is_executed()
                nb_asserted+=1
            # 2 times
            if cmd.name == "go_idle":
                assert cmd.is_executed()
                nb_asserted+=1
            """
            if cmd.name == "specific0":
                assert cmd.is_skipped()
                assert cmd.result == "in step #5/5"
                nb_asserted+=1
            """
            if cmd.name == "specific1":
                assert cmd.is_killed()
                nb_asserted+=1
            if cmd.name == "specific2":
                assert cmd.is_executed()
                assert cmd.result == "in step #5/5"
                nb_asserted+=1
            if cmd.name == "eval 4+3":
                assert cmd.is_executed()
                assert cmd.get_result() == "7"
                nb_asserted+=1
            if cmd.name in ("abort"):
                assert cmd.is_executed()
                nb_asserted+=1
            if cmd.name in ("exit"):
                assert cmd.is_executed()
                nb_asserted+=1
        return nb_asserted


"""
=================================================================
    MAIN FUNCTION
=================================================================
"""
if __name__ == "__main__":

    agent = build_agent(AgentBasic)

    '''
    TEST_MODE, configfile = extract_parameters()
    #agent = AgentX()
    agent = AgentA("AgentA", configfile, RUN_IN_THREAD)
    agent.setSimulatorMode(TEST_MODE)
    print(agent)
    '''
    agent.run()