#!/usr/bin/env python3 import sys ##import utils.Logger as L ##from .Agent import Agent ##sys.path.append("..") ###from agent.Agent import Agent, build_agent sys.path.append("../../../..") from src.core.pyros_django.agent.Agent import Agent, build_agent ##log = L.setupLogger("AgentXTaskLogger", "AgentX") class AgentBasic(Agent): # FOR TEST ONLY # Run this agent in simulator mode TEST_MODE = False # Run the assertion tests at the end TEST_WITH_FINAL_TEST = False #TEST_MAX_DURATION_SEC = None TEST_MAX_DURATION_SEC = 100 # Who should I send commands to ? TEST_COMMANDS_DEST = "myself" #TEST_COMMANDS_DEST = "AgentB" # Scenario to be executed TEST_COMMANDS_LIST = [ # Format : ("self cmd_name cmd_args", timeout, "expected_result", expected_status), #("self do_stop now", 200, '', Agent.CMD_STATUS.CMD_EXECUTED), ("self do_specific1 2 2 3.5 titi (3,'titi',5) [1,3,5,7,9]", 200, '16.5', None), # 1) First, 3 EXCEPTION CASES (uncomment to activate exception) # Each of these lines will stop execution with an exception # ------------------------------------------------------------ # - Agent command, unknown => ko, UnknownCmdException #("self do_unknown", 10, None,None), # - Agent general command malformed (missing arg) => ko, AgentCmdBadArgsException #("Agent set_mode", 10, None,None), # - Agent specific command, known but not implemented => ko, AgentCmdUnimplementedException #("self set_specific2", 10, None,None), # - Agent specific command, implemented but missing args => ko, AgentCmdBadArgsException #(" self do_specific1 1 ", 10, None,None), # 2) NORMAL CASES (test scenario) # All these commands should be executed without error, from the 1st to the last one # ------------------------------- # This command has a validity of 0s and thus should be tagged as "expired" ("self set_mode ATTENTIVE", 0, "MODE is ATTENTIVE", Agent.CMD_STATUS.CMD_EXPIRED), # ("self set_mode ATTENTIVE", 0, "MODE = ATTENTIVE", AgentCmd.CMD_STATUS_CODES), # Agent general command ("self set_mode ROUTINE", 200, "MODE is ROUTINE", Agent.CMD_STATUS.CMD_EXECUTED), ("self get_mode", 200, "MODE is ROUTINE", Agent.CMD_STATUS.CMD_EXECUTED), ("self set_mode ATTENTIVE", 200, "MODE is ATTENTIVE", Agent.CMD_STATUS.CMD_EXECUTED), ("self get_mode", 200, "MODE is ATTENTIVE", Agent.CMD_STATUS.CMD_EXECUTED), # End test ("self do_stop asap", 200, "STOPPING", Agent.CMD_STATUS.CMD_EXECUTED), ] """ ================================================================= FUNCTIONS RUN INSIDE MAIN THREAD ================================================================= """ # @override ''' #def __init__(self, name:str=None, config_filename=None, RUN_IN_THREAD=True): def __init__(self, config_filename=None): ##if name is None: name = self.__class__.__name__ #super().__init__(name, config_filename, RUN_IN_THREAD) super().__init__(config_filename) #self._log.print(f"init done for {name}") self._log.print("init done") ''' def __init__(self, name:str=None): if name is None: name = self.__class__.__name__ super().__init__() #super().__init__(RUN_IN_THREAD) # @override def init(self): #TODO: a faire super().init() # --- Set the mode according the startmode value ##agent_alias = self.__class__.__name__ ##self.set_mode_from_config(agent_alias) #@override def main_loop_start(self): print("LOOP START") #@override def main_loop_end(self): print("LOOP END"); ''' # @override def load_config(self): super().load_config() ''' ''' # @override def update_survey(self): super().update_survey() ''' ''' # @override def get_next_command(self): return super().get_next_command() ''' # @override def do_log(self): super().do_log() # @override def do_things_before_exit(self, stopper_agent_name=None): print("AgentBasic fait quelques trucs à lui avant de stopper...") """ ================================================================= FUNCTIONS RUN INSIDE A SUB-THREAD (OR A PROCESS) (thread_*()) ================================================================= """ # Define your own command step(s) here def cmd_step1(self, step:int): cmd = self._current_specific_cmd cmd.result = f"in step #{step}/{self._thread_total_steps_number}" cmd.save() """ if self.RUN_IN_THREAD: print("(save from thread)") cmd.save() else: #@transaction.atomic print("(save from process)") with transaction.atomic(): cmd.save() #Command.objects.select_for_update() """ def cmd_step2(self, step:int): self.cmd_step1(step) def cmd_step3(self, step:int): self.cmd_step1(step) def cmd_step4(self, step:int): self.cmd_step1(step) """ # @override def thread_exec_specific_cmd_step(self, step:int, sleep_time:float=1.0): self.thread_stop_if_asked() cmd = self._current_specific_cmd print(f">>>>> Thread (cmd {cmd.name}): step #{step}/5") self.sleep(sleep_time) """ ''' # @override def exec_specific_cmd_start(self, cmd:Command, from_thread=True): super().exec_specific_cmd_start(cmd, from_thread) ''' # @override def simulator_test_results_main(self, commands): nb_asserted = 0 for cmd in commands: if cmd.name == "flush_commands": assert cmd.is_executed() nb_asserted+=1 # 2 times if cmd.name == "go_active": assert cmd.is_executed() nb_asserted+=1 # 2 times if cmd.name == "go_idle": assert cmd.is_executed() nb_asserted+=1 """ if cmd.name == "specific0": assert cmd.is_skipped() assert cmd.result == "in step #5/5" nb_asserted+=1 """ if cmd.name == "specific1": assert cmd.is_killed() nb_asserted+=1 if cmd.name == "specific2": assert cmd.is_executed() assert cmd.result == "in step #5/5" nb_asserted+=1 if cmd.name == "eval 4+3": assert cmd.is_executed() assert cmd.get_result() == "7" nb_asserted+=1 if cmd.name in ("abort"): assert cmd.is_executed() nb_asserted+=1 if cmd.name in ("exit"): assert cmd.is_executed() nb_asserted+=1 return nb_asserted """ ================================================================= MAIN FUNCTION ================================================================= """ if __name__ == "__main__": agent = build_agent(AgentBasic) ''' TEST_MODE, configfile = extract_parameters() #agent = AgentX() agent = AgentA("AgentA", configfile, RUN_IN_THREAD) agent.setSimulatorMode(TEST_MODE) print(agent) ''' agent.run()