From 3fec7115dc5723090d891aceb555e1314c36de98 Mon Sep 17 00:00:00 2001 From: Etienne Pallier Date: Fri, 8 Mar 2019 16:30:46 +0100 Subject: [PATCH] Agent encore très refactorisé et (quasi) totalement implémenté --- README.md | 8 +++++--- src/agent/Agent.py | 136 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------------- src/agent/AgentX.py | 57 +++++++++++++++++---------------------------------------- src/common/models.py | 11 +++++++++++ 4 files changed, 141 insertions(+), 71 deletions(-) diff --git a/README.md b/README.md index 5c5cc07..1d776db 100644 --- a/README.md +++ b/README.md @@ -67,14 +67,16 @@ This software has been tested and validated with the following configurations : -------------------------------------------------------------------------------------------- ## LAST VERSION -Date: 07/03/2019 +Date: 08/03/2019 Author: E. Pallier -VERSION: 0.20.9 +VERSION: 0.20.10 Comment: - Agent très refactorisé et presque totalement implémenté (il manque plus que le "abort") + Agent encore très refactorisé et (quasi) totalement implémenté (y-compris "exit" et "abort") + AgentX contient très peu de code (tout est remonté dans Agent) + Il faut cependant encore améliorer le "abort", pas complètement satisfaisant... -------------------------------------------------------------------------------------------- diff --git a/src/agent/Agent.py b/src/agent/Agent.py index e8c2320..910c41a 100644 --- a/src/agent/Agent.py +++ b/src/agent/Agent.py @@ -12,6 +12,7 @@ VERSION = "0.4" import time from datetime import datetime, timedelta import os +import threading, multiprocessing """TODO: @@ -77,6 +78,28 @@ log = L.setupLogger("AgentLogger", "Agent") """ ================================================================= + class StoppableThread +================================================================= +""" + +class StoppableThread(threading.Thread): + """Thread class with a stop() method. The thread itself has to check + regularly for the stopped() condition.""" + # See also https://www.oreilly.com/library/view/python-cookbook/0596001673/ch06s03.html + + def __init__(self, *args, **kwargs): + super(StoppableThread, self).__init__(*args, **kwargs) + self._stop_event = threading.Event() + + def stop(self): + self._stop_event.set() + + def stopped(self): + return self._stop_event.is_set() + + +""" +================================================================= class Agent ================================================================= """ @@ -90,9 +113,10 @@ class Agent: SIMULATOR_MODE = True SIMULATOR_COMMANDS = iter([ "go_active", - "go_idle", - "specific0_not_executed_because_idle", + + # specific0 not_executed_because_idle + "specific0", "go_active", @@ -102,12 +126,13 @@ class Agent: # specific2 executed_later_because_waiting_for_previous_specific_command_to_finish "specific2", + # Should abort the current running command (which should normally be specific1) + # even if specific2 is already pending + "abort", + # specific3 executed_later_because_waiting_for_previous_specific_command_to_finish "specific3", - # Should abort the current running command (which should normally be specific1) - # even if specific2 and 3 are already pending - #"abort" "go_active", "go_idle", @@ -142,7 +167,10 @@ class Agent: STATUS_LAUNCH = "LAUNCHED" STATUS_INIT = "INITIALIZING" STATUS_MAIN_LOOP = "IN_MAIN_LOOP" - STATUS_PROCESS_LOOP = "IN_PROCESS_LOOP" + STATUS_GET_NEXT_COMMAND = "IN_GET_NEXT_COMMAND" + STATUS_GENERAL_PROCESS = "IN_GENERAL_PROCESS" + STATUS_ROUTINE_PROCESS = "IN_ROUTINE_PROCESS" + STATUS_SPECIFIC_PROCESS = "IN_SPECIFIC_PROCESS" STATUS_EXIT = "EXITING" # Modes @@ -153,12 +181,15 @@ class Agent: CONFIG_DIR = "config" _agent_survey = None + _pending_commands = [] + _current_specific_cmd = None + _current_specific_thread = None - _iter_num = 0 + _iter_num = 1 - def __init__(self, name:str=None, config_filename:str=None): - self.set_mode(self.MODE_IDLE) + def __init__(self, name:str="Agent", config_filename:str=None): self.set_status(self.STATUS_LAUNCH) + self.set_mode(self.MODE_IDLE) self.name = name if not config_filename: #config_filename = '/PROJECTS/GFT/SOFT/PYROS_SOFT/CURRENT/config/config_unit_simulunit1.xml' @@ -176,6 +207,10 @@ class Agent: if self.config.get_last_errno() != self.config.NO_ERROR: raise Exception(f"Bad config file name '{config_filename}', error {str(self.config.get_last_errno())}: {str(self.config.get_last_errmsg())}") + self.set_mode_from_config(name) + # TODO: remove + self.set_idle() + # Create 1st survey if none #tmp = AgentSurvey.objects.filter(name=self.name) #if len(tmp) == 0: @@ -232,7 +267,6 @@ class Agent: print() print() #print("-"*80) - print("-"*20, f"MAIN LOOP ITERATION {self._iter_num} (START)", "-"*20) self.set_status(self.STATUS_MAIN_LOOP) self.show_mode_and_status() @@ -262,13 +296,12 @@ class Agent: # Sub-level loop (only if ACTIVE) if self.is_active(): - self.set_status(self.STATUS_PROCESS_LOOP) if cmd: self.specific_process(cmd) print("---") # Every N iterations, delete old commands N=3 - if (self._iter_num % N) == 0: Command.purge_old_commands_for_agent(self.name) + if ((self._iter_num-1) % N) == 0: Command.purge_old_commands_for_agent(self.name) self.waitfor(self.mainloop_waittime) @@ -282,7 +315,8 @@ class Agent: def routine_process(self): - pass + self.set_status(self.STATUS_ROUTINE_PROCESS) + """ def purge_commands(self): @@ -316,7 +350,7 @@ class Agent: time.sleep(nbsec) def set_status(self, status:str): - print(f"Switching from status {self.status} to status {status}") + print(f"[[NEW CURRENT STATUS: {status}]] (switching from status {self.status})") self.status = status return False @@ -458,11 +492,12 @@ class Agent: which is relevant to this agent. Commands are read in chronological order """ - + self.set_status(self.STATUS_GET_NEXT_COMMAND) print("Looking for new commands from the database ...") # 1) Get all pending commands for me (return if None) - commands = Command.get_pending_commands_for_agent(self.name) + self._pending_commands = Command.get_pending_commands_for_agent(self.name) + commands = self._pending_commands if commands is None: return None print("Current pending commands are (time ordered) :") Command.show_commands(commands) @@ -519,15 +554,15 @@ class Agent: def general_process(self, cmd:Command)->Command: + self.set_status(self.STATUS_GENERAL_PROCESS) + print(f"Starting general processing of {cmd}") + # Update read time to say that the command has been READ - ##assert cmd.receiver_read_time is None # f"Command {cmd} should not have been already read !!" cmd.set_read_time() - # Precondition: command cmd is valid (not expired), has already been read, is pending assert (not cmd.is_expired()) and cmd.is_pending() and cmd.is_read() #print(f"Starting general processing of command {cmd.name} sent by agent {cmd.sender} at {cmd.sender_deposit_time}") - print(f"Starting general processing of {cmd}") """ # 2) If expired command, change its status to expired and return @@ -544,9 +579,7 @@ class Agent: # If cmd is "exit", kill myself (without any question, this is an order soldier !) # This "exit" should normally kill any current thread (to be checked...) if cmd.name == "exit": exit(0) - # If cmd is "abort", kill any currently running thread - if cmd.name == "abort": - pass + # Command is executed, so return None return None # cmd is not generic @@ -568,6 +601,7 @@ class Agent: def exec_generic_cmd(self, cmd:Command): print("Starting execution of a Generic cmd...") cmd.set_as_running() + # Executing command if cmd.name == "go_active": self.set_active() @@ -575,12 +609,19 @@ class Agent: if cmd.name == "go_idle": self.set_idle() time.sleep(1) - #cmd.set_as_executed() + # If cmd is "abort", kill any currently running thread + if cmd.name == "abort": + print("Current pending commands are:") + Command.show_commands(self._pending_commands) + print("Aborting current executing command if exists:") + self.kill_running_specific_command_if_exists() + cmd.set_as_processed() print("...Generic cmd has been executed") + def do_log(self): """ log à 2 endroits ou 1 seul @@ -590,18 +631,57 @@ class Agent: print("Logging data...") - # @abstract - # to be implemented by subclasses def specific_process(self, cmd:Command): - #raise NotImplemented() """ Sublevel Loop (only if ACTIVE) : - PLUS TARD, maybe :start_process_thread() dans un thread : ensuite, à chaque tour de boucle il regarde si c'est fini ou pas, et si fini recommence """ + self.set_status(self.STATUS_SPECIFIC_PROCESS) assert self.is_active() - # TODO: LOG + self._current_specific_cmd = cmd + print("Starting specific process...") + #self._current_thread = threading.Thread(target=self.exec_command) + self._current_specific_thread = StoppableThread(target=self.exec_specific_command, args=(cmd,)) + self._current_specific_thread.start() + #my_thread.join() + #self.waitfor(self.subloop_waittime) + print("Ending specific process (thread has been launched)") + + """ + def get_current_specific_command(self): + return self._current_specific_cmd + """ + def exec_specific_command(self, cmd:Command): + """ specific command execution setup """ + #cmd = self.get_current_specific_command() + print(">>>>> Thread starting execution of ", cmd) + print(">>>>> PID: %s, Process Name: %s, Thread Name: %s" % ( + os.getpid(), + multiprocessing.current_process().name, + threading.current_thread().name) + ) + cmd.set_as_running() + + def exec_specific_command_finished(self, cmd:Command): + """ specific command execution tear up """ + cmd.set_as_processed() + print(">>>>> Thread ended execution of command", cmd.name) + cmd = None + # No more current thread + self._current_specific_thread = None + + + def kill_running_specific_command_if_exists(self): + if self._current_specific_thread: + print(f"Killing command {self._current_specific_cmd.name}") + # Ask the thread to stop itself + self._current_specific_thread.stop() + # Now, wait for the end of the thread + self._current_specific_thread.join() + self._current_specific_thread = None + self._current_specific_cmd.set_as_killed() + self._current_specific_cmd = None """ =================================== diff --git a/src/agent/AgentX.py b/src/agent/AgentX.py index 3e16f3c..b6a3c84 100644 --- a/src/agent/AgentX.py +++ b/src/agent/AgentX.py @@ -1,6 +1,6 @@ import utils.Logger as L -import threading, multiprocessing, os +#import threading, multiprocessing, os import time from .Agent import Agent @@ -10,31 +10,23 @@ from common.models import Command log = L.setupLogger("AgentXTaskLogger", "AgentX") -""" - Task to handle the execution of the program - - check the environment status in database - check the devices status (telescope / cameras) - check if the last schedule made has to be planned - launch schedule's sequences -""" class AgentX(Agent): - _current_cmd = None - - def __init__(self, name, config_filename=None): + + + # @override + def __init__(self, name:str=None, config_filename=None): + if name is None: name = self.__class__.__name__ super().__init__(name, config_filename) # @override def init(self): super().init() # --- Set the mode according the startmode value - agent_alias = self.__class__.__name__ - self.set_mode_from_config(agent_alias) - # TODO: remove - self.set_idle() + ##agent_alias = self.__class__.__name__ + ##self.set_mode_from_config(agent_alias) # @override ''' @@ -59,27 +51,12 @@ class AgentX(Agent): super().do_log() # @override - def specific_process(self, cmd:Command): - super().specific_process(cmd) - self._current_cmd = cmd - print("Starting specific process subloop...") - my_thread = threading.Thread(target=self.exec_command) - my_thread.start() - #my_thread.join() - - #self.waitfor(self.subloop_waittime) - - print("Ending specific process subloop...") - - def exec_command(self): - cmd = self._current_cmd - print(">>>>> Thread starting execution of ", cmd) - print(">>>>> PID: %s, Process Name: %s, Thread Name: %s" % ( - os.getpid(), - multiprocessing.current_process().name, - threading.current_thread().name) - ) - cmd.set_as_running() - time.sleep(7) - cmd.set_as_processed() - print(">>>>> Thread ended execution of ", cmd) + def exec_specific_command(self, cmd:Command): + """ This code is executed inside a thread """ + # thread execution setup + super().exec_specific_command(cmd) + + time.sleep(20) + + # thread execution tear up + self.exec_specific_command_finished(cmd) diff --git a/src/common/models.py b/src/common/models.py index b9e0898..9211083 100644 --- a/src/common/models.py +++ b/src/common/models.py @@ -240,6 +240,7 @@ class Command(models.Model): "CMD_EXECUTED", # cde exécutée => simulé par un sleep(3) dans AgentX.core_process()) "CMD_PENDING", # cde en attente d'exécution "CMD_SKIPPED", # cde ignorée (je suis idle… et j’ai ignoré cette commande, et je passe à la cde suivante) + "CMD_KILLED", # cde ignorée (je suis idle… et j’ai ignoré cette commande, et je passe à la cde suivante) "CMD_OUTOFDATE" # cde périmée ) GENERIC_COMMANDS = ["go_idle", "go_active", "abort", "exit"] @@ -383,7 +384,9 @@ class Command(models.Model): def set_read_time(self): self.receiver_read_time = datetime.utcnow().astimezone() self.save() + def set_as_processed(self): + print(f"- Set command {self.name} as processed") self.receiver_status_code = self.CMD_STATUS_CODES.CMD_EXECUTED self.receiver_processed_time = datetime.utcnow().astimezone() self.save() @@ -391,9 +394,17 @@ class Command(models.Model): def set_as_outofdate(self): print(f"- Set this command as expired (older than its validity duration of {self.validity_duration_sec}s): {self}") self.set_status_to(self.CMD_STATUS_CODES.CMD_OUTOFDATE) + def set_as_skipped(self): self.set_status_to(self.CMD_STATUS_CODES.CMD_SKIPPED) + + def set_as_killed(self): + print(f"- Set command {self.name} as killed") + #print(f"- Set this command as killed: {self}") + self.set_status_to(self.CMD_STATUS_CODES.CMD_KILLED) + def set_as_running(self): + print(f"- Set command {self.name} as running") self.set_status_to(self.CMD_STATUS_CODES.CMD_RUNNING) ''' def set_as_executed(self): -- libgit2 0.21.2