#!/usr/bin/env python3 #DEBUG=True #DEBUG=False """ ================================================================= Agent class ================================================================= This class is the base class for all AgentXXX classes (AgentSST, AgentScheduler, AgentImagesProcessor...) It runs an infinite loop (called main_loop) doing at each iteration some routine tasks (process_routine before and after) and executing commands sent by other agents. To start it up, from the project root : - In normal mode : - ./PYROS start -fg agent -o tnc (add -d after the PYROS command for debug mode) - In test mode (the agent will execute a scenario as a suite of commands (in TEST_COMMANDS_LIST) to send to himself (or other agents) and execute them on reception at each iteration : - ./PYROS -t start -fg agent -o tnc """ ### # ================================================================= # SETUP FOR DJANGO # # (see https://docs.djangoproject.com/en/dev/topics/settings) # (see also https://docs.djangoproject.com/en/dev/ref/settings) # ================================================================= ### # For cmd parsing from array import array from datetime import datetime from typing import List, Tuple, Union, Any, Optional, Literal import ast from inspect import signature import os from pathlib import Path import sys import logging import typing from django.utils import timezone from django.conf import settings as djangosettings from django.db.models.query import QuerySet # Conseil sur le net: #https://stackoverflow.com/questions/16853649/how-to-execute-a-python-script-from-the-django-shell #"" #import sys, os #sys.path.append('/path/to/your/django/app') #os.environ['DJANGO_SETTINGS_MODULE'] = 'settings' #from django.conf import settings #"" # To avoid a "ModuleNotFoundError: No module named 'dashboard'"... (not even 1 app found) : ##sys.path.insert(0, os.path.abspath("..")) ##sys.path.insert(0, os.path.abspath("src")) ##sys.path.insert(0, "../src") ##sys.path.insert(0, "src") # To avoid a "ModuleNotFoundError: No module named 'dashboard'" ## sys.path.append("..") py_pwd = os.path.normpath(os.getcwd() + "/..") if (py_pwd not in os.sys.path): (os.sys.path).append(py_pwd) # To avoid a "ModuleNotFoundError: No module named 'src'" ## sys.path.append("../../../..") py_pwd = os.path.normpath(os.getcwd() + "/../../../..") if (py_pwd not in os.sys.path): (os.sys.path).append(py_pwd) ##sys.path.append("src") from src.pyros_logger import log, handler_filebyagent #from src.pyros_logger import logger as logg, handler_filebyagent, ##from src import pyros_logger ''' def printd(*args, **kwargs): if os.environ.get('PYROS_DEBUG', '0')=='1': print(*args, **kwargs) ''' #printd("Starting with this sys.path", sys.path) log.debug("Starting with this sys.path" + str(sys.path)) # DJANGO setup # self.printd("file is", __file__) # mypath = os.getcwd() # Go into src/ ##os.chdir("..") ##os.chdir("src") #printd("Current directory : " + str(os.getcwd())) log.debug("Current directory : " + str(os.getcwd())) #os.environ.setdefault("DJANGO_SETTINGS_MODULE", "src.core.pyros_django.pyros.settings") os.environ.setdefault("DJANGO_SETTINGS_MODULE", "src.core.pyros_django.pyros.settings") # os.environ['SECRET_KEY'] = 'abc' # os.environ['ENVIRONMENT'] = 'production' import django django.setup() #printd("DB2 used is:", djangosettings.DATABASES["default"]["NAME"]) log.debug("DB2 used is:" + djangosettings.DATABASES["default"]["NAME"]) ### # ================================================================= # IMPORT PYTHON PACKAGES #================================================================= ### # --- GENERAL PURPOSE IMPORT --- #from __future__ import absolute_import ##import utils.Logger as L import platform import random import threading #import multiprocessing import time import re ''' from threading import Thread import socket ''' #import ctypes #import copy # --- DJANGO IMPORT --- from django.db import transaction from django import db # from django.core.exceptions import ObjectDoesNotExist # from django.db.models import Q #from django.shortcuts import get_object_or_404 #from django.conf import settings as djangosettings # --- SPECIFIC IMPORT --- # Many ways to import configuration settings: ##import config import config.old_config as config_old #from config import PYROS_ENV, ROOT_DIR, DOC_DIR #from config import * from common.models import AgentSurvey, AgentCmd, AgentLogs CmdException = AgentCmd.CmdException CmdExceptionUnknown = AgentCmd.CmdExceptionUnknown CmdExceptionUnimplemented = AgentCmd.CmdExceptionUnimplemented CmdExceptionBadArgs = AgentCmd.CmdExceptionBadArgs CmdExceptionExecError = AgentCmd.CmdExceptionExecError CmdExceptionExecKilled = AgentCmd.CmdExceptionExecKilled CmdExceptionExecTimeout = AgentCmd.CmdExceptionExecTimeout #from config.configpyros import ConfigPyros from config.old_config.configpyros import ConfigPyros as ConfigPyrosOld from src.core.pyros_django.obsconfig.obsconfig_class import OBSConfig #from dashboard.views import get_sunelev #from devices.TelescopeRemoteControlDefault import TelescopeRemoteControlDefault #from utils.JDManipulator import * ##from agent.logpyros import LogPyros ##from src.logpyros import LogPyros ''' from device_controller.abstract_component.device_controller import ( DCCNotFoundException, UnknownGenericCmdException, UnimplementedGenericCmdException, UnknownNativeCmdException ) ''' from enum import Enum ### # ================================================================= # GENERAL MODULE CONSTANT & FUNCTIONS DEFINITIONS # ================================================================= ### #DEBUG_FILE = False ##log = L.setupLogger("AgentLogger", "Agent") IS_WINDOWS = platform.system() == "Windows" class Colors: HEADER = "\033[95m" BLUE = "\033[94m" GREEN = "\033[92m" WARNING = "\033[93m" FAIL = "\033[91m" ENDC = "\033[0m" BOLD = "\033[1m" UNDERLINE = "\033[4m" def printColor(color: Colors, message, file=sys.stdout, eol=os.linesep, forced=False): #system = platform.system() """ if (self.disp == False and forced == False): return 0 """ #if system == "Windows": if IS_WINDOWS: print(message, file=file, end=eol) else: print(color + message + Colors.ENDC, file=file, end=eol) return 0 def printFullTerm(color: Colors, string: str): #system = platform.system() columns = 100 row = 1000 disp = True value = int(columns / 2 - len(string) / 2) printColor(color, "-" * value, eol="") printColor(color, string, eol="") value += len(string) printColor(color, "-" * (columns - value)) return 0 """ ================================================================= class StoppableThread ================================================================= """ ''' class StoppableThreadEvenWhenSleeping(threading.Thread): # Thread class with a stop() method. The thread itself has to check # regularly for the stopped() condition. # It stops even if sleeping # See https://python.developpez.com/faq/?page=Thread#ThreadKill # See also https://www.oreilly.com/library/view/python-cookbook/0596001673/ch06s03.html def __init__(self, *args, **kwargs): #super(StoppableThreadSimple, self).__init__(*args, **kwargs) super().__init__(*args, **kwargs) self._stop_event = threading.Event() #def stop(self): def terminate(self): self._stop_event.set() def stopped(self): return self._stop_event.is_set() def wait(self, nbsec:float=2.0): self._stop_event.wait(nbsec) ''' ### # ================================================================= # class Agent # ================================================================= ### class Agent: """ See Agent_activity_diag.pu for PlantUML activity diagram Behavior of an agent: - If idle : - still does routine_process() and general_process() - does not do specific_process() - Once a command has been sent to another agent : - It waits (non blocking) for the end of execution of the command and get its result - If command is timed out or has been skipped or killed, then it is NOT re-executed at next iteration (except if needed explicitely) """ # --- # --- CLASS (STATIC) attributes (CONSTANTS) # --- If agent is instance of Agent: # --- - CLASS attributes are accessible via agent.__class__.__dict__ # --- - INSTANCE attributes are accessible via agent.__dict__ # --- class EXEC_MODE(Enum): SEQUENTIAL = 0 THREAD = 1 PROCESS = 2 # Default modes DEBUG_MODE:bool = False #TEST_MODE = False # By default, a command is valid during 5s (and then perempted) DEFAULT_CMD_VALIDITY_DURATION:int = 5 # Wait a fixed number of seconds before each loop ? #WITH_RANDOM_WAIT = False # 1 sec by default __DELAY_NB_SEC:int = 1 # - YES if TEST mode (in init()) # Default LOG level is INFO #PYROS_DEFAULT_GLOBAL_LOG_LEVEL = LogPyros.LOG_LEVEL_INFO # INFO ''' This Agent Specific commands list, that he is able to execute To be overriden by subclasses (here are just a few examples of commands) Format : ("cmd", args, timeout) with : - cmd (str) : the command name - timeout (int) : the command timeout (in sec ; special values : 0=not executed ; -1=no timeout) - exec_mode (EXEC_MODE) : EXEC_MODE.SEQUENTIAL, EXEC_MODE.THREAD, or EXEC_MODE.PROCESS ''' #_AGENT_SPECIFIC_COMMANDS: List[ Tuple[str, int, int] ] = [ _AGENT_SPECIFIC_COMMANDS: List[ Tuple[str, int, EXEC_MODE] ] = [ # Format : (“cmd_name”, timeout, exec_mode) ("do_specific1", 10, EXEC_MODE.SEQUENTIAL), #("set_specific2", 5, 0), ("do_specific3", 3, EXEC_MODE.THREAD), ("do_specific4", 3, EXEC_MODE.PROCESS), ] # # --- FOR TEST ONLY --- # # By default, NOT in test mode #TEST_MODE = False # Maximum duration of this agent (only for SIMULATION mode) # If set to None, it will never exit except if asked (or CTRL-C) # If set to 20, it will exit after 20s TEST_MAX_DURATION_SEC = None #TEST_MAX_DURATION_SEC = 30 # Run this agent in simulator mode #TEST_MODE = True WITH_SIMULATOR:bool = False # Run the assertion tests at the end TEST_WITH_FINAL_TEST:bool = False CMD_STATUS = AgentCmd.CMD_STATUS_CODES AGT_STATUS = AgentSurvey.STATUS_CHOICES # (EP 2022/07) Default SCENARIO to be executed (only in TEST mode), and then STOP # # It is a list of commands to be sent by this agent to other agents ("self" means himself) # # Format : List of tuples (command, validity, expected_res, expected_status), with : # # - DO_IT : execute this command yes (true) or no (false) ? (yes by default) # - command : the format is "recipient cmd args", with : # - recipient : name of the Agent that the command is to be sent to (use "self" to mean himself) # - cmd : the command name # - args : (optional) the list of command arguments, separated by blanks : arg1 arg2 arg3 ... # # - validity : the command is valid for this duration, afterwards you can skip it # # - expected_res : the expected result (set to None if not to be tested) # # - expected_status : the status of the command expected after execution (expired, killed, skipped, executed...) (set to None if not to be tested) # # Ex : # - "AgentScheduler set_state ATTENTIVE" => means to send the command "set_state ATTENTIVE" to the agent AgentScheduler # - "AgentX set_state ATTENTIVE" => means to send the command "set_state ATTENTIVE" to the agent AgentX # - "self set_state ATTENTIVE" => means to send the command "set_state ATTENTIVE" to MYSELF # - "self do_restart_loop" => means to send the command "do_restart_loop" to MYSELF (no args) # #_TEST_COMMANDS_LIST: List[ Tuple[ bool, str, int, Union[str,None], Union[int,None] ] ] = [ ##_TEST_COMMANDS_LIST: List[ Tuple[ bool, str, int, Optional[str], AgentCmd.CMD_STATUS_CODES ] ] = [ _TEST_COMMANDS_LIST: List[ Tuple[ bool, str, Optional[int], Optional[str], Optional[int]] ] = [ # Format : (DO_IT, "self cmd_name cmd_args", validity, "expected_result", expected_status), #("self do_stop now", 200, '15.5', None), (True, "self do_specific1 1 2 3.5 titi (3,'titi',5) [1,3,5,7,9]", 200, '15.5', None), # 1) First, 3 EXCEPTION CASES (uncomment to activate exception) # Each of these lines will stop execution with an exception # ------------------------------------------------------------ # - Agent command, unknown => ko, UnknownCmdException #("self do_unknown", 10, None,None), # - Agent general command malformed (missing arg) => ko, AgentCmdBadArgsException #("Agent set_mode", 10, None,None), # - Agent specific command, known but not implemented => ko, AgentCmdUnimplementedException #("self set_specific2", 10, None,None), # - Agent specific command, implemented but missing args => ko, AgentCmdBadArgsException #(" self do_specific1 1 ", 10, None,None), # 2) NORMAL CASES (test scenario) # All these commands should be executed without error, from the 1st to the last one # ------------------------------- # This command has a validity of 0s and thus should be tagged as "expired" (True, "self set_mode ATTENTIVE", 0, "MODE is ATTENTIVE", CMD_STATUS.CMD_EXPIRED), # Agent general command (True, "self set_mode ATTENTIVE", 200, "MODE is ATTENTIVE", CMD_STATUS.CMD_EXECUTED), # => should get "ATTENTIVE" (True, "self get_mode", 100, "MODE is ATTENTIVE", None), # => should get "7" (True, "self do_eval 3+5-1", 200, '7', None), # END, will not go further #("self do_exit", 2, "STOPPING"), # Agent specific commands => should be executed (True, "self do_specific3", 200, '', None), (True, "self do_exit", 500, "STOPPING", None), (True, "self do_specific1 1 2 3.5 titi (3,'titi',5) [1,3,5,7,9]", 200, '15.5', None), (True, "self set_mode ROUTINE", 200, "MODE is ROUTINE", None), # => should get "ROUTINE" (True, "self get_mode", 200, "MODE is ROUTINE", None), # Agent specific command => should be skipped (because not ATTENTIVE) (True, "self do_specific1 1 2 3.5 titi (3,'titi',5) [1,3,5,7,9]", 200, "SKIPPED", None), # From now on, should not run anymore process_before/after # => and should skip next specific commands (True, "self set_mode IDLE", 200, "MODE is IDLE", None), # => should get "IDLE" (True, "self get_mode", 200, "MODE is IDLE", None), # Agent specific command => should be skipped (because not ATTENTIVE) (True, "self do_specific1 1 2 3.5 titi (3,'titi',5) [1,3,5,7,9]", 200, 'SKIPPED', None), # TODO: test priority commands : do_abort, do_flush_cmds, ... # - Stop executing new commands (just let them accumulate) ##("self do_stop_exec",), ##("self set_mode ATTENTIVE",), ##("self get_mode",), ##("self do_specific1 1 2 3.5 titi (3,'titi',5) [1,3,5,7,9]",), ##("self set_mode ROUTINE",), ##("self get_mode",), ##("self do_abort",), # - Now, resume execution of commands : # we should have the previous list pending, # but as "do_abort" is priority, it should be executed first even if last ! ##("self do_resume_exec",), # Restart the restart loop (from init()) (True, "self do_restart", 200, "RESTARTING", None), # Now stop (True, "self do_exit", 200, "STOPPING", None), ''' # specific0 not_executed_because_idle "specific0", "set_state:active", # specific1 executed_because_not_idle, should complete ok "specific1", # specific2 will be executed only when specific1 is finished, # and should be aborted before end of execution, # because of the 1st coming "do_abort" command below "specific2", # specific3 should be executed only when specific2 is finished (in fact, aborted), # and should be aborted before end of execution, # because of the 2nd coming "do_abort" command below "specific3", # These commands should not have the time to be processed # because the "do_exit" command below should be executed before "specific4", "specific5", "specific6", "specific7", "specific8", # Should abort the current running command (which should normally be specific2) # even if commands above (specific3, ..., specific8) are already pending "do_abort", # These commands (except abort) won't be executed # because too many commands are already pending (above) "specific9", "do_abort", "set_state:active", "set_state:idle", # Should stop the agent even before the previous pending commands are executed "do_exit", # Because of the previous "do_exit" command, # these following commands should not be executed, # and not even be added to the database command table "set_state:active", "specific10" ''' ] #TEST_COMMANDS = iter(TEST_COMMANDS_LIST) ''' # with thread RUN_IN_THREAD = True # with process #RUN_IN_THREAD = False _thread_total_steps_number = 1 ''' #COMMANDS_PEREMPTION_HOURS = 48 #COMMANDS_PEREMPTION_HOURS = 60/60 ##name = "Generic Agent" ##status = None ##mode = None config = None ''' Moved to more central file : config.config_base PYROS_DJANGO_BASE_DIR = Path("src/core/pyros_django") # pathlib DEFAULT_CONFIG_FILE_NAME = "config_unit_simulunit1.xml" CONFIG_DIR_NAME = "config" ''' # My own ConfigPyros instance (Observatory Configuration) _oc = None # Parameters from config file # for /src/ #_path_data = '../../config' # for /src/core/pyros_django/ #_path_data = '../../../../config' # Path to config _path_data = '' _computer_alias = '' _computer_description = '' # Current and next command to send ##_cmdts: AgentCmd = None ##_next_cmdts = None __agent_survey = None __pending_commands = QuerySet # [] ''' _current_device_cmd = None _current_device_cmd_thread = None ''' # List of agents I will send commands to _my_client_agents_aliases = [] _my_client_agents = {} # Log object _log = None def __repr__(self): return "I am agent " + self.name def __str__(self): return self.__repr__() #return "I am agent " + self.name # new obsconfig init for agent: ##def __init__(self, RUN_IN_THREAD=True): def __init__(self): # Declaration of Instance attributes, default values #self.UP_SINCE = datetime.utcnow() self.__UP_SINCE = datetime.now(tz=timezone.utc) self.__ROUTINE_ITER_START_IS_RUNNING:bool = False self.__ROUTINE_ITER_END_IS_RUNNING:bool = False self.__test_cmd_received_num:int = 0 # only for tests # Current Command running self.__CC :AgentCmd = None # Previous Command running self.__CC_prev :AgentCmd = None # Current Command exception (if occurs) self.__CCE :Exception = None self.name = "Generic Agent" self.__status :str = None self.__mode :str = None self.unit = None self.TEST_COMMANDS = None self.__iter_num :int = 0 #print(AgentSurvey.MODE_CHOICES.IDLE) #sys.exit() # Agent is by default in mode ATTENTIVE (most active mode) #self.__mode = self.MODE_ATTENTIVE self.set_mode_attentive() #self._set_mode(MODES.) log.addHandler(handler_filebyagent(logging.INFO, self.__class__.__name__)) log.debug("start Agent init") obs_config_file_path = os.environ["PATH_TO_OBSCONF_FILE"] path_to_obs_config_folder = os.environ["PATH_TO_OBSCONF_FOLDER"] unit = os.environ["unit_name"] oc = OBSConfig(obs_config_file_path,unit) self.set_config(oc, obs_config_file_path, path_to_obs_config_folder, unit) self.name = self.__class__.__name__ # set real unit name (the current unit used) if unit == "": unit = oc.unit_name agent_name_from_config = self.get_config().get_agent_name_from_config(self.__class__.__name__) if agent_name_from_config: self.name = agent_name_from_config self.unit = unit print(f"Agent name : {self.name}") print(f"Unit name : {self.unit}") # (EP) moved to AgentDevice ###self._set_agent_device_aliases_from_config(self.name) self._set_mode_from_config(self.name) log.debug("Step __init__") self.TEST_COMMANDS = iter(self._TEST_COMMANDS_LIST) ##self.RUN_IN_THREAD = RUN_IN_THREAD self.__set_status(self.AGT_STATUS.LAUNCHED) ####self._set_idle() # Create 1st survey if none #tmp = AgentSurvey.objects.filter(name=self.name) #if len(tmp) == 0: #nb_agents = AgentSurvey.objects.filter(name=self.name).count() #if nb_agents == 0: if AgentSurvey.objects.filter(name=self.name).exists(): self.__agent_survey = AgentSurvey.objects.get(name=self.name) else: self.__agent_survey = AgentSurvey.objects.create( name=self.name, validity_duration=60, #mode=self.__mode, mode = self.__get_mode(), #status=self.__status, status=self.get_status(), iteration=-1 ) log.debug("Agent survey is" + str(self.__agent_survey)) #self.printd("Agent survey is", self._agent_survey) ("end Agent __init__") #def __init__(self, name:str="Agent", config_filename:str=None, RUN_IN_THREAD=True): #def __init__(self, config_filename:str=None, RUN_IN_THREAD=True, DEBUG_MODE=False): # def __init__(self, config_filename:str=None, RUN_IN_THREAD=True): # ''' # print('PYROS_ENV', PYROS_ENV) # print('ROOT_DIR', ROOT_DIR) # print('DOC_DIR', DOC_DIR) # ''' # # Set logger # ##pyros_logger.set_logger_config() # ##logg = logging.getLogger('pyroslogger') # log.addHandler(handler_filebyagent(logging.INFO, self.__class__.__name__)) # log.debug("start Agent init") # # SET CONFIG INSTANCE # # - OLD CONFIG # log.debug(f'config file is {config_filename}') # ##log.info('PYROS_ENV', config.PYROS_ENV) # log.debug('PYROS_ENV' + config_old.PYROS_ENV) # log.debug('ROOT_DIR' + config_old.ROOT_DIR) # log.debug('DOC_DIR' + config_old.DOC_DIR) # ##if config.is_dev_env(): print("DEV ENV") # if config_old.is_dev_env(): log.debug("DEV ENV") # if config_old.is_prod_env(): log.debug("PROD ENV") # if config_old.is_debug(): log.debug("IN DEBUG MODE") # # - NEW CONFIG # obs_config_file_path = os.environ["PATH_TO_OBSCONF_FILE"] # path_to_obs_config_folder = os.environ["PATH_TO_OBSCONF_FOLDER"] # unit = os.environ["unit_name"] # oc = OBSConfig(obs_config_file_path) # self.set_config(oc, obs_config_file_path, path_to_obs_config_folder, unit) # log.debug("Step __init__") # self.TEST_COMMANDS = iter(self.TEST_COMMANDS_LIST) # self.RUN_IN_THREAD = RUN_IN_THREAD # self._set_status(self.STATUS_LAUNCH) # self._set_idle() # self._set_agent_device_aliases_from_config(self.name) # self._set_mode_from_config(self.name) # #self.name = name # self.name = self.__class__.__name__ # ''' # printd("*** ENVIRONMENT VARIABLE PYROS_DEBUG is:", os.environ.get('PYROS_DEBUG'), '***') # ##self.DEBUG_MODE = DEBUG_MODE # self.DEBUG_MODE = os.environ.get('PYROS_DEBUG', '0')=='1' # ''' # #self.DEBUG_MODE = config.PYROS_ENV # ##self.log = LogPyros(self.name, AgentLogs) # ##self.DEBUG_MODE = config.is_debug() # self.DEBUG_MODE = config_old.is_debug() # ##self.log.debug_level = DEBUG_MODE # ''' # # Default LOG level is INFO # log_level = LogPyros.LOG_LEVEL_INFO # INFO # self.log.set_global_log_level(LogPyros.LOG_LEVEL_DEBUG) if self.DEBUG_MODE else self.log.set_global_log_level(log_level) # ''' # #global_log_level = LogPyros.LOG_LEVEL_DEBUG if self.DEBUG_MODE else self.PYROS_DEFAULT_GLOBAL_LOG_LEVEL # ##global_log_level = LogPyros.LOG_LEVEL_DEBUG if self.DEBUG_MODE else config.PYROS_DEFAULT_GLOBAL_LOG_LEVEL # ##global_log_level = LogPyros.LOG_LEVEL_DEBUG if self.DEBUG_MODE else config_old.PYROS_DEFAULT_GLOBAL_LOG_LEVEL # ##self.log.set_global_log_level(global_log_level) # ##self.printd("LOG LEVEL IS:", self.log.debug_level) # ##self.print("LOG LEVEL IS:", self.log.get_global_log_level()) # # ------------- OLD CONFIG ------------------- # # Est-ce bien utile ??? # # New way with PathLib # # my_parent_abs_dir = Path(__file__).resolve().parent # #TODO: on doit pouvoir faire mieux avec pathlib (sans utiliser str()) # ##self._path_data = str( Path( str(my_parent_abs_dir).split(str(self.PYROS_DJANGO_BASE_DIR))[0] ) / self.CONFIG_DIR_NAME ) # ##self._path_data = config.CONFIG_DIR # # self._path_data = config_old.CONFIG_DIR # #self._set_mode(self.MODE_IDLE) # # config_filename = self.get_config_filename(config_filename) # #self.printd(f"*** Config file used is={config_filename}") # # log.debug(f"*** Config file used is={config_filename}") # # self.config = ConfigPyrosOld(config_filename) # # if self.config.get_last_errno() != self.config.NO_ERROR: # # raise Exception(f"Bad config file name '{config_filename}', error {str(self.config.get_last_errno())}: {str(self.config.get_last_errmsg())}") # #TODO: : à mettre dans la config # ''' # 'AgentDeviceTelescope1': 'AgentDeviceTelescopeGemini', # 'AgentDeviceFilterSelector1': 'AgentDeviceSBIG', # 'AgentDeviceShutter1': 'AgentDeviceSBIG', # 'AgentDeviceSensor1': 'AgentDeviceSBIG', # ''' # #self._my_client_agents = {} # ### self._agent_logs = AgentLogs.objects.create(name=self.name, message="Step __init__") # #self.printdb("Step __init__") # log.debug("Step __init__") # self.TEST_COMMANDS = iter(self.TEST_COMMANDS_LIST) # self.RUN_IN_THREAD = RUN_IN_THREAD # self._set_status(self.STATUS_LAUNCH) # self._set_idle() # self._set_agent_device_aliases_from_config(self.name) # self._set_mode_from_config(self.name) # # TODO: remove # #self._set_idle() # self._set_active() # # Create 1st survey if none # #tmp = AgentSurvey.objects.filter(name=self.name) # #if len(tmp) == 0: # #nb_agents = AgentSurvey.objects.filter(name=self.name).count() # #if nb_agents == 0: # if AgentSurvey.objects.filter(name=self.name).exists(): # self._agent_survey = AgentSurvey.objects.get(name=self.name) # else: # self._agent_survey = AgentSurvey.objects.create(name=self.name, validity_duration=60, mode=self.__mode, status=self.__status, iteration=-1) # log.debug("Agent survey is" + str(self._agent_survey)) # #self.printd("Agent survey is", self._agent_survey) # ("end Agent init") # Current Command (CC) # - getter (to be used by subclasses ; this Agent superclass uses __CC instead) @property def CC(self): return self.__CC ''' No setter defined because only Agent superclass can do this # - setter @CC.setter def CC(self, value): self.__current_cmd = value ''' # Current Command Exception (CCE) # - getter @property def CCE(self): return self.__CCE ''' # - setter No setter defined because only Agent superclass can do this @CCE.setter def CCE(self, value): self.__CCE = value ''' # Current ROUTINE_PROCESS_BEF/AFT state # No setter defined because only Agent superclass can set this # - getter (to be used by subclasses) @property def ROUTINE_ITER_START_IS_RUNNING(self): return self.__ROUTINE_ITER_START_IS_RUNNING @property def ROUTINE_ITER_END_IS_RUNNING(self): return self.__ROUTINE_ITER_END_IS_RUNNING def set_config(self, oc: OBSConfig, obs_config_file_path: str, path_to_obs_config_folder: str, unit: str): self._oc = { 'config' : oc, 'env' : [ obs_config_file_path, path_to_obs_config_folder, unit ] } ''' config_env = { obs_config_file_path: obs_config_file_path, path_to_obs_config_folder: path_to_obs_config_folder, unit: unit } ''' def get_config(self): return self._oc['config'] def get_config_env(self): return self._oc['env'] def show_config(self): VERBOSE = False oc = self.get_config() obs_config_file_path, path_to_obs_config_folder, unit = self.get_config_env() #self.printd(obs_config_file_path) log.debug(obs_config_file_path) log.debug(path_to_obs_config_folder) log.debug(unit) #log.warning("petit warning bidon") print("\n") print("- Observatory:" + oc.get_obs_name()) my_unit_name = oc.get_units_name()[0] my_unit = (oc.get_units()[my_unit_name]) #print("- Unit description:", my_unit) if VERBOSE: print("\n") print("- Computers:", oc.get_computers()) print("\n") print("- Active Computers:", oc.get_active_computers()) print("\n") print("- Active Devices:", oc.get_active_devices()) print("\n") print("- Unit:", my_unit_name) VERBOSE and print(oc.get_unit_by_name(my_unit_name)) if VERBOSE: print("\n") print("- Unit topology:", oc.get_topology(my_unit_name)) print("\n") print("- Unit active Agents:", oc.get_active_agents(my_unit_name)) VERBOSE and print(oc.get_agents(my_unit_name)) print("\n") print("- Unit Agents per computer:", oc.get_agents_per_computer(my_unit_name)) print("\n") print("- Unit Agents per device:", oc.get_agents_per_device(my_unit_name)) if VERBOSE: print("\n") print("- Unit Channel groups:", oc.get_channel_groups(my_unit_name)) print("\n") print("- Unit Channels:", oc.get_channels(my_unit_name)) #print("\n") #print("- Unit/Channel info:", oc.get_channel_information(my_unit_name, 'OpticalChannel_up')) if VERBOSE: print("\n") print("- Unit Components agents:", oc.get_components_agents(my_unit_name)) if VERBOSE: print("\n") print("- Unit database:", oc.get_database_for_unit(my_unit_name)) print("\n") print("- Devices names:", oc.get_devices_names()) if VERBOSE: print("\n") print("- Devices names & files:", oc.get_devices_names_and_file()) print("\n") print("- Devices:", oc.get_devices()) print("\n") def get_config_filename(self, config_filename: str): if not config_filename: #config_filename = self.DEFAULT_CONFIG_FILE_NAME ##config_filename = config.DEFAULT_CONFIG_FILE_NAME config_filename = config_old.DEFAULT_CONFIG_FILE_NAME # If config file name is RELATIVE (i.e. without path, just the file name) # => give it an absolute path (and remove "src/agent/" from it) if config_filename == os.path.basename(config_filename): ##config_filename = os.path.join(config.CONFIG_DIR, config_filename) config_filename = os.path.join(config_old.CONFIG_DIR, config_filename) ''' # Build abs path including current agent dir config_filename = os.path.abspath(self.CONFIG_DIR_NAME + os.sep + config_filename) # Remove "src/agent_name/" from abs dir : # (1) Remove "src/core/pyros_django/" config_filename = config_filename.replace(str(self.PYROS_DJANGO_BASE_DIR)+os.sep, os.sep) # (2) Remove "agent_name/" #TODO: bidouille, faire plus propre config_filename = config_filename.replace(os.sep+"agent"+os.sep, os.sep) config_filename = config_filename.replace(os.sep+"monitoring"+os.sep, os.sep) ''' return os.path.normpath(config_filename) # Normal print ##def print(self, *args, **kwargs): self.log.print(*args, **kwargs) """ if args: self.printd(f"({self.name}): ", *args, **kwargs) else: self.printd() """ """ # DEBUG print shortcut ##def printd(self, *args, **kwargs): self.log.printd(*args, **kwargs) #if DEBUG: self.printd(d(*args, **kwargs) def log_d(self, *args, **kwargs): self.log.log_d(*args, **kwargs) def log_i(self, *args, **kwargs): self.log.log_i(*args, **kwargs) def log_w(self, *args, **kwargs): self.log.log_w(*args, **kwargs) def log_e(self, *args, **kwargs): self.log.log_e(*args, **kwargs) def log_c(self, *args, **kwargs): self.log.log_c(*args, **kwargs) def printdb(self, *args, **kwargs): self.log.db( *args, **kwargs) """ def _get_real_agent_name(self, agent_alias_name:str)->str: #self.printd("key is", agent_alias_name) ''' if not self._my_client_agents: return agent_alias_name return self._my_client_agents[agent_alias_name] ''' return self._my_client_agents.get(agent_alias_name) def run(self, nb_iter:int=None, FOR_REAL:bool=True): """ FOR_REAL: set to False if you don't want Agent to send commands to devices but just print messages without really doing anything """ ("in run()") self.__set_and_log_status(self.AGT_STATUS.LAUNCHED) #return # TEST MODE ONLY # IF in test mode but with REAL devices (no SIMULATOR), delete all dangerous commands from the test commands list scenario: self.__TEST_prepare() #self._DO_EXIT = False # No agent specific cmd currently running (thread) #self.AGENT_SPECIFIC_CMD_RUNNING = False # No Routine process (before or after) currently running (thread) #self.ROUTINE_PROCESS_RUNNING = False ################ # RESTART loop # ###############@ # REPEAT UNTIL not DO_RESTART_LOOP self.DO_RESTART_LOOP = True while self.DO_RESTART_LOOP: # By default, no restart after exit from main loop self.DO_RESTART_LOOP = False try: self.__restart_loop(nb_iter,FOR_REAL) except KeyboardInterrupt: # CTRL-C # In case of CTRL-C, kill the current thread (process) before dying (in error) #log.info("CTRL-C Interrupted, I kill the current thread (process) before exiting (if exists)") log.info(f"{self.name}: CTRL-C Interrupted") break #self._kill_running_device_cmd_if_exists("USER_CTRLC") #self.do_things_before_exit("USER_CTRLC") ##self._cleanup_before_exit("USER_CTRLC") ###self.do_stop("asap") ###exit(1) # exit self.do_stop("asap") def __restart_loop(self, nb_iter:int=None, FOR_REAL:bool=True): log.info("*"*10+ " STARTING RESTART LOOP "+ "*"*10+ '\n') self.start_time = time.time() #log.debug("on est ici: " + os.getcwd()) self.__load_config() # Only if in TEST mode #if self.is_in_test_mode(): self.__TEST_setup_TEST_MODE() self.__init() ''' testing log: self.log_e("ERROR") self.log_c("FATAL critical ERROR") ''' #self.log_w("WARNING", "watch your step !") #log.warning("WARNING"+ "watch your step !") # Avoid blocking on false "running" command # (old commands that stayed with "running" status when agent was killed) #AgentCmd.delete_commands_with_running_status_for_agent(self.name) self.__kill_false_running_cmd_if_exists() ############# # MAIN loop # ############@ self.__iter_num = 1 self.__CC = None self.__CC_prev = None self.__CCE = None self.DO_MAIN_LOOP = True while self.DO_MAIN_LOOP: # EXIT because of nb of iterations ? if nb_iter is not None: # Bad number of iterations or nb iterations reached => exit if nb_iter <= 0 or nb_iter < self.__iter_num: log.info(f"Exit because number of iterations asked ({nb_iter}) has been reached") break self.__main_loop(nb_iter,FOR_REAL) self.__iter_num += 1 #if not self.DO_MAIN_LOOP: break # TEST mode only #if self.TEST_MODE and self.TEST_WITH_FINAL_TEST: self.__TEST_test_results() #if self._DO_EXIT: exit(0) def __kill_false_running_cmd_if_exists(self): AgentCmd.kill_false_running_cmd_if_exists_for_agent(self.name) def __main_loop(self, nb_iter:int=None, FOR_REAL:bool=True): self.__set_and_log_status(self.AGT_STATUS.IN_MAIN_LOOP_START) self.__main_loop_start(nb_iter) #if not self.DO_MAIN_LOOP: return self.__reload_config_if_changed() # only if changed # Log this agent status (update my current mode and status in DB) ##self._log_agent_state() # better to do this in a subclass #self.show_config() #self.printd("====== START COMMMANDS PROCESSING ======") # SIMU #self.send_cmd_to("AgentScheduler", "do_replan") #print("general cmds:", self.get_general_cmds()) ##print("all cmds:", self.get_all_cmds()) # ROUTINE BEFORE (only if not IDLE) #if not self.IS_IDLE(): self.__routine_process_iter_start() # NEW CMD EXEC (if possible and exists) # # ONLY possible if : # - no current command self.__CC # OR # - current command self.__CC finished (can be run in parallel) # OR # - PRIORITY cmd received # => Then start and return next received cmd (only if exists) # => Otherwise, return None # This may throw Exception => stored in self.__CCE # Not good because started (running) commands cannot access the current cmd (not yet in self.__CC) ###cmd = self.__start_next_received_cmd_if_possible_and_exists() # Better : cmd = self.__get_next_received_cmd_if_possible_and_exists() if cmd: self.__CC_prev = self.__CC # This way, started commands will be able to access the current command via self.__CC self.__CC = cmd # Can throw exception => will be stored in self.__CCE self.__start_next_received_cmd(cmd) # Check if running cmd timeout if self.__CC and self.__CC.is_running() and self.__CC.is_exec_timeout(): self.__CC.set_as_exec_timeout() # If (SEQUENTIAL OR PARALLEL) current cmd is finished => process exception if exists (and test result if in testing mode) #FIXME: #if self.__CC and self.__CC.is_finished(): if ( self.__CC and self.__CC.is_finished() ) or self.__CCE : self.__process_finished_cmd() self.__CC = None # if no EXIT condition (no do_stop or do_restart command received) if self.DO_MAIN_LOOP: # ROUTINE AFTER (only if not IDLE) #if not self.IS_IDLE(): # Wait end of routine_process_iter_start before running new routine while self.ROUTINE_ITER_START_IS_RUNNING: self.sleep(1) self.__routine_process_iter_end() # TEST MODE only : send next command from test scenario (not waiting for current cmd to be finished) #if self.is_in_test_mode(): self.__TEST_send_next_test_cmd() #self.printd("====== END COMMMANDS PROCESSING ======") #self.waitfor(self.mainloop_waittime) # Wait end of routine_process_iter_end before starting new iteration while self.ROUTINE_ITER_END_IS_RUNNING: self.sleep(1) self.__main_loop_end() def __get_next_received_cmd_if_possible_and_exists(self)->Optional[AgentCmd]: self.__set_and_log_status(self.AGT_STATUS.IN_MAIN_LOOP_GET_NEXT_CMD) cmd = None # GET NEXT COMMAND if exists and if current cmd is finished #if self.__CC is None or self.__CC.is_finished() or self.__priority_cmd_received(): CMD_PRIO = self.__get_received_priority_cmd_if_exists() #if not self.__CC or self.__CC.is_finished() or (CMD_PRIO := self.__priority_cmd_received() is not None) : if not self.__CC or self.__CC.is_finished() or CMD_PRIO : ##if self.__priority_cmd_received() and self.__CC.is_running(): self.__CC_prev = self.__CC # Save "current cmd" as now "previous cmd" ###self.__CC_prev = self.__CC # New CC will be either : # - None if no new command received # or # - a priority command if exists in the commands list, otherwise the next command in the list #self.__CC = CMD_PRIO if CMD_PRIO else self.__get_next_received_command_if_exists() ###self.__CC = self.__get_next_received_cmd_if_exists() if not CMD_PRIO else CMD_PRIO ###if self.__CC: if CMD_PRIO: log.info("RUNNING PRIORITY COMMAND:") log.info(str(CMD_PRIO)) cmd = CMD_PRIO if CMD_PRIO else self.__get_next_received_cmd_if_exists() return cmd def __start_next_received_cmd(self, cmd:AgentCmd)->None: cmd.set_read_time() print() print() log.info("*"*10 + " NEXT CMD RECEIVED PROCESSING (START) " + "*"*10 + '\n') # EAFP # If cmd is priority, it will be executed SEQUENTIALLY and this call will be blocking until exec finished # Otherwise, it can be executed in parallel => non blocking try: ###self.__start_next_cmd(self.__CC) # Side effect : can modify cmd self.__start_cmd(cmd) except Exception as e: self.__CCE=e pass log.info("*"*10 + " NEXT CMD RECEIVED PROCESSING (END) " + "*"*10 + "\n") # Process current finished command self.CC, then reset it to None def __process_finished_cmd(self): # If current cmd has exception, deal with it log.info('-'*6 + " CMD EXEC FINISHED") log.info('-'*6 + " CMD STATUS & RESULT ARE:") log.info('-'*6 + " - STATUS: " + self.__CC.get_status()) log.info('-'*6 + " - RESULT: " + str(self.__CC.get_result())) if self.__CCE: log.info('-'*6 + "CMD finished WITH EXCEPTION ") self.__process_exception(self.CCE) self.__CCE = None # Check current cmd res and status ONLY if in test mode and cmd sent by myself #if self.is_in_test_mode() and self.__CC.sender==self.__class__.__name__ : if self.is_in_test_mode(): self.__TEST_check_current_cmd_res_and_status() def __get_received_priority_cmd_if_exists(self)->Optional[AgentCmd]: ''' Return True if priority cmd found in the commands list (if not irrelevant or running or expired) ''' with transaction.atomic(): commands = AgentCmd.get_pending_and_running_commands_for_agent(self.name) if not commands.exists(): return None for cmd in commands: #if cmd.name in ("do_exit", "do_abort", "do_flush_pending_commands"): break #if cmd.name in ("do_exit", "do_abort"): break if cmd.is_agent_general_priority_cmd(): # if prio cmd but "noprio" => do not consider it as prio if 'noprio' in cmd.args: continue # If prio cmd but irrelevant => skip it if cmd.name in ("do_stop","do_restart") and self.__cmd_was_sent_before_my_start(cmd) : cmd.set_as_skipped("Cmd stop/restart was sent before agent start => skip it") continue # if prio cmd already running, wait for it to finish if cmd.is_running(): return None if not cmd.is_expired(): return cmd return None # process self.current_cmd_exception def __process_exception(self, e:Exception): if not isinstance( e, ( # PENDING (non running) cmd exceptions CmdExceptionUnknown, CmdExceptionUnimplemented, CmdExceptionBadArgs, # RUNNING cmd exceptions CmdExceptionExecError, CmdExceptionExecKilled, CmdExceptionExecTimeout, ) ): # Unknown exception => raise it to stop agent raise(e) # Exception is known => deal with it log.error(e) #log.error(f"EXCEPTION on Agent command '{e.cmd_name}'") cmd = e.cmd if isinstance(cmd,AgentCmd) : # - Exception from PENGING if isinstance(e, (CmdExceptionUnknown, CmdExceptionUnimplemented, CmdExceptionBadArgs)) : if isinstance(e,CmdExceptionUnimplemented): ##if cmd.name != "get_specific_cmds": cmd.set_as_unimplemented(e.msg) #cmd.set_as_unimplemented("EXCEPTION: Command known but unimplemented") else: # set back to "pending" if ever was wrongly marked "running" #cmd.set_as_pending() cmd.set_as_invalid(e.msg) #cmd.set_as_invalid("EXCEPTION: Command unknown or bad args") # - Exception from RUNNING (Execution Exception) else: if isinstance(e,CmdExceptionExecError): cmd.set_as_exec_error(e.msg) else: #cmd.set_as_exec_error(e.msg if e.msg else "EXCEPTION: Error during execution") #FIXME: S'assurer que cmd.set_as_killed() or cmd.set_as_timeout() a bien été fait !!! cmd.set_result(e.msg) #cmd.set_result("EXCEPTION: Problem during execution") # isinstance(e.cmd, str) else: raise CmdException(cmd, "EXCEPTION: Abnormal error case...") #raise Exception("EXCEPTION: Abnormal error case...") def __cleanup_before_exit(self, stopper_agent_name:str=None): if not stopper_agent_name: stopper_agent_name = self.name #self._set_status(self.STATUS_EXIT) ##self._log_agent_state() log.info(f"{self.name}: Trying to stop cleanly") log.info(f"{self.name}: Before exiting, Here are (if exists) the current (still) pending commands (time ordered) :") #commands = AgentCmd.get_commands_sent_to_agent(self.name) commands = AgentCmd.get_pending_commands_for_agent(self.name) AgentCmd.show_commands(commands, True) ##self.do_flush_pending_commands() # Only if in TEST mode #if self.TEST_MODE and self.TEST_WITH_FINAL_TEST and self.TEST_COMMANDS_DEST == "myself": self.simulator_test_results() #if self.TEST_MODE and self.TEST_WITH_FINAL_TEST: self.__TEST_test_results() #self._DO_EXIT=True #exit(0) wait_nbsec = 2 # Waiting for (previous) current command still running if need be (if exists) while self.__CC_prev and self.__CC_prev.is_running(): log.info(f"The previous command {self.__CC_prev.name} is still running => Waiting for it to finish (waiting {wait_nbsec} sec...)") self.waitfor(wait_nbsec) # Waiting for current ROUTINE BEFORE process to finish if still running while self.ROUTINE_ITER_START_IS_RUNNING: log.info(f"The ROUTINE BEFORE process is still running => Waiting for it to finish (waiting {wait_nbsec} sec...)") self.waitfor(wait_nbsec) # Waiting for current ROUTINE BEFORE process to finish if still running while self.ROUTINE_ITER_END_IS_RUNNING: log.info(f"The ROUTINE AFTER process is still running => Waiting for it to finish (waiting {wait_nbsec} sec...)") self.waitfor(wait_nbsec) log.info(f"{self.name}: Before exiting, calling do_things_before_exit()") self._do_things_before_exit(stopper_agent_name) ##self._set_and_log_status(self.AGT_STATUS.EXITING) #def _kill_running_device_cmd_if_exists(self, abort_cmd_sender): # to be overriden by subclass def _do_things_before_exit(self, stopper_agent_name:str=None): pass def __main_loop_start(self, nb_iter:int=None): for i in range(3): print() #self.printd("-"*80) log.info("*"*90) log.info("*"*20 + f" {self.name} : MAIN LOOP ITERATION #{self.__iter_num} (START) " + "*"*20) log.info("*"*90 + '\n') #self.print(f"Iteration {self._iter_num}") ''' # EXIT because of nb of iterations ? if nb_iter is not None: # Bad number of iterations or nb iterations reached => exit if nb_iter <= 0 or nb_iter < self.__iter_num: log.info(f"Exit because number of iterations asked ({nb_iter}) has been reached") self.DO_MAIN_LOOP = False return ''' # Temporizing (delay) : Wait a random number of sec before starting iteration # (to avoid to busy to much the processor) # (also to let another agent having the chance to send a me command) if self.__DELAY_NB_SEC : #random_waiting_sec = random.randint(0,5) log.info(f"Waiting {self.__DELAY_NB_SEC} sec (random) before starting new iteration...") #time.sleep(random_waiting_sec) self.waitfor(self.__DELAY_NB_SEC) # (Test only) # EXIT because max duration reached ? if self.TEST_MODE and self.TEST_MAX_DURATION_SEC and (time.time()-self.start_time > self.TEST_MAX_DURATION_SEC): log.info("Exit because of max duration set to ", self.TEST_MAX_DURATION_SEC, "s") #self._kill_running_device_cmd_if_exists(self.name) self.__cleanup_before_exit(self.name) #if self.TEST_MODE and self.TEST_WITH_FINAL_TEST: self._TEST_test_results() self.DO_MAIN_LOOP = False return ##self._set_status(self.AGT_STATUS.IN_MAIN_LOOP) self.show_state() self._main_loop_start() # To be overriden by subclass def _main_loop_start(self): pass def __main_loop_end(self): self._main_loop_end() log.info("*"*20 + " MAIN LOOP ITERATION (END) " + "*"*20 + '\n') #self.do_log(LOG_DEBUG, "Ending main loop iteration") # To be overriden by subclass def _main_loop_end(self): pass # To be overriden by subclass (AgentDevice) def _process_device_level_cmd(self, cmd): pass ''' log.info("(DEVICE LEVEL CMD)") try: self.exec_device_cmd_if_possible(cmd) except (UnimplementedGenericCmdException) as e: #except (UnknownGenericCmdException, UnimplementedGenericCmdException, UnknownNativeCmdException) as e: log.e(f"EXCEPTION caught by {type(self).__name__} (from Agent mainloop) for command '{cmd.name}'", e) log.e("Thus ==> ignore this command") cmd.set_result(e) #cmd.set_as_killed_by(type(self).__name__) cmd.set_as_skipped() #raise ''' def __start_cmd(self, cmd:AgentCmd)->None: ''' Processing the next pending command (self.__CC) ''' #cmd = self.__CC #self.__set_and_log_status(self.AGT_STATUS.IN_MAIN_LOOP_GET_NEXT_CMD) #print() #print() #log.info("*"*10 + " NEXT COMMAND PROCESSING (START) " + "*"*10 + '\n') # Purge commands (every N iterations, starting from 1st, delete old commands) ''' N=5 if ((self._iter_num-1) % N) == 0: log.info("Purging expired commands if exists") #AgentCmd.purge_old_commands_for_agent(self.name) self._purge_old_commands_sent_to_me() ''' # Get next command and process it (if exists) ###cmd = self.__get_next_received_command() ###self.__CC = cmd #self._set_status(self.STATUS_GENERAL_PROCESS) #if cmd: self.command_process(cmd) # SIMU #cmd = "do_replan" #self.send_cmd_to("AgentScheduler", "do_replan") # No new command in the pending cmds queue => nothing to do ###if not cmd: return cmd log.info('-'*6) log.info('-'*6 + " RECEIVED NEW COMMAND TO PROCESS: ") log.info('-'*6 + str(cmd)) ''' if self.is_in_test_mode() and hasattr(self._cmdts,"expected_res"): log.info(f"TTT (with expected result : " + str(self._cmdts.expected_res) + ')') ''' log.info('-'*6) ###cmd.set_read_time() if cmd.is_expired(): cmd.set_as_expired() return # Misnamed command => CMD_INVALID if not AgentCmd.is_generic(cmd.name) and not self.is_native_device_command(cmd.name): cmd.set_as_invalid("Command misnamed, must start with do_, get_, or set_") return #cmd # CASE 1 - AGENT GENERAL command # (DO_RESTART, DO_EXIST, DO_ABORT, DO_ABORT_COMMAND, do_flush_pending_commands, ...) # This processing can set DO_MAIN_LOOP and DO_RESTART_LOOP to False if self.is_agent_general_cmd(cmd): try: self.__process_agent_general_cmd(cmd) except (CmdExceptionUnimplemented, CmdExceptionBadArgs) : # cmd should not be set as "running" cmd.set_as_pending() # These exceptions are managed at higher level : raise return #cmd # CASE 2 - AGENT SPECIFIC command # => Simple action, short execution time, so I execute it directly (in current main thread, not in parallel) if self.is_agent_specific_cmd(cmd): #log.info("(AGENT LEVEL CMD)") if not self.IS_ATTENTIVE(): cmd.set_as_skipped("Skipped because I am not ATTENTIVE") return #cmd try: self.__process_agent_specific_cmd(cmd) #self._exec_agent_cmd(cmd) #except AttributeError as e: except (CmdExceptionUnimplemented, CmdExceptionBadArgs, CmdExceptionExecError) as e: # cmd should not be set as "running" #cmd.set_as_pending() # These exceptions are managed at higher level : raise ''' if isinstance(e, (CmdExceptionUnimplemented, CmdExceptionBadArgs)): raise # If exception is unknown (any), replace it with a CmdExceptionExecError raise CmdExceptionExecError(cmd, str(e)) from None ''' return #cmd # CASE 3 - OTHER level command (DEVsICE level, only for AgentDevice) if self.is_device_level_cmd(cmd): self._process_device_level_cmd(cmd) return #cmd # CASE 4 - INVALID COMMAND #raise Exception("INVALID COMMAND: " + cmd.name) log.warning("******************************************************") log.warning("*************** ERROR: INVALID COMMAND (UNKNOWN) ***************") log.warning("******************************************************") #log.warning("Thus => I ignore this command...") #cmd.set_result("ERROR: INVALID COMMAND") ##cmd.set_as_skipped("ERROR: INVALID AGENT COMMAND") ##self._cleanup_before_exit() ##raise UnknownCmdException(cmd.name) #cmd.set_as_invalid() ##return cmd raise CmdExceptionUnknown(cmd) #print() #log.info("*"*10 + " NEXT COMMAND PROCESSING (END) " + "*"*10 + "\n") def purge_old_commands_sent_to_me(self): AgentCmd.purge_old_commands_sent_to_agent(self.name) # Only if not IDLE def __routine_process_iter_start(self): """ Routine processing BEFORE processing received commands. IMPORTANT : this processing must be as SHORT as possible, so that the agent can quickly read its received commands and process them This is a command or set of processings that this agent does or commands that it sends regularly, at each iteration """ if self.IS_IDLE(): return self.__ROUTINE_ITER_START_IS_RUNNING = True print() print() log.info("*"*10+ " ROUTINE BEFORE (START) "+ "*"*10+ '\n') self.__set_and_log_status(self.AGT_STATUS.IN_MAIN_LOOP_ROUTINE_ITER_START) self._routine_process_iter_start_body() print() log.info("*"*10 + " ROUTINE BEFORE (END) "+ "*"*10) self.__ROUTINE_ITER_START_IS_RUNNING = False # Only if not IDLE def __routine_process_iter_end(self): """ Routine processing AFTER processing received commands. This processing can be longer than the "before" one above, as the agent has already processed its received commands, but not too long anyway, otherwise it will take too long before the next iteration starts... This is a command or set of processings that this agent does or commands that it sends regularly, at each iteration """ if self.IS_IDLE(): return self.__ROUTINE_ITER_END_IS_RUNNING = True print() print() log.info("*"*10+ " ROUTINE AFTER (START) "+ "*"*10+ '\n') self.__set_and_log_status(self.AGT_STATUS.IN_MAIN_LOOP_ROUTINE_ITER_END) self._routine_process_iter_end_body() print() log.info("*"*10 + " ROUTINE AFTER (END) "+ "*"*10 + '\n') self.__ROUTINE_ITER_END_IS_RUNNING = False # To be overridden by subclasses def _routine_process_iter_start_body(self): self.waitfor(1) # To be overridden by subclasses def _routine_process_iter_end_body(self): #if self.TEST_MODE: self._TEST_test_routine_process() self.waitfor(1) # To be overriden by AgentDevice def is_native_device_command(self, cmd_name:str): return False """ def purge_commands(self): ### Delete commands (which I am recipient of) older than COMMANDS_PEREMPTION_HOURS (like 48h) ATTENTION !!! EXCEPT the RUNNING command !!! NB: datetime.utcnow() is equivalent to datetime.now(timezone.utc) ### self.printd("Looking for old commands to purge...") ### COMMAND_PEREMPTION_DATE_FROM_NOW = datetime.utcnow() - timedelta(hours = self.COMMANDS_PEREMPTION_HOURS) #self.printd("peremption date", COMMAND_PEREMPTION_DATE_FROM_NOW) old_commands = AgentCmd.objects.filter( # only commands for me recipient = self.name, # only pending commands sender_deposit_time__lt = COMMAND_PEREMPTION_DATE_FROM_NOW, ) ### old_commands = AgentCmd.get_old_commands_for_agent(self.name) if old_commands.exists(): self.printd("Found old commands to delete:") for cmd in old_commands: self.printd(cmd) old_commands.delete() """ def sleep(self, nbsec:float): log.info(f"Now, waiting (sleeping) for {nbsec} second(s)...") time.sleep(nbsec) # alias to sleep def waitfor(self, nbsec:float): ''' # thread if self._current_device_cmd_thread and self.RUN_IN_THREAD: self._current_device_cmd_thread.wait(nbsec) # process (or main thread) else: time.sleep(nbsec) ''' self.sleep(nbsec) # # MODE & STATUS # def get_state(self)->str: return f"MODE is {self.__get_mode()} ; STATUS is {self.get_status()} ; ITERATION #{self.__iter_num}" def show_state(self): log.info(self.get_state()) #log.info(f"CURRENT MODE is {self.__mode} (status is {self.__status})") #log.info(f"CURRENT MODE is {self.__get_mode()} (status is {self.get_status()})") # # - STATUS MANAGEMENT # # - GET def get_status(self)->AgentSurvey.STATUS_CHOICES: return self.__status # - SET # Private because automatic, must not be changed, depends only on the step the Agent is currently running, used only by Agent (superclass) def __set_status(self, status:AgentSurvey.STATUS_CHOICES): #self.printd(f"[{status}] (switching from status {self.__status})") log.debug(f"[{status}]") self.__status = status return False # Private because automatic, must not be changed, used only by Agent (superclass) ##def __set_and_log_status(self, status:str): def __set_and_log_status(self, status:AgentSurvey.STATUS_CHOICES): self.__set_status(status) self.__log_agent_state() # # - MODE MANAGEMENT # # - GET def __get_mode(self)->AgentSurvey.MODE_CHOICES: return self.__mode # Test mode def IS_MODE_IDLE(self)->bool: return self.__get_mode() == AgentSurvey.MODE_CHOICES.IDLE def IS_MODE_ROUTINE(self)->bool: return self.__get_mode() == AgentSurvey.MODE_CHOICES.ROUTINE def IS_MODE_ATTENTIVE(self)->bool: return self.__get_mode() == AgentSurvey.MODE_CHOICES.ATTENTIVE # @deprecated def IS_IDLE(self): return self.IS_MODE_IDLE() # @deprecated def IS_ROUTINE(self): return self.IS_MODE_ROUTINE() # @deprecated def IS_ATTENTIVE(self): return self.IS_MODE_ATTENTIVE() # - SET #def __set_mode(self, mode:str): def __set_mode(self, mode:AgentSurvey.MODE_CHOICES): #self.printd(f"Switching from mode {self.__mode} to mode {mode}") log.info(f"[NEW MODE {mode}]") self.__mode = mode # Public method because can be called from outside like this : agent.set_mode_idle() def set_mode_idle(self): self.__set_mode(AgentSurvey.MODE_CHOICES.IDLE) # Public method because can be called from outside like this : agent.set_mode_routine() def set_mode_routine(self): self.__set_mode(AgentSurvey.MODE_CHOICES.ROUTINE) # Public method because can be called from outside like this : agent.set_mode_attentive() def set_mode_attentive(self): #self._set_mode(AgentSurvey.MODE_ATTENTIVE) self.__set_mode(AgentSurvey.MODE_CHOICES.ATTENTIVE) ''' def die(self): self._set_status(AgentSurvey.STATUS_EXIT) ''' """ suspend/resume """ def suspend(self): """ FIXME: Mode IDLE (doit rester à l'écoute d'un resume, et doit continuer à alimenter les tables pour informer de son état via tables agents_logs, et lire table agents_command pour reprendre via resume, et update la table agents_survey pour donner son status "idle" """ self._set_idle() return True def resume(self): """ FIXME: Quit suspend() mode """ self._set_active() return True ''' (EP) moved to AgentDevice def _set_agent_device_aliases_from_config(self, agent_alias): for a in self._my_client_agents_aliases: # TODO: activer ##self._my_client_agents[a] = self.config.get_paramvalue(a,'general','real_agent_device_name') pass ''' # new config (obsconfig) def _set_mode_from_config(self, agent_name): # all agent are active ? #mode = self.MODE_ACTIVE #mode = self.MODE_ATTENTIVE self.set_mode_attentive() return True # old config # def _set_mode_from_config(self, agent_name): # # --- Get the startmode of the AgentX # modestr = self.config.get_paramvalue(agent_name,'general','startmode') # if self.config.get_last_errno() != self.config.NO_ERROR: # raise Exception(f"error {str(self.config.get_last_errno())}: {str(self.config.get_last_errmsg())}") # if (modestr == None): # return True # # --- Set the mode according the startmode value # mode = self.MODE_IDLE # if modestr.upper() == 'RUN': # mode = self.MODE_ACTIVE # self._set_mode(mode) # return True def set_delay(self, delay_nb_sec:int): self.__DELAY_NB_SEC = delay_nb_sec """ ======================================================================================= Generic methods that may be specialized (overriden) by subclasses (except if private) ======================================================================================= """ def __init(self): #log.debug("*** Initializing... ***") log.info("*"*10+ " INITIALIZING... "+ "*"*10+ '\n') self.__set_and_log_status(self.AGT_STATUS.INITIALIZING) self._init() # To be overridden by subclasses def _init(self): pass def __reload_config_if_changed(self): self.__load_config() def __load_config(self): """ TODO: only si date fichier xml changée => en RAM, un objet Config avec méthodes d'accès, appelle le parser de AK (classe Config.py indépendante) """ ''' # SETUP try: self.config = get_object_or_404(Config, id=1) # By default, set mode to SCHEDULER (False = REMOTE, which should never be the default) self.config.global_mode = True self.config.save() # self.config = Config.objects.get(pk=1) # self.config = Config.objects.get()[0] except Exception as e: # except Config.ObjectDoesNotExist: self.printd("Config read (or write) exception", str(e)) return -1 ''' log.debug("Loading the config file...") # - NEW CONFIG self.get_config().load(self.get_config_env()[0]) # - OLD CONFIG # self.config.load() # if self.config.get_last_errno() != self.config.NO_ERROR: # raise Exception(f"error {str(self.config.get_last_errno())}: {str(self.config.get_last_errmsg())}") # if not self.config.is_config_contents_changed(): # return # === display informations # --- Get the assembly aliases of the unit # assembled_mount_aliases = [] # assembled_channel_aliases = [] # assembled_aliases = [] # unit_alias = self.config.get_aliases('unit')[0] # params = self.config.get_params(unit_alias) # for param in params: # if param['section']=="assembly" and param['key']=="alias": # assembled_aliases.append(param['value']) #self.printd(f"Unit {unit_alias} is the assembly of {assembled_aliases}") # log.debug("--------- Components of the unit -----------") # log.debug("Configuration file is {}".format(self.config.get_configfile())) # alias = self.config.get_aliases('unit')[0] # namevalue = self.config.get_paramvalue(alias,'unit','description') # log.debug(f"Unit alias is {alias}. Description is {namevalue}:") # unit_subtags = self.config.get_unit_subtags() # for unit_subtag in unit_subtags: # aliases = self.config.get_aliases(unit_subtag) # for alias in aliases: # namevalue = self.config.get_paramvalue(alias,unit_subtag,'description') # log.debug(f"- {unit_subtag} alias is {alias}. Description is {namevalue}") # # --- fill the list of mount and channel assembled # if alias in assembled_aliases: # if unit_subtag=="mount": # assembled_mount_aliases.append(alias) # elif unit_subtag=="channel": # assembled_channel_aliases.append(alias) # log.debug("--------- Assembly of the unit -----------") # log.debug(f"Assembled mount aliases: {assembled_mount_aliases}") # log.debug(f"Assembled channel aliases: {assembled_channel_aliases}") # --- Get the home of the mount[0] # mount_alias = assembled_mount_aliases[0] # home = self.config.get_paramvalue(mount_alias,'MountPointing','home') # log.debug("------------------------------------------") # hostname = socket.gethostname() # self._computer_alias = '' # unit_subtag = 'computer' # aliases = self.config.get_aliases(unit_subtag) # for alias in aliases: # log.debug("alias" + alias) # value = self.config.get_paramvalue(alias,'local','hostname') # log.debug("value" + value) # if value == hostname: # log.debug("value" + value) # self._computer_alias = alias # value = self.config.get_paramvalue(alias,unit_subtag,'description') # self._computer_description = value # value = self.config.get_paramvalue(alias,'path','data') # # Overrides default value # self._path_data = value # break # log.debug(f"hostname = {hostname}") # log.debug(f"path_data = {self._path_data}") # log.debug(f"home = {home}") # log.debug("------------------------------------------") # --- update the log parameters ##self.log.path_data = self._path_data ##print("new self.log.path_data is", self.log.path_data) ##self.log.set_global_path_data(self._path_data) ##self.printd("new self.log.global_path_data is", self.log.get_global_path_data()) ##self.log.home = home #def update_survey(self): def __log_agent_state(self): """ Save (update) this agent current mode, status, and #iteration in DB """ "Updating the agent survey database table..." #self.printd("- fetching table line for agent", self.name) # only necessary when using process (not necessary with threads) #with transaction.atomic(): #self._agent_survey = AgentSurvey.objects.get(name=self.name) self.__agent_survey.mode = self.__get_mode() self.__agent_survey.status = self.get_status() self.__agent_survey.iteration = self.__iter_num self.__agent_survey.save() #self._agent_survey.save(update_fields=["mode", "status"]) """ def send_command(self, cmd_name): recipient_agent = self.name if self.TEST_COMMANDS_DEST=="myself" else self.TEST_COMMANDS_DEST AgentCmd.objects.create(sender=self.name, recipient=recipient_agent, name=cmd_name) """ #def send_command(self, to_agent, cmd_type, cmd_name, cmd_args=None): #def send_cmd_to(self, to_agent:str, cmd_name:str, cmd_args:str=None, validity:int=None, timeout:int=None): def send_cmd_to(self, to_agent:str, cmd_name:str, cmd_args:str=None, validity:int=None) -> AgentCmd: """ #ex: send_command(“AgentX”,”GENERIC”,”EVAL”,“3+4”) ex: send_command(“AgentX”,"EVAL”,“3+4”) """ #return AgentCmd.send_cmd_from_to(self.name, self._get_real_agent_name_for_alias(to_agent), cmd_name, cmd_args) #cmd = self.create_cmd_for(to_agent, cmd_name, cmd_args, validity, timeout).send() cmd = self.create_cmd_for(to_agent, cmd_name, cmd_args, validity) cmd.send() return cmd def create_cmd_for(self, to_agent:str, cmd_name:str, cmd_args:str=None, validity:int=None) -> AgentCmd: ''' real_agent_name = self._get_real_agent_name(to_agent) real_cmd_name = cmd_name if '.' in real_agent_name: real_agent_name, component_name = real_agent_name.split('.') real_cmd_name = component_name+'.'+cmd_name return AgentCmd.create(self.name, real_agent_name, real_cmd_name, cmd_args) try: real_agent_name = self._get_real_agent_name(to_agent) except KeyError as e: ''' return AgentCmd.create(self.name, to_agent, cmd_name, cmd_args, validity) ''' real_agent_name = self._get_real_agent_name(to_agent) if not real_agent_name: return AgentCmd.create(self.name, to_agent, cmd_name, cmd_args) # log.e("UNKNOWN AgentDevice ALIAS", to_agent) # #self.log_e("Exception raised", e) # log.e(f"=> Thus, I do not send this command '{cmd_name}'") # return None return AgentCmd.create(self.name, real_agent_name, cmd_name, cmd_args) ''' ''' return AgentCmd( sender=self.name, recipient=self._get_real_agent_name_for_alias(recipient_agent_alias_name), name=cmd_name ) ''' def get_received_pending_commands(self) -> QuerySet: return AgentCmd.get_pending_and_running_commands_for_agent(self.name) def __get_next_received_cmd_if_exists(self) -> Optional[AgentCmd]: """ Return next pending command SENT to this agent (and NOT RUNNING ?) (read from the DB command table) Commands are read in chronological order OLD version was : Return still VALID (not expired) command """ self.__set_and_log_status(self.AGT_STATUS.IN_MAIN_LOOP_GET_NEXT_CMD) log.info("Looking for a new received command to process (sent by another agent):") # 1) Get all pending commands for me (return if None) # Not sure this is necessary to do it in a transaction, # but there might be a risk # that a command status is modified while we are reading... with transaction.atomic(): self.__pending_commands = self.get_received_pending_commands() commands = self.__pending_commands if not commands.exists(): log.info("") return None "Current pending (or running) commands (time ordered):" AgentCmd.show_commands(commands) # 2) GET first command in the list (the oldest one) #cmd = commands[0] next_cmd = commands.first() # 3) If PRIORITY cmd exists, use it instead of first command above # - SKIP any (priority) STOP style command if sent before my start, # and # - GET a PRIORITY command if exists in the list (even at the very end ; must be still valid and not yet running) ''' for cmd in commands: #if cmd.name in ("do_exit", "do_abort", "do_flush_pending_commands"): break #if cmd.name in ("do_exit", "do_abort"): break if cmd.is_agent_general_priority_cmd(): if cmd.name in ("do_stop","do_restart") and self.__cmd_was_sent_before_my_start(cmd): cmd.set_as_skipped("Cmd stop/restart was sent before agent start => skip it") #return None continue # if prio cmd already running, wait for it to finish (return None) elif cmd.is_running(): return None '' elif cmd.is_expired(): cmd.set_as_expired() return None '' # Replace next_cmd with this priority cmd next_cmd = cmd break #return cmd ''' ''' #if cmd.name in ("do_exit", "do_abort", "do_flush_pending_commands"): if cmd.name in ("do_exit", "do_abort"): if cmd.is_running(): return None if cmd.is_expired(): cmd.set_as_outofdate() return None return cmd ''' # 3) If first (oldest) command is currently running # (status CMD_RUNNING), then do nothing and return """ cmd_executing = Command.objects.filter( # only commands for me recipient = self.name, # only pending commands recipient_status_code = Command.CMD_STATUS_CODES.CMD_RUNNING, ).first() #if cmd_executing.exists(): if cmd_executing: """ ''' if cmd.is_expired(): cmd.set_as_expired() ''' #return cmd #return None ''' if cmd.is_running(): #self.printd(f"There is currently a running command ({cmd_executing.first().name}), so I do nothing (wait for end of execution)") log.info(f"There is currently a running command ({cmd.name})") #""" # Check that this command is not expired if cmd.is_expired(): self.printd("But this command is expired, so set its status to OUTOFDATE, and go on") cmd_executing.set_as_outofdate() else: #""" log.info(f"Thus, I won't execute any new command until this command execution is finished") # TODO: kill si superieur a MAX_EXEC_TIME return None ''' ''' # 4) Tag all expired commands for cmd in commands: if cmd.is_expired(): cmd.set_as_outofdate() # break at 1st "valid" command (not expired) else: break # 5) If no more commands to process, return None if cmd.is_expired(): return None ''' # 6) Current cmd must now be a valid (not expired) and PENDING one, # so return it for execution #self.printd(f"Got command {cmd.name} sent by agent {cmd.sender} at {cmd.sender_deposit_time}") #self.printd(f"Starting processing of this command") ##next_cmd.set_read_time() return next_cmd ''' #def _exec_agent_general_cmd(self, cmd:Command): def _exec_agent_cmd(self, cmd:AgentCmd): #self.print(f"Starting execution of an AGENT LEVEL cmd {cmd}...") log.info(f"Starting execution of an AGENT LEVEL cmd...") # Update read time to say that the command has been READ cmd.set_read_time() cmd.set_as_running() # SPECIFIC command (only related to me, not to any agent) if self._is_agent_specific_cmd(cmd): log.info("(Agent level SPECIFIC cmd)") # Execute method self."cmd.name"() # This can raise an exception (caught by this method caller) self.exec_cmd_from_its_name(cmd) #'' try: except AttributeError as e: self.printd(f"EXCEPTION: Agent level specific command '{cmd.name}' unknown (not implemented as a function) :", e) self.printd("Thus => I ignore this command...") cmd.set_result("ERROR: INVALID AGENT LEVEL SPECIFIC COMMAND") cmd.set_as_pending() cmd.set_as_skipped() return #'' cmd.set_result("Agent level SPECIFIC cmd done") cmd.set_as_processed() log.info("...Agent level SPECIFIC cmd has been executed") return # GENERAL command (related to any agent) log.info("(Agent level GENERAL CMD)") _,cmd_name,cmd_args = cmd.get_full_name_parts() #cmd_name, cmd_args = cmd.tokenize() #if cmd.name == "set_state:active": #elif cmd.name == "set_state:idle": if cmd_name == "set_state": if not cmd_args: raise ValueError() state = cmd_args[0] if state == "active": self._set_active() if state == "idle": self._set_idle() cmd.set_result("I am now " + state) #time.sleep(1) self.waitfor(1) elif cmd_name in ("do_flush_pending_commands"): "flush_commands received: Delete all pending commands" AgentCmd.delete_pending_commands_for_agent(self.name) cmd.set_result('DONE') elif cmd_name in ("do_abort", "do_exit", "do_restart_init"): #self.printd("Current pending commands are:") #Command.show_commands(self._pending_commands) log.info("Aborting current executing command if exists:") #self._kill_running_device_cmd_if_exists(cmd.sender) self.do_things_before_exit(cmd.sender) if cmd_name == "do_restart_init": log.info("restart_init received: Restarting from init()") self._DO_RESTART=True elif cmd.name == "do_exit": self._DO_EXIT=True cmd.set_result('SHOULD BE DONE NOW') else: #'' name = cmd.name args = None if " " in name: name,args = name.split() if name == "do_eval": if args==None: raise(ValueError) cmd.set_result(eval(args)) #'' if cmd_name == "do_eval": if not cmd_args: raise ValueError() cmd.set_result(eval(cmd_args)) cmd.set_as_processed() log.info("...Agent level GENERAL cmd has been executed") # If cmd is "do_exit", kill myself (without any question, this is an order soldier !) # This "do_exit" should normally kill any current thread (to be checked...) if cmd.name == "do_exit": log.info("Before exiting, Here are (if exists) the current (still) pending commands (time ordered) :") commands = AgentCmd.get_pending_and_running_commands_for_agent(self.name) AgentCmd.show_commands(commands, True) #if self.TEST_MODE and self.TEST_WITH_FINAL_TEST and self.TEST_COMMANDS_DEST == "myself": self.simulator_test_results() if self.TEST_MODE and self.TEST_WITH_FINAL_TEST: self._TEST_test_results() #self._DO_EXIT=True #exit(0) ''' # DEV mode only # Make agent sleep as soon as a command has started running # so that we can see this command in the "current command" column of agent web pages # To be overriden by subclass def _sleep_as_soon_as_running(self): # Example : # self.sleep(4) # by default, no sleeping after started running (only for DEV MODE) pass def __process_agent_general_cmd(self, cmd:AgentCmd): try: res = self.__exec_cmd_from_its_name(cmd) except (CmdExceptionUnimplemented, CmdExceptionBadArgs, CmdExceptionExecError) as e: ##cmd.set_as_pending() # These exceptions are managed at higher level : raise cmd.set_as_processed(res) log.info("...Agent GENERAL cmd has been executed") def __process_agent_general_cmd_OLD(self, cmd:AgentCmd): # GENERAL command (related to any agent) #self.print(f"Starting execution of an AGENT LEVEL cmd {cmd}...") log.info(f"Starting execution of an AGENT GENERAL cmd...") # Update read time to say that the command has been READ #cmd.set_read_time() cmd.set_as_running() self._sleep_as_soon_as_running() log.info("(Agent level GENERAL CMD)") cmd_name,cmd_args = cmd.name_and_args #cmd_name, cmd_args = cmd.tokenize() #if cmd.name == "set_state:active": #elif cmd.name == "set_state:idle": # Default result (null) result = None # # - 1) PRIORITY commands # # 1.a) not disturbing commands if cmd_name == "get_specific_cmds": result = self.get_specific_cmds() ''' # CmdUnimplementedException if one specific command (in the list) is unknown try: result = self.get_specific_cmds() except CmdExceptionUnimplemented as e: # raise CmdUnimplementedException("get_specific_cmds.unknown_cmd_name") #cmd.set_as_exec_error("EXCEPTION - One specific cmd is unimplemented: "+e.cmd_name) raise CmdExceptionExecError(cmd, "EXCEPTION - One specific cmd is unimplemented => "+e.cmd_name) from None ''' #elif cmd_name in ("do_flush_pending_commands"): elif cmd_name == "do_flush_pending_commands": "flush_commands received: Delete all pending commands" self.do_flush_pending_commands() #cmd.set_result('DONE') result = "FLUSH DONE" elif cmd_name == "do_exec_commands": pass # 1.b) partially disturbing commands elif cmd_name == "do_stop_current": what = 'both' if not cmd.args else cmd.args[0] result = self.do_stop_current(what) # 1.c) really disturbing commands (STOP command type) if cmd_name in ("do_stop", "do_restart", #@ deprecated "do_abort", "do_exit", ): #self.printd("Current pending commands are:") #Command.show_commands(self._pending_commands) log.info("Stopping or Restarting...") #self._kill_running_device_cmd_if_exists(cmd.sender) ''' self.do_things_before_exit(cmd.sender) if cmd_name == "do_restart_init": log.info("restart_init received: Restarting from init()") self._DO_RESTART=True elif cmd.name == "do_exit": self._DO_EXIT=True cmd.set_result('SHOULD BE DONE NOW') ''' #result = "RESTARTING" if cmd_name == "do_restart_loop" else "STOPPING" if cmd_name == "do_exit": cmd.full_name = "do_stop asap" elif cmd_name == "do_abort": cmd.full_name = "do_stop now" when = 'asap' if not cmd.args else cmd.args[0] result = self.__do_stop_or_restart(False if cmd_name=='do_stop' else True, when) # # - 2) NO priority commands # elif cmd_name == "get_state": result = self.get_state() elif cmd_name == "get_mode": result = "MODE is " + self.__get_mode() elif cmd_name == "set_mode": #if not cmd_args: raise ValueError() if not cmd_args: raise CmdExceptionBadArgs(cmd) mode = cmd_args[0] if mode == "IDLE": self.set_idle() elif mode == "ROUTINE": self.set_routine() elif mode == "ATTENTIVE": self.set_attentive() else: raise CmdExceptionBadArgs(cmd) #cmd.set_result("I am now " + state) result = "MODE is " + mode #time.sleep(1) #self.waitfor(1) elif cmd_name == "do_eval": #if not cmd_args: raise ValueError() if not cmd_args: raise CmdExceptionBadArgs(cmd) #cmd.set_result(eval(cmd_args)) #result = eval(cmd_args) result = self.do_eval(cmd_args[0]) cmd.set_as_processed(result) log.info("...Agent level GENERAL cmd has been executed") ''' # If cmd is "do_exit", kill myself (without any question, this is an order soldier !) # This "do_exit" should normally kill any current thread (to be checked...) if cmd.name == "do_exit": log.info("Before exiting, Here are (if exists) the current (still) pending commands (time ordered) :") commands = AgentCmd.get_pending_and_running_commands_for_agent(self.name) AgentCmd.show_commands(commands, True) #if self.TEST_MODE and self.TEST_WITH_FINAL_TEST and self.TEST_COMMANDS_DEST == "myself": self.simulator_test_results() if self.TEST_MODE and self.TEST_WITH_FINAL_TEST: self._TEST_test_results() #self._DO_EXIT=True #exit(0) ''' def __process_agent_specific_cmd(self, cmd:AgentCmd): #self.print(f"Starting execution of an AGENT LEVEL cmd {cmd}...") log.info(f"Starting execution of an AGENT level SPECIFIC cmd...") # Update read time to say that the command has been READ #cmd.set_read_time(False) ##cmd.set_as_running() log.info("(Agent SPECIFIC cmd)") # Execute method self."cmd.name"() # This can raise an exception (caught by this method caller) try: res = self.__exec_cmd_from_its_name(cmd) except (CmdExceptionUnimplemented, CmdExceptionBadArgs, CmdExceptionExecError) as e: ##cmd.set_as_pending() # These exceptions are managed at higher level : raise ''' except AttributeError as e: self.printd(f"EXCEPTION: Agent level specific command '{cmd.name}' unknown (not implemented as a function) :", e) self.printd("Thus => I ignore this command...") cmd.set_result("ERROR: INVALID AGENT LEVEL SPECIFIC COMMAND") cmd.set_as_pending() cmd.set_as_skipped() return ''' #cmd.set_result("Agent level SPECIFIC cmd done") #res = res if res else "Agent SPECIFIC cmd done" cmd.set_as_processed(res) log.info("...Agent SPECIFIC cmd has been executed") ''' def do_log(self): #"" log à 2 endroits ou 1 seul - in file - in db #"" self.printd("Logging data...") ''' def __exec_cmd_from_its_name(self, cmd:AgentCmd)->Any: #print(dir(self)) ''' for method in dir(self): if callable(getattr(self, method)): m = getattr(self, method) print(method, ' => ', signature(m)) ''' #print("***************") #print(self.get_specific_cmds()) #print("***************") methods_list = [method for method in dir(self) if callable(getattr(self, method))] #print(methodsList) func = cmd.name if func not in methods_list: raise CmdExceptionUnimplemented(cmd) ##f = getattr(self, func) ###print(func, ' => ', signature(f)) #for arg in cmd.args: print(arg) #print(cmd.args) #print(*cmd.args) ''' try: if not cmd.args: # equivalent to calling self.func() return getattr(self, func)() else: args = [] # Convert all args to their real type for arg in cmd.args: #print(arg) #try: # Evaluate arg only if it is not a word (letters) if not arg[0].isalpha(): arg = ast.literal_eval(arg) #except ValueError as e: newarg = arg args.append(arg) #print(args) # equivalent to calling self.func(*cmd.args) return getattr(self, func)(*args) ''' args = [] ###print(cmd.args) # Convert all args to their real type for arg in cmd.args: #print(arg) #try: # Evaluate arg only if it is not a word (letters) : # - a word like "toto" is not evaluated => stays as is (=str) # Are evaluated : # - an int or float # - a (tuple) # - a [list] # - ... #print('********', arg, " :") if cmd.name != "do_eval": if not arg[0].isalpha(): arg = ast.literal_eval(arg) #print("evaluated to", type(arg), arg) #except ValueError as e: newarg = arg args.append(arg) cmd.set_as_running() self._sleep_as_soon_as_running() # Command EXECUTION try: # equivalent to calling self.func(*cmd.args) return getattr(self, func)(*args) # Replace low level exception with high level one (CmdExceptionBadArgs) except (TypeError, AttributeError, ValueError, CmdExceptionBadArgs, AssertionError) as e: # set back to PENDING because this command should never has been RUNNING cmd.set_as_pending() raise CmdExceptionBadArgs(cmd) from None # Replace any other exception raised during execution with high level one (CmdExceptionExecError) except Exception as e: #cmd.set_as_exec_error() msg = ': '+str(e) if str(e) else '' raise CmdExceptionExecError(cmd, type(e).__name__ + msg) from None #raise e # "from None" pour ne pas afficher l'exception AttributeError (car interne) #print("I know this specific command but it is not yet implemented : ", func) ##tb = sys.exc_info()[2] ##raise AgentCmdUnimplementedException(cmd).with_traceback(tb) ##raise AgentCmdUnimplementedException(cmd).with_traceback(None) def is_agent_level_cmd(self, cmd:AgentCmd): return self.is_agent_general_cmd(cmd) or self.is_agent_specific_cmd(cmd) def is_agent_general_cmd(self, cmd:AgentCmd): return cmd.is_agent_general_cmd() ''' def _exec_agent_cmd(self, cmd:Command): # AGENT "GENERAL LEVEL" command # => I process it directly without asking my DC # => Simple action, short execution time, so I execute it directly (in current main thread, not in parallel) if cmd.is_agent_level_general_cmd(): self.printd("********** -- AGENT LEVEL GENERAL CMD *********") self._exec_agent_general_cmd(cmd) # AGENT "SPECIFIC LEVEL" command # => I process it directly without asking my DC # => Simple action, short execution time, so I execute it directly (in current main thread, not in parallel) #elif self._is_agent_level_specific_cmd(cmd): else: self.printd("********** -- AGENT LEVEL SPECIFIC CMD *********") self._exec_agent_specific_cmd(cmd) ''' def is_agent_specific_cmd(self, cmd:AgentCmd): #return cmd.name in self.AGENT_SPECIFIC_COMMANDS #return (cmd.name,) in self.AGENT_SPECIFIC_COMMANDS for (cmd_name,_,_) in self._AGENT_SPECIFIC_COMMANDS: if cmd.name == cmd_name : return True ''' def _exec_agent_specific_cmd(self, cmd:Command): # Execute method self."cmd.name"() self.exec_cmd_from_its_name(cmd) ''' def is_in_test_mode(self): return self.TEST_MODE ### # ================================================================================================ # AGENT GENERAL COMMANDS (METHODS) # ================================================================================================ ### def get_all_cmds(self)->str: general_cmds = self.get_general_cmds() specific_cmds = self.get_specific_cmds() all_cmds = general_cmds if specific_cmds: all_cmds += ';' + specific_cmds return all_cmds def get_general_cmds(self)->str: return self.__get_cmds_with_args_from_names_list(AgentCmd._AGENT_GENERAL_COMMANDS) def get_specific_cmds(self)->str: return self.__get_cmds_with_args_from_names_list(self._AGENT_SPECIFIC_COMMANDS) def __get_cmds_with_args_from_names_list(self, names_list:List)->str: ''' Return the list of all specific cmds, with their arguments type (if exists) - Each cmd is separated with a ';' and presented with this format : 'cmd_name(arg1:type,arg2:type,arg3:type...)' - If cmd is (U)nimplemented, it will be returned as : 'cmd_name(U)' - If cmd is (I)nvalid (misnamed), it will be returned as : 'cmd_name(I)' - Ex: 'do_specific10(arg1:int,arg2:int,arg3:float,arg4:str,arg5:typing.Tuple[int, str, int],arg6:typing.List[int]);do_specific30();do_cmd_raising_some_exception();do_cmd_unimplemented(U)', ''' cmds = "" # For each command #for command_tuple in self._AGENT_SPECIFIC_COMMANDS: for command_tuple in names_list: cmd_name = command_tuple[0] if isinstance(command_tuple, tuple) else command_tuple #print(cmd_name) cmds += cmd_name # 1) Error case 1 - Misnamed (Invalid syntax) command (not get, set, do) (implemented or not) if not AgentCmd.is_generic(cmd_name): cmds += '(I);' continue # 2) Error case 2 - Well named command (get, set, do) but not yet implemented try: f = getattr(self, cmd_name) # Exception if exists an unimplemented command except AttributeError: # Tag this command as UNIMPLEMENTED (U) => 'cmd_name(U)' #raise CmdExceptionUnimplemented(cmd_name) from None cmds += '(U);' # next cmd continue # 3) Normal case : well named command and implemented cmds += "(" args = signature(f) #cmds += str(args) #print(cmds) #print(args.parameters) for arg in args.parameters.values(): arg_type = arg.annotation.__name__ if isinstance(arg.annotation,type) else str(arg.annotation) cmds += arg.name+":"+arg_type+"," #print("name, annot:", arg.name, arg.annotation) #print("annot.nam e:", arg.annotation.__name__) #print(type(arg.annotation)) ''' for arg in args.parameters: print(arg) arg = args.parameters[arg] #print(arg) #print("name, annot:", arg.name, arg.annotation) #print("annot.name:", arg.annotation.__name__) #print(type(arg.annotation)) arg_type = arg.annotation.__name__ if isinstance(arg.annotation,type) else str(arg.annotation) cmds += arg.name+":"+arg_type+"," ''' # cmds += str(args) if args.parameters: cmds = cmds[:-1] cmds += ");" #cmds += ";" if cmds: cmds = cmds[0:-1] #print(cmds) return cmds def do_flush_pending_commands(self): AgentCmd.delete_pending_commands_for_agent(self.name) #@deprecated def do_flush_commands(self): self.do_flush_pending_commands() def get_mode(self)->str: return "MODE is " + self.__mode #def set_mode(self, mode:Season)->str: ##def set_mode(self, mode:AgentSurvey.MODE_CHOICES)->str: def set_mode(self, mode:Literal['IDLE','ROUTINE','ATTENTIVE'])->str: assert mode in AgentSurvey.MODE_CHOICES #if not cmd_args: raise ValueError() ##if not cmd_args: raise CmdExceptionBadArgs(cmd) ##mode = cmd_args[0] if mode == "IDLE": self.set_mode_idle() elif mode == "ROUTINE": self.set_mode_routine() elif mode == "ATTENTIVE": self.set_mode_attentive() else: raise CmdExceptionBadArgs(self.__CC) #cmd.set_result("I am now " + state) return "MODE is " + mode def do_eval(self, eval_str:str): try: res = eval(eval_str) except (SyntaxError, NameError, ValueError) as e: raise CmdExceptionBadArgs(self.__CC) from None return res def do_exec_commands(self, what:Literal['stop','resume','noprio']): assert what in ['stop','resume','noprio'] # - Temporaly stop execution of new commands (let them accumulate) if what == "stop": pass # - Resume execution of commands (accumulated since "do_stop_exec_cmd") if what == "resume": pass # NOT PRIO if what == "noprio": pass # Bad arg ##raise CmdExceptionBadArgs(self.CC) #@deprecated def do_abort_current_command(self): self.do_stop_current("cmd") # Stop currently running cmd or routine def do_stop_current(self, what:Literal['cmd','routine','both']): assert what in ['cmd','routine','both'] if what == "cmd": self.__do_stop_current_cmd_if_exists() if what == "routine": self.__do_stop_current_routine_before_if_exists() self.__do_stop_current_routine_after_if_exists() if what == "both": self.__do_stop_current_cmd_if_exists() self.__do_stop_current_routine_before_if_exists() self.__do_stop_current_routine_after_if_exists() # Bad arg ##raise CmdExceptionBadArgs(self.CC) #FIXME: TODO def __do_stop_current_cmd_if_exists(self): if self.__CC_prev and self.__CC_prev.is_running(): log.info("Aborting current running command if exists...") pass #FIXME: TODO def __do_stop_current_routine_before_if_exists(self): if self.ROUTINE_ITER_START_IS_RUNNING: log.info("Aborting current routine BEFORE process if exists...") pass #FIXME: TODO def __do_stop_current_routine_after_if_exists(self): if self.ROUTINE_ITER_END_IS_RUNNING: log.info("Aborting current routine AFTER process if exists...") pass #@deprecated def do_exit(self): self.do_stop("asap"); #@deprecated def do_abort(self): self.do_stop("now"); # Stop agent asap or now def do_stop(self, when:Literal['asap','now','noprio']='asap'): self.__do_stop_or_restart(False, when) def do_restart(self, when:Literal['asap','now','noprio']='asap'): self.__do_stop_or_restart(True, when) def __do_stop_or_restart(self, restart:bool=False, when:str='asap'): assert when in ('asap','now','noprio') ##if when not in ('asap','now','noprio'): raise CmdExceptionBadArgs(self.CC) # NOT PRIO ''' if when == "noprio": pass ''' # log last agent mode & status self.__set_and_log_status(self.AGT_STATUS.RESTARTING if restart else self.AGT_STATUS.EXITING) #self._log_agent_state() # Exit main loop self.DO_MAIN_LOOP = False # PRIO if when in ('asap','noprio'): self.__cleanup_before_exit() #elif when == 'now': else: self.__do_stop_current_cmd_if_exists() self.__do_stop_current_routine_before_if_exists() self.__do_stop_current_routine_after_if_exists() if restart: self.DO_RESTART_LOOP = True # cmd result return ('RESTARTING ' if restart else 'STOPPING ') + when ### # ================================================================================================ # AGENT SPECIFIC COMMANDS (functions) # (here just to serve as examples) # ================================================================================================ ### #def do_specific1(self, arg1:int, arg2:int=2, arg3:int=3) -> int: #def do_specific1(self, arg1:int, arg2:int=2, arg3:float=3.1, arg4:list=[1,2,3]) -> float: def do_specific1(self, arg1:int, arg2:int, arg3:float=3.1, arg4:str='toto', arg5:Tuple[int,str,int]=(1,'toto',3), #arg5:Tuple[int,int,int]=(1,2,3), arg6:List[int]=[] ) -> float: ''' arg1 = int(arg1) arg2 = int(arg2) arg3 = float(arg3) arg5 = ast.literal_eval(arg5) print(arg5[1]) arg6 = ast.literal_eval(arg6) ''' #print(arg4) res = arg1 + arg2 + arg3 + arg5[0] + arg5[2] if arg6: res += arg6[0] return res # Undefined specific cmd #def set_specific2(self, arg1:str, arg2:int): pass def do_specific3(self): pass ### # ================================================================================================ # DEVICE SPECIFIC FUNCTIONS (abstract for Agent, overriden and implemented by AgentDevice) # ================================================================================================ ### # To be overriden by subclass (AgentDevice...) # @abstract def is_device_level_cmd(self, cmd)->bool: return False ''' # to be overriden by subclass (AgentDevice) # @abstract def exec_device_cmd_if_possible(self, cmd:AgentCmd): pass # TO BE OVERRIDEN by subclass (AgentDevice) # @abstract def exec_device_cmd(self, cmd:AgentCmd): #self.exec_cmd_from_its_name(cmd) pass ''' def __cmd_was_sent_before_my_start(self, cmd:AgentCmd)->bool: return cmd.s_deposit_time < self.__UP_SINCE """ ================================================================= TEST DEDICATED FUNCTIONS ================================================================= """ def _set_debug_mode(self, mode:bool): self.DEBUG_MODE=mode def _set_with_simulator(self, mode:bool): self.WITH_SIMULATOR=mode def _set_test_mode(self, mode:bool): self.TEST_MODE = mode if self.TEST_MODE: log.info("in TEST MODE") def __TEST_setup_TEST_MODE(self): if not self.is_in_test_mode(): return log.debug("[IN TEST MODE]") log.info("(TEST mode only) Flush all pending commands to be sure to start in clean state") AgentCmd.delete_pending_commands_for_agent(self.name) self.set_delay(2) ''' else: log.debug("[IN NORMAL MODE]") self.TEST_MAX_DURATION_SEC=None ''' def __TEST_get_next_command_to_send(self)->AgentCmd: #cmd_full_name, validity, res_expected, after_status = next(self.TEST_COMMANDS, (None,None,None,None)) DO_IT = False while not DO_IT: DO_IT, cmd_full_name, validity, expected_res, expected_status = next(self.TEST_COMMANDS, (False,None,None,None,None)) ##print(DO_IT, cmd_full_name) if cmd_full_name is None: return None #print(expected_final_status) #print(cmd_full_name, res_expected) #return cmd_name #if cmd_full_name is None: return None # Remove excessive spaces => already done in AgentCmd.create() cmd_full_name = re.sub(r"\s+", " ", cmd_full_name).strip() if ' ' not in cmd_full_name: raise Exception('Command is malformed:', cmd_full_name) agent_recipient, cmd_name_and_args = cmd_full_name.split(' ', 1) if agent_recipient == 'self': agent_recipient = self.name cmd_name, cmd_args = cmd_name_and_args, None if ' ' in cmd_name_and_args: cmd_name,cmd_args = cmd_name_and_args.split(' ', 1) cmd = self.create_cmd_for(agent_recipient, cmd_name, cmd_args, validity) cmd.expected_res = expected_res cmd.expected_status = expected_status # If no cmd created (because of error, bad AgentDevice name), call again this method for next cmd #if cmd is None: return self._TEST_get_next_command_to_send() return cmd """ def simulator_send_next_command(self): #self._current_test_cmd = "set_state:idle" if self._current_test_cmd=="set_state:active" else "set_state:active" #if self._nb_test_cmds == 4: self._current_test_cmd = "do_exit" cmd_name = next(self.TEST_COMMANDS, None) #self.printd("next cmd is ", cmd_name) if cmd_name is None: return #Command.objects.create(sender=self.name, recipient=self.name, name=cmd_name) recipient_agent = self.name if self.TEST_COMMANDS_DEST=="myself" else self.TEST_COMMANDS_DEST Command.objects.create(sender=self.name, recipient=recipient_agent, name=cmd_name) #time.sleep(1) #self._TEST_current_cmd_idx += 1 #self._nb_test_cmds += 1 """ # Only if in TEST mode def __TEST_send_next_test_cmd(self): """ TEST mode ONLY Send next command from scenario defined in TEST_COMMANDS_LIST (even if current command running) """ if not self.is_in_test_mode(): return #self._cmdts.refresh_from_db() log.info("(TEST mode) Trying to send a new command if possible...") cmdts = self.__TEST_get_next_command_to_send() # No more command to send (from simulator) => return and EXIT if cmdts is None : if not self.get_received_pending_commands().exists(): self.DO_MAIN_LOOP = False return log.info("TTT") log.info(f"TTT NEW COMMAND TO SEND is: " + str(cmdts)) if hasattr(cmdts,"expected_res") : log.info(f"TTT (with expected final status & result : " + "'" + str(cmdts.expected_status) + "' & '" + str(cmdts.expected_res) + "')") log.info("TTT") # SEND cmdts.send() log.info(f"TTT NEW COMMAND SENT") # A VIRER def __TEST_send_next_test_cmd_ORIG(self): """ TEST MODE ONLY Send next command from scenario defined in TEST_COMMANDS_LIST (only if previous command finished) """ #if not self.is_in_test_mode(): return log.info("(TEST mode) Trying to send a new command if possible...") # There is a current command being processed # => check if next command is "do_abort" # => if so, instantly send a "do_abort" to abort previous command ##if self._cmdts is not None: if self._cmdts is not None: log.info(f"Waiting for end execution of cmd '{self._cmdts.name}' (sent to {self._cmdts.recipient}) ...") # Update cmdts fields from DB self._cmdts.refresh_from_db() # Current cmd is pending or running if self._cmdts.is_pending() or self._cmdts.is_running(): pass ''' FIXME: a reactiver ? en mieux (traitement d'une priority cmd) if self._next_cmdts is None: # If next command is "do_abort" then abort becomes the new current command (to be sent) self._next_cmdts = self.__TEST_get_next_command_to_send() if self._next_cmdts and self._next_cmdts.name == "do_abort": # Wait a little to give a chance to agentB to start execution of current command, # so that we can abort it then (otherwise it won't be aborted!!) #time.sleep(4) self.waitfor(4) self._cmdts = self._next_cmdts self._next_cmdts = None log.info("TTT") #log.info(f"T** SEND ", self._cmdts) log.info(f"TTT" + str(self._cmdts)) log.info("TTT") self._cmdts.send() ''' # Current cmd is finished else: # Execution was not completed #if self._cmdts.is_expired() or self._cmdts.is_skipped() or self._cmdts.is_killed(): ##if self._cmdts.is_skipped() or self._cmdts.is_killed(): if self._cmdts.is_finished_with_error(): log.info("Command was not completed") # 2 possible scenarios: # - (1) Send the SAME command again ''' self.printd("Command was not completed, so I send it again") # The command was not completed, so, make a copy of it and send it again # For this, it is enough to set primary key to None, # then the send() command below will save a NEW command #self._cmdts = copy.copy(self._cmdts) self._cmdts.id = None SEND_A_NEW_COMMAND = True ''' # - (2) Send next command #self._cmdts = None # Execution was completeted OK => get result elif self._cmdts.is_executed(): log.info(f"Cmd executed. Result is '{self._cmdts.get_result()}'") #cmdts_is_processed = True ''' Optimisation possible pour gagner une iteration: self._cmdts = self.simulator_get_next_command_to_send() # No more command to send (from simulator) => return if self._cmdts is None: return SEND_A_NEW_COMMAND = True ''' # Set cmdts to None so that a new command will be sent at next iteration self._cmdts = None # No currently running command => get a new command and SEND it if self._cmdts is None: if self._next_cmdts is not None: self._cmdts = self._next_cmdts self._next_cmdts = None else: self._cmdts = self.__TEST_get_next_command_to_send() # No more command to send (from simulator) => return and EXIT if self.cmdts is None: self.DO_MAIN_LOOP = False return # Send cmd (= set as pending and save) log.info("TTT") #self.printd(f"*** SEND ", self._cmdts) log.info(f"TTT NEW COMMAND TO SEND is: " + str(self._cmdts)) if hasattr(self._cmdts,"expected_res") : log.info(f"TTT (with expected final status & result : " + "'" + str(self._cmdts.expected_status) + "' & '" + str(self._cmdts.expected_res) + "')") log.info("TTT") #self._cmdts.set_as_pending() # SEND self._cmdts.send() log.info(f"TTT NEW COMMAND SENT") #cmdts_is_processed = False #cmdts_res = None def _TEST_get_sent_test_command_num(self, cmd_searched_num:int)->Tuple: ''' Return (really sent) test command which has number num ''' #print("num", cmd_num) i=1 cmd_num=0 while cmd_num <= cmd_searched_num: # Format : (DO_IT, "self cmd_name cmd_args", timeout, "expected_result", expected_status), cmd = self._TEST_COMMANDS_LIST[i-1] #print(cmd) DO_IT, cmd_name, _, _, _ = cmd if DO_IT: cmd_num+=1 if cmd_num == cmd_searched_num: cmd_name = re.sub(r"\s+", " ", cmd_name).strip().split(' ')[1] ##print(cmd_name, self.CC.name) assert cmd_name == self.CC.name break i+=1 return cmd #def __TEST_check_current_cmd_res_and_status(self, cmd:AgentCmd): def __TEST_check_current_cmd_res_and_status(self)->None: cmd = self.CC #if not self.is_in_test_mode(): return #if not (self.CC and self.CC.is_finished()): return # Only check TEST commands (not checking commands sent by others, like via the web interface...) #print("*** TEST ***", self.name) if cmd.sender != self.name: return log.debug("TTT CHECK TTT") # number of the current test command (finished) self.__test_cmd_received_num += 1 ##print(self.__test_cmd_received_num, cmd.name) #FIXME: do special check in this case if cmd.is_agent_general_priority_cmd(): return #print("test cmd num", self.__test_cmd_received_num, "for current cmd", cmd) # Format : (DO_IT, "self cmd_name cmd_args", timeout, "expected_result", expected_status), _, cmd_name, _, test_cmd_sent_expected_res, test_cmd_sent_expected_status = self._TEST_get_sent_test_command_num(self.__test_cmd_received_num) ##print(self.__test_cmd_received_num, cmd.name, cmd_name) #if hasattr(self._cmdts,'expected_res'): ##if cmd.is_executed() and self._cmdts.expected_res: #if cmd.is_finished() and self._cmdts.expected_res is not None: if test_cmd_sent_expected_res is not None: actual=str(cmd.result) expected=test_cmd_sent_expected_res log.debug(actual) log.debug(expected) #print(actual, ' <= vs =>', expected) assert actual==expected, f"Cmd result (='{actual}') is not as expected (='{expected}')" #if hasattr(self._cmdts,'expected_status'): if test_cmd_sent_expected_status is not None: #assert(cmd.state == self._cmdts.expected_status) actual=cmd.state expected=test_cmd_sent_expected_status #print(actual, ' <= vs =>', expected) assert actual==expected, f"Cmd status (='{actual}') is not as expected (='{expected}')" # Only if in TEST mode def __TEST_test_results(self): if not (self.TEST_MODE and self.TEST_WITH_FINAL_TEST): return if self._TEST_COMMANDS_LIST == []: return nb_commands_to_send = len(self._TEST_COMMANDS_LIST) nb_commands_sent, commands = self.__TEST_test_results_start() #nb_commands_to_send = len(self.TEST_COMMANDS_LIST) # General (default) test #self.printd(commands[0].name, "compared to", self.TEST_COMMANDS_LIST[0].split()[1]) assert commands[0].name == self._TEST_COMMANDS_LIST[0].split()[1] last_cmd = commands[-1] assert last_cmd.name == self._TEST_COMMANDS_LIST[-1].split()[1] assert last_cmd.name == "do_exit" assert last_cmd.is_executed() ##assert last_cmd.get_result() == "SHOULD BE DONE NOW" nb_asserted = 0 nb_agent_general = 0 nb_unknown = 0 nb_unimplemented = 0 for cmd in commands: ##assert cmd.is_executed() or cmd.is_killed() or cmd.is_skipped() assert cmd.is_finished() nb_asserted += 1 if cmd.is_agent_general_cmd(): nb_agent_general += 1 if cmd.name == "do_unknown": assert cmd.is_skipped() #assert "UnimplementedGenericCmdException" in cmd.get_result() assert "INVALID COMMAND" in cmd.get_result() nb_unknown += 1 #if cmd.name in ["do_unimplemented", "do_unknown"]: if cmd.name == "do_unimplemented": assert cmd.is_skipped() assert "UnimplementedGenericCmdException" in cmd.get_result() nb_unimplemented += 1 assert nb_asserted == nb_commands_sent log.info(f"{nb_commands_to_send} cmds I had to send <==> {nb_asserted} cmds executed (or killed), {nb_commands_to_send-nb_commands_sent} cmd ignored") log.info("Among executed commands:") log.info(f"- {nb_agent_general} AGENT general command(s)") log.info(f"- {nb_unimplemented} unimplemented command(s) => UnimplementedGenericCmdException raised then command was skipped") log.info(f"- {nb_unknown} unknown command(s) => skipped") # Can be overriden by subclass (AgentDevice) self._TEST_test_results_other(commands) ''' (EP) moved to AgentDevice # Now test that any "AD get_xx" following a "AD set_xx value" command has result = value for i,cmd_set in enumerate(commands): if cmd_set.name.startswith('set_'): commands_after = commands[i+1:] for cmd_get in commands_after: if cmd_get.name.startswith('get_') and cmd_get.name[4:]==cmd_set.name[4:] and cmd_get.device_type==cmd_set.device_type: log.info("cmd_get.result == cmd_set.args ?" + str(cmd_get.result) + ' ' + str(cmd_set.args)) assert cmd_get.get_result() == ','.join(cmd_set.args), "A get_xx command did not gave the expected result as set by a previous set_xx command" break ''' # Specific (detailed) test (to be overriden by subclass) nb_asserted2 = self._TEST_test_results_main(commands) self.__TEST_test_results_end(nb_asserted) def __TEST_prepare(self): if not self.is_in_test_mode(): return log.debug("\n!!! In TEST mode !!! => preparing to run a scenario of test commands") log.debug("- Current test commands list scenario is:\n" + str(self._TEST_COMMANDS_LIST)) if not self.WITH_SIMULATOR: log.debug("\n!!! In TEST but no SIMULATOR mode (using REAL device) !!! => removing dangerous commands for real devices... :") TEST_COMMANDS_LIST_copy = self._TEST_COMMANDS_LIST.copy() for cmd in TEST_COMMANDS_LIST_copy: cmd_key = cmd.split()[1] if ("set_" in cmd_key) or ("do_start" in cmd_key) or cmd_key in ["do_init", "do_goto", "do_open", "do_close"]: self._TEST_COMMANDS_LIST.remove(cmd) log.debug("- NEW test commands list scenario is:\n" + self._TEST_COMMANDS_LIST, '\n') # Can be overriden by subclass (AgentDevice) def _TEST_test_results_other(self, commands): pass def __TEST_test_results_start(self): print() log.info("--- Testing if the commands I SENT had the awaited result") log.info("Here are the last commands I sent:") #commands = list(Command.get_last_N_commands_for_agent(self.name, 16)) #commands = Command.get_last_N_commands_sent_to_agent(self.name, 16) nb_commands = len(self._TEST_COMMANDS_LIST) if "ad_unknown get_dec" in self._TEST_COMMANDS_LIST: nb_commands -= 1 commands = AgentCmd.get_last_N_commands_sent_by_agent(self.name, nb_commands) AgentCmd.show_commands(commands) return nb_commands, commands """ OLD SCENARIO nb_asserted = 0 for cmd in commands: if cmd.name == "specific0": assert cmd.is_skipped() nb_asserted+=1 if cmd.name == "specific1": assert cmd.result == "in step #5/5" assert cmd.is_executed() nb_asserted+=1 if cmd.name in ("specific2","specific3"): assert cmd.is_killed() nb_asserted+=1 if cmd.name in ("specific4", "specific5", "specific6", "specific7", "specific8"): assert cmd.is_pending() nb_asserted+=1 # 2 cmds abort if cmd.name in ("do_abort"): assert cmd.is_executed() nb_asserted+=1 if cmd.name in ("do_exit"): assert cmd.is_executed() nb_asserted+=1 assert nb_asserted == 12 self.printd("--- Finished testing => result is ok") """ # To be overriden by subclass def _TEST_test_results_main(self, commands): return 0 ''' nb_asserted = 0 self.printd("from simulator_test_results_main", commands) for cmd in commands: assert cmd.is_executed() nb_asserted+=1 return nb_asserted ''' def __TEST_test_results_end(self, nb_asserted): #nb_commands_to_send = len(self.TEST_COMMANDS_LIST) #self.printd(nb_asserted, "vs", nb_commands_to_send) #assert nb_asserted == nb_commands_to_send #self.printd(f"************** Finished testing => result is ok ({nb_asserted} assertions) **************") printFullTerm(Colors.GREEN, f"TTTTTTTTTTTT Finished testing => result is ok ({nb_asserted} assertions) TTTTTTTTTTTT") """ ================================================================= MAIN ================================================================= """ import argparse def __extract_parameters(): """ Usage: Agent.py [-t] [configfile] """ # arg 1 : -t # arg 2 : configfile DEBUG_MODE = False TEST_MODE = False WITH_SIM = False VERBOSE_MODE = False configfile = None log.debug("args:" + str(sys.argv)) for arg in sys.argv[1:] : if arg == "-t": TEST_MODE = True elif arg == "-s": WITH_SIM = True elif arg == "-d": DEBUG_MODE = True elif arg == "-v": VERBOSE_MODE = True #else: configfile = arg ''' if len(sys.argv) > 1: if sys.argv[1] == "-t": TEST_MODE = True if len(sys.argv) == 3: configfile = sys.argv[2] else: configfile = sys.argv[1] ''' #return DEBUG_MODE, WITH_SIM, TEST_MODE, VERBOSE_MODE, configfile return DEBUG_MODE, TEST_MODE, VERBOSE_MODE #def build_agent(Agent_type:Agent, name="GenericAgent", RUN_IN_THREAD=True): #def build_agent(Agent_type:Agent, RUN_IN_THREAD=True): def build_agent(Agent_type:Agent,param_constr=None) -> Agent : #DEBUG_MODE, WITH_SIM, TEST_MODE, VERBOSE_MODE, configfile = extract_parameters() DEBUG_MODE, TEST_MODE, VERBOSE_MODE = __extract_parameters() log.debug("debug mode is" + os.getenv("PYROS_DEBUG")) #print(logger) #agent = Agent("GenericAgent", configfile, RUN_IN_THREAD=True) #agent = Agent_type(configfile, RUN_IN_THREAD, DEBUG_MODE=DEBUG_MODE) #agent = Agent_type(RUN_IN_THREAD) if param_constr: if "AgentSST" in str(Agent_type): agent = Agent_type(agent=param_constr.get("agent"),simulated_computer=param_constr.get("computer")) else: agent = Agent_type() if agent.name == "AgentSST": # args are passed two times: first in AgentSST script and then to build_agent function, we need to setup again argparse in order to argparse to allow arguments parser = argparse.ArgumentParser(description='Start a agentSST.') parser.add_argument("--computer",dest="computer",help='Launch agent with simulated computer hostname',action="store") parser.add_argument("--agent",dest="agent",help='Launch an specific agent ',action="store") parser.add_argument("-t", action="store_true" ) # parser.add_argument("-d", action="store_true" ) # parser.add_argument("-s", action="store_true" ) # parser.add_argument("-v", action="store_true" ) # args = parser.parse_args() # if args.computer: # agent.set_computer(args.computer) # AgentSP isn't in a config, so to avoid that WITH_SIM returns an error it's a special case if agent.name == "AgentSP": agent._set_with_simulator(False) agent._set_test_mode(TEST_MODE) return agent #agent = Agent_type(name, configfile, RUN_IN_THREAD) # Get the information of the agent name (name of class) within obsconfig and get the "is_real" attribute if agent.name in agent.get_config().get_agents(agent.unit).keys(): if agent.get_config().get_agent_information(agent.unit,agent.name).get("is_real"): WITH_SIM = not agent.get_config().get_agent_information(agent.unit,agent.name)["is_real"] else: WITH_SIM = True else: WITH_SIM = True agent._set_with_simulator(WITH_SIM) agent._set_test_mode(TEST_MODE) #print(logger) #logg.info("agent built, return it") #agent._set_debug_mode(DEBUG_MODE) return agent if __name__ == "__main__": ''' # with thread RUN_IN_THREAD=True # with process #RUN_IN_THREAD=False ''' #agent = build_agent(Agent, RUN_IN_THREAD=RUN_IN_THREAD) agent = build_agent(Agent) agent.show_config() #agent = build_agent(Agent, name="GenericAgent", RUN_IN_THREAD=RUN_IN_THREAD) ''' TEST_MODE, WITH_SIM, configfile = extract_parameters() agent = Agent("GenericAgent", configfile, RUN_IN_THREAD=True) #agent.setSimulatorMode(TEST_MODE) agent.setTestMode(TEST_MODE) agent.setWithSimulator(WITH_SIM) self.printd(agent) ''' agent.run()