Commit 31e31f3b16edcd4460619064fafdb93a311ed882
1 parent
7d679ea0
Exists in
dev
pyros.py peut lancer plusieurs agents (A et B) en même temps...
- Mode opératoire : - pour lancer agentA seulement : ./pyros.py start agentA [-c configfile] - pour lancer plusieurs agents : ./pyros.py start agentA,agentB,... [-c configfile] (ou encore: activer l'environnement virtuel, puis lancer "./AgentA.py configfile") - pour utiliser thread ou processus : il suffit de mettre la constante RUN_IN_THREAD de AgentA (ou AgentB ou AgentX) à False ou True - Scenario de test : - lancer agents A et B : ./pyros.py start agentA,agentB - attendre 1mn et attendre les 2 résultats suivants: (AgentA): Finished testing => result is ok (AgentB): Finished testing => result is ok - Autres remarques: - Nouvelle commande "flush_commands" pour purger les commmandes en attente - routine_process() implemented - Eval command implemented - Timeout géré : si commande pas exécutée en temps raisonnable => la même commande est ré-exéuctée à l'itération suivante - Chaque agent a son propre scenario de commandes à envoyer - GROSSE OPTIMISATION : plus besoin du script intermédiaire "start_agent.py" !!! ==> pyros.py lance directement "cd src/agent/ ; python AgentX.py"
Showing
7 changed files
with
349 additions
and
205 deletions
Show diff stats
README.md
... | ... | @@ -74,18 +74,28 @@ Author: E. Pallier |
74 | 74 | VERSION: 0.20.22 |
75 | 75 | |
76 | 76 | Comment: |
77 | - Multi-agents (2 agents) : AgentA (sender) sends commands to AgentB (receiver, and sender too) | |
78 | - | |
79 | - - Eval command | |
80 | - - Timeout géré si commande pas exécutée en temps raisonnable : la même commande est ré-exéuctée à l'itération suivante | |
81 | - - AgentA (et AgentB) a son propre scenario de commandes à envoyer à AgentB | |
82 | - - Mode opératoire pour lancer un agent: | |
83 | - - pour démarrer agentA : ./pyros.py start agentA [-c configfile] | |
77 | + pyros.py peut lancer plusieurs agents (A et B) en même temps (+ flush_commands + test) | |
78 | + | |
79 | + - Mode opératoire : | |
80 | + - pour lancer agentA seulement : ./pyros.py start agentA [-c configfile] | |
81 | + - pour lancer plusieurs agents : ./pyros.py start agentA,agentB,... [-c configfile] | |
84 | 82 | (ou encore: activer l'environnement virtuel, puis lancer "./AgentA.py configfile") |
85 | - - pour démarrer agentB : ouvrir un autre terminal et taper "./pyros.py start agentB" | |
86 | 83 | - pour utiliser thread ou processus : il suffit de mettre la constante RUN_IN_THREAD de AgentA (ou AgentB ou AgentX) à False ou True |
87 | - - GROSSE OPTIMISATION : plus besoin du script intermédiaire "start_agent.py" !!! | |
88 | - ==> pyros.py lance directement "cd src/agent/ ; python AgentX.py" | |
84 | + | |
85 | + - Scenario de test : | |
86 | + - lancer agents A et B : ./pyros.py start agentA,agentB | |
87 | + - attendre 1mn et attendre les 2 résultats suivants: | |
88 | + (AgentA): Finished testing => result is ok | |
89 | + (AgentB): Finished testing => result is ok | |
90 | + | |
91 | + - Autres remarques: | |
92 | + - Nouvelle commande "flush_commands" pour purger les commmandes en attente | |
93 | + - routine_process() implemented | |
94 | + - Eval command implemented | |
95 | + - Timeout géré : si commande pas exécutée en temps raisonnable => la même commande est ré-exéuctée à l'itération suivante | |
96 | + - Chaque agent a son propre scenario de commandes à envoyer | |
97 | + - GROSSE OPTIMISATION : plus besoin du script intermédiaire "start_agent.py" !!! | |
98 | + ==> pyros.py lance directement "cd src/agent/ ; python AgentX.py" | |
89 | 99 | |
90 | 100 | -------------------------------------------------------------------------------------------- |
91 | 101 | - TECHNICAL DOC: tinyurl.com/pyros-doc | ... | ... |
pyros.py
... | ... | @@ -123,6 +123,10 @@ def execProcess(command, from_venv=False, is_async=False): |
123 | 123 | printFullTerm(Colors.BLUE, "Executing command" + " [" + command + "]" + from_venv_str) |
124 | 124 | if from_venv: command = VENV_BIN+' ' + command |
125 | 125 | process = subprocess.Popen(command, shell=True) |
126 | + if is_async: | |
127 | + #current_processes.append(process) | |
128 | + return process | |
129 | + # Not is_async => Wait for end of execution | |
126 | 130 | process.wait() |
127 | 131 | if process.returncode == 0: |
128 | 132 | printFullTerm(Colors.GREEN, "Process executed successfully") |
... | ... | @@ -133,11 +137,14 @@ def execProcess(command, from_venv=False, is_async=False): |
133 | 137 | #return process.returncode |
134 | 138 | return True if process.returncode==0 else False |
135 | 139 | |
136 | -def execProcessFromVenv(command:str): | |
137 | - return execProcess(command, from_venv=True) | |
140 | +def execProcessFromVenv(command:str, is_async=False): | |
141 | + #return execProcess(command, from_venv=True, is_async) | |
142 | + return execProcess(command, True, is_async) | |
138 | 143 | |
139 | 144 | #TODO: fusionner dans execProcess avec param is_async |
140 | 145 | def execProcessFromVenvAsync(command:str): |
146 | + return execProcessFromVenv(command, True) | |
147 | + """ | |
141 | 148 | args = command.split() |
142 | 149 | printFullTerm( |
143 | 150 | Colors.BLUE, "Executing command from venv [" + str(" ".join(args[1:])) + "]" |
... | ... | @@ -148,7 +155,7 @@ def execProcessFromVenvAsync(command:str): |
148 | 155 | # self.addExecuted(self.current_command, str(' '.join(args[1:]))) |
149 | 156 | # p.wait() |
150 | 157 | return p |
151 | - | |
158 | + """ | |
152 | 159 | |
153 | 160 | def printColor(color: Colors, message, file=sys.stdout, eol=os.linesep, forced=False): |
154 | 161 | #system = platform.system() |
... | ... | @@ -336,23 +343,16 @@ def start(agent:str, configfile:str): |
336 | 343 | if not _check_agent(a): return |
337 | 344 | print("Agents are:", agents) |
338 | 345 | # 1 agent only |
339 | - elif not _check_agent(agent): return | |
340 | - # VENV_BIN = 'private/venv_py3_pyros' + os.sep + self.b_in_dir + os.sep + self.bin_name | |
341 | - | |
342 | - | |
343 | - # Start Agents | |
344 | - """ | |
345 | - if agent=="majordome" or agent=="all": | |
346 | - from majordome.tasks import Majordome | |
347 | - Majordome().run() | |
348 | - if agent=="alert_manager" or agent=="all": | |
349 | - from alert_manager.tasks import AlertListener | |
350 | - AlertListener().run() | |
351 | - """ | |
346 | + else: | |
347 | + agents = [agent] | |
348 | + if not _check_agent(agent): return | |
349 | + | |
350 | + # Start Agents (processes) | |
351 | + current_processes = [] | |
352 | 352 | for agent_name,agent_folder in AGENTS.items(): |
353 | 353 | |
354 | - if agent in ("all", agent_name) : | |
355 | - | |
354 | + #if agent in ("all", agent_name) : | |
355 | + if agent=="all" or agent_name in agents: | |
356 | 356 | # Default case, launch agentX |
357 | 357 | #if agent_name == "agentX": |
358 | 358 | |
... | ... | @@ -362,6 +362,8 @@ def start(agent:str, configfile:str): |
362 | 362 | #if not test_mode(): execProcess(VENV_BIN + " manage.py runserver") |
363 | 363 | #if not test_mode(): execProcessFromVenv("start_agent_" + agent_name + ".py " + configfile) |
364 | 364 | |
365 | + current_dir = os.getcwd() | |
366 | + | |
365 | 367 | # OLD format agents: majordome, monitoring, alert... |
366 | 368 | cmd = "start_agent.py " + agent_name + " " + configfile |
367 | 369 | |
... | ... | @@ -379,39 +381,29 @@ def start(agent:str, configfile:str): |
379 | 381 | #cmd = "-m AgentX" |
380 | 382 | cmd = f" Agent{agent_name[5:]}.py {configfile}" |
381 | 383 | |
382 | - if not test_mode(): execProcessFromVenv(cmd) | |
384 | + #if not test_mode(): execProcessFromVenv(cmd) | |
385 | + if not test_mode(): current_processes.append( (execProcessFromVenvAsync(cmd), agent_name) ) | |
383 | 386 | # self._change_dir("..") |
387 | + os.chdir(current_dir) | |
384 | 388 | |
385 | - ''' | |
386 | - # Any other agent | |
387 | - else: | |
388 | - # Go into src/ | |
389 | - # self._change_dir("src") | |
390 | - os.chdir("src") | |
391 | - # print("Current directory : " + str(os.getcwd())) | |
392 | - if agent_name != "webserver": os.chdir(agent_folder) | |
393 | - # self.execProcessFromVenvAsync(self.VENV_BIN + ' start_agent_'+agent+'.py') | |
394 | - #print(VENV_BIN) | |
395 | - print("Launching agent", agent_name, "...") | |
396 | - #if not test_mode(): execProcess(VENV_BIN + " start_agent_" + agent_name + ".py") | |
397 | - #TODO: | |
398 | - # - start_agent agent_name (1 script unique) | |
399 | - # - start_agent -c configfile | |
400 | - cmd = "start_agent_" + agent_name + ".py " + configfile | |
401 | - # Django default dev web server | |
402 | - if agent_name == "webserver": cmd = "manage.py runserver" | |
403 | - if not test_mode(): execProcessFromVenv(cmd) | |
404 | - # Go back to src/ | |
405 | - # self._change_dir('..') | |
406 | - os.chdir("..") | |
407 | - ''' | |
408 | - | |
409 | 389 | # Go back to root folder (/) |
410 | 390 | # self._change_dir('..') |
411 | - os.chdir("..") | |
412 | - return True | |
413 | - | |
414 | - | |
391 | + #os.chdir("..") | |
392 | + # Wait for end of each process execution | |
393 | + for (p,agent) in current_processes: | |
394 | + print(f"************ Waiting for end of execution of agent {agent} ************") | |
395 | + p.wait() | |
396 | + print(f"************ END of execution of agent {agent} ************") | |
397 | + if p.returncode == 0: | |
398 | + printFullTerm(Colors.GREEN, f"Process {agent} executed successfully") | |
399 | + # self.addExecuted(self.current_command, command) | |
400 | + else: | |
401 | + printFullTerm(Colors.WARNING, f"Process {agent} execution failed") | |
402 | + # self.addError(self.current_command, command) | |
403 | + | |
404 | + #print("************ end of START() ************") | |
405 | + # Only according to the last process status: | |
406 | + return True if p.returncode==0 else False | |
415 | 407 | |
416 | 408 | |
417 | 409 | @pyros_launcher.command(help="Kill an agent") | ... | ... |
src/agent/Agent.py
... | ... | @@ -44,7 +44,7 @@ sys.path.append("../..") |
44 | 44 | print("Starting with this sys.path", sys.path) |
45 | 45 | |
46 | 46 | # DJANGO setup |
47 | -# print("file is", __file__) | |
47 | +# self.print("file is", __file__) | |
48 | 48 | # mypath = os.getcwd() |
49 | 49 | # Go into src/ |
50 | 50 | ##os.chdir("..") |
... | ... | @@ -160,7 +160,7 @@ class Agent: |
160 | 160 | SIMULATOR_COMMANDS_DEST = "myself" |
161 | 161 | # Default scenario to be executed |
162 | 162 | #SIMULATOR_COMMANDS = iter([ |
163 | - SIMULATOR_COMMANDS = [ | |
163 | + SIMULATOR_COMMANDS_LIST = [ | |
164 | 164 | "go_active", |
165 | 165 | "go_idle", |
166 | 166 | |
... | ... | @@ -210,6 +210,8 @@ class Agent: |
210 | 210 | "go_active", |
211 | 211 | "specific10" |
212 | 212 | ] |
213 | + #SIMULATOR_COMMANDS = iter(SIMULATOR_COMMANDS_LIST) | |
214 | + | |
213 | 215 | |
214 | 216 | |
215 | 217 | """ |
... | ... | @@ -290,25 +292,25 @@ class Agent: |
290 | 292 | |
291 | 293 | def __init__(self, name:str="Agent", config_filename:str=None, RUN_IN_THREAD=True): |
292 | 294 | self.name = name |
293 | - self.SIMULATOR_COMMANDS = iter(self.SIMULATOR_COMMANDS) | |
295 | + self.SIMULATOR_COMMANDS = iter(self.SIMULATOR_COMMANDS_LIST) | |
294 | 296 | self.RUN_IN_THREAD = RUN_IN_THREAD |
295 | 297 | self.set_status(self.STATUS_LAUNCH) |
296 | 298 | self.set_mode(self.MODE_IDLE) |
297 | 299 | if not config_filename: |
298 | 300 | config_filename = self.DEFAULT_CONFIG_FILE_NAME |
299 | - | |
300 | - print(f"config_filename={config_filename}") | |
301 | + | |
302 | + self.print(f"config_filename={config_filename}") | |
301 | 303 | # If config file name is RELATIVE (i.e. without path, just the file name) |
302 | 304 | # => give it an absolute path (and remove "src/agent/" from it) |
303 | 305 | if config_filename == os.path.basename(config_filename): |
304 | 306 | tmp = os.path.abspath(self.CONFIG_DIR + os.sep + config_filename) |
305 | 307 | config_filename = os.path.abspath(self.CONFIG_DIR + os.sep + config_filename).replace(os.sep+"src"+os.sep,os.sep).replace(os.sep+"agent"+os.sep,os.sep) |
306 | - print("Config file used is", config_filename) | |
307 | - #print("current path", os.getcwd()) | |
308 | - #print("this file path :", __file__) | |
309 | - #print("config file path is", config_filename) | |
308 | + self.print("Config file used is", config_filename) | |
309 | + #self.print("current path", os.getcwd()) | |
310 | + #self.print("this file path :", __file__) | |
311 | + #self.print("config file path is", config_filename) | |
310 | 312 | # Instantiate an object for configuration |
311 | - #print("config file path is ", config_abs_filename) | |
313 | + #self.print("config file path is ", config_abs_filename) | |
312 | 314 | self.config = ConfigPyros(config_filename) |
313 | 315 | if self.config.get_last_errno() != self.config.NO_ERROR: |
314 | 316 | raise Exception(f"Bad config file name '{config_filename}', error {str(self.config.get_last_errno())}: {str(self.config.get_last_errmsg())}") |
... | ... | @@ -324,7 +326,7 @@ class Agent: |
324 | 326 | #if nb_agents == 0: |
325 | 327 | if not AgentSurvey.objects.filter(name=self.name).exists(): |
326 | 328 | self._agent_survey = AgentSurvey.objects.create(name=self.name, validity_duration_sec=60, mode=self.mode, status=self.status) |
327 | - print("Agent survey is", self._agent_survey) | |
329 | + self.print("Agent survey is", self._agent_survey) | |
328 | 330 | #self._agent_survey = AgentSurvey(name=self.name, validity_duration_sec=60, mode=self.mode, status=self.status) |
329 | 331 | #self._agent_survey.save() |
330 | 332 | |
... | ... | @@ -332,6 +334,14 @@ class Agent: |
332 | 334 | def __str__(self): |
333 | 335 | return "I am agent " + self.name |
334 | 336 | |
337 | + #def print(self, msg): | |
338 | + def print(self, *args, **kwargs): | |
339 | + #print(f"({self.name}): ", msg) | |
340 | + if args: | |
341 | + print(f"({self.name}): ", *args, **kwargs) | |
342 | + else: | |
343 | + print() | |
344 | + | |
335 | 345 | def sleep(self, nbsec:float=2.0): |
336 | 346 | # thread |
337 | 347 | if self._current_specific_thread and self.RUN_IN_THREAD: |
... | ... | @@ -355,13 +365,13 @@ class Agent: |
355 | 365 | |
356 | 366 | # Avoid blocking on false "running" commands |
357 | 367 | # (old commands that stayed with "running" status when agent was killed) |
358 | - Command.delete_commands_with_running_status_if_exists_for_agent(self.name) | |
368 | + Command.delete_commands_with_running_status_for_agent(self.name) | |
359 | 369 | |
360 | 370 | ''' |
361 | - print() | |
362 | - print(self) | |
363 | - print("FOR REAL ?", self.FOR_REAL) | |
364 | - print("DB3 used is:", djangosettings.DATABASES["default"]["NAME"]) | |
371 | + self.print() | |
372 | + self.print(self) | |
373 | + self.print("FOR REAL ?", self.FOR_REAL) | |
374 | + self.print("DB3 used is:", djangosettings.DATABASES["default"]["NAME"]) | |
365 | 375 | |
366 | 376 | # SETUP |
367 | 377 | try: |
... | ... | @@ -373,7 +383,7 @@ class Agent: |
373 | 383 | # self.config = Config.objects.get()[0] |
374 | 384 | except Exception as e: |
375 | 385 | # except Config.ObjectDoesNotExist: |
376 | - print("Config read (or write) exception", str(e)) | |
386 | + self.print("Config read (or write) exception", str(e)) | |
377 | 387 | return -1 |
378 | 388 | ''' |
379 | 389 | |
... | ... | @@ -383,15 +393,15 @@ class Agent: |
383 | 393 | # Wait a random number of sec before starting new iteration |
384 | 394 | # (to let another agent having the chance to send a command) |
385 | 395 | random_waiting_sec = random.randint(0,5) |
386 | - print(f"Waiting {random_waiting_sec} sec (random) before starting new iteration...") | |
396 | + self.print(f"Waiting {random_waiting_sec} sec (random) before starting new iteration...") | |
387 | 397 | time.sleep(random_waiting_sec) |
388 | 398 | |
389 | 399 | try: |
390 | 400 | |
391 | - print() | |
392 | - print() | |
393 | - #print("-"*80) | |
394 | - print("-"*20, f"MAIN LOOP ITERATION {self._iter_num} (START)", "-"*20) | |
401 | + self.print() | |
402 | + self.print() | |
403 | + #self.print("-"*80) | |
404 | + self.print("-"*20, f"MAIN LOOP ITERATION {self._iter_num} (START)", "-"*20) | |
395 | 405 | self.set_status(self.STATUS_MAIN_LOOP) |
396 | 406 | self.show_mode_and_status() |
397 | 407 | |
... | ... | @@ -402,7 +412,7 @@ class Agent: |
402 | 412 | #if self.SIMULATOR_MODE: self.simulator_send_next_command() |
403 | 413 | |
404 | 414 | # generic cmd in json format |
405 | - print("------START COMMMAND PROCESSING------") | |
415 | + self.print("------START COMMMAND PROCESSING------") | |
406 | 416 | cmd = self.get_next_valid_command() |
407 | 417 | #if cmd: cmd = self.general_process(cmd) |
408 | 418 | if cmd: cmd = self.command_process(cmd) |
... | ... | @@ -412,32 +422,33 @@ class Agent: |
412 | 422 | if cmd: self.specific_process(cmd) |
413 | 423 | ''' |
414 | 424 | self.routine_process() |
415 | - print("------END COMMMAND PROCESSING------") | |
425 | + self.print("------END COMMMAND PROCESSING------") | |
416 | 426 | |
417 | 427 | # Every N iterations, delete old commands |
418 | 428 | N=3 |
419 | - if ((self._iter_num-1) % N) == 0: Command.purge_old_commands_for_agent(self.name) | |
429 | + if ((self._iter_num-1) % N) == 0: | |
430 | + self.print("Looking for old commands to purge...") | |
431 | + Command.purge_old_commands_for_agent(self.name) | |
420 | 432 | |
421 | 433 | #self.waitfor(self.mainloop_waittime) |
422 | 434 | |
423 | - print("-"*20, "MAIN LOOP ITERATION (END)", "-"*20) | |
424 | - #print("-"*80) | |
435 | + self.print("-"*20, "MAIN LOOP ITERATION (END)", "-"*20) | |
436 | + #self.print("-"*80) | |
425 | 437 | |
426 | 438 | #self.do_log(LOG_DEBUG, "Ending main loop iteration") |
427 | 439 | |
428 | 440 | self._iter_num += 1 |
429 | 441 | |
430 | 442 | # Exit if max duration is timed out |
431 | - if self.MAX_DURATION_SEC: | |
432 | - #print ("time", time.time()-start_time) | |
433 | - if time.time()-start_time > self.MAX_DURATION_SEC: | |
434 | - print("Exit because of max duration set to ", self.MAX_DURATION_SEC, "s") | |
435 | - self.kill_running_specific_cmd_if_exists() | |
436 | - break | |
443 | + if self.MAX_DURATION_SEC and (time.time()-start_time > self.MAX_DURATION_SEC): | |
444 | + self.print("Exit because of max duration set to ", self.MAX_DURATION_SEC, "s") | |
445 | + self.kill_running_specific_cmd_if_exists() | |
446 | + if self.SIMULATOR_MODE and self.SIMULATOR_WITH_TEST: self.simulator_test_results() | |
447 | + break | |
437 | 448 | |
438 | 449 | except KeyboardInterrupt: |
439 | 450 | # In case of CTRL-C, kill the current thread (process) before dying (in error) |
440 | - print("CTRL-C Interrupted, I kill the current thread (process) before exiting") | |
451 | + self.print("CTRL-C Interrupted, I kill the current thread (process) before exiting") | |
441 | 452 | self.kill_running_specific_cmd_if_exists() |
442 | 453 | exit(1) |
443 | 454 | |
... | ... | @@ -467,7 +478,7 @@ class Agent: |
467 | 478 | if self.cmdts is None: return |
468 | 479 | |
469 | 480 | # 1) Send cmd |
470 | - print(f"Send command", self.cmdts) | |
481 | + self.print(f"Send command", self.cmdts) | |
471 | 482 | self.cmdts.send() |
472 | 483 | cmdts_is_processed = False |
473 | 484 | cmdts_res = None |
... | ... | @@ -475,7 +486,7 @@ class Agent: |
475 | 486 | # 2) Wait for end of cmd execution |
476 | 487 | #self.wait_for_execution_of_cmd(self.cmdts) |
477 | 488 | while not cmdts_is_processed: |
478 | - print(f"Waiting for end of cmd {self.cmdts.name} execution...") | |
489 | + self.print(f"Waiting for end of cmd {self.cmdts.name} execution...") | |
479 | 490 | self.cmdts.refresh_from_db() |
480 | 491 | # timeout ? |
481 | 492 | if self.cmdts.is_expired(): break |
... | ... | @@ -489,9 +500,9 @@ class Agent: |
489 | 500 | |
490 | 501 | # 3) Get cmd result |
491 | 502 | if cmdts_is_processed: |
492 | - print(f"Cmd executed. Result is '{cmdts_res}'") | |
503 | + self.print(f"Cmd executed. Result is '{cmdts_res}'") | |
493 | 504 | else: |
494 | - print("Command was not completed") | |
505 | + self.print("Command was not completed") | |
495 | 506 | |
496 | 507 | |
497 | 508 | """ |
... | ... | @@ -503,10 +514,10 @@ class Agent: |
503 | 514 | NB: datetime.utcnow() is equivalent to datetime.now(timezone.utc) |
504 | 515 | ### |
505 | 516 | |
506 | - print("Looking for old commands to purge...") | |
517 | + self.print("Looking for old commands to purge...") | |
507 | 518 | ### |
508 | 519 | COMMAND_PEREMPTION_DATE_FROM_NOW = datetime.utcnow() - timedelta(hours = self.COMMANDS_PEREMPTION_HOURS) |
509 | - #print("peremption date", COMMAND_PEREMPTION_DATE_FROM_NOW) | |
520 | + #self.print("peremption date", COMMAND_PEREMPTION_DATE_FROM_NOW) | |
510 | 521 | old_commands = Command.objects.filter( |
511 | 522 | # only commands for me |
512 | 523 | receiver = self.name, |
... | ... | @@ -516,22 +527,22 @@ class Agent: |
516 | 527 | ### |
517 | 528 | old_commands = Command.get_old_commands_for_agent(self.name) |
518 | 529 | if old_commands.exists(): |
519 | - print("Found old commands to delete:") | |
520 | - for cmd in old_commands: print(cmd) | |
530 | + self.print("Found old commands to delete:") | |
531 | + for cmd in old_commands: self.print(cmd) | |
521 | 532 | old_commands.delete() |
522 | 533 | """ |
523 | 534 | |
524 | 535 | def waitfor(self, nbsec): |
525 | - print(f"Now, waiting for {nbsec} seconds...") | |
536 | + self.print(f"Now, waiting for {nbsec} seconds...") | |
526 | 537 | time.sleep(nbsec) |
527 | 538 | |
528 | 539 | def set_status(self, status:str): |
529 | - print(f"[NEW CURRENT STATUS: {status}] (switching from status {self.status})") | |
540 | + self.print(f"[NEW CURRENT STATUS: {status}] (switching from status {self.status})") | |
530 | 541 | self.status = status |
531 | 542 | return False |
532 | 543 | |
533 | 544 | def set_mode(self, mode:str): |
534 | - print(f"Switching from mode {self.mode} to mode {mode}") | |
545 | + self.print(f"Switching from mode {self.mode} to mode {mode}") | |
535 | 546 | self.mode = mode |
536 | 547 | |
537 | 548 | def is_active(self): |
... | ... | @@ -544,7 +555,7 @@ class Agent: |
544 | 555 | self.set_mode(self.MODE_IDLE) |
545 | 556 | |
546 | 557 | def show_mode_and_status(self): |
547 | - print(f"CURRENT MODE is {self.mode} (with status {self.status})") | |
558 | + self.print(f"CURRENT MODE is {self.mode} (with status {self.status})") | |
548 | 559 | |
549 | 560 | def die(self): |
550 | 561 | self.set_status(self.STATUS_EXIT) |
... | ... | @@ -592,7 +603,7 @@ class Agent: |
592 | 603 | """ |
593 | 604 | |
594 | 605 | def init(self): |
595 | - print("Initializing...") | |
606 | + self.print("Initializing...") | |
596 | 607 | self.set_status(self.STATUS_INIT) |
597 | 608 | |
598 | 609 | def load_config(self): |
... | ... | @@ -600,7 +611,7 @@ class Agent: |
600 | 611 | TODO: |
601 | 612 | only si date fichier xml changée => en RAM, un objet Config avec méthodes d'accès, appelle le parser de AK (classe Config.py indépendante) |
602 | 613 | """ |
603 | - print("Loading the config file...") | |
614 | + self.print("Loading the config file...") | |
604 | 615 | #config_filename = 'c:/srv/develop/pyros/config/config_unit_simulunit1.xml' |
605 | 616 | #config.set_configfile(config_filename) |
606 | 617 | self.config.load() |
... | ... | @@ -609,21 +620,21 @@ class Agent: |
609 | 620 | # --- display informations |
610 | 621 | # --- Get all the assembly of this unit[0] (mount + channels) |
611 | 622 | if self.config.is_config_contents_changed(): |
612 | - print("--------- Components of the unit -----------") | |
613 | - print("Configuration file is {}".format(self.config.get_configfile())) | |
623 | + self.print("--------- Components of the unit -----------") | |
624 | + self.print("Configuration file is {}".format(self.config.get_configfile())) | |
614 | 625 | alias = self.config.get_aliases('unit')[0] |
615 | 626 | namevalue = self.config.get_paramvalue(alias,'unit','name') |
616 | - print("Unit alias is {}. Name is {}".format(alias,namevalue), ":") | |
627 | + self.print("Unit alias is {}. Name is {}".format(alias,namevalue), ":") | |
617 | 628 | unit_subtags = self.config.get_unit_subtags() |
618 | 629 | for unit_subtag in unit_subtags: |
619 | 630 | aliases = self.config.get_aliases(unit_subtag) |
620 | 631 | for alias in aliases: |
621 | 632 | namevalue = self.config.get_paramvalue(alias,unit_subtag,'name') |
622 | - print(f"- {unit_subtag} alias is {alias}. Name is {namevalue}") | |
623 | - print("------------------------------------------") | |
633 | + self.print(f"- {unit_subtag} alias is {alias}. Name is {namevalue}") | |
634 | + self.print("------------------------------------------") | |
624 | 635 | #params = self.config.get_params(unit_alias) |
625 | 636 | #for param in params: |
626 | - # print("Unit component is {}".format(param)) | |
637 | + # self.print("Unit component is {}".format(param)) | |
627 | 638 | |
628 | 639 | """ |
629 | 640 | # self.config = Config.objects.get(pk=1) |
... | ... | @@ -637,15 +648,15 @@ class Agent: |
637 | 648 | except Exception as e: |
638 | 649 | # except Config.ObjectDoesNotExist: |
639 | 650 | # except Config.DoesNotExist: |
640 | - print("Config read (or write) exception", str(e)) | |
651 | + self.print("Config read (or write) exception", str(e)) | |
641 | 652 | # return self.config |
642 | 653 | # return -1 |
643 | 654 | return False |
644 | 655 | """ |
645 | 656 | |
646 | 657 | def update_survey(self): |
647 | - print("Updating the survey database table...") | |
648 | - print("- fetching table line for agent", self.name) | |
658 | + self.print("Updating the survey database table...") | |
659 | + #self.print("- fetching table line for agent", self.name) | |
649 | 660 | # only necessary when using process (not necessary with threads) |
650 | 661 | #with transaction.atomic(): |
651 | 662 | self._agent_survey = AgentSurvey.objects.get(name=self.name) |
... | ... | @@ -653,26 +664,6 @@ class Agent: |
653 | 664 | self._agent_survey.status = self.status |
654 | 665 | self._agent_survey.save() |
655 | 666 | |
656 | - | |
657 | - def simulator_get_next_command_to_send(self)->Command: | |
658 | - cmd_name = next(self.SIMULATOR_COMMANDS, None) | |
659 | - if cmd_name is None: return None | |
660 | - receiver_agent = self.name if self.SIMULATOR_COMMANDS_DEST=="myself" else self.SIMULATOR_COMMANDS_DEST | |
661 | - return Command(sender=self.name, receiver=receiver_agent, name=cmd_name) | |
662 | - | |
663 | - def simulator_send_next_command(self): | |
664 | - #self._current_test_cmd = "go_idle" if self._current_test_cmd=="go_active" else "go_active" | |
665 | - #if self._nb_test_cmds == 4: self._current_test_cmd = "exit" | |
666 | - cmd_name = next(self.SIMULATOR_COMMANDS, None) | |
667 | - #print("next cmd is ", cmd_name) | |
668 | - if cmd_name is None: return | |
669 | - #Command.objects.create(sender=self.name, receiver=self.name, name=cmd_name) | |
670 | - receiver_agent = self.name if self.SIMULATOR_COMMANDS_DEST=="myself" else self.SIMULATOR_COMMANDS_DEST | |
671 | - Command.objects.create(sender=self.name, receiver=receiver_agent, name=cmd_name) | |
672 | - #time.sleep(1) | |
673 | - #self._simulator_current_cmd_idx += 1 | |
674 | - #self._nb_test_cmds += 1 | |
675 | - | |
676 | 667 | """ |
677 | 668 | def send_command(self, cmd_name): |
678 | 669 | receiver_agent = self.name if self.SIMULATOR_COMMANDS_DEST=="myself" else self.SIMULATOR_COMMANDS_DEST |
... | ... | @@ -686,26 +677,28 @@ class Agent: |
686 | 677 | Commands are read in chronological order |
687 | 678 | """ |
688 | 679 | self.set_status(self.STATUS_GET_NEXT_COMMAND) |
689 | - print("Looking for new commands from the database ...") | |
680 | + self.print("Looking for new commands from the database ...") | |
690 | 681 | |
691 | 682 | # 1) Get all pending commands for me (return if None) |
692 | - # Not sure this is necessary, but there might be a risk | |
683 | + # Not sure this is necessary to do it in a transaction, | |
684 | + # but there might be a risk | |
693 | 685 | # that a command status is modified while we are reading... |
694 | 686 | with transaction.atomic(): |
695 | 687 | self._pending_commands = Command.get_pending_commands_for_agent(self.name) |
696 | 688 | commands = self._pending_commands |
697 | 689 | if not commands.exists(): |
698 | - print("No new command to process") | |
690 | + self.print("No new command to process") | |
699 | 691 | return None |
700 | - print("Current pending commands are (time ordered) :") | |
692 | + self.print("Current pending commands are (time ordered) :") | |
701 | 693 | Command.show_commands(commands) |
702 | 694 | |
703 | 695 | # 2) If there is a "exit" or "abort" command pending (even at the end of the list), |
704 | 696 | # which is VALID (not expired), |
705 | 697 | # then pass it straight away to general_process() for execution |
706 | - for cmd in commands: | |
707 | - if cmd.name in ("exit", "abort"): break | |
708 | - if cmd.name in ("exit", "abort") and not cmd.is_expired(): return cmd | |
698 | + for cmd in commands: | |
699 | + if cmd.name in ("exit", "abort", "flush_commands"): break | |
700 | + if cmd.name in ("exit", "abort", "flush_commands") and not cmd.is_expired(): | |
701 | + return cmd | |
709 | 702 | |
710 | 703 | # 3) If first (oldest) command is currently running |
711 | 704 | # (status CMD_RUNNING), then do nothing and return |
... | ... | @@ -721,16 +714,16 @@ class Agent: |
721 | 714 | """ |
722 | 715 | cmd = commands[0] |
723 | 716 | if cmd.is_running(): |
724 | - #print(f"There is currently a running command ({cmd_executing.first().name}), so I do nothing (wait for end of execution)") | |
725 | - print(f"There is currently a running command ({cmd.name})") | |
717 | + #self.print(f"There is currently a running command ({cmd_executing.first().name}), so I do nothing (wait for end of execution)") | |
718 | + self.print(f"There is currently a running command ({cmd.name})") | |
726 | 719 | """ |
727 | 720 | # Check that this command is not expired |
728 | 721 | if cmd.is_expired(): |
729 | - print("But this command is expired, so set its status to OUTOFDATE, and go on") | |
722 | + self.print("But this command is expired, so set its status to OUTOFDATE, and go on") | |
730 | 723 | cmd_executing.set_as_outofdate() |
731 | 724 | else: |
732 | 725 | """ |
733 | - print(f"Thus, I will do nothing until this command execution is finished") | |
726 | + self.print(f"Thus, I will do nothing until this command execution is finished") | |
734 | 727 | # TODO: kill si superieur a MAX_EXEC_TIME |
735 | 728 | return None |
736 | 729 | |
... | ... | @@ -745,10 +738,10 @@ class Agent: |
745 | 738 | |
746 | 739 | # 6) Current cmd must now be a valid (not expired) and PENDING one, |
747 | 740 | # so pass it to general_process() for execution |
748 | - #print(f"Got command {cmd.name} sent by agent {cmd.sender} at {cmd.sender_deposit_time}") | |
749 | - print("***") | |
750 | - print("*** Got", cmd) | |
751 | - print("***") | |
741 | + #self.print(f"Got command {cmd.name} sent by agent {cmd.sender} at {cmd.sender_deposit_time}") | |
742 | + self.print("***") | |
743 | + self.print("*** Got", cmd) | |
744 | + self.print("***") | |
752 | 745 | return cmd |
753 | 746 | |
754 | 747 | |
... | ... | @@ -762,34 +755,35 @@ class Agent: |
762 | 755 | def general_process(self, cmd:Command)->Command: |
763 | 756 | |
764 | 757 | self.set_status(self.STATUS_GENERAL_PROCESS) |
765 | - print(f"Starting general processing of {cmd}") | |
758 | + self.print(f"Starting general processing of {cmd}") | |
766 | 759 | |
767 | 760 | # Update read time to say that the command has been READ |
768 | 761 | cmd.set_read_time() |
769 | 762 | # Precondition: command cmd is valid (not expired), has already been read, is pending |
770 | 763 | assert (not cmd.is_expired()) and cmd.is_pending() and cmd.is_read() |
771 | 764 | |
772 | - #print(f"Starting general processing of command {cmd.name} sent by agent {cmd.sender} at {cmd.sender_deposit_time}") | |
765 | + #self.print(f"Starting general processing of command {cmd.name} sent by agent {cmd.sender} at {cmd.sender_deposit_time}") | |
773 | 766 | |
774 | 767 | """ |
775 | 768 | # 2) If expired command, change its status to expired and return |
776 | 769 | if cmd.is_expired(): |
777 | - print("This command is expired, so mark it as such, and ignore it") | |
770 | + self.print("This command is expired, so mark it as such, and ignore it") | |
778 | 771 | cmd.set_as_outofdate() |
779 | 772 | return None |
780 | 773 | """ |
781 | 774 | |
782 | 775 | # If cmd is generic, execute it, change its status to executed, and return |
783 | 776 | if cmd.is_generic(): |
784 | - print("This command is generic, execute it...") | |
777 | + self.print("This command is generic, execute it...") | |
785 | 778 | self.exec_generic_cmd(cmd) |
786 | 779 | # If cmd is "exit", kill myself (without any question, this is an order soldier !) |
787 | 780 | # This "exit" should normally kill any current thread (to be checked...) |
788 | 781 | if cmd.name == "exit": |
789 | - print("(before exiting) Here are the current (still) pending commands (time ordered) :") | |
782 | + self.print("(before exiting) Here are the current (still) pending commands (time ordered) :") | |
790 | 783 | commands = Command.get_pending_commands_for_agent(self.name) |
791 | 784 | Command.show_commands(commands) |
792 | - if self.SIMULATOR_MODE and self.SIMULATOR_WITH_TEST and self.SIMULATOR_COMMANDS_DEST == "myself": self.simulator_test_results() | |
785 | + #if self.SIMULATOR_MODE and self.SIMULATOR_WITH_TEST and self.SIMULATOR_COMMANDS_DEST == "myself": self.simulator_test_results() | |
786 | + if self.SIMULATOR_MODE and self.SIMULATOR_WITH_TEST: self.simulator_test_results() | |
793 | 787 | exit(0) |
794 | 788 | # Command is executed, so return None |
795 | 789 | return None |
... | ... | @@ -799,19 +793,19 @@ class Agent: |
799 | 793 | # cmd is not generic but, as I am idle, change its status to SKIPPED, ignore it, and return |
800 | 794 | #if self.mode == self.MODE_IDLE: |
801 | 795 | if not self.is_active(): |
802 | - print("This command is not generic but, as I am IDLE, I mark it SKIPPED and ignore it") | |
796 | + self.print("This command is not generic but, as I am IDLE, I mark it SKIPPED and ignore it") | |
803 | 797 | cmd.set_as_skipped() |
804 | 798 | return None |
805 | 799 | |
806 | 800 | # Je suis pas idle et cde pas générique: je la traite pas, elle sera traitée par core_process : |
807 | 801 | # attendre que cette commande soit exécutée avant de passer à la commande suivante (situation “bloquante” normale) |
808 | - print("This command is not generic and, as I am not IDLE, I pass it to the specific processing") | |
809 | - print("(then I will not execute any other new command until this command is EXECUTED)") | |
802 | + self.print("This command is not generic and, as I am not IDLE, I pass it to the specific processing") | |
803 | + self.print("(then I will not execute any other new command until this command is EXECUTED)") | |
810 | 804 | return cmd |
811 | 805 | |
812 | 806 | |
813 | 807 | def exec_generic_cmd(self, cmd:Command): |
814 | - print("Starting execution of a Generic cmd...") | |
808 | + self.print("Starting execution of a Generic cmd...") | |
815 | 809 | cmd.set_as_running() |
816 | 810 | |
817 | 811 | # Executing command |
... | ... | @@ -819,19 +813,22 @@ class Agent: |
819 | 813 | self.set_active() |
820 | 814 | cmd.set_result("I am now active") |
821 | 815 | time.sleep(1) |
822 | - if cmd.name == "go_idle": | |
816 | + elif cmd.name == "go_idle": | |
823 | 817 | self.set_idle() |
824 | 818 | cmd.set_result("I am now idle") |
825 | 819 | time.sleep(1) |
826 | - # If cmd is "abort", kill any currently running thread | |
827 | - if cmd.name in ("abort", "exit"): | |
828 | - #print("Current pending commands are:") | |
820 | + elif cmd.name in ("flush_commands"): | |
821 | + self.print("flush_commands received: Delete all pending commands") | |
822 | + Command.delete_pending_commands_for_agent(self.name) | |
823 | + # If cmd is "abort" or "exit", kill any currently running thread | |
824 | + elif cmd.name in ("abort", "exit"): | |
825 | + #self.print("Current pending commands are:") | |
829 | 826 | #Command.show_commands(self._pending_commands) |
830 | - print("Aborting current executing command if exists:") | |
827 | + self.print("Aborting current executing command if exists:") | |
831 | 828 | self.kill_running_specific_cmd_if_exists() |
832 | 829 | |
833 | 830 | cmd.set_as_processed() |
834 | - print("...Generic cmd has been executed") | |
831 | + self.print("...Generic cmd has been executed") | |
835 | 832 | |
836 | 833 | |
837 | 834 | |
... | ... | @@ -842,7 +839,7 @@ class Agent: |
842 | 839 | - in file |
843 | 840 | - in db |
844 | 841 | """ |
845 | - print("Logging data...") | |
842 | + self.print("Logging data...") | |
846 | 843 | |
847 | 844 | |
848 | 845 | |
... | ... | @@ -860,11 +857,11 @@ class Agent: |
860 | 857 | self.set_status(self.STATUS_SPECIFIC_PROCESS) |
861 | 858 | assert self.is_active() |
862 | 859 | self._current_specific_cmd = cmd |
863 | - print("Starting specific process...") | |
860 | + self.print("Starting specific process...") | |
864 | 861 | #self._current_thread = threading.Thread(target=self.exec_command) |
865 | 862 | # Run in a thread |
866 | 863 | if self.RUN_IN_THREAD: |
867 | - print("(run cmd in a thread)") | |
864 | + self.print("(run cmd in a thread)") | |
868 | 865 | self._current_specific_thread = StoppableThreadEvenWhenSleeping(target=self.thread_exec_specific_cmd) |
869 | 866 | #self._current_specific_thread = StoppableThreadEvenWhenSleeping(target=self.exec_specific_cmd, args=(cmd,)) |
870 | 867 | #self._current_thread = threading.Thread(target=self.exec_command) |
... | ... | @@ -873,7 +870,7 @@ class Agent: |
873 | 870 | #self._current_specific_thread = thread_with_exception('thread test') |
874 | 871 | # Run in a process |
875 | 872 | else: |
876 | - print("(run cmd in a process)") | |
873 | + self.print("(run cmd in a process)") | |
877 | 874 | # close the database connection first, it will be re-opened in each process |
878 | 875 | db.connections.close_all() |
879 | 876 | self._current_specific_thread = multiprocessing.Process(target=self.thread_exec_specific_cmd) |
... | ... | @@ -883,14 +880,14 @@ class Agent: |
883 | 880 | self._current_specific_thread.start() |
884 | 881 | #my_thread.join() |
885 | 882 | #self.waitfor(self.subloop_waittime) |
886 | - print("Ending specific process (thread has been launched)") | |
883 | + self.print("Ending specific process (thread has been launched)") | |
887 | 884 | |
888 | 885 | |
889 | 886 | def kill_running_specific_cmd_if_exists(self): |
890 | 887 | if (self._current_specific_thread is None) or not self._current_specific_thread.is_alive(): |
891 | - print("...No current specific command thread to abort...") | |
888 | + self.print("...No current specific command thread to abort...") | |
892 | 889 | else: |
893 | - print(f"Killing command {self._current_specific_cmd.name}") | |
890 | + self.print(f"Killing command {self._current_specific_cmd.name}") | |
894 | 891 | # Ask the thread to stop itself |
895 | 892 | #self._current_specific_thread.stop() |
896 | 893 | #self._current_specific_thread._stop() |
... | ... | @@ -907,13 +904,42 @@ class Agent: |
907 | 904 | self._current_specific_cmd = None |
908 | 905 | |
909 | 906 | |
907 | + """ | |
908 | + ================================================================= | |
909 | + SIMULATOR DEDICATED FUNCTIONS | |
910 | + ================================================================= | |
911 | + """ | |
912 | + | |
913 | + def simulator_get_next_command_to_send(self)->Command: | |
914 | + cmd_name = next(self.SIMULATOR_COMMANDS, None) | |
915 | + if cmd_name is None: return None | |
916 | + receiver_agent = self.name if self.SIMULATOR_COMMANDS_DEST=="myself" else self.SIMULATOR_COMMANDS_DEST | |
917 | + return Command(sender=self.name, receiver=receiver_agent, name=cmd_name) | |
918 | + | |
919 | + """ | |
920 | + def simulator_send_next_command(self): | |
921 | + #self._current_test_cmd = "go_idle" if self._current_test_cmd=="go_active" else "go_active" | |
922 | + #if self._nb_test_cmds == 4: self._current_test_cmd = "exit" | |
923 | + cmd_name = next(self.SIMULATOR_COMMANDS, None) | |
924 | + #self.print("next cmd is ", cmd_name) | |
925 | + if cmd_name is None: return | |
926 | + #Command.objects.create(sender=self.name, receiver=self.name, name=cmd_name) | |
927 | + receiver_agent = self.name if self.SIMULATOR_COMMANDS_DEST=="myself" else self.SIMULATOR_COMMANDS_DEST | |
928 | + Command.objects.create(sender=self.name, receiver=receiver_agent, name=cmd_name) | |
929 | + #time.sleep(1) | |
930 | + #self._simulator_current_cmd_idx += 1 | |
931 | + #self._nb_test_cmds += 1 | |
932 | + """ | |
933 | + | |
910 | 934 | def simulator_test_results(self): |
911 | - print("\n--- Starting testing if result is ok") | |
912 | - print("Here are the 14 last commands:") | |
935 | + self.print("\n--- Testing if the commands I SENT had the awaited result") | |
936 | + self.print("Here are the last commands I sent:") | |
913 | 937 | #commands = list(Command.get_last_N_commands_for_agent(self.name, 16)) |
914 | - commands = Command.get_last_N_commands_for_agent(self.name, 16) | |
915 | - Command.show_commands(commands) | |
938 | + #commands = Command.get_last_N_commands_sent_to_agent(self.name, 16) | |
939 | + commands = Command.get_last_N_commands_sent_by_agent(self.name, len(self.SIMULATOR_COMMANDS_LIST)) | |
916 | 940 | Command.show_commands(commands) |
941 | + return commands | |
942 | + """ OLD SCENARIO | |
917 | 943 | nb_asserted = 0 |
918 | 944 | for cmd in commands: |
919 | 945 | if cmd.name == "specific0": |
... | ... | @@ -937,7 +963,8 @@ class Agent: |
937 | 963 | assert cmd.is_executed() |
938 | 964 | nb_asserted+=1 |
939 | 965 | assert nb_asserted == 12 |
940 | - print("--- Finished testing => result is ok") | |
966 | + self.print("--- Finished testing => result is ok") | |
967 | + """ | |
941 | 968 | |
942 | 969 | |
943 | 970 | |
... | ... | @@ -965,8 +992,8 @@ class Agent: |
965 | 992 | cmd = self._current_specific_cmd |
966 | 993 | """ specific command execution setting up """ |
967 | 994 | #cmd = self.get_current_specific_cmd() |
968 | - print(">>>>> Thread: starting execution of command", cmd.name) | |
969 | - print(">>>>> Thread: PID: %s, Process Name: %s, Thread Name: %s" % ( | |
995 | + self.print(">>>>> Thread: starting execution of command", cmd.name) | |
996 | + self.print(">>>>> Thread: PID: %s, Process Name: %s, Thread Name: %s" % ( | |
970 | 997 | os.getpid(), |
971 | 998 | multiprocessing.current_process().name, |
972 | 999 | threading.current_thread().name) |
... | ... | @@ -1004,7 +1031,7 @@ class Agent: |
1004 | 1031 | def thread_exec_specific_cmd_main(self): |
1005 | 1032 | """ |
1006 | 1033 | cmd = self._current_specific_cmd |
1007 | - print("Doing nothing, just sleeping...") | |
1034 | + self.print("Doing nothing, just sleeping...") | |
1008 | 1035 | self.sleep(3) |
1009 | 1036 | """ |
1010 | 1037 | |
... | ... | @@ -1047,7 +1074,7 @@ class Agent: |
1047 | 1074 | with transaction.atomic(): |
1048 | 1075 | cmd.set_as_processed() |
1049 | 1076 | """ |
1050 | - print(">>>>> Thread: ended execution of command", cmd.name) | |
1077 | + self.print(">>>>> Thread: ended execution of command", cmd.name) | |
1051 | 1078 | cmd = None |
1052 | 1079 | # No more current thread |
1053 | 1080 | #self._current_specific_thread = None |
... | ... | @@ -1058,9 +1085,9 @@ class Agent: |
1058 | 1085 | # Exit if I was asked to stop |
1059 | 1086 | cmd = self._current_specific_cmd |
1060 | 1087 | if self.RUN_IN_THREAD and threading.current_thread().stopped(): |
1061 | - print(f">>>>> Thread (cmd {cmd.name}): I received the stop signal, so I stop (in error)") | |
1088 | + self.print(f">>>>> Thread (cmd {cmd.name}): I received the stop signal, so I stop (in error)") | |
1062 | 1089 | exit(1) |
1063 | - print(f">>>>> Thread (cmd {cmd.name}): step #{step}/{self._thread_total_steps_number}") | |
1090 | + self.print(f">>>>> Thread (cmd {cmd.name}): step #{step}/{self._thread_total_steps_number}") | |
1064 | 1091 | # call a specific function to be defined by subclass |
1065 | 1092 | cmd_step_function(step) |
1066 | 1093 | # Wait for a specific time (interruptible) |
... | ... | @@ -1069,7 +1096,7 @@ class Agent: |
1069 | 1096 | def thread_stop_if_asked(self): |
1070 | 1097 | assert self._current_specific_thread is not None |
1071 | 1098 | if self.RUN_IN_THREAD and threading.current_thread().stopped(): |
1072 | - print("(Thread) I received the stop signal, so I stop (in error)") | |
1099 | + self.print("(Thread) I received the stop signal, so I stop (in error)") | |
1073 | 1100 | exit(1) |
1074 | 1101 | |
1075 | 1102 | def thread_set_total_steps_number(self, nbsteps): |
... | ... | @@ -1103,6 +1130,11 @@ class Agent: |
1103 | 1130 | """ |
1104 | 1131 | |
1105 | 1132 | |
1133 | +""" | |
1134 | +================================================================= | |
1135 | + MAIN FUNCTION | |
1136 | +================================================================= | |
1137 | +""" | |
1106 | 1138 | if __name__ == "__main__": |
1107 | 1139 | |
1108 | 1140 | configfile = None | ... | ... |
src/agent/AgentA.py
... | ... | @@ -15,27 +15,36 @@ from Agent import Agent |
15 | 15 | class AgentA(Agent): |
16 | 16 | |
17 | 17 | #MAX_DURATION_SEC = None |
18 | - MAX_DURATION_SEC = 85 | |
18 | + MAX_DURATION_SEC = 90 | |
19 | 19 | |
20 | 20 | # FOR TEST ONLY |
21 | 21 | # Run this agent in simulator mode |
22 | 22 | SIMULATOR_MODE = True |
23 | 23 | # Run the assertion tests at the end |
24 | - SIMULATOR_WITH_TEST = False | |
24 | + SIMULATOR_WITH_TEST = True | |
25 | 25 | # Who should I send commands to ? |
26 | 26 | #SIMULATOR_COMMANDS_DEST = "myself" |
27 | 27 | SIMULATOR_COMMANDS_DEST = "AgentB" |
28 | 28 | # Scenario to be executed |
29 | - SIMULATOR_COMMANDS = [ | |
29 | + SIMULATOR_COMMANDS_LIST = [ | |
30 | + # Ask receiver to delete all its previous commands | |
31 | + "flush_commands", | |
32 | + | |
30 | 33 | "go_active", |
31 | 34 | |
32 | 35 | "go_idle", |
33 | 36 | # Not executed because receiver agent is now "idle" |
34 | 37 | #"specific0", |
35 | 38 | |
36 | - # Executed because receiver agent is now "active" | |
37 | 39 | "go_active", |
38 | - #"specific1", | |
40 | + | |
41 | + # Executed because receiver agent is now "active" | |
42 | + "specific1", | |
43 | + | |
44 | + # should abort previous command, but does not because specific1 is already executed | |
45 | + "abort", | |
46 | + | |
47 | + # fully executed, result is 7 | |
39 | 48 | "eval 4+3", |
40 | 49 | |
41 | 50 | "go_idle", |
... | ... | @@ -161,14 +170,68 @@ class AgentA(Agent): |
161 | 170 | super().exec_specific_cmd_end(cmd, from_thread) |
162 | 171 | ''' |
163 | 172 | |
173 | + # @override | |
174 | + def simulator_test_results(self): | |
175 | + commands = super().simulator_test_results() | |
176 | + #self.print(commands) | |
177 | + """ | |
178 | + "go_active", | |
164 | 179 | |
180 | + "go_idle", | |
181 | + # Not executed because receiver agent is now "idle" | |
182 | + #"specific0", | |
183 | + | |
184 | + # Executed because receiver agent is now "active" | |
185 | + "go_active", | |
186 | + #"specific1", | |
187 | + "eval 4+3", | |
165 | 188 | |
189 | + "go_idle", | |
190 | + "exit", | |
191 | + """ | |
192 | + nb_asserted = 0 | |
193 | + for cmd in commands: | |
194 | + if cmd.name == "flush_commands": | |
195 | + assert cmd.is_executed() | |
196 | + nb_asserted+=1 | |
197 | + # 2 times | |
198 | + if cmd.name == "go_active": | |
199 | + assert cmd.is_executed() | |
200 | + nb_asserted+=1 | |
201 | + # 2 times | |
202 | + if cmd.name == "go_idle": | |
203 | + assert cmd.is_executed() | |
204 | + nb_asserted+=1 | |
205 | + if cmd.name == "specific1": | |
206 | + assert cmd.is_executed() | |
207 | + assert cmd.result == "in step #5/5" | |
208 | + nb_asserted+=1 | |
209 | + if cmd.name == "eval 4+3": | |
210 | + assert cmd.is_executed() | |
211 | + assert cmd.get_result() == "7" | |
212 | + nb_asserted+=1 | |
213 | + if cmd.name in ("abort"): | |
214 | + assert cmd.is_executed() | |
215 | + nb_asserted+=1 | |
216 | + if cmd.name in ("exit"): | |
217 | + assert cmd.is_executed() | |
218 | + nb_asserted+=1 | |
219 | + #assert nb_asserted == 6 | |
220 | + assert nb_asserted == len(self.SIMULATOR_COMMANDS_LIST) | |
221 | + self.print("************** Finished testing => result is ok **************") | |
222 | + | |
223 | + | |
224 | +""" | |
225 | +================================================================= | |
226 | + MAIN FUNCTION | |
227 | +================================================================= | |
228 | +""" | |
166 | 229 | if __name__ == "__main__": |
167 | 230 | |
168 | 231 | # with thread |
169 | 232 | RUN_IN_THREAD=True |
170 | 233 | # with process |
171 | - RUN_IN_THREAD=False | |
234 | + #RUN_IN_THREAD=False | |
172 | 235 | |
173 | 236 | configfile = None |
174 | 237 | ... | ... |
src/agent/AgentB.py
... | ... | @@ -14,16 +14,19 @@ from Agent import Agent |
14 | 14 | |
15 | 15 | class AgentB(Agent): |
16 | 16 | |
17 | + #MAX_DURATION_SEC = None | |
18 | + MAX_DURATION_SEC = 120 | |
19 | + | |
17 | 20 | # FOR TEST ONLY |
18 | 21 | # Run this agent in simulator mode |
19 | 22 | SIMULATOR_MODE = True |
20 | 23 | # Run the assertion tests at the end |
21 | - SIMULATOR_WITH_TEST = False | |
24 | + SIMULATOR_WITH_TEST = True | |
22 | 25 | # Who should I send commands to ? |
23 | 26 | #SIMULATOR_COMMANDS_DEST = "myself" |
24 | 27 | SIMULATOR_COMMANDS_DEST = "AgentA" |
25 | 28 | # Scenario to be executed |
26 | - SIMULATOR_COMMANDS = [ | |
29 | + SIMULATOR_COMMANDS_LIST = [ | |
27 | 30 | "go_active", |
28 | 31 | "go_idle", |
29 | 32 | ] |
... | ... | @@ -150,15 +153,30 @@ class AgentB(Agent): |
150 | 153 | self.thread_exec_specific_cmd_step(5, self.cmd_step1, 3) |
151 | 154 | ### |
152 | 155 | """ |
153 | - | |
156 | + | |
154 | 157 | ''' |
155 | 158 | # @override |
156 | 159 | def exec_specific_cmd_end(self, cmd:Command, from_thread=True): |
157 | 160 | super().exec_specific_cmd_end(cmd, from_thread) |
158 | 161 | ''' |
159 | 162 | |
160 | - | |
161 | - | |
163 | + # @override | |
164 | + def simulator_test_results(self): | |
165 | + commands = super().simulator_test_results() | |
166 | + nb_asserted = 0 | |
167 | + for cmd in commands: | |
168 | + assert cmd.is_executed() | |
169 | + nb_asserted+=1 | |
170 | + #assert nb_asserted == 2 | |
171 | + assert nb_asserted == len(self.SIMULATOR_COMMANDS_LIST) | |
172 | + self.print("************** Finished testing => result is ok **************") | |
173 | + | |
174 | + | |
175 | +""" | |
176 | +================================================================= | |
177 | + MAIN FUNCTION | |
178 | +================================================================= | |
179 | +""" | |
162 | 180 | if __name__ == "__main__": |
163 | 181 | |
164 | 182 | # with thread | ... | ... |
src/agent/AgentX.py
src/common/models.py
... | ... | @@ -244,7 +244,7 @@ class Command(models.Model): |
244 | 244 | "CMD_KILLED", # cde ignorée (je suis idle… et j’ai ignoré cette commande, et je passe à la cde suivante) |
245 | 245 | "CMD_OUTOFDATE" # cde périmée |
246 | 246 | ) |
247 | - GENERIC_COMMANDS = ["go_idle", "go_active", "abort", "exit"] | |
247 | + GENERIC_COMMANDS = ["go_idle", "go_active", "flush_commands", "abort", "exit"] | |
248 | 248 | #COMMANDS_PEREMPTION_HOURS = 48 |
249 | 249 | COMMANDS_PEREMPTION_HOURS = 60/60 |
250 | 250 | COMMANDS_VALIDITY_DURATION_SEC_DEFAULT = 30 |
... | ... | @@ -254,8 +254,8 @@ class Command(models.Model): |
254 | 254 | |
255 | 255 | #sender = models.CharField(max_length=50, blank=True, null=True, unique=True) |
256 | 256 | sender = models.CharField(max_length=50, help_text='sender agent name') |
257 | - receiver = models.CharField(max_length=50) | |
258 | - name = models.CharField(max_length=400) | |
257 | + receiver = models.CharField(max_length=50, help_text='receiver agent name') | |
258 | + name = models.CharField(max_length=400, help_text='command name') | |
259 | 259 | validity_duration_sec = models.PositiveIntegerField(default=COMMANDS_VALIDITY_DURATION_SEC_DEFAULT) |
260 | 260 | # Automatically set at table line creation (line created by the sender) |
261 | 261 | sender_deposit_time = models.DateTimeField(blank=True, null=True, auto_now_add=True) |
... | ... | @@ -289,7 +289,7 @@ class Command(models.Model): |
289 | 289 | return datetime.utcnow().astimezone() - timedelta(hours = cls.COMMANDS_PEREMPTION_HOURS) |
290 | 290 | |
291 | 291 | @classmethod |
292 | - def delete_commands_with_running_status_if_exists_for_agent(cls, agent_name): | |
292 | + def delete_commands_with_running_status_for_agent(cls, agent_name): | |
293 | 293 | running_commands = cls.objects.filter( |
294 | 294 | # only commands for agent agent_name |
295 | 295 | receiver = agent_name, |
... | ... | @@ -302,6 +302,23 @@ class Command(models.Model): |
302 | 302 | print("Delete (false) 'running' command:") |
303 | 303 | Command.show_commands(running_commands) |
304 | 304 | running_commands.delete() |
305 | + else: print("<None>") | |
306 | + | |
307 | + @classmethod | |
308 | + def delete_pending_commands_for_agent(cls, agent_name): | |
309 | + pending_commands = cls.objects.filter( | |
310 | + # only commands for agent agent_name | |
311 | + receiver = agent_name, | |
312 | + # only running commands | |
313 | + receiver_status_code = cls.CMD_STATUS_CODES.CMD_PENDING, | |
314 | + # only not expired commands | |
315 | + #sender_deposit_time__gte = cls.get_peremption_date_from_now(), | |
316 | + ) | |
317 | + if pending_commands: | |
318 | + print("Delete these pending command(s):") | |
319 | + Command.show_commands(pending_commands) | |
320 | + pending_commands.delete() | |
321 | + else: print("<None>") | |
305 | 322 | |
306 | 323 | @classmethod |
307 | 324 | def get_pending_commands_for_agent(cls, agent_name): |
... | ... | @@ -317,15 +334,27 @@ class Command(models.Model): |
317 | 334 | ).order_by("sender_deposit_time") |
318 | 335 | |
319 | 336 | @classmethod |
320 | - def get_commands_for_agent(cls, agent_name): | |
321 | - return cls.objects.filter(receiver = agent_name) | |
337 | + def get_commands_sent_to_agent(cls, agent_name): | |
338 | + return cls.objects.filter(receiver=agent_name) | |
339 | + | |
340 | + @classmethod | |
341 | + def get_commands_sent_by_agent(cls, agent_name): | |
342 | + return cls.objects.filter(sender=agent_name) | |
343 | + | |
344 | + @classmethod | |
345 | + def get_last_N_commands_sent_to_agent(cls, agent_name, N): | |
346 | + #filter(since=since) | |
347 | + #return cls.objects.all()[:nb_cmds] | |
348 | + #commands = cls.objects.filter(receiver = agent_name).order_by('-id')[:N] | |
349 | + commands = cls.get_commands_sent_to_agent(agent_name).order_by('-id')[:N] | |
350 | + return list(reversed(commands)) | |
322 | 351 | |
323 | 352 | @classmethod |
324 | - def get_last_N_commands_for_agent(cls, agent_name, N): | |
353 | + def get_last_N_commands_sent_by_agent(cls, agent_name, N): | |
325 | 354 | #filter(since=since) |
326 | 355 | #return cls.objects.all()[:nb_cmds] |
327 | 356 | #commands = cls.objects.filter(receiver = agent_name).order_by('-id')[:N] |
328 | - commands = cls.get_commands_for_agent(agent_name).order_by('-id')[:N] | |
357 | + commands = cls.get_commands_sent_by_agent(agent_name).order_by('-id')[:N] | |
329 | 358 | return list(reversed(commands)) |
330 | 359 | |
331 | 360 | @classmethod |
... | ... | @@ -336,7 +365,7 @@ class Command(models.Model): |
336 | 365 | NB: datetime.utcnow() is equivalent to datetime.now(timezone.utc) |
337 | 366 | """ |
338 | 367 | |
339 | - print(f"Looking for old commands to purge... (commands that are not executing and older than {cls.COMMANDS_PEREMPTION_HOURS} hour(s))") | |
368 | + print(f"(Looking for commands that are not executing and older than {cls.COMMANDS_PEREMPTION_HOURS} hour(s))") | |
340 | 369 | """ |
341 | 370 | COMMAND_PEREMPTION_DATE_FROM_NOW = datetime.utcnow() - timedelta(hours = self.COMMANDS_PEREMPTION_HOURS) |
342 | 371 | #print("peremption date", COMMAND_PEREMPTION_DATE_FROM_NOW) | ... | ... |