diff --git a/README.md b/README.md index 113ee52..fe68426 100644 --- a/README.md +++ b/README.md @@ -89,10 +89,12 @@ Comment: (AgentB): Finished testing => result is ok - Autres remarques: + - mode DEBUG (2 niveaux pour print) - Nouvelle commande "flush_commands" pour purger les commmandes en attente - routine_process() implemented - Eval command implemented - Timeout géré : si commande pas exécutée en temps raisonnable => la même commande est ré-exéuctée à l'itération suivante + - self.print() pour les agents - Chaque agent a son propre scenario de commandes à envoyer - GROSSE OPTIMISATION : plus besoin du script intermédiaire "start_agent.py" !!! ==> pyros.py lance directement "cd src/agent/ ; python AgentX.py" diff --git a/pyros.py b/pyros.py index 073c34a..16f9dd0 100755 --- a/pyros.py +++ b/pyros.py @@ -410,7 +410,6 @@ def start(agent:str, configfile:str): print("Synthesis of the results:") for process in current_processes: p,agent,returncode = process - print(process) if returncode == 0: printFullTerm(Colors.GREEN, f"Process {agent} executed successfully") # self.addExecuted(self.current_command, command) diff --git a/src/agent/Agent.py b/src/agent/Agent.py index ca7f9db..8aa2a0e 100755 --- a/src/agent/Agent.py +++ b/src/agent/Agent.py @@ -2,6 +2,8 @@ VERSION = "0.5" +DEBUG=True +DEBUG=False """TODO: @@ -44,7 +46,7 @@ sys.path.append("../..") print("Starting with this sys.path", sys.path) # DJANGO setup -# self.print("file is", __file__) +# self.printd("file is", __file__) # mypath = os.getcwd() # Go into src/ ##os.chdir("..") @@ -70,11 +72,12 @@ print() # --- GENERAL PURPOSE IMPORT --- #from __future__ import absolute_import -import time -import threading, multiprocessing +import platform +import random from threading import Thread +import threading, multiprocessing +import time import utils.Logger as L -import random #import ctypes #import copy @@ -87,8 +90,8 @@ from django import db #from django.conf import settings as djangosettings # --- SPECIFIC IMPORT --- -from config.configpyros import ConfigPyros from common.models import AgentSurvey, Command +from config.configpyros import ConfigPyros #from dashboard.views import get_sunelev #from devices.TelescopeRemoteControlDefault import TelescopeRemoteControlDefault #from utils.JDManipulator import * @@ -96,15 +99,50 @@ from common.models import AgentSurvey, Command """ ================================================================= - GENERAL MODULE CONSTANT DEFINITIONS + GENERAL MODULE CONSTANT & FUNCTIONS DEFINITIONS ================================================================= """ -DEBUG_FILE = False +#DEBUG_FILE = False log = L.setupLogger("AgentLogger", "Agent") +IS_WINDOWS = platform.system() == "Windows" +class Colors: + HEADER = "\033[95m" + BLUE = "\033[94m" + GREEN = "\033[92m" + WARNING = "\033[93m" + FAIL = "\033[91m" + ENDC = "\033[0m" + BOLD = "\033[1m" + UNDERLINE = "\033[4m" + +def printColor(color: Colors, message, file=sys.stdout, eol=os.linesep, forced=False): + #system = platform.system() + """ + if (self.disp == False and forced == False): + return 0 + """ + #if system == "Windows": + if IS_WINDOWS: + print(message, file=file, end=eol) + else: + print(color + message + Colors.ENDC, file=file, end=eol) + return 0 + +def printFullTerm(color: Colors, string: str): + #system = platform.system() + columns = 100 + row = 1000 + disp = True + value = int(columns / 2 - len(string) / 2) + printColor(color, "-" * value, eol="") + printColor(color, string, eol="") + value += len(string) + printColor(color, "-" * (columns - value)) + return 0 """ @@ -300,18 +338,18 @@ class Agent: if not config_filename: config_filename = self.DEFAULT_CONFIG_FILE_NAME - self.print(f"config_filename={config_filename}") + self.printd(f"config_filename={config_filename}") # If config file name is RELATIVE (i.e. without path, just the file name) # => give it an absolute path (and remove "src/agent/" from it) if config_filename == os.path.basename(config_filename): tmp = os.path.abspath(self.CONFIG_DIR + os.sep + config_filename) config_filename = os.path.abspath(self.CONFIG_DIR + os.sep + config_filename).replace(os.sep+"src"+os.sep,os.sep).replace(os.sep+"agent"+os.sep,os.sep) - self.print("Config file used is", config_filename) - #self.print("current path", os.getcwd()) - #self.print("this file path :", __file__) - #self.print("config file path is", config_filename) + self.printd("Config file used is", config_filename) + #self.printd("current path", os.getcwd()) + #self.printd("this file path :", __file__) + #self.printd("config file path is", config_filename) # Instantiate an object for configuration - #self.print("config file path is ", config_abs_filename) + #self.printd("config file path is ", config_abs_filename) self.config = ConfigPyros(config_filename) if self.config.get_last_errno() != self.config.NO_ERROR: raise Exception(f"Bad config file name '{config_filename}', error {str(self.config.get_last_errno())}: {str(self.config.get_last_errmsg())}") @@ -328,7 +366,7 @@ class Agent: #if nb_agents == 0: if not AgentSurvey.objects.filter(name=self.name).exists(): self._agent_survey = AgentSurvey.objects.create(name=self.name, validity_duration_sec=60, mode=self.mode, status=self.status) - self.print("Agent survey is", self._agent_survey) + self.printd("Agent survey is", self._agent_survey) #self._agent_survey = AgentSurvey(name=self.name, validity_duration_sec=60, mode=self.mode, status=self.status) #self._agent_survey.save() @@ -336,14 +374,17 @@ class Agent: def __str__(self): return "I am agent " + self.name - #def print(self, msg): + # Normal print def print(self, *args, **kwargs): - #print(f"({self.name}): ", msg) if args: print(f"({self.name}): ", *args, **kwargs) else: print() + # DEBUG print + def printd(self, *args, **kwargs): + if DEBUG: self.print(*args, **kwargs) + def sleep(self, nbsec:float=2.0): # thread if self._current_specific_thread and self.RUN_IN_THREAD: @@ -369,11 +410,18 @@ class Agent: # (old commands that stayed with "running" status when agent was killed) Command.delete_commands_with_running_status_for_agent(self.name) + """ + # SIMULATOR MODE ONLY : flush previous commands to be sure to restart clean + if self.SIMULATOR_MODE: + self.print("flush previous commands to be sure to start in clean state") + Command.delete_pending_commands_for_agent(self.name) + """ + ''' - self.print() - self.print(self) - self.print("FOR REAL ?", self.FOR_REAL) - self.print("DB3 used is:", djangosettings.DATABASES["default"]["NAME"]) + self.printd() + self.printd(self) + self.printd("FOR REAL ?", self.FOR_REAL) + self.printd("DB3 used is:", djangosettings.DATABASES["default"]["NAME"]) # SETUP try: @@ -385,7 +433,7 @@ class Agent: # self.config = Config.objects.get()[0] except Exception as e: # except Config.ObjectDoesNotExist: - self.print("Config read (or write) exception", str(e)) + self.printd("Config read (or write) exception", str(e)) return -1 ''' @@ -393,7 +441,7 @@ class Agent: while True: # Wait a random number of sec before starting new iteration - # (to let another agent having the chance to send a command) + # (to let another agent having the chance to send a command before me) random_waiting_sec = random.randint(0,5) self.print(f"Waiting {random_waiting_sec} sec (random) before starting new iteration...") time.sleep(random_waiting_sec) @@ -402,7 +450,7 @@ class Agent: self.print() self.print() - #self.print("-"*80) + #self.printd("-"*80) self.print("-"*20, f"MAIN LOOP ITERATION {self._iter_num} (START)", "-"*20) self.set_status(self.STATUS_MAIN_LOOP) self.show_mode_and_status() @@ -413,35 +461,34 @@ class Agent: #if self.SIMULATOR_MODE: self.simulator_send_next_command() - # generic cmd in json format - self.print("------START COMMMAND PROCESSING------") - cmd = self.get_next_valid_command() - #if cmd: cmd = self.general_process(cmd) - if cmd: cmd = self.command_process(cmd) - ''' - # Sub-level loop (only if ACTIVE) - if self.is_active(): - if cmd: self.specific_process(cmd) - ''' - self.routine_process() - self.print("------END COMMMAND PROCESSING------") + self.printd("------START COMMMAND PROCESSING------") - # Every N iterations, delete old commands + # Purge commandes (every N iterations, delete old commands) N=3 if ((self._iter_num-1) % N) == 0: self.print("Looking for old commands to purge...") Command.purge_old_commands_for_agent(self.name) + # Get next command to execute + cmd = self.get_next_valid_command() + #if cmd: cmd = self.general_process(cmd) + + # Process this (next) command (if exists) + if cmd: cmd = self.command_process(cmd) + + # ROUTINE process + self.routine_process() + + self.printd("------END COMMMAND PROCESSING------") + #self.waitfor(self.mainloop_waittime) self.print("-"*20, "MAIN LOOP ITERATION (END)", "-"*20) - #self.print("-"*80) - #self.do_log(LOG_DEBUG, "Ending main loop iteration") self._iter_num += 1 - # Exit if max duration is timed out + # Exit if max duration is reached if self.MAX_DURATION_SEC and (time.time()-start_time > self.MAX_DURATION_SEC): self.print("Exit because of max duration set to ", self.MAX_DURATION_SEC, "s") self.kill_running_specific_cmd_if_exists() @@ -466,7 +513,7 @@ class Agent: """ self.set_status(self.STATUS_ROUTINE_PROCESS) if not self.is_active(): - self.print("I am IDLE, so I bypass the routine_process (do not send any new command)") + self.printd("I am IDLE, so I bypass the routine_process (do not send any new command)") return if self.cmdts is None: @@ -476,14 +523,14 @@ class Agent: # For this, it is enough to set primary key to None, # then the send() command below will save a NEW command #self.cmdts = copy.copy(self.cmdts) - self.cmdts.set_as_pending() self.cmdts.id = None # No more command to send (from simulator), return if self.cmdts is None: return - # 1) Send cmd + # 1) Send cmd (= set as pending and save) self.print(f"Send command", self.cmdts) + #self.cmdts.set_as_pending() self.cmdts.send() cmdts_is_processed = False cmdts_res = None @@ -491,7 +538,7 @@ class Agent: # 2) Wait for end of cmd execution #self.wait_for_execution_of_cmd(self.cmdts) while not cmdts_is_processed: - self.print(f"Waiting for end of cmd {self.cmdts.name} execution...") + self.print(f"Waiting for end of cmd '{self.cmdts.name}' execution...") self.cmdts.refresh_from_db() # timeout ? if self.cmdts.is_expired(): break @@ -507,7 +554,7 @@ class Agent: if cmdts_is_processed: self.print(f"Cmd executed. Result is '{cmdts_res}'") else: - self.print("Command was not completed") + self.printd("Command was not completed") """ @@ -519,10 +566,10 @@ class Agent: NB: datetime.utcnow() is equivalent to datetime.now(timezone.utc) ### - self.print("Looking for old commands to purge...") + self.printd("Looking for old commands to purge...") ### COMMAND_PEREMPTION_DATE_FROM_NOW = datetime.utcnow() - timedelta(hours = self.COMMANDS_PEREMPTION_HOURS) - #self.print("peremption date", COMMAND_PEREMPTION_DATE_FROM_NOW) + #self.printd("peremption date", COMMAND_PEREMPTION_DATE_FROM_NOW) old_commands = Command.objects.filter( # only commands for me receiver = self.name, @@ -532,22 +579,24 @@ class Agent: ### old_commands = Command.get_old_commands_for_agent(self.name) if old_commands.exists(): - self.print("Found old commands to delete:") - for cmd in old_commands: self.print(cmd) + self.printd("Found old commands to delete:") + for cmd in old_commands: self.printd(cmd) old_commands.delete() """ def waitfor(self, nbsec): - self.print(f"Now, waiting for {nbsec} seconds...") + self.printd(f"Now, waiting for {nbsec} seconds...") time.sleep(nbsec) def set_status(self, status:str): - self.print(f"[NEW CURRENT STATUS: {status}] (switching from status {self.status})") + #self.print(f"[{status}] (switching from status {self.status})") + self.print(f"[{status}]") self.status = status return False def set_mode(self, mode:str): - self.print(f"Switching from mode {self.mode} to mode {mode}") + #self.print(f"Switching from mode {self.mode} to mode {mode}") + self.print(f"[NEW MODE {mode}]") self.mode = mode def is_active(self): @@ -560,7 +609,7 @@ class Agent: self.set_mode(self.MODE_IDLE) def show_mode_and_status(self): - self.print(f"CURRENT MODE is {self.mode} (with status {self.status})") + self.print(f"CURRENT MODE is {self.mode} (status is {self.status})") def die(self): self.set_status(self.STATUS_EXIT) @@ -608,7 +657,7 @@ class Agent: """ def init(self): - self.print("Initializing...") + self.printd("Initializing...") self.set_status(self.STATUS_INIT) def load_config(self): @@ -616,7 +665,7 @@ class Agent: TODO: only si date fichier xml changée => en RAM, un objet Config avec méthodes d'accès, appelle le parser de AK (classe Config.py indépendante) """ - self.print("Loading the config file...") + self.printd("Loading the config file...") #config_filename = 'c:/srv/develop/pyros/config/config_unit_simulunit1.xml' #config.set_configfile(config_filename) self.config.load() @@ -625,21 +674,21 @@ class Agent: # --- display informations # --- Get all the assembly of this unit[0] (mount + channels) if self.config.is_config_contents_changed(): - self.print("--------- Components of the unit -----------") - self.print("Configuration file is {}".format(self.config.get_configfile())) + self.printd("--------- Components of the unit -----------") + self.printd("Configuration file is {}".format(self.config.get_configfile())) alias = self.config.get_aliases('unit')[0] namevalue = self.config.get_paramvalue(alias,'unit','name') - self.print("Unit alias is {}. Name is {}".format(alias,namevalue), ":") + self.printd("Unit alias is {}. Name is {}".format(alias,namevalue), ":") unit_subtags = self.config.get_unit_subtags() for unit_subtag in unit_subtags: aliases = self.config.get_aliases(unit_subtag) for alias in aliases: namevalue = self.config.get_paramvalue(alias,unit_subtag,'name') - self.print(f"- {unit_subtag} alias is {alias}. Name is {namevalue}") - self.print("------------------------------------------") + self.printd(f"- {unit_subtag} alias is {alias}. Name is {namevalue}") + self.printd("------------------------------------------") #params = self.config.get_params(unit_alias) #for param in params: - # self.print("Unit component is {}".format(param)) + # self.printd("Unit component is {}".format(param)) """ # self.config = Config.objects.get(pk=1) @@ -653,15 +702,15 @@ class Agent: except Exception as e: # except Config.ObjectDoesNotExist: # except Config.DoesNotExist: - self.print("Config read (or write) exception", str(e)) + self.printd("Config read (or write) exception", str(e)) # return self.config # return -1 return False """ def update_survey(self): - self.print("Updating the survey database table...") - #self.print("- fetching table line for agent", self.name) + self.printd("Updating the survey database table...") + #self.printd("- fetching table line for agent", self.name) # only necessary when using process (not necessary with threads) #with transaction.atomic(): self._agent_survey = AgentSurvey.objects.get(name=self.name) @@ -682,17 +731,17 @@ class Agent: Commands are read in chronological order """ self.set_status(self.STATUS_GET_NEXT_COMMAND) - self.print("Looking for new commands from the database ...") + #self.printd("Looking for new commands from the database ...") # 1) Get all pending commands for me (return if None) # Not sure this is necessary to do it in a transaction, # but there might be a risk # that a command status is modified while we are reading... with transaction.atomic(): - self._pending_commands = Command.get_pending_commands_for_agent(self.name) + self._pending_commands = Command.get_pending_and_running_commands_for_agent(self.name) commands = self._pending_commands if not commands.exists(): - self.print("No new command to process") + self.printd("No new command to process") return None self.print("Current pending commands are (time ordered) :") Command.show_commands(commands) @@ -719,16 +768,16 @@ class Agent: """ cmd = commands[0] if cmd.is_running(): - #self.print(f"There is currently a running command ({cmd_executing.first().name}), so I do nothing (wait for end of execution)") - self.print(f"There is currently a running command ({cmd.name})") + #self.printd(f"There is currently a running command ({cmd_executing.first().name}), so I do nothing (wait for end of execution)") + self.printd(f"There is currently a running command ({cmd.name})") """ # Check that this command is not expired if cmd.is_expired(): - self.print("But this command is expired, so set its status to OUTOFDATE, and go on") + self.printd("But this command is expired, so set its status to OUTOFDATE, and go on") cmd_executing.set_as_outofdate() else: """ - self.print(f"Thus, I will do nothing until this command execution is finished") + self.printd(f"Thus, I will do nothing until this command execution is finished") # TODO: kill si superieur a MAX_EXEC_TIME return None @@ -743,10 +792,10 @@ class Agent: # 6) Current cmd must now be a valid (not expired) and PENDING one, # so pass it to general_process() for execution - #self.print(f"Got command {cmd.name} sent by agent {cmd.sender} at {cmd.sender_deposit_time}") - self.print("***") - self.print("*** Got", cmd) - self.print("***") + #self.printd(f"Got command {cmd.name} sent by agent {cmd.sender} at {cmd.sender_deposit_time}") + self.printd("***") + self.printd("*** Got", cmd) + self.printd("***") return cmd @@ -760,32 +809,32 @@ class Agent: def general_process(self, cmd:Command)->Command: self.set_status(self.STATUS_GENERAL_PROCESS) - self.print(f"Starting general processing of {cmd}") + self.print(f"Starting processing of {cmd}") # Update read time to say that the command has been READ cmd.set_read_time() # Precondition: command cmd is valid (not expired), has already been read, is pending assert (not cmd.is_expired()) and cmd.is_pending() and cmd.is_read() - #self.print(f"Starting general processing of command {cmd.name} sent by agent {cmd.sender} at {cmd.sender_deposit_time}") + #self.printd(f"Starting general processing of command {cmd.name} sent by agent {cmd.sender} at {cmd.sender_deposit_time}") """ # 2) If expired command, change its status to expired and return if cmd.is_expired(): - self.print("This command is expired, so mark it as such, and ignore it") + self.printd("This command is expired, so mark it as such, and ignore it") cmd.set_as_outofdate() return None """ # If cmd is generic, execute it, change its status to executed, and return if cmd.is_generic(): - self.print("This command is generic, execute it...") + self.printd("This command is generic, execute it...") self.exec_generic_cmd(cmd) # If cmd is "exit", kill myself (without any question, this is an order soldier !) # This "exit" should normally kill any current thread (to be checked...) if cmd.name == "exit": - self.print("(before exiting) Here are the current (still) pending commands (time ordered) :") - commands = Command.get_pending_commands_for_agent(self.name) + self.printd("(before exiting) Here are the current (still) pending commands (time ordered) :") + commands = Command.get_pending_and_running_commands_for_agent(self.name) Command.show_commands(commands) #if self.SIMULATOR_MODE and self.SIMULATOR_WITH_TEST and self.SIMULATOR_COMMANDS_DEST == "myself": self.simulator_test_results() if self.SIMULATOR_MODE and self.SIMULATOR_WITH_TEST: self.simulator_test_results() @@ -798,19 +847,19 @@ class Agent: # cmd is not generic but, as I am idle, change its status to SKIPPED, ignore it, and return #if self.mode == self.MODE_IDLE: if not self.is_active(): - self.print("This command is not generic but, as I am IDLE, I mark it SKIPPED and ignore it") + self.printd("This command is not generic but, as I am IDLE, I mark it SKIPPED and ignore it") cmd.set_as_skipped() return None # Je suis pas idle et cde pas générique: je la traite pas, elle sera traitée par core_process : # attendre que cette commande soit exécutée avant de passer à la commande suivante (situation “bloquante” normale) - self.print("This command is not generic and, as I am not IDLE, I pass it to the specific processing") - self.print("(then I will not execute any other new command until this command is EXECUTED)") + self.printd("This command is not generic and, as I am not IDLE, I pass it to the specific processing") + self.printd("(then I will not execute any other new command until this command is EXECUTED)") return cmd def exec_generic_cmd(self, cmd:Command): - self.print("Starting execution of a Generic cmd...") + self.printd("Starting execution of a Generic cmd...") cmd.set_as_running() # Executing command @@ -823,17 +872,17 @@ class Agent: cmd.set_result("I am now idle") time.sleep(1) elif cmd.name in ("flush_commands"): - self.print("flush_commands received: Delete all pending commands") + self.printd("flush_commands received: Delete all pending commands") Command.delete_pending_commands_for_agent(self.name) # If cmd is "abort" or "exit", kill any currently running thread elif cmd.name in ("abort", "exit"): - #self.print("Current pending commands are:") + #self.printd("Current pending commands are:") #Command.show_commands(self._pending_commands) - self.print("Aborting current executing command if exists:") + self.printd("Aborting current executing command if exists:") self.kill_running_specific_cmd_if_exists() cmd.set_as_processed() - self.print("...Generic cmd has been executed") + self.printd("...Generic cmd has been executed") @@ -844,7 +893,7 @@ class Agent: - in file - in db """ - self.print("Logging data...") + self.printd("Logging data...") @@ -862,11 +911,11 @@ class Agent: self.set_status(self.STATUS_SPECIFIC_PROCESS) assert self.is_active() self._current_specific_cmd = cmd - self.print("Starting specific process...") + self.printd("Starting specific process...") #self._current_thread = threading.Thread(target=self.exec_command) # Run in a thread if self.RUN_IN_THREAD: - self.print("(run cmd in a thread)") + self.printd("(run cmd in a thread)") self._current_specific_thread = StoppableThreadEvenWhenSleeping(target=self.thread_exec_specific_cmd) #self._current_specific_thread = StoppableThreadEvenWhenSleeping(target=self.exec_specific_cmd, args=(cmd,)) #self._current_thread = threading.Thread(target=self.exec_command) @@ -875,7 +924,7 @@ class Agent: #self._current_specific_thread = thread_with_exception('thread test') # Run in a process else: - self.print("(run cmd in a process)") + self.printd("(run cmd in a process)") # close the database connection first, it will be re-opened in each process db.connections.close_all() self._current_specific_thread = multiprocessing.Process(target=self.thread_exec_specific_cmd) @@ -885,14 +934,14 @@ class Agent: self._current_specific_thread.start() #my_thread.join() #self.waitfor(self.subloop_waittime) - self.print("Ending specific process (thread has been launched)") + self.printd("Ending specific process (thread has been launched)") def kill_running_specific_cmd_if_exists(self): if (self._current_specific_thread is None) or not self._current_specific_thread.is_alive(): - self.print("...No current specific command thread to abort...") + self.printd("...No current specific command thread to abort...") else: - self.print(f"Killing command {self._current_specific_cmd.name}") + self.printd(f"Killing command {self._current_specific_cmd.name}") # Ask the thread to stop itself #self._current_specific_thread.stop() #self._current_specific_thread._stop() @@ -926,7 +975,7 @@ class Agent: #self._current_test_cmd = "go_idle" if self._current_test_cmd=="go_active" else "go_active" #if self._nb_test_cmds == 4: self._current_test_cmd = "exit" cmd_name = next(self.SIMULATOR_COMMANDS, None) - #self.print("next cmd is ", cmd_name) + #self.printd("next cmd is ", cmd_name) if cmd_name is None: return #Command.objects.create(sender=self.name, receiver=self.name, name=cmd_name) receiver_agent = self.name if self.SIMULATOR_COMMANDS_DEST=="myself" else self.SIMULATOR_COMMANDS_DEST @@ -937,12 +986,19 @@ class Agent: """ def simulator_test_results(self): - self.print("\n--- Testing if the commands I SENT had the awaited result") - self.print("Here are the last commands I sent:") + commands = self.simulator_test_results_start() + nb_asserted = self.simulator_test_results_main(commands) + self.simulator_test_results_end(nb_asserted) + + def simulator_test_results_start(self): + self.printd("\n--- Testing if the commands I SENT had the awaited result") + self.printd("Here are the last commands I sent:") #commands = list(Command.get_last_N_commands_for_agent(self.name, 16)) #commands = Command.get_last_N_commands_sent_to_agent(self.name, 16) commands = Command.get_last_N_commands_sent_by_agent(self.name, len(self.SIMULATOR_COMMANDS_LIST)) Command.show_commands(commands) + assert commands[0].name == self.SIMULATOR_COMMANDS_LIST[0] + assert commands[-1].name == self.SIMULATOR_COMMANDS_LIST[-1] return commands """ OLD SCENARIO nb_asserted = 0 @@ -968,9 +1024,24 @@ class Agent: assert cmd.is_executed() nb_asserted+=1 assert nb_asserted == 12 - self.print("--- Finished testing => result is ok") + self.printd("--- Finished testing => result is ok") """ + # To be overriden by subclass + def simulator_test_results_main(self, commands): + nb_asserted = 0 + for cmd in commands: + assert cmd.is_executed() + nb_asserted+=1 + return nb_asserted + + def simulator_test_results_end(self, nb_asserted): + nb_commands_to_send = len(self.SIMULATOR_COMMANDS_LIST) + assert nb_asserted == nb_commands_to_send + #self.print(f"************** Finished testing => result is ok ({nb_asserted} assertions) **************") + printFullTerm(Colors.GREEN, f"************** Finished testing => result is ok ({nb_asserted} assertions) **************") + + """ @@ -997,8 +1068,8 @@ class Agent: cmd = self._current_specific_cmd """ specific command execution setting up """ #cmd = self.get_current_specific_cmd() - self.print(">>>>> Thread: starting execution of command", cmd.name) - self.print(">>>>> Thread: PID: %s, Process Name: %s, Thread Name: %s" % ( + self.printd(">>>>> Thread: starting execution of command", cmd.name) + self.printd(">>>>> Thread: PID: %s, Process Name: %s, Thread Name: %s" % ( os.getpid(), multiprocessing.current_process().name, threading.current_thread().name) @@ -1036,7 +1107,7 @@ class Agent: def thread_exec_specific_cmd_main(self): """ cmd = self._current_specific_cmd - self.print("Doing nothing, just sleeping...") + self.printd("Doing nothing, just sleeping...") self.sleep(3) """ @@ -1079,7 +1150,7 @@ class Agent: with transaction.atomic(): cmd.set_as_processed() """ - self.print(">>>>> Thread: ended execution of command", cmd.name) + self.printd(">>>>> Thread: ended execution of command", cmd.name) cmd = None # No more current thread #self._current_specific_thread = None @@ -1090,9 +1161,9 @@ class Agent: # Exit if I was asked to stop cmd = self._current_specific_cmd if self.RUN_IN_THREAD and threading.current_thread().stopped(): - self.print(f">>>>> Thread (cmd {cmd.name}): I received the stop signal, so I stop (in error)") + self.printd(f">>>>> Thread (cmd {cmd.name}): I received the stop signal, so I stop (in error)") exit(1) - self.print(f">>>>> Thread (cmd {cmd.name}): step #{step}/{self._thread_total_steps_number}") + self.printd(f">>>>> Thread (cmd {cmd.name}): step #{step}/{self._thread_total_steps_number}") # call a specific function to be defined by subclass cmd_step_function(step) # Wait for a specific time (interruptible) @@ -1101,7 +1172,7 @@ class Agent: def thread_stop_if_asked(self): assert self._current_specific_thread is not None if self.RUN_IN_THREAD and threading.current_thread().stopped(): - self.print("(Thread) I received the stop signal, so I stop (in error)") + self.printd("(Thread) I received the stop signal, so I stop (in error)") exit(1) def thread_set_total_steps_number(self, nbsteps): diff --git a/src/agent/AgentA.py b/src/agent/AgentA.py index a3db42c..7b5370b 100755 --- a/src/agent/AgentA.py +++ b/src/agent/AgentA.py @@ -15,7 +15,7 @@ from Agent import Agent class AgentA(Agent): #MAX_DURATION_SEC = None - MAX_DURATION_SEC = 90 + MAX_DURATION_SEC = 120 # FOR TEST ONLY # Run this agent in simulator mode @@ -32,11 +32,12 @@ class AgentA(Agent): "go_active", - # Because of this command, the receiver agent - # will no more send any new command + # Because of this command, the receiver agent : + # - will no more send any new command + # - will only execute "generic" commands (and not the "specific" ones) "go_idle", - # Not executed because receiver agent is now "idle" + # Not executed (skipped) because receiver agent is now "idle" #"specific0", # Because of this command, the receiver agent @@ -176,24 +177,7 @@ class AgentA(Agent): ''' # @override - def simulator_test_results(self): - commands = super().simulator_test_results() - #self.print(commands) - """ - "go_active", - - "go_idle", - # Not executed because receiver agent is now "idle" - #"specific0", - - # Executed because receiver agent is now "active" - "go_active", - #"specific1", - "eval 4+3", - - "go_idle", - "exit", - """ + def simulator_test_results_main(self, commands): nb_asserted = 0 for cmd in commands: if cmd.name == "flush_commands": @@ -207,6 +191,12 @@ class AgentA(Agent): if cmd.name == "go_idle": assert cmd.is_executed() nb_asserted+=1 + """ + if cmd.name == "specific0": + assert cmd.is_skipped() + assert cmd.result == "in step #5/5" + nb_asserted+=1 + """ if cmd.name == "specific1": assert cmd.is_executed() assert cmd.result == "in step #5/5" @@ -221,9 +211,7 @@ class AgentA(Agent): if cmd.name in ("exit"): assert cmd.is_executed() nb_asserted+=1 - #assert nb_asserted == 6 - assert nb_asserted == len(self.SIMULATOR_COMMANDS_LIST) - self.print("************** Finished testing => result is ok **************") + return nb_asserted """ diff --git a/src/agent/AgentB.py b/src/agent/AgentB.py index 4414b77..b0f3030 100755 --- a/src/agent/AgentB.py +++ b/src/agent/AgentB.py @@ -170,17 +170,15 @@ class AgentB(Agent): super().exec_specific_cmd_end(cmd, from_thread) ''' + """ # @override - def simulator_test_results(self): - commands = super().simulator_test_results() + def simulator_test_results_main(self, commands): nb_asserted = 0 for cmd in commands: assert cmd.is_executed() nb_asserted+=1 - #assert nb_asserted == 2 - assert nb_asserted == len(self.SIMULATOR_COMMANDS_LIST) - self.print("************** Finished testing => result is ok **************") - + return nb_asserted + """ """ ================================================================= diff --git a/src/common/models.py b/src/common/models.py index 5cf2f34..42a0cfe 100644 --- a/src/common/models.py +++ b/src/common/models.py @@ -306,14 +306,24 @@ class Command(models.Model): @classmethod def delete_pending_commands_for_agent(cls, agent_name): - print("Delete pending command(s) if exists:") + """ + Delete all pending commands sent to agent_name, + except very recent commands. + This (exception) is to avoid a "data race" where for example agentB is executing a "flush" command + while agentA is sending command to agentB... : + - agentB will then delete the command just sent by agentA + - agentA will check regularly the status of its sent command, and this will crash as this command exists no more !! + """ + print("Delete all pending command(s) if exists (except very recent ones):") + now_minus_2sec = datetime.utcnow().astimezone() - timedelta(seconds = 2) + #print("now_minus_2sec", now_minus_2sec) pending_commands = cls.objects.filter( # only commands for agent agent_name receiver = agent_name, # only running commands receiver_status_code = cls.CMD_STATUS_CODES.CMD_PENDING, - # only not expired commands - #sender_deposit_time__gte = cls.get_peremption_date_from_now(), + # except very recent commands : take only commands that are more than 2 sec old + sender_deposit_time__lt = now_minus_2sec ) if pending_commands: Command.show_commands(pending_commands) @@ -321,7 +331,7 @@ class Command(models.Model): else: print("") @classmethod - def get_pending_commands_for_agent(cls, agent_name): + def get_pending_and_running_commands_for_agent(cls, agent_name): #print("peremption date", COMMAND_PEREMPTION_DATE_FROM_NOW) return cls.objects.filter( # only pending commands @@ -391,6 +401,8 @@ class Command(models.Model): #for cmd in old_commands: print(cmd) cls.show_commands(old_commands) old_commands.delete() + else: + print("") @classmethod #def show_commands(cls, commands:models.query): @@ -409,7 +421,9 @@ class Command(models.Model): # --- BOOLEAN (test) functions --- - def send(self): self.save() + def send(self): + #self.save() + self.set_as_pending() def is_generic(self): """ @@ -465,7 +479,7 @@ class Command(models.Model): def set_as_processed(self): print(f"- Set command {self.name} as processed") - print(self) + #print(self) self.receiver_status_code = self.CMD_STATUS_CODES.CMD_EXECUTED self.receiver_processed_time = datetime.utcnow().astimezone() self.save() -- libgit2 0.21.2