#!/usr/bin/env python3 VERSION = "0.5" #DEBUG=True #DEBUG=False """TODO: - 1 log par agent + lastlog (30 dernières lignes) - table agents_log avec log minimum pour affichage dans dashboard, et ordre chrono intéressant pour suivi activité : nom agent, timestamp, message - 1 table par agent: agent__vars : nom variable, value, desc """ """ ================================================================= SETUP FOR DJANGO (see https://docs.djangoproject.com/en/dev/topics/settings) (see also https://docs.djangoproject.com/en/dev/ref/settings) ================================================================= """ import os from pathlib import Path import sys import logging from django.conf import settings as djangosettings # Conseil sur le net: #https://stackoverflow.com/questions/16853649/how-to-execute-a-python-script-from-the-django-shell #"" #import sys, os #sys.path.append('/path/to/your/django/app') #os.environ['DJANGO_SETTINGS_MODULE'] = 'settings' #from django.conf import settings #"" # To avoid a "ModuleNotFoundError: No module named 'dashboard'"... (not even 1 app found) : ##sys.path.insert(0, os.path.abspath("..")) ##sys.path.insert(0, os.path.abspath("src")) ##sys.path.insert(0, "../src") ##sys.path.insert(0, "src") # To avoid a "ModuleNotFoundError: No module named 'dashboard'" ## sys.path.append("..") py_pwd = os.path.normpath(os.getcwd() + "/..") if (py_pwd not in os.sys.path): (os.sys.path).append(py_pwd) # To avoid a "ModuleNotFoundError: No module named 'src'" ## sys.path.append("../../../..") py_pwd = os.path.normpath(os.getcwd() + "/../../../..") if (py_pwd not in os.sys.path): (os.sys.path).append(py_pwd) ##sys.path.append("src") from src.pyros_logger import log, handler_filebyagent #from src.pyros_logger import logger as logg, handler_filebyagent, ##from src import pyros_logger ''' def printd(*args, **kwargs): if os.environ.get('PYROS_DEBUG', '0')=='1': print(*args, **kwargs) ''' #printd("Starting with this sys.path", sys.path) log.debug("Starting with this sys.path" + str(sys.path)) # DJANGO setup # self.printd("file is", __file__) # mypath = os.getcwd() # Go into src/ ##os.chdir("..") ##os.chdir("src") #printd("Current directory : " + str(os.getcwd())) log.debug("Current directory : " + str(os.getcwd())) #os.environ.setdefault("DJANGO_SETTINGS_MODULE", "src.core.pyros_django.pyros.settings") os.environ.setdefault("DJANGO_SETTINGS_MODULE", "src.core.pyros_django.pyros.settings") # os.environ['SECRET_KEY'] = 'abc' # os.environ['ENVIRONMENT'] = 'production' import django django.setup() #printd("DB2 used is:", djangosettings.DATABASES["default"]["NAME"]) log.debug("DB2 used is:" + djangosettings.DATABASES["default"]["NAME"]) """ ================================================================= IMPORT PYTHON PACKAGES ================================================================= """ # --- GENERAL PURPOSE IMPORT --- #from __future__ import absolute_import ##import utils.Logger as L import platform import random import threading #import multiprocessing from threading import Thread import time import socket #import ctypes #import copy # --- DJANGO IMPORT --- from django.db import transaction from django import db # from django.core.exceptions import ObjectDoesNotExist # from django.db.models import Q #from django.shortcuts import get_object_or_404 #from django.conf import settings as djangosettings # --- SPECIFIC IMPORT --- # Many ways to import configuration settings: ##import config import config.old_config as config_old #from config import PYROS_ENV, ROOT_DIR, DOC_DIR #from config import * from common.models import AgentSurvey, AgentCmd, AgentLogs #from config.configpyros import ConfigPyros from config.old_config.configpyros import ConfigPyros as ConfigPyrosOld from src.core.pyros_django.obsconfig.configpyros import ConfigPyros #from dashboard.views import get_sunelev #from devices.TelescopeRemoteControlDefault import TelescopeRemoteControlDefault #from utils.JDManipulator import * ##from agent.logpyros import LogPyros ##from src.logpyros import LogPyros from device_controller.abstract_component.device_controller import ( DCCNotFoundException, UnknownGenericCmdException, UnimplementedGenericCmdException, UnknownNativeCmdException ) """ ================================================================= GENERAL MODULE CONSTANT & FUNCTIONS DEFINITIONS ================================================================= """ #DEBUG_FILE = False ##log = L.setupLogger("AgentLogger", "Agent") IS_WINDOWS = platform.system() == "Windows" class Colors: HEADER = "\033[95m" BLUE = "\033[94m" GREEN = "\033[92m" WARNING = "\033[93m" FAIL = "\033[91m" ENDC = "\033[0m" BOLD = "\033[1m" UNDERLINE = "\033[4m" def printColor(color: Colors, message, file=sys.stdout, eol=os.linesep, forced=False): #system = platform.system() """ if (self.disp == False and forced == False): return 0 """ #if system == "Windows": if IS_WINDOWS: log.info(message, file=file, end=eol) else: log.info(color + message + Colors.ENDC, file=file, end=eol) return 0 def printFullTerm(color: Colors, string: str): #system = platform.system() columns = 100 row = 1000 disp = True value = int(columns / 2 - len(string) / 2) printColor(color, "-" * value, eol="") printColor(color, string, eol="") value += len(string) printColor(color, "-" * (columns - value)) return 0 """ ================================================================= class StoppableThread ================================================================= """ class StoppableThreadEvenWhenSleeping(threading.Thread): # Thread class with a stop() method. The thread itself has to check # regularly for the stopped() condition. # It stops even if sleeping # See https://python.developpez.com/faq/?page=Thread#ThreadKill # See also https://www.oreilly.com/library/view/python-cookbook/0596001673/ch06s03.html def __init__(self, *args, **kwargs): #super(StoppableThreadSimple, self).__init__(*args, **kwargs) super().__init__(*args, **kwargs) self._stop_event = threading.Event() #def stop(self): def terminate(self): self._stop_event.set() def stopped(self): return self._stop_event.is_set() def wait(self, nbsec:float=2.0): self._stop_event.wait(nbsec) """ ================================================================= class Agent ================================================================= """ class Agent: """ See Agent_activity_diag.pu for PlantUML activity diagram Behavior of an agent: - If idle : - still does routine_process() and general_process() - does not do specific_process() - Once a command has been sent to another agent : - It waits (non blocking) for the end of execution of the command and get its result - If command is timed out or has been skipped or killed, then it is NOT re-executed at next iteration (except if needed explicitely) """ # --- # --- CLASS (STATIC) attributes (CONSTANTS) # --- If agent is instance of Agent: # --- - CLASS attributes are accessible via agent.__class__.__dict__ # --- - INSTANCE attributes are accessible via agent.__dict__ # --- # Default modes DEBUG_MODE = False WITH_SIMULATOR = False TEST_MODE = False # Default LOG level is INFO #PYROS_DEFAULT_GLOBAL_LOG_LEVEL = LogPyros.LOG_LEVEL_INFO # INFO # To be overriden by subclasses (empty by default, no agent specific command) AGENT_SPECIFIC_COMMANDS = [ #"do_eval", #"set_state", ] # Maximum duration of this agent (only for SIMULATION mode) # If set to None, it will never exit except if asked (or CTRL-C) # If set to 20, it will exit after 20s TEST_MAX_DURATION_SEC = None #TEST_MAX_DURATION_SEC = 30 # FOR TEST ONLY # Run this agent in simulator mode TEST_MODE = True WITH_SIMULATOR = False # Run the assertion tests at the end TEST_WITH_FINAL_TEST = False # Who should I send commands to ? TEST_COMMANDS_DEST = "myself" # Default scenario to be executed #TEST_COMMANDS = iter([ TEST_COMMANDS_LIST = [ "set_state:active", "set_state:idle", # specific0 not_executed_because_idle "specific0", "set_state:active", # specific1 executed_because_not_idle, should complete ok "specific1", # specific2 will be executed only when specific1 is finished, # and should be aborted before end of execution, # because of the 1st coming "do_abort" command below "specific2", # specific3 should be executed only when specific2 is finished (in fact, aborted), # and should be aborted before end of execution, # because of the 2nd coming "do_abort" command below "specific3", # These commands should not have the time to be processed # because the "do_exit" command below should be executed before "specific4", "specific5", "specific6", "specific7", "specific8", # Should abort the current running command (which should normally be specific2) # even if commands above (specific3, ..., specific8) are already pending "do_abort", # These commands (except abort) won't be executed # because too many commands are already pending (above) "specific9", "do_abort", "set_state:active", "set_state:idle", # Should stop the agent even before the previous pending commands are executed "do_exit", # Because of the previous "do_exit" command, # these following commands should not be executed, # and not even be added to the database command table "set_state:active", "specific10" ] #TEST_COMMANDS = iter(TEST_COMMANDS_LIST) """ How to run this agent exec_specific_cmd() method ? - True = inside a Thread (cannot be killed, must be asked to stop, and inadequate for computation) - False = inside a Process If thread, displays : >>>>> Thread: starting execution of command specific1 >>>>> Thread: PID: 2695, Process Name: MainProcess, Thread Name: Thread-1 ... >>>>> Thread: starting execution of command specific2 >>>>> Thread: PID: 2695, Process Name: MainProcess, Thread Name: Thread-2 ... >>>>> Thread: starting execution of command specific3 >>>>> Thread: PID: 2695, Process Name: MainProcess, Thread Name: Thread-3 If process, displays : >>>>> Thread: starting execution of command specific1 >>>>> Thread: PID: 2687, Process Name: Process-1, Thread Name: MainThread ... >>>>> Thread: starting execution of command specific2 >>>>> Thread: PID: 2689, Process Name: Process-2, Thread Name: MainThread ... >>>>> Thread: starting execution of command specific3 >>>>> Thread: PID: 2690, Process Name: Process-3, Thread Name: MainThread """ # with thread RUN_IN_THREAD = True # with process #RUN_IN_THREAD = False _thread_total_steps_number = 1 #COMMANDS_PEREMPTION_HOURS = 48 #COMMANDS_PEREMPTION_HOURS = 60/60 name = "Generic Agent" mainloop_waittime = 3 subloop_waittime = 2 status = None mode = None config = None # Statuses STATUS_LAUNCH = "LAUNCHED" STATUS_INIT = "INITIALIZING" STATUS_MAIN_LOOP = "IN_MAIN_LOOP" STATUS_GET_NEXT_COMMAND = "IN_GET_NEXT_COMMAND" STATUS_GENERAL_PROCESS = "IN_GENERAL_PROCESS" STATUS_ROUTINE_PROCESS = "IN_ROUTINE_PROCESS" STATUS_SPECIFIC_PROCESS = "IN_SPECIFIC_PROCESS" STATUS_EXIT = "EXITING" # Modes MODE_ACTIVE = "ACTIVE" MODE_IDLE = "IDLE" ''' Moved to more central file : config.config_base PYROS_DJANGO_BASE_DIR = Path("src/core/pyros_django") # pathlib DEFAULT_CONFIG_FILE_NAME = "config_unit_simulunit1.xml" CONFIG_DIR_NAME = "config" ''' # My own ConfigPyros instance (Observatory Configuration) _oc = None # Parameters from config file # for /src/ #_path_data = '../../config' # for /src/core/pyros_django/ #_path_data = '../../../../config' # Path to config _path_data = '' _computer_alias = '' _computer_description = '' # Current and next command to send _cmdts: AgentCmd = None _next_cmdts = None _agent_survey = None _pending_commands = [] ''' _current_device_cmd = None _current_device_cmd_thread = None ''' # List of agents I will send commands to _my_client_agents_aliases = [] _my_client_agents = {} _iter_num = None # Log object _log = None #def __init__(self, name:str="Agent", config_filename:str=None, RUN_IN_THREAD=True): #def __init__(self, config_filename:str=None, RUN_IN_THREAD=True, DEBUG_MODE=False): def __init__(self, config_filename:str=None, RUN_IN_THREAD=True): ''' print('PYROS_ENV', PYROS_ENV) print('ROOT_DIR', ROOT_DIR) print('DOC_DIR', DOC_DIR) ''' # Set logger ##pyros_logger.set_logger_config() ##logg = logging.getLogger('pyroslogger') log.addHandler(handler_filebyagent(logging.INFO, self.__class__.__name__)) log.info("start Agent init") # SET CONFIG INSTANCE # - OLD CONFIG log.info(f'config file is {config_filename}') ##log.info('PYROS_ENV', config.PYROS_ENV) log.info('PYROS_ENV' + config_old.PYROS_ENV) log.info('ROOT_DIR' + config_old.ROOT_DIR) log.info('DOC_DIR' + config_old.DOC_DIR) ##if config.is_dev_env(): print("DEV ENV") if config_old.is_dev_env(): log.info("DEV ENV") if config_old.is_prod_env(): log.info("PROD ENV") if config_old.is_debug(): log.info("IN DEBUG MODE") # - NEW CONFIG obs_config_file_path = os.environ["PATH_TO_OBSCONF_FILE"] path_to_obs_config_folder = os.environ["PATH_TO_OBSCONF_FOLDER"] unit = os.environ["unit_name"] oc = ConfigPyros(obs_config_file_path) log.info("start Agent init 2") self.set_config(oc, obs_config_file_path, path_to_obs_config_folder, unit) #self.name = name self.name = self.__class__.__name__ ''' printd("*** ENVIRONMENT VARIABLE PYROS_DEBUG is:", os.environ.get('PYROS_DEBUG'), '***') ##self.DEBUG_MODE = DEBUG_MODE self.DEBUG_MODE = os.environ.get('PYROS_DEBUG', '0')=='1' ''' #self.DEBUG_MODE = config.PYROS_ENV ##self.log = LogPyros(self.name, AgentLogs) ##self.DEBUG_MODE = config.is_debug() self.DEBUG_MODE = config_old.is_debug() ##self.log.debug_level = DEBUG_MODE ''' # Default LOG level is INFO log_level = LogPyros.LOG_LEVEL_INFO # INFO self.log.set_global_log_level(LogPyros.LOG_LEVEL_DEBUG) if self.DEBUG_MODE else self.log.set_global_log_level(log_level) ''' #global_log_level = LogPyros.LOG_LEVEL_DEBUG if self.DEBUG_MODE else self.PYROS_DEFAULT_GLOBAL_LOG_LEVEL ##global_log_level = LogPyros.LOG_LEVEL_DEBUG if self.DEBUG_MODE else config.PYROS_DEFAULT_GLOBAL_LOG_LEVEL ##global_log_level = LogPyros.LOG_LEVEL_DEBUG if self.DEBUG_MODE else config_old.PYROS_DEFAULT_GLOBAL_LOG_LEVEL ##self.log.set_global_log_level(global_log_level) ##self.printd("LOG LEVEL IS:", self.log.debug_level) ##self.print("LOG LEVEL IS:", self.log.get_global_log_level()) # Est-ce bien utile ??? # New way with PathLib my_parent_abs_dir = Path(__file__).resolve().parent #TODO: on doit pouvoir faire mieux avec pathlib (sans utiliser str()) ##self._path_data = str( Path( str(my_parent_abs_dir).split(str(self.PYROS_DJANGO_BASE_DIR))[0] ) / self.CONFIG_DIR_NAME ) ##self._path_data = config.CONFIG_DIR self._path_data = config_old.CONFIG_DIR #self._set_mode(self.MODE_IDLE) config_filename = self.get_config_filename(config_filename) #self.printd(f"*** Config file used is={config_filename}") log.debug(f"*** Config file used is={config_filename}") self.config = ConfigPyrosOld(config_filename) if self.config.get_last_errno() != self.config.NO_ERROR: raise Exception(f"Bad config file name '{config_filename}', error {str(self.config.get_last_errno())}: {str(self.config.get_last_errmsg())}") #TODO: : à mettre dans la config ''' 'AgentDeviceTelescope1': 'AgentDeviceTelescopeGemini', 'AgentDeviceFilterSelector1': 'AgentDeviceSBIG', 'AgentDeviceShutter1': 'AgentDeviceSBIG', 'AgentDeviceSensor1': 'AgentDeviceSBIG', ''' #self._my_client_agents = {} ### self._agent_logs = AgentLogs.objects.create(name=self.name, message="Step __init__") #self.printdb("Step __init__") log.debug("Step __init__") self.TEST_COMMANDS = iter(self.TEST_COMMANDS_LIST) self.RUN_IN_THREAD = RUN_IN_THREAD self._set_status(self.STATUS_LAUNCH) self._set_idle() self._set_agent_device_aliases_from_config(self.name) self._set_mode_from_config(self.name) # TODO: remove #self._set_idle() self._set_active() # Create 1st survey if none #tmp = AgentSurvey.objects.filter(name=self.name) #if len(tmp) == 0: #nb_agents = AgentSurvey.objects.filter(name=self.name).count() #if nb_agents == 0: if AgentSurvey.objects.filter(name=self.name).exists(): self._agent_survey = AgentSurvey.objects.get(name=self.name) else: self._agent_survey = AgentSurvey.objects.create(name=self.name, validity_duration=60, mode=self.mode, status=self.status, iteration=-1) log.debug("Agent survey is" + str(self._agent_survey)) #self.printd("Agent survey is", self._agent_survey) log.info("end Agent init") def set_config(self, oc: ConfigPyros, obs_config_file_path: str, path_to_obs_config_folder: str, unit: str): self._oc = { 'config' : oc, 'env' : [ obs_config_file_path, path_to_obs_config_folder, unit ] } ''' config_env = { obs_config_file_path: obs_config_file_path, path_to_obs_config_folder: path_to_obs_config_folder, unit: unit } ''' def get_config(self): return self._oc['config'] def get_config_env(self): return self._oc['env'] def show_config(self): oc = self.get_config() obs_config_file_path, path_to_obs_config_folder, unit = self.get_config_env() #self.printd(obs_config_file_path) log.debug(obs_config_file_path) log.debug(path_to_obs_config_folder) log.debug(unit) print("\n") print("- Observatory:" + oc.get_obs_name()) my_unit_name = oc.get_units_name()[0] my_unit = (oc.get_units()[my_unit_name]) #print("- Unit description:", my_unit) print("\n") print("- Computers:", oc.get_computers()) print("\n") print("- Active Computers:", oc.get_active_computers()) print("\n") print("- Active Devices:", oc.get_active_devices()) print("\n") print("- Unit:", my_unit_name) print(oc.get_unit_by_name(my_unit_name)) print("\n") print("- Unit topology:", oc.get_topology(my_unit_name)) print("\n") print("- Unit active Agents:", oc.get_active_agents(my_unit_name)) print(oc.get_agents(my_unit_name)) print("\n") print("- Unit Agents per computer:", oc.get_agents_per_computer(my_unit_name)) print("\n") print("- Unit Agents per device:", oc.get_agents_per_device(my_unit_name)) print("\n") print("- Unit Channel groups:", oc.get_channel_groups(my_unit_name)) print("\n") print("- Unit Channels:", oc.get_channels(my_unit_name)) #print("\n") #print("- Unit/Channel info:", oc.get_channel_information(my_unit_name, 'OpticalChannel_up')) print("\n") print("- Unit Components agents:", oc.get_components_agents(my_unit_name)) print("\n") print("- Unit database:", oc.get_database_for_unit(my_unit_name)) print("\n") print("- Devices names:", oc.get_devices_names()) print("\n") print("- Devices names & files:", oc.get_devices_names_and_file()) print("\n") print("- Devices:", oc.get_devices()) print("\n") def get_config_filename(self, config_filename: str): if not config_filename: #config_filename = self.DEFAULT_CONFIG_FILE_NAME ##config_filename = config.DEFAULT_CONFIG_FILE_NAME config_filename = config_old.DEFAULT_CONFIG_FILE_NAME # If config file name is RELATIVE (i.e. without path, just the file name) # => give it an absolute path (and remove "src/agent/" from it) if config_filename == os.path.basename(config_filename): ##config_filename = os.path.join(config.CONFIG_DIR, config_filename) config_filename = os.path.join(config_old.CONFIG_DIR, config_filename) ''' # Build abs path including current agent dir config_filename = os.path.abspath(self.CONFIG_DIR_NAME + os.sep + config_filename) # Remove "src/agent_name/" from abs dir : # (1) Remove "src/core/pyros_django/" config_filename = config_filename.replace(str(self.PYROS_DJANGO_BASE_DIR)+os.sep, os.sep) # (2) Remove "agent_name/" #TODO: bidouille, faire plus propre config_filename = config_filename.replace(os.sep+"agent"+os.sep, os.sep) config_filename = config_filename.replace(os.sep+"monitoring"+os.sep, os.sep) ''' return os.path.normpath(config_filename) def __repr__(self): return "I am agent " + self.name def __str__(self): return self.__repr__() #return "I am agent " + self.name # Normal print ##def print(self, *args, **kwargs): self.log.print(*args, **kwargs) """ if args: self.printd(f"({self.name}): ", *args, **kwargs) else: self.printd() """ """ # DEBUG print shortcut ##def printd(self, *args, **kwargs): self.log.printd(*args, **kwargs) #if DEBUG: self.printd(d(*args, **kwargs) def log_d(self, *args, **kwargs): self.log.log_d(*args, **kwargs) def log_i(self, *args, **kwargs): self.log.log_i(*args, **kwargs) def log_w(self, *args, **kwargs): self.log.log_w(*args, **kwargs) def log_e(self, *args, **kwargs): self.log.log_e(*args, **kwargs) def log_c(self, *args, **kwargs): self.log.log_c(*args, **kwargs) def printdb(self, *args, **kwargs): self.log.db( *args, **kwargs) """ def sleep(self, nbsec:float=2.0): ''' # thread if self._current_device_cmd_thread and self.RUN_IN_THREAD: self._current_device_cmd_thread.wait(nbsec) # process (or main thread) else: time.sleep(nbsec) ''' time.sleep(nbsec) def _get_real_agent_name(self, agent_alias_name:str)->str: #self.printd("key is", agent_alias_name) ''' if not self._my_client_agents: return agent_alias_name return self._my_client_agents[agent_alias_name] ''' return self._my_client_agents.get(agent_alias_name) def run(self, nb_iter:int=None, FOR_REAL:bool=True): """ FOR_REAL: set to False if you don't want Agent to send commands to devices but just print messages without really doing anything """ log.info("in run()") #return # TEST MODE ONLY # IF in test mode but with REAL devices (no SIMULATOR), delete all dangerous commands from the test commands list scenario: if self.TEST_MODE: log.debug("\n!!! In TEST mode !!! => preparing to run a scenario of test commands") log.debug("- Current test commands list scenario is:\n" + self.TEST_COMMANDS_LIST) if not self.WITH_SIMULATOR: log.debug("\n!!! In TEST but no SIMULATOR mode (using REAL device) !!! => removing dangerous commands for real devices... :") TEST_COMMANDS_LIST_copy = self.TEST_COMMANDS_LIST.copy() for cmd in TEST_COMMANDS_LIST_copy: cmd_key = cmd.split()[1] if ("set_" in cmd_key) or ("do_start" in cmd_key) or cmd_key in ["do_init", "do_goto", "do_open", "do_close"]: self.TEST_COMMANDS_LIST.remove(cmd) log.debug("- NEW test commands list scenario is:\n" + self.TEST_COMMANDS_LIST, '\n') self._DO_EXIT = False self._DO_RESTART = True # Will loop again only if _DO_RESTART is True while self._DO_RESTART: self._DO_RESTART = False self.start_time = time.time() log.debug("on est ici: " + os.getcwd()) self._load_config() self.print_TEST_MODE() self.init() ''' testing log: self.log_e("ERROR") self.log_c("FATAL critical ERROR") ''' #self.log_w("WARNING", "watch your step !") log.warning("WARNING"+ "watch your step !") # Avoid blocking on false "running" commands # (old commands that stayed with "running" status when agent was killed) AgentCmd.delete_commands_with_running_status_for_agent(self.name) self._iter_num = 1 self._DO_MAIN_LOOP = True # Main loop while self._DO_MAIN_LOOP: try: self._main_loop(nb_iter,FOR_REAL) if not self._DO_MAIN_LOOP: break except KeyboardInterrupt: # CTRL-C # In case of CTRL-C, kill the current thread (process) before dying (in error) log.info("CTRL-C Interrupted, I kill the current thread (process) before exiting (if exists)") self._kill_running_device_cmd_if_exists("USER_CTRLC") exit(1) if self.TEST_MODE and self.TEST_WITH_FINAL_TEST: self._TEST_test_results() #if self._DO_EXIT: exit(0) # DEVICE level def _kill_running_device_cmd_if_exists(self, abort_cmd_sender): pass def _main_loop(self, nb_iter:int=None, FOR_REAL:bool=True): self._main_loop_start(nb_iter) if not self._DO_MAIN_LOOP: return self._load_config() # only if changed self.show_config() # Log this agent status (update my current mode and status in DB) self._log_agent_status() #self.printd("====== START COMMMANDS PROCESSING ======") # ROUTINE process # To be overriden to be specified by subclass self._routine_process() #self.printd("I am IDLE, so I bypass the routine_process (do not send any new command)") # Processing the next pending command if exists self._command_process_if_exists() # if restart, exit this loop to restart from beginning if self._DO_RESTART or self._DO_EXIT: self._DO_MAIN_LOOP = False return #self.printd("====== END COMMMANDS PROCESSING ======") #self.waitfor(self.mainloop_waittime) log.info("*"*20 + "MAIN LOOP ITERATION (END)" + "*"*20) #self.do_log(LOG_DEBUG, "Ending main loop iteration") self._iter_num += 1 def _main_loop_start(self, nb_iter:int): for i in range(3): print() #self.printd("-"*80) log.info("*"*73) log.info("*"*20 + f"MAIN LOOP ITERATION {self._iter_num} (START)" + "*"*20) log.info("*"*73 + '\n') #self.print(f"Iteration {self._iter_num}") # EXIT because of nb of iterations ? if nb_iter is not None: # Bad number of iterations or nb iterations reached => exit if nb_iter <= 0 or nb_iter < self._iter_num: log.info(f"Exit because number of iterations asked ({nb_iter}) has been reached") self._DO_MAIN_LOOP = False return # Wait a random number of sec before starting iteration # (to let another agent having the chance to send a command before me) random_waiting_sec = random.randint(0,5) log.info(f"Waiting {random_waiting_sec} sec (random) before starting new iteration...") time.sleep(random_waiting_sec) # (Test only) # EXIT because max duration reached ? if self.TEST_MAX_DURATION_SEC and (time.time()-self.start_time > self.TEST_MAX_DURATION_SEC): log.info("Exit because of max duration set to ", self.TEST_MAX_DURATION_SEC, "s") self._kill_running_device_cmd_if_exists(self.name) #if self.TEST_MODE and self.TEST_WITH_FINAL_TEST: self._TEST_test_results() self._DO_MAIN_LOOP = False return self._set_status(self.STATUS_MAIN_LOOP) self.show_mode_and_status() def _command_process_if_exists(self): ''' Processing the next pending command if exists ''' print() print() log.info("*"*10 + "NEXT COMMAND PROCESSING (START)" + "*"*10 + '\n') # Purge commands (every N iterations, delete old commands) N=5 if ((self._iter_num-1) % N) == 0: log.info("Purging old commands if exists") #AgentCmd.purge_old_commands_for_agent(self.name) self._purge_old_commands_sent_to_me() # Get next command and process it (if exists) cmd = self._get_next_valid_and_not_running_command() #self._set_status(self.STATUS_GENERAL_PROCESS) #if cmd: self.command_process(cmd) if cmd: log.info('-'*6) log.info('-'*6, "RECEIVED NEW COMMAND TO PROCESS:") log.info('-'*6, cmd) log.info('-'*6) # CASE 1 - AGENT LEVEL command # => I process it directly without asking my DC # => Simple action, short execution time, so I execute it directly (in current main thread, not in parallel) if self.is_agent_level_cmd(cmd): log.info("(AGENT LEVEL CMD)") try: self._exec_agent_cmd(cmd) except AttributeError as e: #self.log_e(f"EXCEPTION: Agent level specific command '{cmd.name}' unknown (not implemented as a function) :", e) #self.log_e("Thus => I ignore this command...") log.e(f"EXCEPTION: Agent level specific command '{cmd.name}' unknown (not implemented as a function) :", e) log.e("Thus => I ignore this command...") cmd.set_result("ERROR: INVALID AGENT LEVEL SPECIFIC COMMAND") cmd.set_as_pending() cmd.set_as_skipped() # CASE 2 - DEVICE LEVEL command # => I delegate it to my DC # => Execute it only if I am active and no currently running another device level cmd # => Long execution time, so I will execute it in parallel (in a new thread or process) elif self.is_device_level_cmd(cmd): log.info("(DEVICE LEVEL CMD)") try: self.exec_device_cmd_if_possible(cmd) except (UnimplementedGenericCmdException) as e: #except (UnknownGenericCmdException, UnimplementedGenericCmdException, UnknownNativeCmdException) as e: log.e(f"EXCEPTION caught by {type(self).__name__} (from Agent mainloop) for command '{cmd.name}'", e) log.e("Thus ==> ignore this command") cmd.set_result(e) #cmd.set_as_killed_by(type(self).__name__) cmd.set_as_skipped() #raise # CASE 3 - INVALID COMMAND else: #raise Exception("INVALID COMMAND: " + cmd.name) log.info("******************************************************") log.info("*************** ERROR: INVALID COMMAND ***************") log.info("******************************************************") log.info("Thus => I ignore this command...") cmd.set_result("ERROR: INVALID COMMAND") cmd.set_as_skipped() print() log.info("*"*10 + "NEXT COMMAND PROCESSING (END)" + "*"*10 + "\n") def print_TEST_MODE(self): if self.TEST_MODE: log.debug("[IN TEST MODE]") log.info("Flush previous commands to be sure to start in clean state") AgentCmd.delete_pending_commands_for_agent(self.name) else: log.debug("[IN NORMAL MODE]") self.TEST_MAX_DURATION_SEC=None def _purge_old_commands_sent_to_me(self): AgentCmd.purge_old_commands_sent_to_agent(self.name) def _routine_process(self): """ Routine processing. This is a command or set of commands that this agent sends regularly, (or just a regular processing) at each iteration """ print() print() log.info("*"*10+ "ROUTINE PROCESSING (START)"+ "*"*10+ '\n') self._set_status(self.STATUS_ROUTINE_PROCESS) self.routine_process_body() print() log.info("*"*10 + "ROUTINE PROCESSING (END)"+ "*"*10) # To be overridden by subclasses def routine_process_body(self): if self.TEST_MODE: self._test_routine_process() """ def purge_commands(self): ### Delete commands (which I am recipient of) older than COMMANDS_PEREMPTION_HOURS (like 48h) ATTENTION !!! EXCEPT the RUNNING command !!! NB: datetime.utcnow() is equivalent to datetime.now(timezone.utc) ### self.printd("Looking for old commands to purge...") ### COMMAND_PEREMPTION_DATE_FROM_NOW = datetime.utcnow() - timedelta(hours = self.COMMANDS_PEREMPTION_HOURS) #self.printd("peremption date", COMMAND_PEREMPTION_DATE_FROM_NOW) old_commands = AgentCmd.objects.filter( # only commands for me recipient = self.name, # only pending commands sender_deposit_time__lt = COMMAND_PEREMPTION_DATE_FROM_NOW, ) ### old_commands = AgentCmd.get_old_commands_for_agent(self.name) if old_commands.exists(): self.printd("Found old commands to delete:") for cmd in old_commands: self.printd(cmd) old_commands.delete() """ def waitfor(self, nbsec): log.info(f"Now, waiting for {nbsec} seconds...") time.sleep(nbsec) def _set_status(self, status:str): #self.printd(f"[{status}] (switching from status {self.status})") log.debug(f"[{status}]") self.status = status return False def _set_mode(self, mode:str): #self.printd(f"Switching from mode {self.mode} to mode {mode}") log.info(f"[NEW MODE {mode}]") self.mode = mode def _is_active(self): return self.mode == self.MODE_ACTIVE def _is_idle(self): return not self._is_active() def _set_active(self): self._set_mode(self.MODE_ACTIVE) log.info("in set active") def _set_idle(self): self._set_mode(self.MODE_IDLE) def show_mode_and_status(self): log.info(f"CURRENT MODE is {self.mode} (status is {self.status})") def die(self): self._set_status(self.STATUS_EXIT) """ suspend/resume """ def suspend(self): """ TODO: Mode IDLE (doit rester à l'écoute d'un resume, et doit continuer à alimenter les tables pour informer de son état via tables agents_logs, et lire table agents_command pour reprendre via resume, et update la table agents_survey pour donner son status "idle" """ self._set_idle() return True def resume(self): """ Quit suspend() mode """ self._set_active() return True #TODO: def _set_agent_device_aliases_from_config(self, agent_alias): for a in self._my_client_agents_aliases: # TODO: activer ##self._my_client_agents[a] = self.config.get_paramvalue(a,'general','real_agent_device_name') pass def _set_mode_from_config(self, agent_name): # --- Get the startmode of the AgentX modestr = self.config.get_paramvalue(agent_name,'general','startmode') if self.config.get_last_errno() != self.config.NO_ERROR: raise Exception(f"error {str(self.config.get_last_errno())}: {str(self.config.get_last_errmsg())}") if (modestr == None): return True # --- Set the mode according the startmode value mode = self.MODE_IDLE if modestr.upper() == 'RUN': mode = self.MODE_ACTIVE self._set_mode(mode) return True """ ================================================================= Generic methods that may be specialized (overriden) by subclasses ================================================================= """ def init(self): log.debug("Initializing...") self._set_status(self.STATUS_INIT) def _load_config(self): """ TODO: only si date fichier xml changée => en RAM, un objet Config avec méthodes d'accès, appelle le parser de AK (classe Config.py indépendante) """ ''' # SETUP try: self.config = get_object_or_404(Config, id=1) # By default, set mode to SCHEDULER (False = REMOTE, which should never be the default) self.config.global_mode = True self.config.save() # self.config = Config.objects.get(pk=1) # self.config = Config.objects.get()[0] except Exception as e: # except Config.ObjectDoesNotExist: self.printd("Config read (or write) exception", str(e)) return -1 ''' log.debug("Loading the config file...") # - NEW CONFIG self.get_config().load(self.get_config_env()[0]) # - OLD CONFIG self.config.load() if self.config.get_last_errno() != self.config.NO_ERROR: raise Exception(f"error {str(self.config.get_last_errno())}: {str(self.config.get_last_errmsg())}") if not self.config.is_config_contents_changed(): return # === display informations # --- Get the assembly aliases of the unit assembled_mount_aliases = [] assembled_channel_aliases = [] assembled_aliases = [] unit_alias = self.config.get_aliases('unit')[0] params = self.config.get_params(unit_alias) for param in params: if param['section']=="assembly" and param['key']=="alias": assembled_aliases.append(param['value']) #self.printd(f"Unit {unit_alias} is the assembly of {assembled_aliases}") log.debug("--------- Components of the unit -----------") log.debug("Configuration file is {}".format(self.config.get_configfile())) alias = self.config.get_aliases('unit')[0] namevalue = self.config.get_paramvalue(alias,'unit','description') log.debug(f"Unit alias is {alias}. Description is {namevalue}:") unit_subtags = self.config.get_unit_subtags() for unit_subtag in unit_subtags: aliases = self.config.get_aliases(unit_subtag) for alias in aliases: namevalue = self.config.get_paramvalue(alias,unit_subtag,'description') log.debug(f"- {unit_subtag} alias is {alias}. Description is {namevalue}") # --- fill the list of mount and channel assembled if alias in assembled_aliases: if unit_subtag=="mount": assembled_mount_aliases.append(alias) elif unit_subtag=="channel": assembled_channel_aliases.append(alias) log.debug("--------- Assembly of the unit -----------") log.debug(f"Assembled mount aliases: {assembled_mount_aliases}") log.debug(f"Assembled channel aliases: {assembled_channel_aliases}") # --- Get the home of the mount[0] mount_alias = assembled_mount_aliases[0] home = self.config.get_paramvalue(mount_alias,'MountPointing','home') log.debug("------------------------------------------") hostname = socket.gethostname() self._computer_alias = '' unit_subtag = 'computer' aliases = self.config.get_aliases(unit_subtag) for alias in aliases: log.debug("alias" + alias) value = self.config.get_paramvalue(alias,'local','hostname') log.debug("value" + value) if value == hostname: log.debug("value" + value) self._computer_alias = alias value = self.config.get_paramvalue(alias,unit_subtag,'description') self._computer_description = value value = self.config.get_paramvalue(alias,'path','data') # Overrides default value self._path_data = value break log.debug(f"hostname = {hostname}") log.debug(f"path_data = {self._path_data}") log.debug(f"home = {home}") log.debug("------------------------------------------") # --- update the log parameters ##self.log.path_data = self._path_data ##print("new self.log.path_data is", self.log.path_data) ##self.log.set_global_path_data(self._path_data) ##self.printd("new self.log.global_path_data is", self.log.get_global_path_data()) ##self.log.home = home #def update_survey(self): def _log_agent_status(self): """ Save (update) this agent current mode and status in DB """ "Updating the agent survey database table..." #self.printd("- fetching table line for agent", self.name) # only necessary when using process (not necessary with threads) #with transaction.atomic(): #self._agent_survey = AgentSurvey.objects.get(name=self.name) self._agent_survey.mode = self.mode self._agent_survey.status = self.status self._agent_survey.iteration = self._iter_num self._agent_survey.save() #self._agent_survey.save(update_fields=["mode", "status"]) """ def send_command(self, cmd_name): recipient_agent = self.name if self.TEST_COMMANDS_DEST=="myself" else self.TEST_COMMANDS_DEST AgentCmd.objects.create(sender=self.name, recipient=recipient_agent, name=cmd_name) """ #def send_command(self, to_agent, cmd_type, cmd_name, cmd_args=None): def send_cmd_to(self, to_agent, cmd_name, cmd_args=None): """ #ex: send_command(“AgentX”,”GENERIC”,”EVAL”,“3+4”) ex: send_command(“AgentX”,"EVAL”,“3+4”) """ #return AgentCmd.send_cmd(self.name, self._get_real_agent_name_for_alias(to_agent), cmd_name, cmd_args) cmd = self.create_cmd_for(to_agent, cmd_name, cmd_args) cmd.send() return cmd def create_cmd_for(self, to_agent, cmd_name, cmd_args=None) -> AgentCmd: ''' real_agent_name = self._get_real_agent_name(to_agent) real_cmd_name = cmd_name if '.' in real_agent_name: real_agent_name, component_name = real_agent_name.split('.') real_cmd_name = component_name+'.'+cmd_name return AgentCmd.create(self.name, real_agent_name, real_cmd_name, cmd_args) try: real_agent_name = self._get_real_agent_name(to_agent) except KeyError as e: ''' real_agent_name = self._get_real_agent_name(to_agent) if not real_agent_name: log.e("UNKNOWN AgentDevice ALIAS", to_agent) #self.log_e("Exception raised", e) log.e(f"=> Thus, I do not send this command '{cmd_name}'") return None return AgentCmd.create(self.name, real_agent_name, cmd_name, cmd_args) ''' return AgentCmd( sender=self.name, recipient=self._get_real_agent_name_for_alias(recipient_agent_alias_name), name=cmd_name ) ''' def _get_next_valid_and_not_running_command(self)->AgentCmd: """ Return next VALID (not expired) command (read from the DB command table) which is relevant to this agent. Commands are read in chronological order """ self._set_status(self.STATUS_GET_NEXT_COMMAND) log.info("Looking for a new command to process (sent by another agent):") # 1) Get all pending commands for me (return if None) # Not sure this is necessary to do it in a transaction, # but there might be a risk # that a command status is modified while we are reading... with transaction.atomic(): self._pending_commands = AgentCmd.get_pending_and_running_commands_for_agent(self.name) commands = self._pending_commands if not commands.exists(): log.info("") return None "Current pending (or running) commands are (time ordered):" AgentCmd.show_commands(commands) # 2) If there is a "do_exit" or "do_abort" command pending (even at the end of the list), # which is VALID (not expired), # then pass it straight away to general_process() for execution for cmd in commands: #if cmd.name in ("do_exit", "do_abort", "do_flush_commands"): break if cmd.name in ("do_exit", "do_abort"): break #if cmd.name in ("do_exit", "do_abort", "do_flush_commands"): if cmd.name in ("do_exit", "do_abort"): if cmd.is_running(): return None if cmd.is_expired(): cmd.set_as_outofdate() return None return cmd # 3) If first (oldest) command is currently running # (status CMD_RUNNING), then do nothing and return """ cmd_executing = Command.objects.filter( # only commands for me recipient = self.name, # only pending commands recipient_status_code = Command.CMD_STATUS_CODES.CMD_RUNNING, ).first() #if cmd_executing.exists(): if cmd_executing: """ #cmd = commands[0] cmd = commands.first() if cmd.is_expired(): cmd.set_as_outofdate() return None if cmd.is_running(): #self.printd(f"There is currently a running command ({cmd_executing.first().name}), so I do nothing (wait for end of execution)") log.info(f"There is currently a running command ({cmd.name})") """ # Check that this command is not expired if cmd.is_expired(): self.printd("But this command is expired, so set its status to OUTOFDATE, and go on") cmd_executing.set_as_outofdate() else: """ log.info(f"Thus, I won't execute any new command until this command execution is finished") # TODO: kill si superieur a MAX_EXEC_TIME return None ''' # 4) Tag all expired commands for cmd in commands: if cmd.is_expired(): cmd.set_as_outofdate() # break at 1st "valid" command (not expired) else: break # 5) If no more commands to process, return None if cmd.is_expired(): return None ''' # 6) Current cmd must now be a valid (not expired) and PENDING one, # so return it for execution #self.printd(f"Got command {cmd.name} sent by agent {cmd.sender} at {cmd.sender_deposit_time}") #self.printd(f"Starting processing of this command") return cmd #def _exec_agent_general_cmd(self, cmd:Command): def _exec_agent_cmd(self, cmd:AgentCmd): #self.print(f"Starting execution of an AGENT LEVEL cmd {cmd}...") log.info(f"Starting execution of an AGENT LEVEL cmd...") # Update read time to say that the command has been READ cmd.set_read_time() cmd.set_as_running() # SPECIFIC command (only related to me, not to any agent) if self._is_agent_specific_cmd(cmd): log.info("(Agent level SPECIFIC cmd)") # Execute method self."cmd.name"() # This can raise an exception (caught by this method caller) self.exec_cmd_from_its_name(cmd) ''' try: except AttributeError as e: self.printd(f"EXCEPTION: Agent level specific command '{cmd.name}' unknown (not implemented as a function) :", e) self.printd("Thus => I ignore this command...") cmd.set_result("ERROR: INVALID AGENT LEVEL SPECIFIC COMMAND") cmd.set_as_pending() cmd.set_as_skipped() return ''' cmd.set_result("Agent level SPECIFIC cmd done") cmd.set_as_processed() log.info("...Agent level SPECIFIC cmd has been executed") return # GENERAL command (related to any agent) log.info("(Agent level GENERAL CMD)") _,cmd_name,cmd_args = cmd.get_full_name_parts() #cmd_name, cmd_args = cmd.tokenize() #if cmd.name == "set_state:active": #elif cmd.name == "set_state:idle": if cmd_name == "set_state": if not cmd_args: raise ValueError() state = cmd_args[0] if state == "active": self._set_active() if state == "idle": self._set_idle() cmd.set_result("I am now " + state) time.sleep(1) elif cmd_name in ("do_flush_commands"): "flush_commands received: Delete all pending commands" AgentCmd.delete_pending_commands_for_agent(self.name) cmd.set_result('DONE') elif cmd_name in ("do_abort", "do_exit", "do_restart_init"): #self.printd("Current pending commands are:") #Command.show_commands(self._pending_commands) log.info("Aborting current executing command if exists:") self._kill_running_device_cmd_if_exists(cmd.sender) if cmd_name == "do_restart_init": log.info("restart_init received: Restarting from init()") self._DO_RESTART=True elif cmd.name == "do_exit": self._DO_EXIT=True cmd.set_result('SHOULD BE DONE NOW') else: ''' name = cmd.name args = None if " " in name: name,args = name.split() if name == "do_eval": if args==None: raise(ValueError) cmd.set_result(eval(args)) ''' if cmd_name == "do_eval": if not cmd_args: raise ValueError() cmd.set_result(eval(cmd_args)) cmd.set_as_processed() log.info("...Agent level GENERAL cmd has been executed") # If cmd is "do_exit", kill myself (without any question, this is an order soldier !) # This "do_exit" should normally kill any current thread (to be checked...) if cmd.name == "do_exit": log.info("Before exiting, Here are (if exists) the current (still) pending commands (time ordered) :") commands = AgentCmd.get_pending_and_running_commands_for_agent(self.name) AgentCmd.show_commands(commands, True) #if self.TEST_MODE and self.TEST_WITH_FINAL_TEST and self.TEST_COMMANDS_DEST == "myself": self.simulator_test_results() if self.TEST_MODE and self.TEST_WITH_FINAL_TEST: self._TEST_test_results() #self._DO_EXIT=True #exit(0) ''' def do_log(self): #"" log à 2 endroits ou 1 seul - in file - in db #"" self.printd("Logging data...") ''' def exec_cmd_from_its_name(self, cmd:AgentCmd): func = cmd.name if cmd.args: return getattr(self, func)(*cmd.args) else: return getattr(self, func)() def is_agent_level_cmd(self, cmd:AgentCmd): return cmd.is_agent_general_cmd() or self._is_agent_specific_cmd(cmd) ''' def _exec_agent_cmd(self, cmd:Command): # AGENT "GENERAL LEVEL" command # => I process it directly without asking my DC # => Simple action, short execution time, so I execute it directly (in current main thread, not in parallel) if cmd.is_agent_level_general_cmd(): self.printd("********** -- AGENT LEVEL GENERAL CMD *********") self._exec_agent_general_cmd(cmd) # AGENT "SPECIFIC LEVEL" command # => I process it directly without asking my DC # => Simple action, short execution time, so I execute it directly (in current main thread, not in parallel) #elif self._is_agent_level_specific_cmd(cmd): else: self.printd("********** -- AGENT LEVEL SPECIFIC CMD *********") self._exec_agent_specific_cmd(cmd) ''' def _is_agent_specific_cmd(self, cmd:AgentCmd): return cmd.name in self.AGENT_SPECIFIC_COMMANDS ''' def _exec_agent_specific_cmd(self, cmd:Command): # Execute method self."cmd.name"() self.exec_cmd_from_its_name(cmd) ''' def is_in_test_mode(self): return self.TEST_MODE """ ================================================================================================ DEVICE SPECIFIC FUNCTIONS (abstract for Agent, overriden and implemented by AgentDevice) ================================================================================================ """ # To be overriden by subclass (AgentDevice) # @abstract def is_device_level_cmd(self, cmd): return False # to be overriden by subclass (AgentDevice) # @abstract def exec_device_cmd_if_possible(self, cmd:AgentCmd): pass # TO BE OVERRIDEN by subclass (AgentDevice) # @abstract def exec_device_cmd(self, cmd:AgentCmd): #self.exec_cmd_from_its_name(cmd) pass """ ================================================================= TEST DEDICATED FUNCTIONS ================================================================= """ def _set_debug_mode(self, mode:bool): self.DEBUG_MODE=mode def _set_with_simulator(self, mode:bool): self.WITH_SIMULATOR=mode def _set_test_mode(self, mode:bool): self.TEST_MODE=mode log.info("in test mode") def _TEST_get_next_command_to_send(self)->AgentCmd: cmd_full_name = next(self.TEST_COMMANDS, None) #return cmd_name if cmd_full_name is None: return None if ' ' not in cmd_full_name: raise Exception('Command is malformed:', cmd_full_name) agent_recipient,cmd_name = cmd_full_name.split(' ', 1) ##recipient_agent = self.name if self.TEST_COMMANDS_DEST=="myself" else self.TEST_COMMANDS_DEST #return Command(sender=self.name, recipient=recipient_agent, name=cmd_name) cmd = self.create_cmd_for(agent_recipient, cmd_name) # If no cmd created (because of error, bad AgentDevice name), call again this method for next cmd if cmd is None: return self._TEST_get_next_command_to_send() return cmd """ def simulator_send_next_command(self): #self._current_test_cmd = "set_state:idle" if self._current_test_cmd=="set_state:active" else "set_state:active" #if self._nb_test_cmds == 4: self._current_test_cmd = "do_exit" cmd_name = next(self.TEST_COMMANDS, None) #self.printd("next cmd is ", cmd_name) if cmd_name is None: return #Command.objects.create(sender=self.name, recipient=self.name, name=cmd_name) recipient_agent = self.name if self.TEST_COMMANDS_DEST=="myself" else self.TEST_COMMANDS_DEST Command.objects.create(sender=self.name, recipient=recipient_agent, name=cmd_name) #time.sleep(1) #self._TEST_current_cmd_idx += 1 #self._nb_test_cmds += 1 """ def _test_routine_process(self): """ TEST MODE ONLY """ log.info("(TEST mode) Trying to send a new command if possible...") # There is a current command being processed # => check if next command is "do_abort" # => if so, instantly send a "do_abort" to abort previous command if self._cmdts is not None: log.info(f"Waiting for end execution of cmd '{self._cmdts.name}' (sent to {self._cmdts.recipient}) ...") # Update cmdts fields from DB self._cmdts.refresh_from_db() if self._cmdts.is_pending() or self._cmdts.is_running(): if self._next_cmdts is None: # If next command is "do_abort" then abort becomes the new current command (to be sent) self._next_cmdts = self._TEST_get_next_command_to_send() if self._next_cmdts and self._next_cmdts.name == "do_abort": # Wait a little to give a chance to agentB to start execution of current command, # so that we can abort it then (otherwise it won't be aborted!!) time.sleep(4) self._cmdts = self._next_cmdts self._next_cmdts = None log.info("***") #log.info(f"*** SEND ", self._cmdts) log.info(f"***", self._cmdts) log.info("***") self._cmdts.send() # Current cmd is no more running else: # Execution was not completed #if self._cmdts.is_expired() or self._cmdts.is_skipped() or self._cmdts.is_killed(): if self._cmdts.is_skipped() or self._cmdts.is_killed(): log.info("Command was not completed") # 2 possible scenarios: # - (1) Send the SAME command again ''' self.printd("Command was not completed, so I send it again") # The command was not completed, so, make a copy of it and send it again # For this, it is enough to set primary key to None, # then the send() command below will save a NEW command #self._cmdts = copy.copy(self._cmdts) self._cmdts.id = None SEND_A_NEW_COMMAND = True ''' # - (2) Send next command #self._cmdts = None # Execution was not complete => get result elif self._cmdts.is_executed(): cmdts_res = self._cmdts.get_result() log.info(f"Cmd executed. Result is '{cmdts_res}'") #cmdts_is_processed = True ''' Optimisation possible pour gagner une iteration: self._cmdts = self.simulator_get_next_command_to_send() # No more command to send (from simulator) => return if self._cmdts is None: return SEND_A_NEW_COMMAND = True ''' # Set cmdts to None so that a new command will be sent at next iteration self._cmdts = None # No currently running command => get a new command and SEND it if self._cmdts is None: if self._next_cmdts is not None: self._cmdts = self._next_cmdts self._next_cmdts = None else: self._cmdts = self._TEST_get_next_command_to_send() # No more command to send (from simulator) => return and EXIT if self._cmdts is None: self._DO_MAIN_LOOP = False return # Send cmd (= set as pending and save) log.info("***") #self.printd(f"*** SEND ", self._cmdts) log.info(f"*** NEW COMMAND TO SEND is:", self._cmdts) log.info("***") #self._cmdts.set_as_pending() # SEND self._cmdts.send() #cmdts_is_processed = False #cmdts_res = None def _TEST_test_results(self): if self.TEST_COMMANDS_LIST == [] : return nb_commands_to_send = len(self.TEST_COMMANDS_LIST) nb_commands_sent, commands = self._TEST_test_results_start() #nb_commands_to_send = len(self.TEST_COMMANDS_LIST) # General (default) test #self.printd(commands[0].name, "compared to", self.TEST_COMMANDS_LIST[0].split()[1]) assert commands[0].name == self.TEST_COMMANDS_LIST[0].split()[1] last_cmd = commands[-1] assert last_cmd.name == self.TEST_COMMANDS_LIST[-1].split()[1] assert last_cmd.name == "do_exit" assert last_cmd.is_executed() assert last_cmd.get_result() == "SHOULD BE DONE NOW" nb_asserted = 0 nb_agent_general = 0 nb_unknown = 0 nb_unimplemented = 0 for cmd in commands: assert cmd.is_executed() or cmd.is_killed() or cmd.is_skipped() nb_asserted += 1 if cmd.is_agent_general_cmd(): nb_agent_general += 1 if cmd.name == "do_unknown": assert cmd.is_skipped() #assert "UnimplementedGenericCmdException" in cmd.get_result() assert "INVALID COMMAND" in cmd.get_result() nb_unknown += 1 #if cmd.name in ["do_unimplemented", "do_unknown"]: if cmd.name == "do_unimplemented": assert cmd.is_skipped() assert "UnimplementedGenericCmdException" in cmd.get_result() nb_unimplemented += 1 assert nb_asserted == nb_commands_sent log.info(nb_commands_to_send, "cmds I had to send <==>", nb_asserted, "cmds executed (or killed), ", nb_commands_to_send-nb_commands_sent, "cmd ignored") log.info("Among executed commands:") log.info(f"- {nb_agent_general} AGENT general command(s)") log.info("-", nb_unimplemented, "unimplemented command(s) => UnimplementedGenericCmdException raised then command was skipped") log.info("-", nb_unknown, "unknown command(s) => skipped") # Now test that any "AD get_xx" following a "AD set_xx value" command has result = value for i,cmd_set in enumerate(commands): if cmd_set.name.startswith('set_'): commands_after = commands[i+1:] for cmd_get in commands_after: if cmd_get.name.startswith('get_') and cmd_get.name[4:]==cmd_set.name[4:] and cmd_get.device_type==cmd_set.device_type: log.info("cmd_get.result == cmd_set.args ?", cmd_get.result, cmd_set.args) assert cmd_get.get_result() == ','.join(cmd_set.args), "A get_xx command did not gave the expected result as set by a previous set_xx command" break # Specific (detailed) test (to be overriden by subclass) nb_asserted2 = self.TEST_test_results_main(commands) self._TEST_test_results_end(nb_asserted) def _TEST_test_results_start(self): print() log.info("--- Testing if the commands I SENT had the awaited result") log.info("Here are the last commands I sent:") #commands = list(Command.get_last_N_commands_for_agent(self.name, 16)) #commands = Command.get_last_N_commands_sent_to_agent(self.name, 16) nb_commands = len(self.TEST_COMMANDS_LIST) if "ad_unknown get_dec" in self.TEST_COMMANDS_LIST: nb_commands -= 1 commands = AgentCmd.get_last_N_commands_sent_by_agent(self.name, nb_commands) AgentCmd.show_commands(commands) return nb_commands, commands """ OLD SCENARIO nb_asserted = 0 for cmd in commands: if cmd.name == "specific0": assert cmd.is_skipped() nb_asserted+=1 if cmd.name == "specific1": assert cmd.result == "in step #5/5" assert cmd.is_executed() nb_asserted+=1 if cmd.name in ("specific2","specific3"): assert cmd.is_killed() nb_asserted+=1 if cmd.name in ("specific4", "specific5", "specific6", "specific7", "specific8"): assert cmd.is_pending() nb_asserted+=1 # 2 cmds abort if cmd.name in ("do_abort"): assert cmd.is_executed() nb_asserted+=1 if cmd.name in ("do_exit"): assert cmd.is_executed() nb_asserted+=1 assert nb_asserted == 12 self.printd("--- Finished testing => result is ok") """ # To be overriden by subclass def TEST_test_results_main(self, commands): return 0 ''' nb_asserted = 0 self.printd("from simulator_test_results_main", commands) for cmd in commands: assert cmd.is_executed() nb_asserted+=1 return nb_asserted ''' def _TEST_test_results_end(self, nb_asserted): #nb_commands_to_send = len(self.TEST_COMMANDS_LIST) #self.printd(nb_asserted, "vs", nb_commands_to_send) #assert nb_asserted == nb_commands_to_send #self.printd(f"************** Finished testing => result is ok ({nb_asserted} assertions) **************") printFullTerm(Colors.GREEN, f"************** Finished testing => result is ok ({nb_asserted} assertions) **************") """ ================================================================= MAIN ================================================================= """ def extract_parameters(): """ Usage: Agent.py [-t] [configfile] """ # arg 1 : -t # arg 2 : configfile DEBUG_MODE = False TEST_MODE = False WITH_SIM = False VERBOSE_MODE = False configfile = None log.debug("args:" + str(sys.argv)) for arg in sys.argv[1:] : if arg == "-t": TEST_MODE = True elif arg == "-s": WITH_SIM = True elif arg == "-d": DEBUG_MODE = True elif arg == "-v": VERBOSE_MODE = True else: configfile = arg ''' if len(sys.argv) > 1: if sys.argv[1] == "-t": TEST_MODE = True if len(sys.argv) == 3: configfile = sys.argv[2] else: configfile = sys.argv[1] ''' return DEBUG_MODE, WITH_SIM, TEST_MODE, VERBOSE_MODE, configfile #def build_agent(Agent_type:Agent, name="GenericAgent", RUN_IN_THREAD=True): def build_agent(Agent_type:Agent, RUN_IN_THREAD=True): DEBUG_MODE, WITH_SIM, TEST_MODE, VERBOSE_MODE, configfile = extract_parameters() log.info("env is" + os.getenv("PYROS_DEBUG")) #print(logger) #agent = Agent("GenericAgent", configfile, RUN_IN_THREAD=True) #agent = Agent_type(configfile, RUN_IN_THREAD, DEBUG_MODE=DEBUG_MODE) agent = Agent_type(configfile, RUN_IN_THREAD) #agent = Agent_type(name, configfile, RUN_IN_THREAD) agent._set_with_simulator(WITH_SIM) agent._set_test_mode(TEST_MODE) #print(logger) #logg.info("agent built, return it") #agent._set_debug_mode(DEBUG_MODE) return agent if __name__ == "__main__": # with thread RUN_IN_THREAD=True # with process #RUN_IN_THREAD=False agent = build_agent(Agent, RUN_IN_THREAD=RUN_IN_THREAD) agent.show_config() #agent = build_agent(Agent, name="GenericAgent", RUN_IN_THREAD=RUN_IN_THREAD) ''' TEST_MODE, WITH_SIM, configfile = extract_parameters() agent = Agent("GenericAgent", configfile, RUN_IN_THREAD=True) #agent.setSimulatorMode(TEST_MODE) agent.setTestMode(TEST_MODE) agent.setWithSimulator(WITH_SIM) self.printd(agent) ''' agent.run()