#!/usr/bin/env python3 VERSION = "0.5" DEBUG=True #DEBUG=False """TODO: - 1 log par agent + lastlog (30 dernières lignes) - table agents_log avec log minimum pour affichage dans dashboard, et ordre chrono intéressant pour suivi activité : nom agent, timestamp, message - 1 table par agent: agent__vars : nom variable, value, desc """ """ ================================================================= SETUP FOR DJANGO (see https://docs.djangoproject.com/en/dev/topics/settings) (see also https://docs.djangoproject.com/en/dev/ref/settings) ================================================================= """ import os from pathlib import Path import sys from django.conf import settings as djangosettings # Conseil sur le net: #https://stackoverflow.com/questions/16853649/how-to-execute-a-python-script-from-the-django-shell #"" #import sys, os #sys.path.append('/path/to/your/django/app') #os.environ['DJANGO_SETTINGS_MODULE'] = 'settings' #from django.conf import settings #"" # To avoid a "ModuleNotFoundError: No module named 'dashboard'"... (not even 1 app found) : ##sys.path.insert(0, os.path.abspath("..")) ##sys.path.insert(0, os.path.abspath("src")) ##sys.path.insert(0, "../src") ##sys.path.insert(0, "src") # To avoid a "ModuleNotFoundError: No module named 'dashboard'" ## sys.path.append("..") py_pwd = os.path.normpath(os.getcwd() + "/..") if (py_pwd not in os.sys.path): (os.sys.path).append(py_pwd) # To avoid a "ModuleNotFoundError: No module named 'src'" ## sys.path.append("../../../..") py_pwd = os.path.normpath(os.getcwd() + "/../../../..") if (py_pwd not in os.sys.path): (os.sys.path).append(py_pwd) ##sys.path.append("src") print("Starting with this sys.path", sys.path) # DJANGO setup # self.printd("file is", __file__) # mypath = os.getcwd() # Go into src/ ##os.chdir("..") ##os.chdir("src") print("Current directory : " + str(os.getcwd())) #os.environ.setdefault("DJANGO_SETTINGS_MODULE", "src.core.pyros_django.pyros.settings") os.environ.setdefault("DJANGO_SETTINGS_MODULE", "src.core.pyros_django.pyros.settings") # os.environ['SECRET_KEY'] = 'abc' # os.environ['ENVIRONMENT'] = 'production' import django django.setup() print("DB2 used is:", djangosettings.DATABASES["default"]["NAME"]) print() """ ================================================================= IMPORT PYTHON PACKAGES ================================================================= """ # --- GENERAL PURPOSE IMPORT --- #from __future__ import absolute_import import utils.Logger as L import platform import random from threading import Thread import threading, multiprocessing import time import socket #import ctypes #import copy # --- DJANGO IMPORT --- from django.db import transaction from django import db # from django.core.exceptions import ObjectDoesNotExist # from django.db.models import Q #from django.shortcuts import get_object_or_404 #from django.conf import settings as djangosettings # --- SPECIFIC IMPORT --- from common.models import AgentSurvey, Command, AgentLogs from config.configpyros import ConfigPyros #from dashboard.views import get_sunelev #from devices.TelescopeRemoteControlDefault import TelescopeRemoteControlDefault #from utils.JDManipulator import * from agent.logpyros import LogPyros """ ================================================================= GENERAL MODULE CONSTANT & FUNCTIONS DEFINITIONS ================================================================= """ #DEBUG_FILE = False log = L.setupLogger("AgentLogger", "Agent") IS_WINDOWS = platform.system() == "Windows" class Colors: HEADER = "\033[95m" BLUE = "\033[94m" GREEN = "\033[92m" WARNING = "\033[93m" FAIL = "\033[91m" ENDC = "\033[0m" BOLD = "\033[1m" UNDERLINE = "\033[4m" def printColor(color: Colors, message, file=sys.stdout, eol=os.linesep, forced=False): #system = platform.system() """ if (self.disp == False and forced == False): return 0 """ #if system == "Windows": if IS_WINDOWS: print(message, file=file, end=eol) else: print(color + message + Colors.ENDC, file=file, end=eol) return 0 def printFullTerm(color: Colors, string: str): #system = platform.system() columns = 100 row = 1000 disp = True value = int(columns / 2 - len(string) / 2) printColor(color, "-" * value, eol="") printColor(color, string, eol="") value += len(string) printColor(color, "-" * (columns - value)) return 0 """ ================================================================= class StoppableThread ================================================================= """ class StoppableThreadEvenWhenSleeping(threading.Thread): # Thread class with a stop() method. The thread itself has to check # regularly for the stopped() condition. # It stops even if sleeping # See https://python.developpez.com/faq/?page=Thread#ThreadKill # See also https://www.oreilly.com/library/view/python-cookbook/0596001673/ch06s03.html def __init__(self, *args, **kwargs): #super(StoppableThreadSimple, self).__init__(*args, **kwargs) super().__init__(*args, **kwargs) self._stop_event = threading.Event() #def stop(self): def terminate(self): self._stop_event.set() def stopped(self): return self._stop_event.is_set() def wait(self, nbsec:float=2.0): self._stop_event.wait(nbsec) """ ================================================================= class Agent ================================================================= """ class Agent: """ See Agent_activity_diag.pu for PlantUML activity diagram Behavior of an agent: - If idle : - still does routine_process() and general_process() - does not do specific_process() - Once a command has been sent to another agent : - It waits (non blocking) for the end of execution of the command and get its result - If command is timed out or has been skipped or killed, then it is NOT re-executed at next iteration (except if needed explicitely) """ # --- # --- CLASS (STATIC) attributes (CONSTANTS) # --- If agent is instance of Agent: # --- - CLASS attributes are accessible via agent.__class__.__dict__ # --- - INSTANCE attributes are accessible via agent.__dict__ # --- # Maximum duration of this agent (only for SIMULATION mode) # If set to None, it will never exit except if asked (or CTRL-C) # If set to 20, it will exit after 20s SIMULATOR_MAX_DURATION_SEC = None #SIMULATOR_MAX_DURATION_SEC = 30 # FOR TEST ONLY # Run this agent in simulator mode SIMULATOR_MODE = True # Run the assertion tests at the end SIMULATOR_WITH_TEST = False # Who should I send commands to ? SIMULATOR_COMMANDS_DEST = "myself" # Default scenario to be executed #SIMULATOR_COMMANDS = iter([ SIMULATOR_COMMANDS_LIST = [ "go_active", "go_idle", # specific0 not_executed_because_idle "specific0", "go_active", # specific1 executed_because_not_idle, should complete ok "specific1", # specific2 will be executed only when specific1 is finished, # and should be aborted before end of execution, # because of the 1st coming "abort" command below "specific2", # specific3 should be executed only when specific2 is finished (in fact, aborted), # and should be aborted before end of execution, # because of the 2nd coming "abort" command below "specific3", # These commands should not have the time to be processed # because the "exit" command below should be executed before "specific4", "specific5", "specific6", "specific7", "specific8", # Should abort the current running command (which should normally be specific2) # even if commands above (specific3, ..., specific8) are already pending "abort", # These commands (except abort) won't be executed # because too many commands are already pending (above) "specific9", "abort", "go_active", "go_idle", # Should stop the agent even before the previous pending commands are executed "exit", # Because of the previous "exit" command, # these following commands should not be executed, # and not even be added to the database command table "go_active", "specific10" ] #SIMULATOR_COMMANDS = iter(SIMULATOR_COMMANDS_LIST) """ How to run this agent exec_specific_cmd() method ? - True = inside a Thread (cannot be killed, must be asked to stop, and inadequate for computation) - False = inside a Process If thread, displays : >>>>> Thread: starting execution of command specific1 >>>>> Thread: PID: 2695, Process Name: MainProcess, Thread Name: Thread-1 ... >>>>> Thread: starting execution of command specific2 >>>>> Thread: PID: 2695, Process Name: MainProcess, Thread Name: Thread-2 ... >>>>> Thread: starting execution of command specific3 >>>>> Thread: PID: 2695, Process Name: MainProcess, Thread Name: Thread-3 If process, displays : >>>>> Thread: starting execution of command specific1 >>>>> Thread: PID: 2687, Process Name: Process-1, Thread Name: MainThread ... >>>>> Thread: starting execution of command specific2 >>>>> Thread: PID: 2689, Process Name: Process-2, Thread Name: MainThread ... >>>>> Thread: starting execution of command specific3 >>>>> Thread: PID: 2690, Process Name: Process-3, Thread Name: MainThread """ # with thread RUN_IN_THREAD = True # with process #RUN_IN_THREAD = False _thread_total_steps_number = 1 #COMMANDS_PEREMPTION_HOURS = 48 #COMMANDS_PEREMPTION_HOURS = 60/60 name = "Generic Agent" mainloop_waittime = 3 subloop_waittime = 2 status = None mode = None config = None # Statuses STATUS_LAUNCH = "LAUNCHED" STATUS_INIT = "INITIALIZING" STATUS_MAIN_LOOP = "IN_MAIN_LOOP" STATUS_GET_NEXT_COMMAND = "IN_GET_NEXT_COMMAND" STATUS_GENERAL_PROCESS = "IN_GENERAL_PROCESS" STATUS_ROUTINE_PROCESS = "IN_ROUTINE_PROCESS" STATUS_SPECIFIC_PROCESS = "IN_SPECIFIC_PROCESS" STATUS_EXIT = "EXITING" # Modes MODE_ACTIVE = "ACTIVE" MODE_IDLE = "IDLE" PYROS_DJANGO_BASE_DIR = Path("src/core/pyros_django") # pathlib DEFAULT_CONFIG_FILE_NAME = "config_unit_simulunit1.xml" CONFIG_DIR_NAME = "config" # Parameters from config file # for /src/ #_path_data = '../../config' # for /src/core/pyros_django/ #_path_data = '../../../../config' # Path to config _path_data = '' _computer_alias = '' _computer_description = '' # Current and next command to send cmdts = None next_cmdts = None _agent_survey = None _pending_commands = [] _current_specific_cmd = None _current_specific_thread = None _iter_num = None # Log object _log = None def __init__(self, name:str="Agent", config_filename:str=None, RUN_IN_THREAD=True): self.name = name self._log = LogPyros(self.name,AgentLogs) self._log.debug_level = DEBUG # New way with PathLib my_parent_abs_dir = Path(__file__).resolve().parent #TODO: on doit pouvoir faire mieux avec pathlib (sans utiliser str()) self._path_data = str( Path( str(my_parent_abs_dir).split(str(self.PYROS_DJANGO_BASE_DIR))[0] ) / self.CONFIG_DIR_NAME ) #self.set_mode(self.MODE_IDLE) if not config_filename: config_filename = self.DEFAULT_CONFIG_FILE_NAME # If config file name is RELATIVE (i.e. without path, just the file name) # => give it an absolute path (and remove "src/agent/" from it) if config_filename == os.path.basename(config_filename): # Build abs path including current agent dir config_filename = os.path.abspath(self.CONFIG_DIR_NAME + os.sep + config_filename) # Remove "src/agent_name/" from abs dir : # (1) Remove "src/core/pyros_django/" config_filename = config_filename.replace(str(self.PYROS_DJANGO_BASE_DIR)+os.sep, os.sep) # (2) Remove "agent_name/" #TODO: bidouille, faire plus propre config_filename = config_filename.replace(os.sep+"agent"+os.sep, os.sep) config_filename = config_filename.replace(os.sep+"monitoring"+os.sep, os.sep) config_filename = os.path.normpath(config_filename) self.printd(f"Config file used is={config_filename}") self.config = ConfigPyros(config_filename) if self.config.get_last_errno() != self.config.NO_ERROR: raise Exception(f"Bad config file name '{config_filename}', error {str(self.config.get_last_errno())}: {str(self.config.get_last_errmsg())}") ### self._agent_logs = AgentLogs.objects.create(name=self.name, message="Step __init__") self.printdb("Step __init__") self.SIMULATOR_COMMANDS = iter(self.SIMULATOR_COMMANDS_LIST) self.RUN_IN_THREAD = RUN_IN_THREAD self.set_status(self.STATUS_LAUNCH) self.set_idle() self.set_mode_from_config(name) # TODO: remove #self.set_idle() self.set_active() # Create 1st survey if none #tmp = AgentSurvey.objects.filter(name=self.name) #if len(tmp) == 0: #nb_agents = AgentSurvey.objects.filter(name=self.name).count() #if nb_agents == 0: if AgentSurvey.objects.filter(name=self.name).exists(): self._agent_survey = AgentSurvey.objects.get(name=self.name) else: self._agent_survey = AgentSurvey.objects.create(name=self.name, validity_duration=60, mode=self.mode, status=self.status, iteration=-1) self.printd("Agent survey is", self._agent_survey) def __repr__(self): return "I am agent " + self.name def __str__(self): return self.__repr__() #return "I am agent " + self.name # Normal print def print(self, *args, **kwargs): self._log.print(*args, **kwargs) """ if args: print(f"({self.name}): ", *args, **kwargs) else: print() """ # DEBUG print def printd(self, *args, **kwargs): self._log.printd(*args, **kwargs) """ if DEBUG: self.print(*args, **kwargs) """ def printdb(self, *args, **kwargs): self._log.db( *args, **kwargs) def sleep(self, nbsec:float=2.0): # thread if self._current_specific_thread and self.RUN_IN_THREAD: self._current_specific_thread.wait(nbsec) # process (or main thread) else: time.sleep(nbsec) def run(self, nb_iter:int=None, FOR_REAL:bool=True): """ FOR_REAL: set to False if you don't want Agent to send commands to devices but just print messages without really doing anything """ self.DO_EXIT = False self.DO_RESTART = True # Will loop again only if DO_RESTART is True while self.DO_RESTART: self.DO_RESTART = False self.start_time = time.time() self.printd("on est ici: ", os.getcwd()) self.load_config() self.print_simulator_mode() self.init() # Avoid blocking on false "running" commands # (old commands that stayed with "running" status when agent was killed) Command.delete_commands_with_running_status_for_agent(self.name) self._iter_num = 1 self.DO_MAIN_LOOP = True # Main loop while self.DO_MAIN_LOOP: try: self.main_loop(nb_iter,FOR_REAL) if not self.DO_MAIN_LOOP: break except KeyboardInterrupt: # In case of CTRL-C, kill the current thread (process) before dying (in error) self.print("CTRL-C Interrupted, I kill the current thread (process) before exiting (if exists)") self.kill_running_specific_cmd_if_exists("USER_CTRLC") exit(1) #if self.DO_EXIT: exit(0) def main_loop(self, nb_iter:int=None, FOR_REAL:bool=True): # Bad number of iterations, so exit if nb_iter is not None: if nb_iter <= 0: self.DO_MAIN_LOOP = False return # Number of iterations asked is reached, so exit if self._iter_num > nb_iter: print(f"Exit because number of iterations asked ({nb_iter}) has been reached") self.DO_MAIN_LOOP = False return self.print() self.print() #self.printd("-"*80) self.print("-"*20, f"MAIN LOOP ITERATION {self._iter_num} (START)", "-"*20) self.printdb(f"Iteration {self._iter_num}") self.set_status(self.STATUS_MAIN_LOOP) self.show_mode_and_status() # Wait a random number of sec before starting iteration # (to let another agent having the chance to send a command before me) random_waiting_sec = random.randint(0,5) self.print(f"Waiting {random_waiting_sec} sec (random) before starting new iteration...") time.sleep(random_waiting_sec) self.load_config() # only if changed # Log this agent status (update my current mode and status in DB) self.log_agent_status() self.printd("------START COMMMAND PROCESSING------") # Purge commandes (every N iterations, delete old commands) N=5 if ((self._iter_num-1) % N) == 0: self.print("Looking for old commands to purge...") #Command.purge_old_commands_for_agent(self.name) self.purge_old_commands_sent_to_me() # ROUTINE process #if self.is_active(): self.routine_process() #self.printd("I am IDLE, so I bypass the routine_process (do not send any new command)") # Get next command and process it (if exists) cmd = self.get_next_valid_and_not_running_command() #if cmd: self.command_process(cmd) if cmd: # GENERIC cmd : execute it always if cmd.is_generic(): self.exec_generic_cmd(cmd) # SPECIFIC cmd : execute it only if active and no currently running specific cmd else: self.exec_specific_cmd_if_possible(cmd) # if restart, exit this loop to restart from beginning if self.DO_RESTART or self.DO_EXIT: self.DO_MAIN_LOOP = False return self.printd("------END COMMMAND PROCESSING------") #self.waitfor(self.mainloop_waittime) self.print("-"*20, "MAIN LOOP ITERATION (END)", "-"*20) #self.do_log(LOG_DEBUG, "Ending main loop iteration") # (simulator only) Exit if max duration is reached if self.SIMULATOR_MAX_DURATION_SEC and (time.time()-self.start_time > self.SIMULATOR_MAX_DURATION_SEC): self.print("Exit because of max duration set to ", self.SIMULATOR_MAX_DURATION_SEC, "s") self.kill_running_specific_cmd_if_exists(self.name) if self.SIMULATOR_MODE and self.SIMULATOR_WITH_TEST: self.simulator_test_results() self.DO_MAIN_LOOP = False return self._iter_num += 1 def print_simulator_mode(self): if self.SIMULATOR_MODE: self._log.print("[IN SIMULATOR MODE]") self.print("flush previous commands to be sure to start in clean state") Command.delete_pending_commands_for_agent(self.name) else: self._log.print("[IN NORMAL MODE]") self.SIMULATOR_MAX_DURATION_SEC=None def purge_old_commands_sent_to_me(self): Command.purge_old_commands_sent_to_agent(self.name) # To be overriden by subclass def routine_process(self): """ Routine processing. This is a command or set of commands that this agent sends regularly, (or just a regular processing) at each iteration """ self.set_status(self.STATUS_ROUTINE_PROCESS) if self.SIMULATOR_MODE: self.simulator_routine_process() """ def purge_commands(self): ### Delete commands (which I am recipient of) older than COMMANDS_PEREMPTION_HOURS (like 48h) ATTENTION !!! EXCEPT the RUNNING command !!! NB: datetime.utcnow() is equivalent to datetime.now(timezone.utc) ### self.printd("Looking for old commands to purge...") ### COMMAND_PEREMPTION_DATE_FROM_NOW = datetime.utcnow() - timedelta(hours = self.COMMANDS_PEREMPTION_HOURS) #self.printd("peremption date", COMMAND_PEREMPTION_DATE_FROM_NOW) old_commands = Command.objects.filter( # only commands for me recipient = self.name, # only pending commands sender_deposit_time__lt = COMMAND_PEREMPTION_DATE_FROM_NOW, ) ### old_commands = Command.get_old_commands_for_agent(self.name) if old_commands.exists(): self.printd("Found old commands to delete:") for cmd in old_commands: self.printd(cmd) old_commands.delete() """ def waitfor(self, nbsec): self.printd(f"Now, waiting for {nbsec} seconds...") time.sleep(nbsec) def set_status(self, status:str): #self.print(f"[{status}] (switching from status {self.status})") self.print(f"[{status}]") self.status = status return False def set_mode(self, mode:str): #self.print(f"Switching from mode {self.mode} to mode {mode}") self.print(f"[NEW MODE {mode}]") self.mode = mode def is_active(self): return self.mode == self.MODE_ACTIVE def is_idle(self): return not self.is_active() def set_active(self): self.set_mode(self.MODE_ACTIVE) def set_idle(self): self.set_mode(self.MODE_IDLE) def show_mode_and_status(self): self.print(f"CURRENT MODE is {self.mode} (status is {self.status})") def die(self): self.set_status(self.STATUS_EXIT) """ suspend/resume """ def suspend(self): """ TODO: Mode IDLE (doit rester à l'écoute d'un resume, et doit continuer à alimenter les tables pour informer de son état via tables agents_logs, et lire table agents_command pour reprendre via resume, et update la table agents_survey pour donner son status "idle" """ self.set_idle() return True def resume(self): """ Quit suspend() mode """ self.set_active() return True def set_mode_from_config(self, agent_alias): # --- Get the startmode of the AgentX modestr = self.config.get_paramvalue(agent_alias,'general','startmode') if self.config.get_last_errno() != self.config.NO_ERROR: raise Exception(f"error {str(self.config.get_last_errno())}: {str(self.config.get_last_errmsg())}") if (modestr == None): return True # --- Set the mode according the startmode value mode = self.MODE_IDLE if modestr.upper() == 'RUN': mode = self.MODE_ACTIVE self.set_mode(mode) return True """ ================================================================= Generic methods that may be specialized (overriden) by subclasses ================================================================= """ def init(self): self.printd("Initializing...") self.set_status(self.STATUS_INIT) def load_config(self): """ TODO: only si date fichier xml changée => en RAM, un objet Config avec méthodes d'accès, appelle le parser de AK (classe Config.py indépendante) """ ''' # SETUP try: self.config = get_object_or_404(Config, id=1) # By default, set mode to SCHEDULER (False = REMOTE, which should never be the default) self.config.global_mode = True self.config.save() # self.config = Config.objects.get(pk=1) # self.config = Config.objects.get()[0] except Exception as e: # except Config.ObjectDoesNotExist: self.printd("Config read (or write) exception", str(e)) return -1 ''' self.printd("Loading the config file...") self.config.load() if self.config.get_last_errno() != self.config.NO_ERROR: raise Exception(f"error {str(self.config.get_last_errno())}: {str(self.config.get_last_errmsg())}") if not self.config.is_config_contents_changed(): return # === display informations # --- Get the assembly aliases of the unit assembled_mount_aliases = [] assembled_channel_aliases = [] assembled_aliases = [] unit_alias = self.config.get_aliases('unit')[0] params = self.config.get_params(unit_alias) for param in params: if param['section']=="assembly" and param['key']=="alias": assembled_aliases.append(param['value']) #print(f"Unit {unit_alias} is the assembly of {assembled_aliases}") self.print("--------- Components of the unit -----------") self.print("Configuration file is {}".format(self.config.get_configfile())) alias = self.config.get_aliases('unit')[0] namevalue = self.config.get_paramvalue(alias,'unit','description') self.print(f"Unit alias is {alias}. Description is {namevalue}:") unit_subtags = self.config.get_unit_subtags() for unit_subtag in unit_subtags: aliases = self.config.get_aliases(unit_subtag) for alias in aliases: namevalue = self.config.get_paramvalue(alias,unit_subtag,'description') self.print(f"- {unit_subtag} alias is {alias}. Description is {namevalue}") # --- fill the list of mount and channel assembled if alias in assembled_aliases: if unit_subtag=="mount": assembled_mount_aliases.append(alias) elif unit_subtag=="channel": assembled_channel_aliases.append(alias) self.print("--------- Assembly of the unit -----------") self.print(f"Assembled mount aliases: {assembled_mount_aliases}") self.print(f"Assembled channel aliases: {assembled_channel_aliases}") # --- Get the home of the mount[0] mount_alias = assembled_mount_aliases[0] home = self.config.get_paramvalue(mount_alias,'MountPointing','home') self.print("------------------------------------------") hostname = socket.gethostname() self._computer_alias = '' unit_subtag = 'computer' aliases = self.config.get_aliases(unit_subtag) for alias in aliases: self.printd("alias", alias) value = self.config.get_paramvalue(alias,'local','hostname') self.printd("value", value) if value == hostname: self.printd("value", value) self._computer_alias = alias value = self.config.get_paramvalue(alias,unit_subtag,'description') self._computer_description = value value = self.config.get_paramvalue(alias,'path','data') # Overrides default value self._path_data = value break self.print(f"hostname = {hostname}") self.print(f"path_data = {self._path_data}") self.print(f"home = {home}") self.print("------------------------------------------") # --- update the log parameters self._log.path_data = self._path_data self._log.home = home #def update_survey(self): def log_agent_status(self): """ Save (update) this agent current mode and status in DB """ self.printd("Updating the agent survey database table...") #self.printd("- fetching table line for agent", self.name) # only necessary when using process (not necessary with threads) #with transaction.atomic(): #self._agent_survey = AgentSurvey.objects.get(name=self.name) self._agent_survey.mode = self.mode self._agent_survey.status = self.status self._agent_survey.iteration = self._iter_num self._agent_survey.save() #self._agent_survey.save(update_fields=["mode", "status"]) """ def send_command(self, cmd_name): recipient_agent = self.name if self.SIMULATOR_COMMANDS_DEST=="myself" else self.SIMULATOR_COMMANDS_DEST Command.objects.create(sender=self.name, recipient=recipient_agent, name=cmd_name) """ #def send_command(self, to_agent, cmd_type, cmd_name, cmd_args=None): def send_command(self, to_agent, cmd_name, cmd_args=None): """ #ex: send_command(“AgentX”,”GENERIC”,”EVAL”,“3+4”) ex: send_command(“AgentX”,"EVAL”,“3+4”) """ return Command.send_command(self.name, to_agent, cmd_name, cmd_args) def get_next_valid_and_not_running_command(self)->Command: """ Return next VALID (not expired) command (read from the DB command table) which is relevant to this agent. Commands are read in chronological order """ self.set_status(self.STATUS_GET_NEXT_COMMAND) #self.printd("Looking for new commands from the database ...") # 1) Get all pending commands for me (return if None) # Not sure this is necessary to do it in a transaction, # but there might be a risk # that a command status is modified while we are reading... with transaction.atomic(): self._pending_commands = Command.get_pending_and_running_commands_for_agent(self.name) commands = self._pending_commands if not commands.exists(): self.printd("No new command to process") return None self.print("Current pending commands are (time ordered) :") Command.show_commands(commands) # 2) If there is a "exit" or "abort" command pending (even at the end of the list), # which is VALID (not expired), # then pass it straight away to general_process() for execution for cmd in commands: if cmd.name in ("exit", "abort", "flush_commands"): break if cmd.name in ("exit", "abort", "flush_commands"): if cmd.is_running(): return None if cmd.is_expired(): cmd.set_as_outofdate() return None return cmd # 3) If first (oldest) command is currently running # (status CMD_RUNNING), then do nothing and return """ cmd_executing = Command.objects.filter( # only commands for me recipient = self.name, # only pending commands recipient_status_code = Command.CMD_STATUS_CODES.CMD_RUNNING, ).first() #if cmd_executing.exists(): if cmd_executing: """ #cmd = commands[0] cmd = commands.first() if cmd.is_expired(): cmd.set_as_outofdate() return None if cmd.is_running(): #self.printd(f"There is currently a running command ({cmd_executing.first().name}), so I do nothing (wait for end of execution)") self.printd(f"There is currently a running command ({cmd.name})") """ # Check that this command is not expired if cmd.is_expired(): self.printd("But this command is expired, so set its status to OUTOFDATE, and go on") cmd_executing.set_as_outofdate() else: """ self.printd(f"Thus, I won't execute any new command until this command execution is finished") # TODO: kill si superieur a MAX_EXEC_TIME return None ''' # 4) Tag all expired commands for cmd in commands: if cmd.is_expired(): cmd.set_as_outofdate() # break at 1st "valid" command (not expired) else: break # 5) If no more commands to process, return None if cmd.is_expired(): return None ''' # 6) Current cmd must now be a valid (not expired) and PENDING one, # so return it for execution #self.printd(f"Got command {cmd.name} sent by agent {cmd.sender} at {cmd.sender_deposit_time}") self.set_status(self.STATUS_GENERAL_PROCESS) self.print("***") self.print("*** RECEIVED", cmd) self.print("***") self.printd(f"Starting processing of this command") return cmd def AVIRER_command_process(self, cmd:Command)->Command: assert cmd is not None cmd = self.general_process(cmd) # Sub-level loop (only if ACTIVE) if cmd and self.is_active(): self.specific_process(cmd) def AVIRER_general_process(self, cmd:Command)->Command: assert cmd is not None self.set_status(self.STATUS_GENERAL_PROCESS) self.print("***") self.print("*** RECEIVED", cmd) self.print("***") self.printd(f"Starting general processing of this command") # Update read time to say that the command has been READ cmd.set_read_time() # Precondition: command cmd is valid (not expired), has already been read, is pending assert (not cmd.is_expired()) and cmd.is_pending() and cmd.is_read() #self.printd(f"Starting general processing of command {cmd.name} sent by agent {cmd.sender} at {cmd.sender_deposit_time}") """ # 2) If expired command, change its status to expired and return if cmd.is_expired(): self.printd("This command is expired, so mark it as such, and ignore it") cmd.set_as_outofdate() return None """ # If cmd is generic, execute it, change its status to executed, and return if cmd.is_generic(): self.print("This command is generic, execute it...") self.exec_generic_cmd(cmd) # If cmd is "exit", kill myself (without any question, this is an order soldier !) # This "exit" should normally kill any current thread (to be checked...) if cmd.name == "exit": self.printd("(before exiting) Here are the current (still) pending commands (time ordered) :") commands = Command.get_pending_and_running_commands_for_agent(self.name) Command.show_commands(commands) #if self.SIMULATOR_MODE and self.SIMULATOR_WITH_TEST and self.SIMULATOR_COMMANDS_DEST == "myself": self.simulator_test_results() if self.SIMULATOR_MODE and self.SIMULATOR_WITH_TEST: self.simulator_test_results() #self.DO_EXIT=True #exit(0) # Command is executed, so return None return None # cmd is not generic else: # cmd is not generic but, as I am idle, change its status to SKIPPED, ignore it, and return #if self.mode == self.MODE_IDLE: if self.is_idle(): self.printd("This command is not generic but, as I am IDLE, I mark it SKIPPED and ignore it") cmd.set_as_skipped() return None # Je suis pas idle et cde pas générique: je la traite pas, elle sera traitée par core_process : # attendre que cette commande soit exécutée avant de passer à la commande suivante (situation “bloquante” normale) self.printd("This command is not generic and, as I am not IDLE, I pass it to the specific processing") self.printd("(then I will not execute any other new command until this command is EXECUTED)") return cmd def exec_generic_cmd(self, cmd:Command): self.printd(f"Starting execution of a GENERIC cmd {cmd}") # Update read time to say that the command has been READ cmd.set_read_time() cmd.set_as_running() # Executing command if cmd.name == "go_active": self.set_active() cmd.set_result("I am now active") time.sleep(1) elif cmd.name == "go_idle": self.set_idle() cmd.set_result("I am now idle") time.sleep(1) elif cmd.name in ("flush_commands"): self.print("flush_commands received: Delete all pending commands") Command.delete_pending_commands_for_agent(self.name) cmd.set_result('DONE') elif cmd.name in ("abort", "exit", "restart_init"): #self.printd("Current pending commands are:") #Command.show_commands(self._pending_commands) self.print("Aborting current executing command if exists:") self.kill_running_specific_cmd_if_exists(cmd.sender) if cmd.name == "restart_init": self.print("restart_init received: Restarting from init()") self.DO_RESTART=True elif cmd.name == "exit": self.DO_EXIT=True cmd.set_result('SHOULD BE DONE NOW') else: name = cmd.name args = None if " " in name: name,args = name.split() if name == "eval": if args==None: raise(ValueError) cmd.set_result(eval(args)) cmd.set_as_processed() self.printd("...Generic cmd has been executed") # If cmd is "exit", kill myself (without any question, this is an order soldier !) # This "exit" should normally kill any current thread (to be checked...) if cmd.name == "exit": self.printd("(before exiting) Here are the current (still) pending commands (time ordered) :") commands = Command.get_pending_and_running_commands_for_agent(self.name) Command.show_commands(commands) #if self.SIMULATOR_MODE and self.SIMULATOR_WITH_TEST and self.SIMULATOR_COMMANDS_DEST == "myself": self.simulator_test_results() if self.SIMULATOR_MODE and self.SIMULATOR_WITH_TEST: self.simulator_test_results() #self.DO_EXIT=True #exit(0) def do_log(self): """ log à 2 endroits ou 1 seul - in file - in db """ self.printd("Logging data...") def exec_specific_cmd_if_possible(self, cmd:Command): self.set_status(self.STATUS_SPECIFIC_PROCESS) self.printd(f"Starting execution of a SPECIFIC cmd {cmd}") if self.is_idle(): self.printd("I am IDLE ==> I mark the cmd SKIPPED and ignore it") cmd.set_as_skipped() return if self.is_running_specific_cmd(): self.printd("There is still a thread executing a command ==> I cannot execute this new one now (I will try again to execute it at next iteration)") return """ Dans le cas du Majordome, cette methode doit lancer la prise d'une séquence. La sequence elle même va être une boucle. Donc on peut voir une séquence comme un appel unique à une fonction qui va durer un certain temps (max 20 min par exemple) mais dans cette méthode il y a une boucle sur la prise des images. """ self.printd("Starting specific process...") self._current_specific_cmd = cmd # Update read time to say that the command has been READ cmd.set_read_time() #self._current_thread = threading.Thread(target=self.exec_command) # Run in a thread if self.RUN_IN_THREAD: self.printd("(run cmd in a thread)") self._current_specific_thread = StoppableThreadEvenWhenSleeping(target=self.thread_exec_specific_cmd) #self._current_specific_thread = StoppableThreadEvenWhenSleeping(target=self.exec_specific_cmd, args=(cmd,)) #self._current_thread = threading.Thread(target=self.exec_command) #self._current_specific_thread = StoppableThread(target=self.exec_specific_cmd, args=(cmd,)) #self._current_specific_thread = threading.Thread(target=self.exec_specific_cmd, args=(cmd,)) #self._current_specific_thread = thread_with_exception('thread test') # Run in a process else: self.printd("(run cmd in a process)") # close the database connection first, it will be re-opened in each process db.connections.close_all() self._current_specific_thread = multiprocessing.Process(target=self.thread_exec_specific_cmd) #self._current_specific_thread = multiprocessing.Process(target=self.exec_specific_cmd, args=(cmd,)) #self._current_specific_thread = threading.Thread(target=self.exec_specific_cmd, args=(cmd,)) #self._current_specific_thread = thread_with_exception('thread test') self._current_specific_thread.start() #my_thread.join() #self.waitfor(self.subloop_waittime) self.printd("Ending specific process (thread has been launched)") def is_running_specific_cmd(self): #return (self._current_specific_thread is not None) or self._current_specific_thread.is_alive() #return (self._current_specific_thread is None) or not self._current_specific_thread.is_alive() return self._current_specific_thread and self._current_specific_thread.is_alive() def is_in_simulator_mode(self): return self.SIMULATOR_MODE def kill_running_specific_cmd_if_exists(self, abort_sender): if not self.is_running_specific_cmd(): self.print("...No current specific command thread to abort...") else: self.print(f"Killing command {self._current_specific_cmd.name}") # Ask the thread to stop itself #self._current_specific_thread.stop() #self._current_specific_thread._stop() #self._current_specific_thread.shutdown() #threading._shutdown() #self._current_specific_thread.raise_exception() self._current_specific_cmd.set_as_killed_by(abort_sender) self._current_specific_thread.terminate() # Now, wait for the end of the thread if self.RUN_IN_THREAD: self._current_specific_thread.join() self._current_specific_thread = None #self._current_specific_cmd.set_as_killed() self._current_specific_cmd = None """ ================================================================= SIMULATOR DEDICATED FUNCTIONS ================================================================= """ def setSimulatorMode(self, mode): self.SIMULATOR_MODE=mode def simulator_get_next_command_to_send(self)->Command: cmd_name = next(self.SIMULATOR_COMMANDS, None) #return cmd_name if cmd_name is None: return None recipient_agent = self.name if self.SIMULATOR_COMMANDS_DEST=="myself" else self.SIMULATOR_COMMANDS_DEST return Command(sender=self.name, recipient=recipient_agent, name=cmd_name) """ def simulator_send_next_command(self): #self._current_test_cmd = "go_idle" if self._current_test_cmd=="go_active" else "go_active" #if self._nb_test_cmds == 4: self._current_test_cmd = "exit" cmd_name = next(self.SIMULATOR_COMMANDS, None) #self.printd("next cmd is ", cmd_name) if cmd_name is None: return #Command.objects.create(sender=self.name, recipient=self.name, name=cmd_name) recipient_agent = self.name if self.SIMULATOR_COMMANDS_DEST=="myself" else self.SIMULATOR_COMMANDS_DEST Command.objects.create(sender=self.name, recipient=recipient_agent, name=cmd_name) #time.sleep(1) #self._simulator_current_cmd_idx += 1 #self._nb_test_cmds += 1 """ def simulator_routine_process(self): """ SIMULATOR MODE ONLY """ # There is a current command being processed # => check if next command is "abort" # => if so, instantly send an "abort" to abort previous command if self.cmdts is not None: self.print(f"Waiting for end of cmd '{self.cmdts.name}' execution...") # Update cmdts fields from DB self.cmdts.refresh_from_db() if self.cmdts.is_pending() or self.cmdts.is_running(): if self.next_cmdts is None: # If next command is "abort" then abort becomes the new current command (to be sent) self.next_cmdts = self.simulator_get_next_command_to_send() if self.next_cmdts and self.next_cmdts.name == "abort": # Wait a little to give a chance to agentB to start execution of current command, # so that we can abort it then (otherwise it won't be aborted!!) time.sleep(4) self.cmdts = self.next_cmdts self.next_cmdts = None self.print("***") self.print(f"*** SEND command", self.cmdts) self.print("***") self.cmdts.send() # Current cmd is no more running else: # Execution was not completed #if self.cmdts.is_expired() or self.cmdts.is_skipped() or self.cmdts.is_killed(): if self.cmdts.is_skipped() or self.cmdts.is_killed(): self.printd("Command was not completed") # 2 possible scenarios: # - (1) Send the SAME command again ''' self.printd("Command was not completed, so I send it again") # The command was not completed, so, make a copy of it and send it again # For this, it is enough to set primary key to None, # then the send() command below will save a NEW command #self.cmdts = copy.copy(self.cmdts) self.cmdts.id = None SEND_A_NEW_COMMAND = True ''' # - (2) Send next command #self.cmdts = None # Execution was not complete => get result elif self.cmdts.is_executed(): cmdts_res = self.cmdts.get_result() self.print(f"Cmd executed. Result is '{cmdts_res}'") #cmdts_is_processed = True ''' Optimisation possible pour gagner une iteration: self.cmdts = self.simulator_get_next_command_to_send() # No more command to send (from simulator) => return if self.cmdts is None: return SEND_A_NEW_COMMAND = True ''' # Set cmdts to None so that a new command will be sent at next iteration self.cmdts = None # No currently running command => get a new command and SEND it if self.cmdts is None: if self.next_cmdts is not None: self.cmdts = self.next_cmdts self.next_cmdts = None else: self.cmdts = self.simulator_get_next_command_to_send() # No more command to send (from simulator) => return if self.cmdts is None: return # Send cmd (= set as pending and save) self.print("***") self.print(f"*** SEND command", self.cmdts) self.print("***") #self.cmdts.set_as_pending() # SEND self.cmdts.send() #cmdts_is_processed = False #cmdts_res = None def simulator_test_results(self): if self.SIMULATOR_COMMANDS_LIST == [] : return commands = self.simulator_test_results_start() nb_asserted = self.simulator_test_results_main(commands) self.simulator_test_results_end(nb_asserted) def simulator_test_results_start(self): self.print("\n--- Testing if the commands I SENT had the awaited result") self.print("Here are the last commands I sent:") #commands = list(Command.get_last_N_commands_for_agent(self.name, 16)) #commands = Command.get_last_N_commands_sent_to_agent(self.name, 16) commands = Command.get_last_N_commands_sent_by_agent(self.name, len(self.SIMULATOR_COMMANDS_LIST)) Command.show_commands(commands) assert commands[0].name == self.SIMULATOR_COMMANDS_LIST[0] assert commands[-1].name == self.SIMULATOR_COMMANDS_LIST[-1] return commands """ OLD SCENARIO nb_asserted = 0 for cmd in commands: if cmd.name == "specific0": assert cmd.is_skipped() nb_asserted+=1 if cmd.name == "specific1": assert cmd.result == "in step #5/5" assert cmd.is_executed() nb_asserted+=1 if cmd.name in ("specific2","specific3"): assert cmd.is_killed() nb_asserted+=1 if cmd.name in ("specific4", "specific5", "specific6", "specific7", "specific8"): assert cmd.is_pending() nb_asserted+=1 # 2 cmds abort if cmd.name in ("abort"): assert cmd.is_executed() nb_asserted+=1 if cmd.name in ("exit"): assert cmd.is_executed() nb_asserted+=1 assert nb_asserted == 12 self.printd("--- Finished testing => result is ok") """ # To be overriden by subclass def simulator_test_results_main(self, commands): nb_asserted = 0 for cmd in commands: assert cmd.is_executed() nb_asserted+=1 return nb_asserted def simulator_test_results_end(self, nb_asserted): nb_commands_to_send = len(self.SIMULATOR_COMMANDS_LIST) assert nb_asserted == nb_commands_to_send #self.print(f"************** Finished testing => result is ok ({nb_asserted} assertions) **************") printFullTerm(Colors.GREEN, f"************** Finished testing => result is ok ({nb_asserted} assertions) **************") """ ================================================================= FUNCTIONS RUN INSIDE A THREAD (OR A PROCESS) (thread_*()) ================================================================= """ def thread_exec_specific_cmd(self): assert self._current_specific_thread is not None # thread execution setting up self.thread_exec_specific_cmd_start() # processing body (main) # Specific case of the EVAL command if self._current_specific_cmd.name.startswith("eval"): self.thread_set_total_steps_number(1) self.thread_exec_specific_cmd_step(1, self.cmd_step_eval) return # to be overriden by subclasses self.thread_exec_specific_cmd_main() # thread execution tearing down self.thread_exec_specific_cmd_end() ''' except TimeoutError: pass ''' #def exec_specific_cmd_start(self, cmd:Command): def thread_exec_specific_cmd_start(self): cmd = self._current_specific_cmd """ specific command execution setting up """ #cmd = self.get_current_specific_cmd() self.printd(">>>>> Thread: starting execution of command", cmd.name) self.printd(">>>>> Thread: PID: %s, Process Name: %s, Thread Name: %s" % ( os.getpid(), multiprocessing.current_process().name, threading.current_thread().name) ) cmd.set_as_running() """ if self.RUN_IN_THREAD: cmd.set_as_running() else: with transaction.atomic(): cmd.set_as_running() """ # Define your own command step(s) here def cmd_step(self, step:int): cmd = self._current_specific_cmd """ cmd.result = f"in step #{step}/{self._thread_total_steps_number}" cmd.save() """ cmd.set_result(f"in step #{step}/{self._thread_total_steps_number}") def cmd_step_eval(self, step:int): cmd = self._current_specific_cmd cmd_args = self._current_specific_cmd.name.split()[1] """ cmd.result = f"in step #{step}/{self._thread_total_steps_number}" cmd.save() """ cmd.set_result(eval(cmd_args)) # Default body of the specific processing # Should be overriden by subclass def thread_exec_specific_cmd_main(self): """ cmd = self._current_specific_cmd self.printd("Doing nothing, just sleeping...") self.sleep(3) """ # This is optional self.thread_set_total_steps_number(5) # HERE, write your own scenario # scenario OK self.thread_exec_specific_cmd_step(1, self.cmd_step, 1) self.thread_exec_specific_cmd_step(2, self.cmd_step, 3) self.thread_exec_specific_cmd_step(3, self.cmd_step, 5) self.thread_exec_specific_cmd_step(4, self.cmd_step, 10) self.thread_exec_specific_cmd_step(5, self.cmd_step, 4) # ... as many as you need """ other scenario self.thread_exec_specific_cmd_step(1, self.cmd_step1, 1) self.thread_exec_specific_cmd_step(2, self.cmd_step2, 2) self.thread_exec_specific_cmd_step(3, self.cmd_step1, 2) self.thread_exec_specific_cmd_step(4, self.cmd_step3, 2) self.thread_exec_specific_cmd_step(5, self.cmd_step1, 3) """ def thread_exec_specific_cmd_end(self): """ specific command execution tearing down """ cmd = self._current_specific_cmd cmd.set_as_processed() """ if self.RUN_IN_THREAD: cmd.set_as_processed() else: with transaction.atomic(): cmd.set_as_processed() """ self.printd(">>>>> Thread: ended execution of command", cmd.name) cmd = None # No more current thread #self._current_specific_thread = None # Default body of a specific cmd step # Should be overriden by subclass def thread_exec_specific_cmd_step(self, step:int, cmd_step_function, sleep_time:float=1.0): # Exit if I was asked to stop cmd = self._current_specific_cmd if self.RUN_IN_THREAD and threading.current_thread().stopped(): self.printd(f">>>>> Thread (cmd {cmd.name}): I received the stop signal, so I stop (in error)") exit(1) self.printd(f">>>>> Thread (cmd {cmd.name}): step #{step}/{self._thread_total_steps_number}") # call a specific function to be defined by subclass cmd_step_function(step) # Wait for a specific time (interruptible) self.sleep(sleep_time) def thread_stop_if_asked(self): assert self._current_specific_thread is not None if self.RUN_IN_THREAD and threading.current_thread().stopped(): self.printd("(Thread) I received the stop signal, so I stop (in error)") exit(1) def thread_set_total_steps_number(self, nbsteps): self._thread_total_steps_number = nbsteps """ =================================== OLD FUNCTIONS TO BE REMOVED =================================== def _plc_is_not_auto(self): if not self.plc_is_connected(): return True # now, self.plc_status has been updated, so check it: return self.plc_status.plc_mode != "AUTO" def plc_is_auto(self): return not self._plc_is_not_auto() def plc_is_safe(self): if not self.plc_is_connected(): return False # now, self.plc_status has been updated, so check it: return self.plc_status.is_safe def is_night(self): return get_sunelev() < -10 """ """ ================================================================= MAIN ================================================================= """ def extract_parameters(): """ Usage: Agent.py [-t] [configfile] """ # arg 1 : -t # arg 2 : configfile TEST_MODE = False configfile = None if len(sys.argv) > 1: if sys.argv[1] == "-t": TEST_MODE = True if len(sys.argv) == 3: configfile = sys.argv[2] else: configfile = sys.argv[1] return TEST_MODE, configfile if __name__ == "__main__": TEST_MODE, configfile = extract_parameters() agent = Agent("GenericAgent", configfile, RUN_IN_THREAD=True) agent.setSimulatorMode(TEST_MODE) print(agent) agent.run()