Commit 454ecf1f16e0cdce13091442769dfce9a41ddca0

Authored by Etienne Pallier
1 parent db13d8ac
Exists in dev

bugfix flush_command & routine_process... + refactor & cleanup

README.md
... ... @@ -89,10 +89,12 @@ Comment:
89 89 (AgentB): Finished testing => result is ok
90 90  
91 91 - Autres remarques:
  92 + - mode DEBUG (2 niveaux pour print)
92 93 - Nouvelle commande "flush_commands" pour purger les commmandes en attente
93 94 - routine_process() implemented
94 95 - Eval command implemented
95 96 - Timeout géré : si commande pas exécutée en temps raisonnable => la même commande est ré-exéuctée à l'itération suivante
  97 + - self.print() pour les agents
96 98 - Chaque agent a son propre scenario de commandes à envoyer
97 99 - GROSSE OPTIMISATION : plus besoin du script intermédiaire "start_agent.py" !!!
98 100 ==> pyros.py lance directement "cd src/agent/ ; python AgentX.py"
... ...
pyros.py
... ... @@ -410,7 +410,6 @@ def start(agent:str, configfile:str):
410 410 print("Synthesis of the results:")
411 411 for process in current_processes:
412 412 p,agent,returncode = process
413   - print(process)
414 413 if returncode == 0:
415 414 printFullTerm(Colors.GREEN, f"Process {agent} executed successfully")
416 415 # self.addExecuted(self.current_command, command)
... ...
src/agent/Agent.py
... ... @@ -2,6 +2,8 @@
2 2  
3 3 VERSION = "0.5"
4 4  
  5 +DEBUG=True
  6 +DEBUG=False
5 7  
6 8 """TODO:
7 9  
... ... @@ -44,7 +46,7 @@ sys.path.append("../..")
44 46 print("Starting with this sys.path", sys.path)
45 47  
46 48 # DJANGO setup
47   -# self.print("file is", __file__)
  49 +# self.printd("file is", __file__)
48 50 # mypath = os.getcwd()
49 51 # Go into src/
50 52 ##os.chdir("..")
... ... @@ -70,11 +72,12 @@ print()
70 72  
71 73 # --- GENERAL PURPOSE IMPORT ---
72 74 #from __future__ import absolute_import
73   -import time
74   -import threading, multiprocessing
  75 +import platform
  76 +import random
75 77 from threading import Thread
  78 +import threading, multiprocessing
  79 +import time
76 80 import utils.Logger as L
77   -import random
78 81 #import ctypes
79 82 #import copy
80 83  
... ... @@ -87,8 +90,8 @@ from django import db
87 90 #from django.conf import settings as djangosettings
88 91  
89 92 # --- SPECIFIC IMPORT ---
90   -from config.configpyros import ConfigPyros
91 93 from common.models import AgentSurvey, Command
  94 +from config.configpyros import ConfigPyros
92 95 #from dashboard.views import get_sunelev
93 96 #from devices.TelescopeRemoteControlDefault import TelescopeRemoteControlDefault
94 97 #from utils.JDManipulator import *
... ... @@ -96,15 +99,50 @@ from common.models import AgentSurvey, Command
96 99  
97 100 """
98 101 =================================================================
99   - GENERAL MODULE CONSTANT DEFINITIONS
  102 + GENERAL MODULE CONSTANT & FUNCTIONS DEFINITIONS
100 103 =================================================================
101 104 """
102 105  
103   -DEBUG_FILE = False
  106 +#DEBUG_FILE = False
104 107  
105 108 log = L.setupLogger("AgentLogger", "Agent")
106 109  
  110 +IS_WINDOWS = platform.system() == "Windows"
107 111  
  112 +class Colors:
  113 + HEADER = "\033[95m"
  114 + BLUE = "\033[94m"
  115 + GREEN = "\033[92m"
  116 + WARNING = "\033[93m"
  117 + FAIL = "\033[91m"
  118 + ENDC = "\033[0m"
  119 + BOLD = "\033[1m"
  120 + UNDERLINE = "\033[4m"
  121 +
  122 +def printColor(color: Colors, message, file=sys.stdout, eol=os.linesep, forced=False):
  123 + #system = platform.system()
  124 + """
  125 + if (self.disp == False and forced == False):
  126 + return 0
  127 + """
  128 + #if system == "Windows":
  129 + if IS_WINDOWS:
  130 + print(message, file=file, end=eol)
  131 + else:
  132 + print(color + message + Colors.ENDC, file=file, end=eol)
  133 + return 0
  134 +
  135 +def printFullTerm(color: Colors, string: str):
  136 + #system = platform.system()
  137 + columns = 100
  138 + row = 1000
  139 + disp = True
  140 + value = int(columns / 2 - len(string) / 2)
  141 + printColor(color, "-" * value, eol="")
  142 + printColor(color, string, eol="")
  143 + value += len(string)
  144 + printColor(color, "-" * (columns - value))
  145 + return 0
108 146  
109 147  
110 148 """
... ... @@ -300,18 +338,18 @@ class Agent:
300 338 if not config_filename:
301 339 config_filename = self.DEFAULT_CONFIG_FILE_NAME
302 340  
303   - self.print(f"config_filename={config_filename}")
  341 + self.printd(f"config_filename={config_filename}")
304 342 # If config file name is RELATIVE (i.e. without path, just the file name)
305 343 # => give it an absolute path (and remove "src/agent/" from it)
306 344 if config_filename == os.path.basename(config_filename):
307 345 tmp = os.path.abspath(self.CONFIG_DIR + os.sep + config_filename)
308 346 config_filename = os.path.abspath(self.CONFIG_DIR + os.sep + config_filename).replace(os.sep+"src"+os.sep,os.sep).replace(os.sep+"agent"+os.sep,os.sep)
309   - self.print("Config file used is", config_filename)
310   - #self.print("current path", os.getcwd())
311   - #self.print("this file path :", __file__)
312   - #self.print("config file path is", config_filename)
  347 + self.printd("Config file used is", config_filename)
  348 + #self.printd("current path", os.getcwd())
  349 + #self.printd("this file path :", __file__)
  350 + #self.printd("config file path is", config_filename)
313 351 # Instantiate an object for configuration
314   - #self.print("config file path is ", config_abs_filename)
  352 + #self.printd("config file path is ", config_abs_filename)
315 353 self.config = ConfigPyros(config_filename)
316 354 if self.config.get_last_errno() != self.config.NO_ERROR:
317 355 raise Exception(f"Bad config file name '{config_filename}', error {str(self.config.get_last_errno())}: {str(self.config.get_last_errmsg())}")
... ... @@ -328,7 +366,7 @@ class Agent:
328 366 #if nb_agents == 0:
329 367 if not AgentSurvey.objects.filter(name=self.name).exists():
330 368 self._agent_survey = AgentSurvey.objects.create(name=self.name, validity_duration_sec=60, mode=self.mode, status=self.status)
331   - self.print("Agent survey is", self._agent_survey)
  369 + self.printd("Agent survey is", self._agent_survey)
332 370 #self._agent_survey = AgentSurvey(name=self.name, validity_duration_sec=60, mode=self.mode, status=self.status)
333 371 #self._agent_survey.save()
334 372  
... ... @@ -336,14 +374,17 @@ class Agent:
336 374 def __str__(self):
337 375 return "I am agent " + self.name
338 376  
339   - #def print(self, msg):
  377 + # Normal print
340 378 def print(self, *args, **kwargs):
341   - #print(f"({self.name}): ", msg)
342 379 if args:
343 380 print(f"({self.name}): ", *args, **kwargs)
344 381 else:
345 382 print()
346 383  
  384 + # DEBUG print
  385 + def printd(self, *args, **kwargs):
  386 + if DEBUG: self.print(*args, **kwargs)
  387 +
347 388 def sleep(self, nbsec:float=2.0):
348 389 # thread
349 390 if self._current_specific_thread and self.RUN_IN_THREAD:
... ... @@ -369,11 +410,18 @@ class Agent:
369 410 # (old commands that stayed with "running" status when agent was killed)
370 411 Command.delete_commands_with_running_status_for_agent(self.name)
371 412  
  413 + """
  414 + # SIMULATOR MODE ONLY : flush previous commands to be sure to restart clean
  415 + if self.SIMULATOR_MODE:
  416 + self.print("flush previous commands to be sure to start in clean state")
  417 + Command.delete_pending_commands_for_agent(self.name)
  418 + """
  419 +
372 420 '''
373   - self.print()
374   - self.print(self)
375   - self.print("FOR REAL ?", self.FOR_REAL)
376   - self.print("DB3 used is:", djangosettings.DATABASES["default"]["NAME"])
  421 + self.printd()
  422 + self.printd(self)
  423 + self.printd("FOR REAL ?", self.FOR_REAL)
  424 + self.printd("DB3 used is:", djangosettings.DATABASES["default"]["NAME"])
377 425  
378 426 # SETUP
379 427 try:
... ... @@ -385,7 +433,7 @@ class Agent:
385 433 # self.config = Config.objects.get()[0]
386 434 except Exception as e:
387 435 # except Config.ObjectDoesNotExist:
388   - self.print("Config read (or write) exception", str(e))
  436 + self.printd("Config read (or write) exception", str(e))
389 437 return -1
390 438 '''
391 439  
... ... @@ -393,7 +441,7 @@ class Agent:
393 441 while True:
394 442  
395 443 # Wait a random number of sec before starting new iteration
396   - # (to let another agent having the chance to send a command)
  444 + # (to let another agent having the chance to send a command before me)
397 445 random_waiting_sec = random.randint(0,5)
398 446 self.print(f"Waiting {random_waiting_sec} sec (random) before starting new iteration...")
399 447 time.sleep(random_waiting_sec)
... ... @@ -402,7 +450,7 @@ class Agent:
402 450  
403 451 self.print()
404 452 self.print()
405   - #self.print("-"*80)
  453 + #self.printd("-"*80)
406 454 self.print("-"*20, f"MAIN LOOP ITERATION {self._iter_num} (START)", "-"*20)
407 455 self.set_status(self.STATUS_MAIN_LOOP)
408 456 self.show_mode_and_status()
... ... @@ -413,35 +461,34 @@ class Agent:
413 461  
414 462 #if self.SIMULATOR_MODE: self.simulator_send_next_command()
415 463  
416   - # generic cmd in json format
417   - self.print("------START COMMMAND PROCESSING------")
418   - cmd = self.get_next_valid_command()
419   - #if cmd: cmd = self.general_process(cmd)
420   - if cmd: cmd = self.command_process(cmd)
421   - '''
422   - # Sub-level loop (only if ACTIVE)
423   - if self.is_active():
424   - if cmd: self.specific_process(cmd)
425   - '''
426   - self.routine_process()
427   - self.print("------END COMMMAND PROCESSING------")
  464 + self.printd("------START COMMMAND PROCESSING------")
428 465  
429   - # Every N iterations, delete old commands
  466 + # Purge commandes (every N iterations, delete old commands)
430 467 N=3
431 468 if ((self._iter_num-1) % N) == 0:
432 469 self.print("Looking for old commands to purge...")
433 470 Command.purge_old_commands_for_agent(self.name)
434 471  
  472 + # Get next command to execute
  473 + cmd = self.get_next_valid_command()
  474 + #if cmd: cmd = self.general_process(cmd)
  475 +
  476 + # Process this (next) command (if exists)
  477 + if cmd: cmd = self.command_process(cmd)
  478 +
  479 + # ROUTINE process
  480 + self.routine_process()
  481 +
  482 + self.printd("------END COMMMAND PROCESSING------")
  483 +
435 484 #self.waitfor(self.mainloop_waittime)
436 485  
437 486 self.print("-"*20, "MAIN LOOP ITERATION (END)", "-"*20)
438   - #self.print("-"*80)
439   -
440 487 #self.do_log(LOG_DEBUG, "Ending main loop iteration")
441 488  
442 489 self._iter_num += 1
443 490  
444   - # Exit if max duration is timed out
  491 + # Exit if max duration is reached
445 492 if self.MAX_DURATION_SEC and (time.time()-start_time > self.MAX_DURATION_SEC):
446 493 self.print("Exit because of max duration set to ", self.MAX_DURATION_SEC, "s")
447 494 self.kill_running_specific_cmd_if_exists()
... ... @@ -466,7 +513,7 @@ class Agent:
466 513 """
467 514 self.set_status(self.STATUS_ROUTINE_PROCESS)
468 515 if not self.is_active():
469   - self.print("I am IDLE, so I bypass the routine_process (do not send any new command)")
  516 + self.printd("I am IDLE, so I bypass the routine_process (do not send any new command)")
470 517 return
471 518  
472 519 if self.cmdts is None:
... ... @@ -476,14 +523,14 @@ class Agent:
476 523 # For this, it is enough to set primary key to None,
477 524 # then the send() command below will save a NEW command
478 525 #self.cmdts = copy.copy(self.cmdts)
479   - self.cmdts.set_as_pending()
480 526 self.cmdts.id = None
481 527  
482 528 # No more command to send (from simulator), return
483 529 if self.cmdts is None: return
484 530  
485   - # 1) Send cmd
  531 + # 1) Send cmd (= set as pending and save)
486 532 self.print(f"Send command", self.cmdts)
  533 + #self.cmdts.set_as_pending()
487 534 self.cmdts.send()
488 535 cmdts_is_processed = False
489 536 cmdts_res = None
... ... @@ -491,7 +538,7 @@ class Agent:
491 538 # 2) Wait for end of cmd execution
492 539 #self.wait_for_execution_of_cmd(self.cmdts)
493 540 while not cmdts_is_processed:
494   - self.print(f"Waiting for end of cmd {self.cmdts.name} execution...")
  541 + self.print(f"Waiting for end of cmd '{self.cmdts.name}' execution...")
495 542 self.cmdts.refresh_from_db()
496 543 # timeout ?
497 544 if self.cmdts.is_expired(): break
... ... @@ -507,7 +554,7 @@ class Agent:
507 554 if cmdts_is_processed:
508 555 self.print(f"Cmd executed. Result is '{cmdts_res}'")
509 556 else:
510   - self.print("Command was not completed")
  557 + self.printd("Command was not completed")
511 558  
512 559  
513 560 """
... ... @@ -519,10 +566,10 @@ class Agent:
519 566 NB: datetime.utcnow() is equivalent to datetime.now(timezone.utc)
520 567 ###
521 568  
522   - self.print("Looking for old commands to purge...")
  569 + self.printd("Looking for old commands to purge...")
523 570 ###
524 571 COMMAND_PEREMPTION_DATE_FROM_NOW = datetime.utcnow() - timedelta(hours = self.COMMANDS_PEREMPTION_HOURS)
525   - #self.print("peremption date", COMMAND_PEREMPTION_DATE_FROM_NOW)
  572 + #self.printd("peremption date", COMMAND_PEREMPTION_DATE_FROM_NOW)
526 573 old_commands = Command.objects.filter(
527 574 # only commands for me
528 575 receiver = self.name,
... ... @@ -532,22 +579,24 @@ class Agent:
532 579 ###
533 580 old_commands = Command.get_old_commands_for_agent(self.name)
534 581 if old_commands.exists():
535   - self.print("Found old commands to delete:")
536   - for cmd in old_commands: self.print(cmd)
  582 + self.printd("Found old commands to delete:")
  583 + for cmd in old_commands: self.printd(cmd)
537 584 old_commands.delete()
538 585 """
539 586  
540 587 def waitfor(self, nbsec):
541   - self.print(f"Now, waiting for {nbsec} seconds...")
  588 + self.printd(f"Now, waiting for {nbsec} seconds...")
542 589 time.sleep(nbsec)
543 590  
544 591 def set_status(self, status:str):
545   - self.print(f"[NEW CURRENT STATUS: {status}] (switching from status {self.status})")
  592 + #self.print(f"[{status}] (switching from status {self.status})")
  593 + self.print(f"[{status}]")
546 594 self.status = status
547 595 return False
548 596  
549 597 def set_mode(self, mode:str):
550   - self.print(f"Switching from mode {self.mode} to mode {mode}")
  598 + #self.print(f"Switching from mode {self.mode} to mode {mode}")
  599 + self.print(f"[NEW MODE {mode}]")
551 600 self.mode = mode
552 601  
553 602 def is_active(self):
... ... @@ -560,7 +609,7 @@ class Agent:
560 609 self.set_mode(self.MODE_IDLE)
561 610  
562 611 def show_mode_and_status(self):
563   - self.print(f"CURRENT MODE is {self.mode} (with status {self.status})")
  612 + self.print(f"CURRENT MODE is {self.mode} (status is {self.status})")
564 613  
565 614 def die(self):
566 615 self.set_status(self.STATUS_EXIT)
... ... @@ -608,7 +657,7 @@ class Agent:
608 657 """
609 658  
610 659 def init(self):
611   - self.print("Initializing...")
  660 + self.printd("Initializing...")
612 661 self.set_status(self.STATUS_INIT)
613 662  
614 663 def load_config(self):
... ... @@ -616,7 +665,7 @@ class Agent:
616 665 TODO:
617 666 only si date fichier xml changée => en RAM, un objet Config avec méthodes d'accès, appelle le parser de AK (classe Config.py indépendante)
618 667 """
619   - self.print("Loading the config file...")
  668 + self.printd("Loading the config file...")
620 669 #config_filename = 'c:/srv/develop/pyros/config/config_unit_simulunit1.xml'
621 670 #config.set_configfile(config_filename)
622 671 self.config.load()
... ... @@ -625,21 +674,21 @@ class Agent:
625 674 # --- display informations
626 675 # --- Get all the assembly of this unit[0] (mount + channels)
627 676 if self.config.is_config_contents_changed():
628   - self.print("--------- Components of the unit -----------")
629   - self.print("Configuration file is {}".format(self.config.get_configfile()))
  677 + self.printd("--------- Components of the unit -----------")
  678 + self.printd("Configuration file is {}".format(self.config.get_configfile()))
630 679 alias = self.config.get_aliases('unit')[0]
631 680 namevalue = self.config.get_paramvalue(alias,'unit','name')
632   - self.print("Unit alias is {}. Name is {}".format(alias,namevalue), ":")
  681 + self.printd("Unit alias is {}. Name is {}".format(alias,namevalue), ":")
633 682 unit_subtags = self.config.get_unit_subtags()
634 683 for unit_subtag in unit_subtags:
635 684 aliases = self.config.get_aliases(unit_subtag)
636 685 for alias in aliases:
637 686 namevalue = self.config.get_paramvalue(alias,unit_subtag,'name')
638   - self.print(f"- {unit_subtag} alias is {alias}. Name is {namevalue}")
639   - self.print("------------------------------------------")
  687 + self.printd(f"- {unit_subtag} alias is {alias}. Name is {namevalue}")
  688 + self.printd("------------------------------------------")
640 689 #params = self.config.get_params(unit_alias)
641 690 #for param in params:
642   - # self.print("Unit component is {}".format(param))
  691 + # self.printd("Unit component is {}".format(param))
643 692  
644 693 """
645 694 # self.config = Config.objects.get(pk=1)
... ... @@ -653,15 +702,15 @@ class Agent:
653 702 except Exception as e:
654 703 # except Config.ObjectDoesNotExist:
655 704 # except Config.DoesNotExist:
656   - self.print("Config read (or write) exception", str(e))
  705 + self.printd("Config read (or write) exception", str(e))
657 706 # return self.config
658 707 # return -1
659 708 return False
660 709 """
661 710  
662 711 def update_survey(self):
663   - self.print("Updating the survey database table...")
664   - #self.print("- fetching table line for agent", self.name)
  712 + self.printd("Updating the survey database table...")
  713 + #self.printd("- fetching table line for agent", self.name)
665 714 # only necessary when using process (not necessary with threads)
666 715 #with transaction.atomic():
667 716 self._agent_survey = AgentSurvey.objects.get(name=self.name)
... ... @@ -682,17 +731,17 @@ class Agent:
682 731 Commands are read in chronological order
683 732 """
684 733 self.set_status(self.STATUS_GET_NEXT_COMMAND)
685   - self.print("Looking for new commands from the database ...")
  734 + #self.printd("Looking for new commands from the database ...")
686 735  
687 736 # 1) Get all pending commands for me (return if None)
688 737 # Not sure this is necessary to do it in a transaction,
689 738 # but there might be a risk
690 739 # that a command status is modified while we are reading...
691 740 with transaction.atomic():
692   - self._pending_commands = Command.get_pending_commands_for_agent(self.name)
  741 + self._pending_commands = Command.get_pending_and_running_commands_for_agent(self.name)
693 742 commands = self._pending_commands
694 743 if not commands.exists():
695   - self.print("No new command to process")
  744 + self.printd("No new command to process")
696 745 return None
697 746 self.print("Current pending commands are (time ordered) :")
698 747 Command.show_commands(commands)
... ... @@ -719,16 +768,16 @@ class Agent:
719 768 """
720 769 cmd = commands[0]
721 770 if cmd.is_running():
722   - #self.print(f"There is currently a running command ({cmd_executing.first().name}), so I do nothing (wait for end of execution)")
723   - self.print(f"There is currently a running command ({cmd.name})")
  771 + #self.printd(f"There is currently a running command ({cmd_executing.first().name}), so I do nothing (wait for end of execution)")
  772 + self.printd(f"There is currently a running command ({cmd.name})")
724 773 """
725 774 # Check that this command is not expired
726 775 if cmd.is_expired():
727   - self.print("But this command is expired, so set its status to OUTOFDATE, and go on")
  776 + self.printd("But this command is expired, so set its status to OUTOFDATE, and go on")
728 777 cmd_executing.set_as_outofdate()
729 778 else:
730 779 """
731   - self.print(f"Thus, I will do nothing until this command execution is finished")
  780 + self.printd(f"Thus, I will do nothing until this command execution is finished")
732 781 # TODO: kill si superieur a MAX_EXEC_TIME
733 782 return None
734 783  
... ... @@ -743,10 +792,10 @@ class Agent:
743 792  
744 793 # 6) Current cmd must now be a valid (not expired) and PENDING one,
745 794 # so pass it to general_process() for execution
746   - #self.print(f"Got command {cmd.name} sent by agent {cmd.sender} at {cmd.sender_deposit_time}")
747   - self.print("***")
748   - self.print("*** Got", cmd)
749   - self.print("***")
  795 + #self.printd(f"Got command {cmd.name} sent by agent {cmd.sender} at {cmd.sender_deposit_time}")
  796 + self.printd("***")
  797 + self.printd("*** Got", cmd)
  798 + self.printd("***")
750 799 return cmd
751 800  
752 801  
... ... @@ -760,32 +809,32 @@ class Agent:
760 809 def general_process(self, cmd:Command)->Command:
761 810  
762 811 self.set_status(self.STATUS_GENERAL_PROCESS)
763   - self.print(f"Starting general processing of {cmd}")
  812 + self.print(f"Starting processing of {cmd}")
764 813  
765 814 # Update read time to say that the command has been READ
766 815 cmd.set_read_time()
767 816 # Precondition: command cmd is valid (not expired), has already been read, is pending
768 817 assert (not cmd.is_expired()) and cmd.is_pending() and cmd.is_read()
769 818  
770   - #self.print(f"Starting general processing of command {cmd.name} sent by agent {cmd.sender} at {cmd.sender_deposit_time}")
  819 + #self.printd(f"Starting general processing of command {cmd.name} sent by agent {cmd.sender} at {cmd.sender_deposit_time}")
771 820  
772 821 """
773 822 # 2) If expired command, change its status to expired and return
774 823 if cmd.is_expired():
775   - self.print("This command is expired, so mark it as such, and ignore it")
  824 + self.printd("This command is expired, so mark it as such, and ignore it")
776 825 cmd.set_as_outofdate()
777 826 return None
778 827 """
779 828  
780 829 # If cmd is generic, execute it, change its status to executed, and return
781 830 if cmd.is_generic():
782   - self.print("This command is generic, execute it...")
  831 + self.printd("This command is generic, execute it...")
783 832 self.exec_generic_cmd(cmd)
784 833 # If cmd is "exit", kill myself (without any question, this is an order soldier !)
785 834 # This "exit" should normally kill any current thread (to be checked...)
786 835 if cmd.name == "exit":
787   - self.print("(before exiting) Here are the current (still) pending commands (time ordered) :")
788   - commands = Command.get_pending_commands_for_agent(self.name)
  836 + self.printd("(before exiting) Here are the current (still) pending commands (time ordered) :")
  837 + commands = Command.get_pending_and_running_commands_for_agent(self.name)
789 838 Command.show_commands(commands)
790 839 #if self.SIMULATOR_MODE and self.SIMULATOR_WITH_TEST and self.SIMULATOR_COMMANDS_DEST == "myself": self.simulator_test_results()
791 840 if self.SIMULATOR_MODE and self.SIMULATOR_WITH_TEST: self.simulator_test_results()
... ... @@ -798,19 +847,19 @@ class Agent:
798 847 # cmd is not generic but, as I am idle, change its status to SKIPPED, ignore it, and return
799 848 #if self.mode == self.MODE_IDLE:
800 849 if not self.is_active():
801   - self.print("This command is not generic but, as I am IDLE, I mark it SKIPPED and ignore it")
  850 + self.printd("This command is not generic but, as I am IDLE, I mark it SKIPPED and ignore it")
802 851 cmd.set_as_skipped()
803 852 return None
804 853  
805 854 # Je suis pas idle et cde pas générique: je la traite pas, elle sera traitée par core_process :
806 855 # attendre que cette commande soit exécutée avant de passer à la commande suivante (situation “bloquante” normale)
807   - self.print("This command is not generic and, as I am not IDLE, I pass it to the specific processing")
808   - self.print("(then I will not execute any other new command until this command is EXECUTED)")
  856 + self.printd("This command is not generic and, as I am not IDLE, I pass it to the specific processing")
  857 + self.printd("(then I will not execute any other new command until this command is EXECUTED)")
809 858 return cmd
810 859  
811 860  
812 861 def exec_generic_cmd(self, cmd:Command):
813   - self.print("Starting execution of a Generic cmd...")
  862 + self.printd("Starting execution of a Generic cmd...")
814 863 cmd.set_as_running()
815 864  
816 865 # Executing command
... ... @@ -823,17 +872,17 @@ class Agent:
823 872 cmd.set_result("I am now idle")
824 873 time.sleep(1)
825 874 elif cmd.name in ("flush_commands"):
826   - self.print("flush_commands received: Delete all pending commands")
  875 + self.printd("flush_commands received: Delete all pending commands")
827 876 Command.delete_pending_commands_for_agent(self.name)
828 877 # If cmd is "abort" or "exit", kill any currently running thread
829 878 elif cmd.name in ("abort", "exit"):
830   - #self.print("Current pending commands are:")
  879 + #self.printd("Current pending commands are:")
831 880 #Command.show_commands(self._pending_commands)
832   - self.print("Aborting current executing command if exists:")
  881 + self.printd("Aborting current executing command if exists:")
833 882 self.kill_running_specific_cmd_if_exists()
834 883  
835 884 cmd.set_as_processed()
836   - self.print("...Generic cmd has been executed")
  885 + self.printd("...Generic cmd has been executed")
837 886  
838 887  
839 888  
... ... @@ -844,7 +893,7 @@ class Agent:
844 893 - in file
845 894 - in db
846 895 """
847   - self.print("Logging data...")
  896 + self.printd("Logging data...")
848 897  
849 898  
850 899  
... ... @@ -862,11 +911,11 @@ class Agent:
862 911 self.set_status(self.STATUS_SPECIFIC_PROCESS)
863 912 assert self.is_active()
864 913 self._current_specific_cmd = cmd
865   - self.print("Starting specific process...")
  914 + self.printd("Starting specific process...")
866 915 #self._current_thread = threading.Thread(target=self.exec_command)
867 916 # Run in a thread
868 917 if self.RUN_IN_THREAD:
869   - self.print("(run cmd in a thread)")
  918 + self.printd("(run cmd in a thread)")
870 919 self._current_specific_thread = StoppableThreadEvenWhenSleeping(target=self.thread_exec_specific_cmd)
871 920 #self._current_specific_thread = StoppableThreadEvenWhenSleeping(target=self.exec_specific_cmd, args=(cmd,))
872 921 #self._current_thread = threading.Thread(target=self.exec_command)
... ... @@ -875,7 +924,7 @@ class Agent:
875 924 #self._current_specific_thread = thread_with_exception('thread test')
876 925 # Run in a process
877 926 else:
878   - self.print("(run cmd in a process)")
  927 + self.printd("(run cmd in a process)")
879 928 # close the database connection first, it will be re-opened in each process
880 929 db.connections.close_all()
881 930 self._current_specific_thread = multiprocessing.Process(target=self.thread_exec_specific_cmd)
... ... @@ -885,14 +934,14 @@ class Agent:
885 934 self._current_specific_thread.start()
886 935 #my_thread.join()
887 936 #self.waitfor(self.subloop_waittime)
888   - self.print("Ending specific process (thread has been launched)")
  937 + self.printd("Ending specific process (thread has been launched)")
889 938  
890 939  
891 940 def kill_running_specific_cmd_if_exists(self):
892 941 if (self._current_specific_thread is None) or not self._current_specific_thread.is_alive():
893   - self.print("...No current specific command thread to abort...")
  942 + self.printd("...No current specific command thread to abort...")
894 943 else:
895   - self.print(f"Killing command {self._current_specific_cmd.name}")
  944 + self.printd(f"Killing command {self._current_specific_cmd.name}")
896 945 # Ask the thread to stop itself
897 946 #self._current_specific_thread.stop()
898 947 #self._current_specific_thread._stop()
... ... @@ -926,7 +975,7 @@ class Agent:
926 975 #self._current_test_cmd = "go_idle" if self._current_test_cmd=="go_active" else "go_active"
927 976 #if self._nb_test_cmds == 4: self._current_test_cmd = "exit"
928 977 cmd_name = next(self.SIMULATOR_COMMANDS, None)
929   - #self.print("next cmd is ", cmd_name)
  978 + #self.printd("next cmd is ", cmd_name)
930 979 if cmd_name is None: return
931 980 #Command.objects.create(sender=self.name, receiver=self.name, name=cmd_name)
932 981 receiver_agent = self.name if self.SIMULATOR_COMMANDS_DEST=="myself" else self.SIMULATOR_COMMANDS_DEST
... ... @@ -937,12 +986,19 @@ class Agent:
937 986 """
938 987  
939 988 def simulator_test_results(self):
940   - self.print("\n--- Testing if the commands I SENT had the awaited result")
941   - self.print("Here are the last commands I sent:")
  989 + commands = self.simulator_test_results_start()
  990 + nb_asserted = self.simulator_test_results_main(commands)
  991 + self.simulator_test_results_end(nb_asserted)
  992 +
  993 + def simulator_test_results_start(self):
  994 + self.printd("\n--- Testing if the commands I SENT had the awaited result")
  995 + self.printd("Here are the last commands I sent:")
942 996 #commands = list(Command.get_last_N_commands_for_agent(self.name, 16))
943 997 #commands = Command.get_last_N_commands_sent_to_agent(self.name, 16)
944 998 commands = Command.get_last_N_commands_sent_by_agent(self.name, len(self.SIMULATOR_COMMANDS_LIST))
945 999 Command.show_commands(commands)
  1000 + assert commands[0].name == self.SIMULATOR_COMMANDS_LIST[0]
  1001 + assert commands[-1].name == self.SIMULATOR_COMMANDS_LIST[-1]
946 1002 return commands
947 1003 """ OLD SCENARIO
948 1004 nb_asserted = 0
... ... @@ -968,9 +1024,24 @@ class Agent:
968 1024 assert cmd.is_executed()
969 1025 nb_asserted+=1
970 1026 assert nb_asserted == 12
971   - self.print("--- Finished testing => result is ok")
  1027 + self.printd("--- Finished testing => result is ok")
972 1028 """
973 1029  
  1030 + # To be overriden by subclass
  1031 + def simulator_test_results_main(self, commands):
  1032 + nb_asserted = 0
  1033 + for cmd in commands:
  1034 + assert cmd.is_executed()
  1035 + nb_asserted+=1
  1036 + return nb_asserted
  1037 +
  1038 + def simulator_test_results_end(self, nb_asserted):
  1039 + nb_commands_to_send = len(self.SIMULATOR_COMMANDS_LIST)
  1040 + assert nb_asserted == nb_commands_to_send
  1041 + #self.print(f"************** Finished testing => result is ok ({nb_asserted} assertions) **************")
  1042 + printFullTerm(Colors.GREEN, f"************** Finished testing => result is ok ({nb_asserted} assertions) **************")
  1043 +
  1044 +
974 1045  
975 1046  
976 1047 """
... ... @@ -997,8 +1068,8 @@ class Agent:
997 1068 cmd = self._current_specific_cmd
998 1069 """ specific command execution setting up """
999 1070 #cmd = self.get_current_specific_cmd()
1000   - self.print(">>>>> Thread: starting execution of command", cmd.name)
1001   - self.print(">>>>> Thread: PID: %s, Process Name: %s, Thread Name: %s" % (
  1071 + self.printd(">>>>> Thread: starting execution of command", cmd.name)
  1072 + self.printd(">>>>> Thread: PID: %s, Process Name: %s, Thread Name: %s" % (
1002 1073 os.getpid(),
1003 1074 multiprocessing.current_process().name,
1004 1075 threading.current_thread().name)
... ... @@ -1036,7 +1107,7 @@ class Agent:
1036 1107 def thread_exec_specific_cmd_main(self):
1037 1108 """
1038 1109 cmd = self._current_specific_cmd
1039   - self.print("Doing nothing, just sleeping...")
  1110 + self.printd("Doing nothing, just sleeping...")
1040 1111 self.sleep(3)
1041 1112 """
1042 1113  
... ... @@ -1079,7 +1150,7 @@ class Agent:
1079 1150 with transaction.atomic():
1080 1151 cmd.set_as_processed()
1081 1152 """
1082   - self.print(">>>>> Thread: ended execution of command", cmd.name)
  1153 + self.printd(">>>>> Thread: ended execution of command", cmd.name)
1083 1154 cmd = None
1084 1155 # No more current thread
1085 1156 #self._current_specific_thread = None
... ... @@ -1090,9 +1161,9 @@ class Agent:
1090 1161 # Exit if I was asked to stop
1091 1162 cmd = self._current_specific_cmd
1092 1163 if self.RUN_IN_THREAD and threading.current_thread().stopped():
1093   - self.print(f">>>>> Thread (cmd {cmd.name}): I received the stop signal, so I stop (in error)")
  1164 + self.printd(f">>>>> Thread (cmd {cmd.name}): I received the stop signal, so I stop (in error)")
1094 1165 exit(1)
1095   - self.print(f">>>>> Thread (cmd {cmd.name}): step #{step}/{self._thread_total_steps_number}")
  1166 + self.printd(f">>>>> Thread (cmd {cmd.name}): step #{step}/{self._thread_total_steps_number}")
1096 1167 # call a specific function to be defined by subclass
1097 1168 cmd_step_function(step)
1098 1169 # Wait for a specific time (interruptible)
... ... @@ -1101,7 +1172,7 @@ class Agent:
1101 1172 def thread_stop_if_asked(self):
1102 1173 assert self._current_specific_thread is not None
1103 1174 if self.RUN_IN_THREAD and threading.current_thread().stopped():
1104   - self.print("(Thread) I received the stop signal, so I stop (in error)")
  1175 + self.printd("(Thread) I received the stop signal, so I stop (in error)")
1105 1176 exit(1)
1106 1177  
1107 1178 def thread_set_total_steps_number(self, nbsteps):
... ...
src/agent/AgentA.py
... ... @@ -15,7 +15,7 @@ from Agent import Agent
15 15 class AgentA(Agent):
16 16  
17 17 #MAX_DURATION_SEC = None
18   - MAX_DURATION_SEC = 90
  18 + MAX_DURATION_SEC = 120
19 19  
20 20 # FOR TEST ONLY
21 21 # Run this agent in simulator mode
... ... @@ -32,11 +32,12 @@ class AgentA(Agent):
32 32  
33 33 "go_active",
34 34  
35   - # Because of this command, the receiver agent
36   - # will no more send any new command
  35 + # Because of this command, the receiver agent :
  36 + # - will no more send any new command
  37 + # - will only execute "generic" commands (and not the "specific" ones)
37 38 "go_idle",
38 39  
39   - # Not executed because receiver agent is now "idle"
  40 + # Not executed (skipped) because receiver agent is now "idle"
40 41 #"specific0",
41 42  
42 43 # Because of this command, the receiver agent
... ... @@ -176,24 +177,7 @@ class AgentA(Agent):
176 177 '''
177 178  
178 179 # @override
179   - def simulator_test_results(self):
180   - commands = super().simulator_test_results()
181   - #self.print(commands)
182   - """
183   - "go_active",
184   -
185   - "go_idle",
186   - # Not executed because receiver agent is now "idle"
187   - #"specific0",
188   -
189   - # Executed because receiver agent is now "active"
190   - "go_active",
191   - #"specific1",
192   - "eval 4+3",
193   -
194   - "go_idle",
195   - "exit",
196   - """
  180 + def simulator_test_results_main(self, commands):
197 181 nb_asserted = 0
198 182 for cmd in commands:
199 183 if cmd.name == "flush_commands":
... ... @@ -207,6 +191,12 @@ class AgentA(Agent):
207 191 if cmd.name == "go_idle":
208 192 assert cmd.is_executed()
209 193 nb_asserted+=1
  194 + """
  195 + if cmd.name == "specific0":
  196 + assert cmd.is_skipped()
  197 + assert cmd.result == "in step #5/5"
  198 + nb_asserted+=1
  199 + """
210 200 if cmd.name == "specific1":
211 201 assert cmd.is_executed()
212 202 assert cmd.result == "in step #5/5"
... ... @@ -221,9 +211,7 @@ class AgentA(Agent):
221 211 if cmd.name in ("exit"):
222 212 assert cmd.is_executed()
223 213 nb_asserted+=1
224   - #assert nb_asserted == 6
225   - assert nb_asserted == len(self.SIMULATOR_COMMANDS_LIST)
226   - self.print("************** Finished testing => result is ok **************")
  214 + return nb_asserted
227 215  
228 216  
229 217 """
... ...
src/agent/AgentB.py
... ... @@ -170,17 +170,15 @@ class AgentB(Agent):
170 170 super().exec_specific_cmd_end(cmd, from_thread)
171 171 '''
172 172  
  173 + """
173 174 # @override
174   - def simulator_test_results(self):
175   - commands = super().simulator_test_results()
  175 + def simulator_test_results_main(self, commands):
176 176 nb_asserted = 0
177 177 for cmd in commands:
178 178 assert cmd.is_executed()
179 179 nb_asserted+=1
180   - #assert nb_asserted == 2
181   - assert nb_asserted == len(self.SIMULATOR_COMMANDS_LIST)
182   - self.print("************** Finished testing => result is ok **************")
183   -
  180 + return nb_asserted
  181 + """
184 182  
185 183 """
186 184 =================================================================
... ...
src/common/models.py
... ... @@ -306,14 +306,24 @@ class Command(models.Model):
306 306  
307 307 @classmethod
308 308 def delete_pending_commands_for_agent(cls, agent_name):
309   - print("Delete pending command(s) if exists:")
  309 + """
  310 + Delete all pending commands sent to agent_name,
  311 + except very recent commands.
  312 + This (exception) is to avoid a "data race" where for example agentB is executing a "flush" command
  313 + while agentA is sending command to agentB... :
  314 + - agentB will then delete the command just sent by agentA
  315 + - agentA will check regularly the status of its sent command, and this will crash as this command exists no more !!
  316 + """
  317 + print("Delete all pending command(s) if exists (except very recent ones):")
  318 + now_minus_2sec = datetime.utcnow().astimezone() - timedelta(seconds = 2)
  319 + #print("now_minus_2sec", now_minus_2sec)
310 320 pending_commands = cls.objects.filter(
311 321 # only commands for agent agent_name
312 322 receiver = agent_name,
313 323 # only running commands
314 324 receiver_status_code = cls.CMD_STATUS_CODES.CMD_PENDING,
315   - # only not expired commands
316   - #sender_deposit_time__gte = cls.get_peremption_date_from_now(),
  325 + # except very recent commands : take only commands that are more than 2 sec old
  326 + sender_deposit_time__lt = now_minus_2sec
317 327 )
318 328 if pending_commands:
319 329 Command.show_commands(pending_commands)
... ... @@ -321,7 +331,7 @@ class Command(models.Model):
321 331 else: print("<None>")
322 332  
323 333 @classmethod
324   - def get_pending_commands_for_agent(cls, agent_name):
  334 + def get_pending_and_running_commands_for_agent(cls, agent_name):
325 335 #print("peremption date", COMMAND_PEREMPTION_DATE_FROM_NOW)
326 336 return cls.objects.filter(
327 337 # only pending commands
... ... @@ -391,6 +401,8 @@ class Command(models.Model):
391 401 #for cmd in old_commands: print(cmd)
392 402 cls.show_commands(old_commands)
393 403 old_commands.delete()
  404 + else:
  405 + print("<None>")
394 406  
395 407 @classmethod
396 408 #def show_commands(cls, commands:models.query):
... ... @@ -409,7 +421,9 @@ class Command(models.Model):
409 421  
410 422 # --- BOOLEAN (test) functions ---
411 423  
412   - def send(self): self.save()
  424 + def send(self):
  425 + #self.save()
  426 + self.set_as_pending()
413 427  
414 428 def is_generic(self):
415 429 """
... ... @@ -465,7 +479,7 @@ class Command(models.Model):
465 479  
466 480 def set_as_processed(self):
467 481 print(f"- Set command {self.name} as processed")
468   - print(self)
  482 + #print(self)
469 483 self.receiver_status_code = self.CMD_STATUS_CODES.CMD_EXECUTED
470 484 self.receiver_processed_time = datetime.utcnow().astimezone()
471 485 self.save()
... ...