Commit 3fec7115dc5723090d891aceb555e1314c36de98

Authored by Etienne Pallier
1 parent 896081bb
Exists in dev

Agent encore très refactorisé et (quasi) totalement implémenté

- commandes "exit" et "abort" implémentées
- AgentX contient très peu de code (tout est remonté dans Agent)
- Il faut cependant encore améliorer le "abort", pas complètement
satisfaisant...
README.md
... ... @@ -67,14 +67,16 @@ This software has been tested and validated with the following configurations :
67 67 --------------------------------------------------------------------------------------------
68 68 ## LAST VERSION
69 69  
70   -Date: 07/03/2019
  70 +Date: 08/03/2019
71 71  
72 72 Author: E. Pallier
73 73  
74   -VERSION: 0.20.9
  74 +VERSION: 0.20.10
75 75  
76 76 Comment:
77   - Agent très refactorisé et presque totalement implémenté (il manque plus que le "abort")
  77 + Agent encore très refactorisé et (quasi) totalement implémenté (y-compris "exit" et "abort")
  78 + AgentX contient très peu de code (tout est remonté dans Agent)
  79 + Il faut cependant encore améliorer le "abort", pas complètement satisfaisant...
78 80  
79 81  
80 82 --------------------------------------------------------------------------------------------
... ...
src/agent/Agent.py
... ... @@ -12,6 +12,7 @@ VERSION = "0.4"
12 12 import time
13 13 from datetime import datetime, timedelta
14 14 import os
  15 +import threading, multiprocessing
15 16  
16 17  
17 18 """TODO:
... ... @@ -77,6 +78,28 @@ log = L.setupLogger("AgentLogger", "Agent")
77 78  
78 79 """
79 80 =================================================================
  81 + class StoppableThread
  82 +=================================================================
  83 +"""
  84 +
  85 +class StoppableThread(threading.Thread):
  86 + """Thread class with a stop() method. The thread itself has to check
  87 + regularly for the stopped() condition."""
  88 + # See also https://www.oreilly.com/library/view/python-cookbook/0596001673/ch06s03.html
  89 +
  90 + def __init__(self, *args, **kwargs):
  91 + super(StoppableThread, self).__init__(*args, **kwargs)
  92 + self._stop_event = threading.Event()
  93 +
  94 + def stop(self):
  95 + self._stop_event.set()
  96 +
  97 + def stopped(self):
  98 + return self._stop_event.is_set()
  99 +
  100 +
  101 +"""
  102 +=================================================================
80 103 class Agent
81 104 =================================================================
82 105 """
... ... @@ -90,9 +113,10 @@ class Agent:
90 113 SIMULATOR_MODE = True
91 114 SIMULATOR_COMMANDS = iter([
92 115 "go_active",
93   -
94 116 "go_idle",
95   - "specific0_not_executed_because_idle",
  117 +
  118 + # specific0 not_executed_because_idle
  119 + "specific0",
96 120  
97 121 "go_active",
98 122  
... ... @@ -102,12 +126,13 @@ class Agent:
102 126 # specific2 executed_later_because_waiting_for_previous_specific_command_to_finish
103 127 "specific2",
104 128  
  129 + # Should abort the current running command (which should normally be specific1)
  130 + # even if specific2 is already pending
  131 + "abort",
  132 +
105 133 # specific3 executed_later_because_waiting_for_previous_specific_command_to_finish
106 134 "specific3",
107 135  
108   - # Should abort the current running command (which should normally be specific1)
109   - # even if specific2 and 3 are already pending
110   - #"abort"
111 136  
112 137 "go_active",
113 138 "go_idle",
... ... @@ -142,7 +167,10 @@ class Agent:
142 167 STATUS_LAUNCH = "LAUNCHED"
143 168 STATUS_INIT = "INITIALIZING"
144 169 STATUS_MAIN_LOOP = "IN_MAIN_LOOP"
145   - STATUS_PROCESS_LOOP = "IN_PROCESS_LOOP"
  170 + STATUS_GET_NEXT_COMMAND = "IN_GET_NEXT_COMMAND"
  171 + STATUS_GENERAL_PROCESS = "IN_GENERAL_PROCESS"
  172 + STATUS_ROUTINE_PROCESS = "IN_ROUTINE_PROCESS"
  173 + STATUS_SPECIFIC_PROCESS = "IN_SPECIFIC_PROCESS"
146 174 STATUS_EXIT = "EXITING"
147 175  
148 176 # Modes
... ... @@ -153,12 +181,15 @@ class Agent:
153 181 CONFIG_DIR = "config"
154 182  
155 183 _agent_survey = None
  184 + _pending_commands = []
  185 + _current_specific_cmd = None
  186 + _current_specific_thread = None
156 187  
157   - _iter_num = 0
  188 + _iter_num = 1
158 189  
159   - def __init__(self, name:str=None, config_filename:str=None):
160   - self.set_mode(self.MODE_IDLE)
  190 + def __init__(self, name:str="Agent", config_filename:str=None):
161 191 self.set_status(self.STATUS_LAUNCH)
  192 + self.set_mode(self.MODE_IDLE)
162 193 self.name = name
163 194 if not config_filename:
164 195 #config_filename = '/PROJECTS/GFT/SOFT/PYROS_SOFT/CURRENT/config/config_unit_simulunit1.xml'
... ... @@ -176,6 +207,10 @@ class Agent:
176 207 if self.config.get_last_errno() != self.config.NO_ERROR:
177 208 raise Exception(f"Bad config file name '{config_filename}', error {str(self.config.get_last_errno())}: {str(self.config.get_last_errmsg())}")
178 209  
  210 + self.set_mode_from_config(name)
  211 + # TODO: remove
  212 + self.set_idle()
  213 +
179 214 # Create 1st survey if none
180 215 #tmp = AgentSurvey.objects.filter(name=self.name)
181 216 #if len(tmp) == 0:
... ... @@ -232,7 +267,6 @@ class Agent:
232 267 print()
233 268 print()
234 269 #print("-"*80)
235   -
236 270 print("-"*20, f"MAIN LOOP ITERATION {self._iter_num} (START)", "-"*20)
237 271 self.set_status(self.STATUS_MAIN_LOOP)
238 272 self.show_mode_and_status()
... ... @@ -262,13 +296,12 @@ class Agent:
262 296  
263 297 # Sub-level loop (only if ACTIVE)
264 298 if self.is_active():
265   - self.set_status(self.STATUS_PROCESS_LOOP)
266 299 if cmd: self.specific_process(cmd)
267 300 print("---")
268 301  
269 302 # Every N iterations, delete old commands
270 303 N=3
271   - if (self._iter_num % N) == 0: Command.purge_old_commands_for_agent(self.name)
  304 + if ((self._iter_num-1) % N) == 0: Command.purge_old_commands_for_agent(self.name)
272 305  
273 306 self.waitfor(self.mainloop_waittime)
274 307  
... ... @@ -282,7 +315,8 @@ class Agent:
282 315  
283 316  
284 317 def routine_process(self):
285   - pass
  318 + self.set_status(self.STATUS_ROUTINE_PROCESS)
  319 +
286 320  
287 321 """
288 322 def purge_commands(self):
... ... @@ -316,7 +350,7 @@ class Agent:
316 350 time.sleep(nbsec)
317 351  
318 352 def set_status(self, status:str):
319   - print(f"Switching from status {self.status} to status {status}")
  353 + print(f"[[NEW CURRENT STATUS: {status}]] (switching from status {self.status})")
320 354 self.status = status
321 355 return False
322 356  
... ... @@ -458,11 +492,12 @@ class Agent:
458 492 which is relevant to this agent.
459 493 Commands are read in chronological order
460 494 """
461   -
  495 + self.set_status(self.STATUS_GET_NEXT_COMMAND)
462 496 print("Looking for new commands from the database ...")
463 497  
464 498 # 1) Get all pending commands for me (return if None)
465   - commands = Command.get_pending_commands_for_agent(self.name)
  499 + self._pending_commands = Command.get_pending_commands_for_agent(self.name)
  500 + commands = self._pending_commands
466 501 if commands is None: return None
467 502 print("Current pending commands are (time ordered) :")
468 503 Command.show_commands(commands)
... ... @@ -519,15 +554,15 @@ class Agent:
519 554  
520 555 def general_process(self, cmd:Command)->Command:
521 556  
  557 + self.set_status(self.STATUS_GENERAL_PROCESS)
  558 + print(f"Starting general processing of {cmd}")
  559 +
522 560 # Update read time to say that the command has been READ
523   - ##assert cmd.receiver_read_time is None # f"Command {cmd} should not have been already read !!"
524 561 cmd.set_read_time()
525   -
526 562 # Precondition: command cmd is valid (not expired), has already been read, is pending
527 563 assert (not cmd.is_expired()) and cmd.is_pending() and cmd.is_read()
528 564  
529 565 #print(f"Starting general processing of command {cmd.name} sent by agent {cmd.sender} at {cmd.sender_deposit_time}")
530   - print(f"Starting general processing of {cmd}")
531 566  
532 567 """
533 568 # 2) If expired command, change its status to expired and return
... ... @@ -544,9 +579,7 @@ class Agent:
544 579 # If cmd is "exit", kill myself (without any question, this is an order soldier !)
545 580 # This "exit" should normally kill any current thread (to be checked...)
546 581 if cmd.name == "exit": exit(0)
547   - # If cmd is "abort", kill any currently running thread
548   - if cmd.name == "abort":
549   - pass
  582 + # Command is executed, so return None
550 583 return None
551 584  
552 585 # cmd is not generic
... ... @@ -568,6 +601,7 @@ class Agent:
568 601 def exec_generic_cmd(self, cmd:Command):
569 602 print("Starting execution of a Generic cmd...")
570 603 cmd.set_as_running()
  604 +
571 605 # Executing command
572 606 if cmd.name == "go_active":
573 607 self.set_active()
... ... @@ -575,12 +609,19 @@ class Agent:
575 609 if cmd.name == "go_idle":
576 610 self.set_idle()
577 611 time.sleep(1)
578   - #cmd.set_as_executed()
  612 + # If cmd is "abort", kill any currently running thread
  613 + if cmd.name == "abort":
  614 + print("Current pending commands are:")
  615 + Command.show_commands(self._pending_commands)
  616 + print("Aborting current executing command if exists:")
  617 + self.kill_running_specific_command_if_exists()
  618 +
579 619 cmd.set_as_processed()
580 620 print("...Generic cmd has been executed")
581 621  
582 622  
583 623  
  624 +
584 625 def do_log(self):
585 626 """
586 627 log à 2 endroits ou 1 seul
... ... @@ -590,18 +631,57 @@ class Agent:
590 631 print("Logging data...")
591 632  
592 633  
593   - # @abstract
594   - # to be implemented by subclasses
595 634 def specific_process(self, cmd:Command):
596   - #raise NotImplemented()
597 635 """
598 636 Sublevel Loop (only if ACTIVE) :
599   - PLUS TARD, maybe :start_process_thread() dans un thread : ensuite, à chaque tour de boucle il regarde si c'est fini ou pas, et si fini recommence
600 637 """
  638 + self.set_status(self.STATUS_SPECIFIC_PROCESS)
601 639 assert self.is_active()
602   - # TODO: LOG
603 640  
  641 + self._current_specific_cmd = cmd
  642 + print("Starting specific process...")
  643 + #self._current_thread = threading.Thread(target=self.exec_command)
  644 + self._current_specific_thread = StoppableThread(target=self.exec_specific_command, args=(cmd,))
  645 + self._current_specific_thread.start()
  646 + #my_thread.join()
  647 + #self.waitfor(self.subloop_waittime)
  648 + print("Ending specific process (thread has been launched)")
  649 +
  650 + """
  651 + def get_current_specific_command(self):
  652 + return self._current_specific_cmd
  653 + """
604 654  
  655 + def exec_specific_command(self, cmd:Command):
  656 + """ specific command execution setup """
  657 + #cmd = self.get_current_specific_command()
  658 + print(">>>>> Thread starting execution of ", cmd)
  659 + print(">>>>> PID: %s, Process Name: %s, Thread Name: %s" % (
  660 + os.getpid(),
  661 + multiprocessing.current_process().name,
  662 + threading.current_thread().name)
  663 + )
  664 + cmd.set_as_running()
  665 +
  666 + def exec_specific_command_finished(self, cmd:Command):
  667 + """ specific command execution tear up """
  668 + cmd.set_as_processed()
  669 + print(">>>>> Thread ended execution of command", cmd.name)
  670 + cmd = None
  671 + # No more current thread
  672 + self._current_specific_thread = None
  673 +
  674 +
  675 + def kill_running_specific_command_if_exists(self):
  676 + if self._current_specific_thread:
  677 + print(f"Killing command {self._current_specific_cmd.name}")
  678 + # Ask the thread to stop itself
  679 + self._current_specific_thread.stop()
  680 + # Now, wait for the end of the thread
  681 + self._current_specific_thread.join()
  682 + self._current_specific_thread = None
  683 + self._current_specific_cmd.set_as_killed()
  684 + self._current_specific_cmd = None
605 685  
606 686 """
607 687 ===================================
... ...
src/agent/AgentX.py
1 1  
2 2 import utils.Logger as L
3   -import threading, multiprocessing, os
  3 +#import threading, multiprocessing, os
4 4 import time
5 5  
6 6 from .Agent import Agent
... ... @@ -10,31 +10,23 @@ from common.models import Command
10 10  
11 11 log = L.setupLogger("AgentXTaskLogger", "AgentX")
12 12  
13   -"""
14   - Task to handle the execution of the program
15   -
16   - check the environment status in database
17   - check the devices status (telescope / cameras)
18   - check if the last schedule made has to be planned
19   - launch schedule's sequences
20   -"""
21 13  
22 14  
23 15 class AgentX(Agent):
24 16  
25   - _current_cmd = None
26   -
27   - def __init__(self, name, config_filename=None):
  17 +
  18 +
  19 + # @override
  20 + def __init__(self, name:str=None, config_filename=None):
  21 + if name is None: name = self.__class__.__name__
28 22 super().__init__(name, config_filename)
29 23  
30 24 # @override
31 25 def init(self):
32 26 super().init()
33 27 # --- Set the mode according the startmode value
34   - agent_alias = self.__class__.__name__
35   - self.set_mode_from_config(agent_alias)
36   - # TODO: remove
37   - self.set_idle()
  28 + ##agent_alias = self.__class__.__name__
  29 + ##self.set_mode_from_config(agent_alias)
38 30  
39 31 # @override
40 32 '''
... ... @@ -59,27 +51,12 @@ class AgentX(Agent):
59 51 super().do_log()
60 52  
61 53 # @override
62   - def specific_process(self, cmd:Command):
63   - super().specific_process(cmd)
64   - self._current_cmd = cmd
65   - print("Starting specific process subloop...")
66   - my_thread = threading.Thread(target=self.exec_command)
67   - my_thread.start()
68   - #my_thread.join()
69   -
70   - #self.waitfor(self.subloop_waittime)
71   -
72   - print("Ending specific process subloop...")
73   -
74   - def exec_command(self):
75   - cmd = self._current_cmd
76   - print(">>>>> Thread starting execution of ", cmd)
77   - print(">>>>> PID: %s, Process Name: %s, Thread Name: %s" % (
78   - os.getpid(),
79   - multiprocessing.current_process().name,
80   - threading.current_thread().name)
81   - )
82   - cmd.set_as_running()
83   - time.sleep(7)
84   - cmd.set_as_processed()
85   - print(">>>>> Thread ended execution of ", cmd)
  54 + def exec_specific_command(self, cmd:Command):
  55 + """ This code is executed inside a thread """
  56 + # thread execution setup
  57 + super().exec_specific_command(cmd)
  58 +
  59 + time.sleep(20)
  60 +
  61 + # thread execution tear up
  62 + self.exec_specific_command_finished(cmd)
... ...
src/common/models.py
... ... @@ -240,6 +240,7 @@ class Command(models.Model):
240 240 "CMD_EXECUTED", # cde exécutée => simulé par un sleep(3) dans AgentX.core_process())
241 241 "CMD_PENDING", # cde en attente d'exécution
242 242 "CMD_SKIPPED", # cde ignorée (je suis idle… et j’ai ignoré cette commande, et je passe à la cde suivante)
  243 + "CMD_KILLED", # cde ignorée (je suis idle… et j’ai ignoré cette commande, et je passe à la cde suivante)
243 244 "CMD_OUTOFDATE" # cde périmée
244 245 )
245 246 GENERIC_COMMANDS = ["go_idle", "go_active", "abort", "exit"]
... ... @@ -383,7 +384,9 @@ class Command(models.Model):
383 384 def set_read_time(self):
384 385 self.receiver_read_time = datetime.utcnow().astimezone()
385 386 self.save()
  387 +
386 388 def set_as_processed(self):
  389 + print(f"- Set command {self.name} as processed")
387 390 self.receiver_status_code = self.CMD_STATUS_CODES.CMD_EXECUTED
388 391 self.receiver_processed_time = datetime.utcnow().astimezone()
389 392 self.save()
... ... @@ -391,9 +394,17 @@ class Command(models.Model):
391 394 def set_as_outofdate(self):
392 395 print(f"- Set this command as expired (older than its validity duration of {self.validity_duration_sec}s): {self}")
393 396 self.set_status_to(self.CMD_STATUS_CODES.CMD_OUTOFDATE)
  397 +
394 398 def set_as_skipped(self):
395 399 self.set_status_to(self.CMD_STATUS_CODES.CMD_SKIPPED)
  400 +
  401 + def set_as_killed(self):
  402 + print(f"- Set command {self.name} as killed")
  403 + #print(f"- Set this command as killed: {self}")
  404 + self.set_status_to(self.CMD_STATUS_CODES.CMD_KILLED)
  405 +
396 406 def set_as_running(self):
  407 + print(f"- Set command {self.name} as running")
397 408 self.set_status_to(self.CMD_STATUS_CODES.CMD_RUNNING)
398 409 '''
399 410 def set_as_executed(self):
... ...