Commit 39fd2bca0262123eb589c9955698583323bb6468
1 parent
ef335590
Exists in
dev
Agent v2 en cours... added PRIORITY commands management
Showing
4 changed files
with
145 additions
and
53 deletions
Show diff stats
CHANGELOG
VERSION
src/core/pyros_django/agent/Agent.py
... | ... | @@ -643,9 +643,14 @@ class Agent: |
643 | 643 | # Declaration of Instance attributes, default values |
644 | 644 | #self.UP_SINCE = datetime.utcnow() |
645 | 645 | self.__UP_SINCE = datetime.now(tz=timezone.utc) |
646 | + self.__ROUTINE_PROCESS_BEFORE_IS_RUNNING:bool = False | |
647 | + self.__ROUTINE_PROCESS_AFTER_IS_RUNNING:bool = False | |
646 | 648 | self.__test_cmd_received_num:int = 0 # only for tests |
647 | 649 | # Current Command running |
648 | 650 | self.__CC :AgentCmd = None |
651 | + # Previous Command running | |
652 | + self.__CC_prev :AgentCmd = None | |
653 | + # Current Command exception (if occurs) | |
649 | 654 | self.__CCE :Exception = None |
650 | 655 | self.name = "Generic Agent" |
651 | 656 | self.__status :str = None |
... | ... | @@ -1104,7 +1109,7 @@ class Agent: |
1104 | 1109 | self.DO_MAIN_LOOP = True |
1105 | 1110 | self.__CC = None |
1106 | 1111 | self.__CC_prev = None |
1107 | - self._CCE = None | |
1112 | + self.__CCE = None | |
1108 | 1113 | while self.DO_MAIN_LOOP: |
1109 | 1114 | # EXIT because of nb of iterations ? |
1110 | 1115 | if nb_iter is not None: |
... | ... | @@ -1155,17 +1160,25 @@ class Agent: |
1155 | 1160 | |
1156 | 1161 | # TRY TO START NEXT COMMAND if exists and if current cmd is finished |
1157 | 1162 | #if self.__CC is None or self.__CC.is_finished() or self.__priority_cmd_received(): |
1158 | - if not self.__CC or self.__CC.is_finished() or self.__priority_cmd_received(): | |
1163 | + CMD_PRIO=None | |
1164 | + if not self.__CC or self.__CC.is_finished() or (CMD_PRIO := self.__priority_cmd_received() is not None) : | |
1159 | 1165 | print() |
1160 | 1166 | print() |
1161 | 1167 | log.info("*"*10 + " NEXT CMD RECEIVED PROCESSING (START) " + "*"*10 + '\n') |
1162 | - if self.__priority_cmd_received(): self.__CC_prev = self.__CC | |
1163 | - self.__CC = self.__get_next_received_command() | |
1168 | + ##if self.__priority_cmd_received() and self.__CC.is_running(): self.__CC_prev = self.__CC | |
1169 | + # Save "current cmd" as now "previous cmd" | |
1170 | + self.__CC_prev = self.__CC | |
1171 | + # New CC will be either : | |
1172 | + # - None if no new command received | |
1173 | + # or | |
1174 | + # - a priority command if exists in the commands list, otherwise the next command in the list | |
1175 | + self.__CC = CMD_PRIO if CMD_PRIO else self.__get_next_received_command() | |
1164 | 1176 | if self.__CC: |
1165 | - try: | |
1177 | + try: | |
1178 | + # If CC is a priority cmd, it will be executed SEQUENTIALLY | |
1179 | + # Thus this call will be blocking until exec finished | |
1166 | 1180 | self.__start_next_cmd() |
1167 | - except Exception as e: | |
1168 | - self.__CCE = e | |
1181 | + except Exception as e: self.__CCE=e | |
1169 | 1182 | log.info("*"*10 + " NEXT CMD RECEIVED PROCESSING (END) " + "*"*10 + "\n") |
1170 | 1183 | |
1171 | 1184 | # Check if running cmd timeout |
... | ... | @@ -1173,7 +1186,7 @@ class Agent: |
1173 | 1186 | self.__CC.set_as_exec_timeout() |
1174 | 1187 | |
1175 | 1188 | # If (SEQUENTIAL OR PARALLEL) current cmd is finished => process exception if exists (and test result if in testing mode) |
1176 | - if ( self.__CC and self.__CC.is_finished() ) or self.__CCE : | |
1189 | + if ( self.__CC and self.__CC.is_finished() ) or self.__CCE : | |
1177 | 1190 | self.__process_finished_cmd() |
1178 | 1191 | self.__CC = None |
1179 | 1192 | |
... | ... | @@ -1210,12 +1223,29 @@ class Agent: |
1210 | 1223 | #self.CC = None |
1211 | 1224 | |
1212 | 1225 | |
1213 | - | |
1214 | - | |
1215 | - | |
1216 | 1226 | def __priority_cmd_received(self): |
1217 | - return False | |
1218 | - | |
1227 | + ''' | |
1228 | + Return True if priority cmd found in the commands list | |
1229 | + (if not irrelevant or running or expired) | |
1230 | + ''' | |
1231 | + with transaction.atomic(): | |
1232 | + commands = AgentCmd.get_pending_and_running_commands_for_agent(self.name) | |
1233 | + if not commands.exists(): return None | |
1234 | + for cmd in commands: | |
1235 | + #if cmd.name in ("do_exit", "do_abort", "do_flush_commands"): break | |
1236 | + #if cmd.name in ("do_exit", "do_abort"): break | |
1237 | + if cmd.is_agent_general_priority_cmd(): | |
1238 | + # If prio cmd but irrelevant => skip it | |
1239 | + if cmd.name in ("do_stop","do_restart") and self.__cmd_was_sent_before_my_start(cmd): | |
1240 | + cmd.set_as_skipped("Cmd stop/restart was sent before agent start => skip it") | |
1241 | + #return None | |
1242 | + continue | |
1243 | + # if prio cmd already running, wait for it to finish | |
1244 | + if cmd.is_running(): | |
1245 | + return None | |
1246 | + if not cmd.is_expired(): | |
1247 | + return cmd | |
1248 | + return None | |
1219 | 1249 | |
1220 | 1250 | |
1221 | 1251 | # process self.current_cmd_exception |
... | ... | @@ -1286,6 +1316,23 @@ class Agent: |
1286 | 1316 | #self._DO_EXIT=True |
1287 | 1317 | #exit(0) |
1288 | 1318 | |
1319 | + wait_nbsec = 2 | |
1320 | + | |
1321 | + # Waiting for current (in fact previous) command still running if need be (if exists) | |
1322 | + while self.__CC_prev and self.__CC_prev.is_running(): | |
1323 | + log.info(f"The previous command {self.__CC_prev.name} is still running => Waiting for it to finish (waiting {wait_nbsec} sec...)") | |
1324 | + self.waitfor(wait_nbsec) | |
1325 | + | |
1326 | + # Waiting for current ROUTINE BEFORE process to finish if still running | |
1327 | + while self.__ROUTINE_PROCESS_BEFORE_IS_RUNNING: | |
1328 | + log.info(f"The ROUTINE BEFORE process is still running => Waiting for it to finish (waiting {wait_nbsec} sec...)") | |
1329 | + self.waitfor(wait_nbsec) | |
1330 | + | |
1331 | + # Waiting for current ROUTINE BEFORE process to finish if still running | |
1332 | + while self.__ROUTINE_PROCESS_AFTER_IS_RUNNING: | |
1333 | + log.info(f"The ROUTINE AFTER process is still running => Waiting for it to finish (waiting {wait_nbsec} sec...)") | |
1334 | + self.waitfor(wait_nbsec) | |
1335 | + | |
1289 | 1336 | log.info(f"{self.name}: Before exiting, calling do_things_before_exit()") |
1290 | 1337 | self._do_things_before_exit(stopper_agent_name) |
1291 | 1338 | ##self._set_and_log_status(self.AGT_STATUS.EXITING) |
... | ... | @@ -1481,13 +1528,6 @@ class Agent: |
1481 | 1528 | |
1482 | 1529 | |
1483 | 1530 | |
1484 | - def __abort_current_running_cmd_if_exists(self): | |
1485 | - log.info("Aborting current running command if exists...") | |
1486 | - pass | |
1487 | - | |
1488 | - def __abort_current_routine_process_if_exists(self): | |
1489 | - log.info("Aborting current routine process if exists...") | |
1490 | - pass | |
1491 | 1531 | |
1492 | 1532 | |
1493 | 1533 | |
... | ... | @@ -1506,6 +1546,7 @@ class Agent: |
1506 | 1546 | This is a command or set of processings that this agent does or commands that it sends regularly, |
1507 | 1547 | at each iteration |
1508 | 1548 | """ |
1549 | + self.__ROUTINE_PROCESS_BEFORE_IS_RUNNING = True | |
1509 | 1550 | print() |
1510 | 1551 | print() |
1511 | 1552 | log.info("*"*10+ " ROUTINE BEFORE (START) "+ "*"*10+ '\n') |
... | ... | @@ -1515,6 +1556,7 @@ class Agent: |
1515 | 1556 | |
1516 | 1557 | print() |
1517 | 1558 | log.info("*"*10 + " ROUTINE BEFORE (END) "+ "*"*10) |
1559 | + self.__ROUTINE_PROCESS_BEFORE_IS_RUNNING = False | |
1518 | 1560 | |
1519 | 1561 | |
1520 | 1562 | def __routine_process_after(self): |
... | ... | @@ -1528,6 +1570,7 @@ class Agent: |
1528 | 1570 | This is a command or set of processings that this agent does or commands that it sends regularly, |
1529 | 1571 | at each iteration |
1530 | 1572 | """ |
1573 | + self.__ROUTINE_PROCESS_AFTER_IS_RUNNING = True | |
1531 | 1574 | print() |
1532 | 1575 | print() |
1533 | 1576 | log.info("*"*10+ " ROUTINE AFTER (START) "+ "*"*10+ '\n') |
... | ... | @@ -1537,6 +1580,7 @@ class Agent: |
1537 | 1580 | |
1538 | 1581 | print() |
1539 | 1582 | log.info("*"*10 + " ROUTINE AFTER (END) "+ "*"*10 + '\n') |
1583 | + self.__ROUTINE_PROCESS_AFTER_IS_RUNNING = False | |
1540 | 1584 | |
1541 | 1585 | |
1542 | 1586 | # To be overridden by subclasses |
... | ... | @@ -2228,7 +2272,41 @@ class Agent: |
2228 | 2272 | # Default result (null) |
2229 | 2273 | result = None |
2230 | 2274 | |
2231 | - # STOP kind of command ? | |
2275 | + # | |
2276 | + # - 1) PRIORITY commands | |
2277 | + # | |
2278 | + | |
2279 | + # 1.a) not disturbing commands | |
2280 | + | |
2281 | + if cmd_name == "get_specific_cmds": | |
2282 | + result = self.get_specific_cmds() | |
2283 | + ''' | |
2284 | + # CmdUnimplementedException if one specific command (in the list) is unknown | |
2285 | + try: | |
2286 | + result = self.get_specific_cmds() | |
2287 | + except CmdExceptionUnimplemented as e: | |
2288 | + # raise CmdUnimplementedException("get_specific_cmds.unknown_cmd_name") | |
2289 | + #cmd.set_as_exec_error("EXCEPTION - One specific cmd is unimplemented: "+e.cmd_name) | |
2290 | + raise CmdExceptionExecError(cmd, "EXCEPTION - One specific cmd is unimplemented => "+e.cmd_name) from None | |
2291 | + ''' | |
2292 | + | |
2293 | + #elif cmd_name in ("do_flush_commands"): | |
2294 | + elif cmd_name == "do_flush_commands": | |
2295 | + "flush_commands received: Delete all pending commands" | |
2296 | + self.do_flush_commands() | |
2297 | + #cmd.set_result('DONE') | |
2298 | + result = "FLUSH DONE" | |
2299 | + | |
2300 | + elif cmd_name == "do_exec_commands": | |
2301 | + pass | |
2302 | + | |
2303 | + # 1.b) partially disturbing commands | |
2304 | + | |
2305 | + elif cmd_name == "do_stop_current": | |
2306 | + what = 'both' if not cmd.args else cmd.args[0] | |
2307 | + result = self.do_stop_current(what) | |
2308 | + | |
2309 | + # 1.c) really disturbing commands (STOP command type) | |
2232 | 2310 | if cmd_name in ("do_stop", |
2233 | 2311 | "do_restart", |
2234 | 2312 | #@ deprecated |
... | ... | @@ -2255,6 +2333,12 @@ class Agent: |
2255 | 2333 | when = 'asap' if not cmd.args else cmd.args[0] |
2256 | 2334 | result = self.do_stop_or_restart(False if cmd_name=='do_stop' else True, when) |
2257 | 2335 | |
2336 | + | |
2337 | + | |
2338 | + # | |
2339 | + # - 2) NO priority commands | |
2340 | + # | |
2341 | + | |
2258 | 2342 | elif cmd_name == "get_state": |
2259 | 2343 | result = self.get_state() |
2260 | 2344 | |
... | ... | @@ -2273,13 +2357,6 @@ class Agent: |
2273 | 2357 | result = "MODE is " + mode |
2274 | 2358 | #time.sleep(1) |
2275 | 2359 | #self.waitfor(1) |
2276 | - | |
2277 | - #elif cmd_name in ("do_flush_commands"): | |
2278 | - elif cmd_name == "do_flush_commands": | |
2279 | - "flush_commands received: Delete all pending commands" | |
2280 | - self.do_flush_commands() | |
2281 | - #cmd.set_result('DONE') | |
2282 | - result = "FLUSH DONE" | |
2283 | 2360 | |
2284 | 2361 | elif cmd_name == "do_eval": |
2285 | 2362 | #if not cmd_args: raise ValueError() |
... | ... | @@ -2288,17 +2365,6 @@ class Agent: |
2288 | 2365 | #result = eval(cmd_args) |
2289 | 2366 | result = self.do_eval(cmd_args[0]) |
2290 | 2367 | |
2291 | - elif cmd_name == "get_specific_cmds": | |
2292 | - result = self.get_specific_cmds() | |
2293 | - ''' | |
2294 | - # CmdUnimplementedException if one specific command (in the list) is unknown | |
2295 | - try: | |
2296 | - result = self.get_specific_cmds() | |
2297 | - except CmdExceptionUnimplemented as e: | |
2298 | - # raise CmdUnimplementedException("get_specific_cmds.unknown_cmd_name") | |
2299 | - #cmd.set_as_exec_error("EXCEPTION - One specific cmd is unimplemented: "+e.cmd_name) | |
2300 | - raise CmdExceptionExecError(cmd, "EXCEPTION - One specific cmd is unimplemented => "+e.cmd_name) from None | |
2301 | - ''' | |
2302 | 2368 | |
2303 | 2369 | cmd.set_as_processed(result) |
2304 | 2370 | log.info("...Agent level GENERAL cmd has been executed") |
... | ... | @@ -2512,17 +2578,35 @@ class Agent: |
2512 | 2578 | |
2513 | 2579 | # Stop currently running cmd or routine |
2514 | 2580 | def do_stop_current(self, what:str): |
2515 | - if what == "cmd": self.do_stop_current_cmd() | |
2516 | - if what == "routine": self.do_stop_current_routine() | |
2581 | + if what == "cmd": | |
2582 | + self.__do_stop_current_cmd_if_exists() | |
2583 | + if what == "routine": | |
2584 | + self.__do_stop_current_routine_before_if_exists() | |
2585 | + self.__do_stop_current_routine_after_if_exists() | |
2517 | 2586 | if what == "both": |
2518 | - self.do_stop_current_cmd() | |
2519 | - self.do_stop_current_routine() | |
2587 | + self.__do_stop_current_cmd_if_exists() | |
2588 | + self.__do_stop_current_routine_before_if_exists() | |
2589 | + self.__do_stop_current_routine_after_if_exists() | |
2520 | 2590 | # Bad arg |
2521 | 2591 | raise CmdExceptionBadArgs(self.CC) |
2522 | - def do_stop_current_cmd(self): | |
2523 | - pass | |
2524 | - def do_stop_current_routine(self): | |
2525 | - pass | |
2592 | + #FIXME: TODO | |
2593 | + def __do_stop_current_cmd_if_exists(self): | |
2594 | + if self.__CC_prev and self.__CC_prev.is_running(): | |
2595 | + log.info("Aborting current running command if exists...") | |
2596 | + pass | |
2597 | + | |
2598 | + #FIXME: TODO | |
2599 | + def __do_stop_current_routine_before_if_exists(self): | |
2600 | + if self.__ROUTINE_PROCESS_BEFORE_IS_RUNNING: | |
2601 | + log.info("Aborting current routine BEFORE process if exists...") | |
2602 | + pass | |
2603 | + #FIXME: TODO | |
2604 | + def __do_stop_current_routine_after_if_exists(self): | |
2605 | + if self.__ROUTINE_PROCESS_AFTER_IS_RUNNING: | |
2606 | + log.info("Aborting current routine AFTER process if exists...") | |
2607 | + pass | |
2608 | + | |
2609 | + | |
2526 | 2610 | |
2527 | 2611 | def do_exit(self): self.do_stop("asap"); |
2528 | 2612 | def do_abort(self): self.do_stop("now"); |
... | ... | @@ -2532,8 +2616,10 @@ class Agent: |
2532 | 2616 | def do_stop_or_restart(self, restart:bool=False, when:str='asap'): |
2533 | 2617 | if when not in ('asap','now','noprio'): raise CmdExceptionBadArgs(self.CC) |
2534 | 2618 | # NOT PRIO |
2619 | + ''' | |
2535 | 2620 | if when == "noprio": |
2536 | 2621 | pass |
2622 | + ''' | |
2537 | 2623 | # log last agent mode & status |
2538 | 2624 | self.__set_and_log_status(self.AGT_STATUS.RESTARTING if restart else self.AGT_STATUS.EXITING) |
2539 | 2625 | #self._log_agent_state() | ... | ... |
src/core/pyros_django/common/models.py
... | ... | @@ -626,7 +626,7 @@ class AgentCmd(models.Model): |
626 | 626 | # ARGS : |
627 | 627 | # - "cmd" : stop currently running specific command |
628 | 628 | # - "routine" : stop currently running routine process |
629 | - # - "both" : stop currently running current cmd and routine process | |
629 | + # - "both" (default) : stop currently running current cmd and routine process | |
630 | 630 | "do_stop_current", # + arg "cmd" or "routine" |
631 | 631 | # TODO: a virer => do_stop_current now |
632 | 632 | # @deprecated |
... | ... | @@ -649,17 +649,21 @@ class AgentCmd(models.Model): |
649 | 649 | # Priority commands, executed as soon as in the received commands list (BEFORE any other pending command) |
650 | 650 | # NB : must be a subset of _AGENT_GENERAL_COMMANDS |
651 | 651 | _AGENT_GENERAL_PRIORITY_COMMANDS = [ |
652 | + | |
653 | + # NO STOP commands => will not stop the Agent or any running cmd | |
652 | 654 | "get_specific_cmds", |
653 | 655 | #"do_exit", |
654 | 656 | "do_flush_commands", |
655 | 657 | "do_exec_commands" |
656 | 658 | |
657 | - "do_restart", | |
658 | - # @deprecated | |
659 | - "do_restart_loop", | |
660 | - | |
659 | + # PARTIAL STOP command => will just stop current running cmd (and routine) | |
661 | 660 | "do_stop_current", |
661 | + | |
662 | + # (TOTAL) STOP commands => will stop the Agent (waiting or not for it to finish what it is currently doing) | |
662 | 663 | "do_stop", |
664 | + "do_restart", | |
665 | + # @deprecated | |
666 | + ###"do_restart_loop", | |
663 | 667 | ] |
664 | 668 | |
665 | 669 | # -------------- Command CLASS (static) METHODS -------------- | ... | ... |