#!/usr/bin/env python3 #DEBUG=True #DEBUG=False """ ================================================================= Agent class ================================================================= This class is the base class for all AgentXXX classes (AgentSST, AgentScheduler, AgentImagesProcessor...) It runs an infinite loop (called main_loop) doing at each iteration some routine tasks (process_routine before and after) and executing commands sent by other agents. To start it up, from the project root : - In normal mode : - ./PYROS start -fg agent -o tnc (add -d after the PYROS command for debug mode) - In test mode (the agent will execute a scenario as a suite of commands (in TEST_COMMANDS_LIST) to send to himself (or other agents) and execute them on reception at each iteration : - ./PYROS -t start -fg agent -o tnc """ ### # ================================================================= # SETUP FOR DJANGO # # (see https://docs.djangoproject.com/en/dev/topics/settings) # (see also https://docs.djangoproject.com/en/dev/ref/settings) # ================================================================= ### # For cmd parsing from array import array from typing import List, Tuple, Union, Any import ast from inspect import signature import os from pathlib import Path import sys import logging import typing from django.conf import settings as djangosettings # Conseil sur le net: #https://stackoverflow.com/questions/16853649/how-to-execute-a-python-script-from-the-django-shell #"" #import sys, os #sys.path.append('/path/to/your/django/app') #os.environ['DJANGO_SETTINGS_MODULE'] = 'settings' #from django.conf import settings #"" # To avoid a "ModuleNotFoundError: No module named 'dashboard'"... (not even 1 app found) : ##sys.path.insert(0, os.path.abspath("..")) ##sys.path.insert(0, os.path.abspath("src")) ##sys.path.insert(0, "../src") ##sys.path.insert(0, "src") # To avoid a "ModuleNotFoundError: No module named 'dashboard'" ## sys.path.append("..") py_pwd = os.path.normpath(os.getcwd() + "/..") if (py_pwd not in os.sys.path): (os.sys.path).append(py_pwd) # To avoid a "ModuleNotFoundError: No module named 'src'" ## sys.path.append("../../../..") py_pwd = os.path.normpath(os.getcwd() + "/../../../..") if (py_pwd not in os.sys.path): (os.sys.path).append(py_pwd) ##sys.path.append("src") from src.pyros_logger import log, handler_filebyagent #from src.pyros_logger import logger as logg, handler_filebyagent, ##from src import pyros_logger ''' def printd(*args, **kwargs): if os.environ.get('PYROS_DEBUG', '0')=='1': print(*args, **kwargs) ''' #printd("Starting with this sys.path", sys.path) log.debug("Starting with this sys.path" + str(sys.path)) # DJANGO setup # self.printd("file is", __file__) # mypath = os.getcwd() # Go into src/ ##os.chdir("..") ##os.chdir("src") #printd("Current directory : " + str(os.getcwd())) log.debug("Current directory : " + str(os.getcwd())) #os.environ.setdefault("DJANGO_SETTINGS_MODULE", "src.core.pyros_django.pyros.settings") os.environ.setdefault("DJANGO_SETTINGS_MODULE", "src.core.pyros_django.pyros.settings") # os.environ['SECRET_KEY'] = 'abc' # os.environ['ENVIRONMENT'] = 'production' import django django.setup() #printd("DB2 used is:", djangosettings.DATABASES["default"]["NAME"]) log.debug("DB2 used is:" + djangosettings.DATABASES["default"]["NAME"]) ### # ================================================================= # IMPORT PYTHON PACKAGES #================================================================= ### # --- GENERAL PURPOSE IMPORT --- #from __future__ import absolute_import ##import utils.Logger as L import platform import random import threading #import multiprocessing import time ''' from threading import Thread import socket ''' #import ctypes #import copy # --- DJANGO IMPORT --- from django.db import transaction from django import db # from django.core.exceptions import ObjectDoesNotExist # from django.db.models import Q #from django.shortcuts import get_object_or_404 #from django.conf import settings as djangosettings # --- SPECIFIC IMPORT --- # Many ways to import configuration settings: ##import config import config.old_config as config_old #from config import PYROS_ENV, ROOT_DIR, DOC_DIR #from config import * from common.models import AgentSurvey, AgentCmd, AgentLogs #from config.configpyros import ConfigPyros from config.old_config.configpyros import ConfigPyros as ConfigPyrosOld from src.core.pyros_django.obsconfig.obsconfig_class import OBSConfig #from dashboard.views import get_sunelev #from devices.TelescopeRemoteControlDefault import TelescopeRemoteControlDefault #from utils.JDManipulator import * ##from agent.logpyros import LogPyros ##from src.logpyros import LogPyros ''' from device_controller.abstract_component.device_controller import ( DCCNotFoundException, UnknownGenericCmdException, UnimplementedGenericCmdException, UnknownNativeCmdException ) ''' ### # ================================================================= # GENERAL MODULE CONSTANT & FUNCTIONS DEFINITIONS # ================================================================= ### #DEBUG_FILE = False ##log = L.setupLogger("AgentLogger", "Agent") IS_WINDOWS = platform.system() == "Windows" class Colors: HEADER = "\033[95m" BLUE = "\033[94m" GREEN = "\033[92m" WARNING = "\033[93m" FAIL = "\033[91m" ENDC = "\033[0m" BOLD = "\033[1m" UNDERLINE = "\033[4m" def printColor(color: Colors, message, file=sys.stdout, eol=os.linesep, forced=False): #system = platform.system() """ if (self.disp == False and forced == False): return 0 """ #if system == "Windows": if IS_WINDOWS: print(message, file=file, end=eol) else: print(color + message + Colors.ENDC, file=file, end=eol) return 0 def printFullTerm(color: Colors, string: str): #system = platform.system() columns = 100 row = 1000 disp = True value = int(columns / 2 - len(string) / 2) printColor(color, "-" * value, eol="") printColor(color, string, eol="") value += len(string) printColor(color, "-" * (columns - value)) return 0 """ ================================================================= class StoppableThread ================================================================= """ ''' class StoppableThreadEvenWhenSleeping(threading.Thread): # Thread class with a stop() method. The thread itself has to check # regularly for the stopped() condition. # It stops even if sleeping # See https://python.developpez.com/faq/?page=Thread#ThreadKill # See also https://www.oreilly.com/library/view/python-cookbook/0596001673/ch06s03.html def __init__(self, *args, **kwargs): #super(StoppableThreadSimple, self).__init__(*args, **kwargs) super().__init__(*args, **kwargs) self._stop_event = threading.Event() #def stop(self): def terminate(self): self._stop_event.set() def stopped(self): return self._stop_event.is_set() def wait(self, nbsec:float=2.0): self._stop_event.wait(nbsec) ''' ### # ================================================================= # EXCEPTIONS classes # ================================================================= ### class CmdException(Exception): ''' Base class for all Agent command exceptions ''' # pass def __init__( self, cmd: Union[AgentCmd,str] ): self.cmd = cmd self.cmd_name = cmd if isinstance(cmd,str) else cmd.name ''' def __str__(self): return f"The Agent command '{self.cmd.name}' is unknown to the agent" #return f"({type(self).__name__}): Device Generic command has no implementation in the controller" ''' # --- CMD PENDING (non running) EXCEPTIONS --- class CmdUnknownException(CmdException): ''' Raised when a PENDING (non running) Agent (specific) cmd is NOT known by the agent ''' def __str__(self): return f"The Agent command '{self.cmd_name}' is unknown to the agent" #return f"({type(self).__name__}): Device Generic command has no implementation in the controller" class CmdUnimplementedException(CmdException): ''' Raised when a PENDING (non running) Agent Specific cmd is known by the agent but not implemented ''' def __str__(self): return f"The Agent command '{self.cmd_name}' is known by the agent but not implemented" #return f"({type(self).__name__}): Device Generic command has no implementation in the controller" class CmdBadArgsException(CmdException): ''' Raised when a PENDING (non running) Agent cmd has bad, missing, or too many argument(s) ''' def __str__(self): return f"The Agent command '{self.cmd_name}' has bad, missing, or too many argument(s)" #return f"({type(self).__name__}): Device Generic command has no implementation in the controller" # --- CMD RUNNING EXCEPTIONS --- class CmdExecErrorException(CmdException): ''' Raised when a RUNNING Agent cmd has had a running error ''' def __str__(self): return f"The running Agent command '{self.cmd_name}' has had an error (during execution)" class CmdExecTimeoutException(CmdException): ''' Raised when a RUNNING Agent cmd is timeout ''' def __str__(self): return f"The running Agent command '{self.cmd_name}' is timeout" class CmdExecKilledException(CmdException): ''' Raised when a RUNNING Agent cmd has been aborted (by another agent) ''' def __str__(self): return f"The running Agent command '{self.cmd_name}' has been killed (by another agent)" ### # ================================================================= # class Agent # ================================================================= ### class Agent: """ See Agent_activity_diag.pu for PlantUML activity diagram Behavior of an agent: - If idle : - still does routine_process() and general_process() - does not do specific_process() - Once a command has been sent to another agent : - It waits (non blocking) for the end of execution of the command and get its result - If command is timed out or has been skipped or killed, then it is NOT re-executed at next iteration (except if needed explicitely) """ # --- # --- CLASS (STATIC) attributes (CONSTANTS) # --- If agent is instance of Agent: # --- - CLASS attributes are accessible via agent.__class__.__dict__ # --- - INSTANCE attributes are accessible via agent.__dict__ # --- # Default modes DEBUG_MODE:bool = False #TEST_MODE = False # By default, a command is valid during 5s (and then perempted) DEFAULT_CMD_VALIDITY_DURATION:int = 5 # Wait a fixed number of seconds before each loop ? #WITH_RANDOM_WAIT = False # 1 sec by default _DELAY_NB_SEC:int = 1 # - YES if TEST mode (in init()) # Default LOG level is INFO #PYROS_DEFAULT_GLOBAL_LOG_LEVEL = LogPyros.LOG_LEVEL_INFO # INFO ''' This Agent Specific commands list, that he is able to execute To be overriden by subclasses (here are just a few examples of commands) Format : ("cmd", args, timeout) with : - cmd (str) : the command name - timeout (int) : the command timeout (in sec) - exec_mode (int) : 0 (sequential), 1 (thread), or 2 (process) ''' AGENT_SPECIFIC_COMMANDS: List[ Tuple[str, int, int] ] = [ # Format : (“cmd_name”, timeout, exec_mode) ("do_specific1", 10, 0), #("set_specific2", 5, 0), ("do_specific3", 3, 0), ] # # --- FOR TEST ONLY --- # # By default, NOT in test mode #TEST_MODE = False # Maximum duration of this agent (only for SIMULATION mode) # If set to None, it will never exit except if asked (or CTRL-C) # If set to 20, it will exit after 20s TEST_MAX_DURATION_SEC = None #TEST_MAX_DURATION_SEC = 30 # Run this agent in simulator mode #TEST_MODE = True WITH_SIMULATOR:bool = False # Run the assertion tests at the end TEST_WITH_FINAL_TEST:bool = False CMD_STATUS = AgentCmd.CMD_STATUS_CODES AGT_STATUS = AgentSurvey.STATUS_CHOICES # (EP 2022/07) Default SCENARIO to be executed (only in TEST mode), and then STOP # # It is a list of commands to be sent by this agent to other agents ("self" means himself) # # Format : List of tuples (command, validity, expected_res, expected_status), with : # # - command : the format is "recipient cmd args", with : # - recipient : name of the Agent that the command is to be sent to (use "self" to mean himself) # - cmd : the command name # - args : (optional) the list of command arguments, separated by blanks : arg1 arg2 arg3 ... # # - validity : the command is valid for this duration, afterwards you can forget it # # - expected_res : the expected result # # - expected_status : the status of the command expected after execution (expired, killed, skipped, executed...) # # Ex : # - "AgentScheduler set_state ATTENTIVE" => means to send the command "set_state ATTENTIVE" to the agent AgentScheduler # - "AgentX set_state ATTENTIVE" => means to send the command "set_state ATTENTIVE" to the agent AgentX # - "self set_state ATTENTIVE" => means to send the command "set_state ATTENTIVE" to MYSELF # - "self do_restart_loop" => means to send the command "do_restart_loop" to MYSELF (no args) # TEST_COMMANDS_LIST: List[ Tuple[ str, int, str, Union[int,None] ] ] = [ # Format : ("self cmd_name cmd_args", timeout, "expected_result", expected_status), #("self do_stop now", 200, '15.5', None), ("self do_specific1 1 2 3.5 titi (3,'titi',5) [1,3,5,7,9]", 200, '15.5', None), # 1) First, 3 EXCEPTION CASES (uncomment to activate exception) # Each of these lines will stop execution with an exception # ------------------------------------------------------------ # - Agent command, unknown => ko, UnknownCmdException #("self do_unknown", 10, None,None), # - Agent general command malformed (missing arg) => ko, AgentCmdBadArgsException #("Agent set_mode", 10, None,None), # - Agent specific command, known but not implemented => ko, AgentCmdUnimplementedException #("self set_specific2", 10, None,None), # - Agent specific command, implemented but missing args => ko, AgentCmdBadArgsException #(" self do_specific1 1 ", 10, None,None), # 2) NORMAL CASES (test scenario) # All these commands should be executed without error, from the 1st to the last one # ------------------------------- # This command has a validity of 0s and thus should be tagged as "expired" ("self set_mode ATTENTIVE", 0, "MODE is ATTENTIVE", CMD_STATUS.CMD_EXPIRED), # Agent general command ("self set_mode ATTENTIVE", 200, "MODE is ATTENTIVE", CMD_STATUS.CMD_EXECUTED), # => should get "ATTENTIVE" ("self get_mode", 100, "MODE is ATTENTIVE", None), # => should get "7" ("self do_eval 3+5-1", 200, '7', None), # END, will not go further #("self do_exit", 2, "STOPPING"), # Agent specific commands => should be executed ("self do_specific3", 200, '', None), ("self do_exit", 500, "STOPPING", None), ("self do_specific1 1 2 3.5 titi (3,'titi',5) [1,3,5,7,9]", 200, '15.5', None), ("self set_mode ROUTINE", 200, "MODE is ROUTINE", None), # => should get "ROUTINE" ("self get_mode", 200, "MODE is ROUTINE", None), # Agent specific command => should be skipped (because not ATTENTIVE) ("self do_specific1 1 2 3.5 titi (3,'titi',5) [1,3,5,7,9]", 200, "SKIPPED", None), # From now on, should not run anymore process_before/after # => and should skip next specific commands ("self set_mode IDLE", 200, "MODE is IDLE", None), # => should get "IDLE" ("self get_mode", 200, "MODE is IDLE", None), # Agent specific command => should be skipped (because not ATTENTIVE) ("self do_specific1 1 2 3.5 titi (3,'titi',5) [1,3,5,7,9]", 200, 'SKIPPED', None), # TODO: test priority commands : do_abort, do_flush_cmds, ... # - Stop executing new commands (just let them accumulate) ##("self do_stop_exec",), ##("self set_mode ATTENTIVE",), ##("self get_mode",), ##("self do_specific1 1 2 3.5 titi (3,'titi',5) [1,3,5,7,9]",), ##("self set_mode ROUTINE",), ##("self get_mode",), ##("self do_abort",), # - Now, resume execution of commands : # we should have the previous list pending, # but as "do_abort" is priority, it should be executed first even if last ! ##("self do_resume_exec",), # Restart the restart loop (from init()) ("self do_restart", 200, "RESTARTING", None), # Now stop ("self do_exit", 200, "STOPPING", None), ''' # specific0 not_executed_because_idle "specific0", "set_state:active", # specific1 executed_because_not_idle, should complete ok "specific1", # specific2 will be executed only when specific1 is finished, # and should be aborted before end of execution, # because of the 1st coming "do_abort" command below "specific2", # specific3 should be executed only when specific2 is finished (in fact, aborted), # and should be aborted before end of execution, # because of the 2nd coming "do_abort" command below "specific3", # These commands should not have the time to be processed # because the "do_exit" command below should be executed before "specific4", "specific5", "specific6", "specific7", "specific8", # Should abort the current running command (which should normally be specific2) # even if commands above (specific3, ..., specific8) are already pending "do_abort", # These commands (except abort) won't be executed # because too many commands are already pending (above) "specific9", "do_abort", "set_state:active", "set_state:idle", # Should stop the agent even before the previous pending commands are executed "do_exit", # Because of the previous "do_exit" command, # these following commands should not be executed, # and not even be added to the database command table "set_state:active", "specific10" ''' ] #TEST_COMMANDS = iter(TEST_COMMANDS_LIST) ''' # with thread RUN_IN_THREAD = True # with process #RUN_IN_THREAD = False _thread_total_steps_number = 1 ''' #COMMANDS_PEREMPTION_HOURS = 48 #COMMANDS_PEREMPTION_HOURS = 60/60 ##name = "Generic Agent" ##status = None ##mode = None config = None ''' Moved to more central file : config.config_base PYROS_DJANGO_BASE_DIR = Path("src/core/pyros_django") # pathlib DEFAULT_CONFIG_FILE_NAME = "config_unit_simulunit1.xml" CONFIG_DIR_NAME = "config" ''' # My own ConfigPyros instance (Observatory Configuration) _oc = None # Parameters from config file # for /src/ #_path_data = '../../config' # for /src/core/pyros_django/ #_path_data = '../../../../config' # Path to config _path_data = '' _computer_alias = '' _computer_description = '' # Current and next command to send _cmdts: AgentCmd = None _next_cmdts = None _agent_survey = None _pending_commands = [] ''' _current_device_cmd = None _current_device_cmd_thread = None ''' # List of agents I will send commands to _my_client_agents_aliases = [] _my_client_agents = {} _iter_num = None # Log object _log = None # new obsconfig init for agent: ##def __init__(self, RUN_IN_THREAD=True): def __init__(self): # Declaration of Instance attributes, default values self.name = "Generic Agent" self.status = None self.mode = None self.unit = None self.TEST_COMMANDS = None #print(AgentSurvey.MODE_CHOICES.IDLE) #sys.exit() # Agent is by default in mode ATTENTIVE (most active mode) #self.mode = self.MODE_ATTENTIVE self._set_mode_attentive() #self._set_mode(MODES.) log.addHandler(handler_filebyagent(logging.INFO, self.__class__.__name__)) log.debug("start Agent init") obs_config_file_path = os.environ["PATH_TO_OBSCONF_FILE"] path_to_obs_config_folder = os.environ["PATH_TO_OBSCONF_FOLDER"] unit = os.environ["unit_name"] oc = OBSConfig(obs_config_file_path,unit) self.set_config(oc, obs_config_file_path, path_to_obs_config_folder, unit) self.name = self.__class__.__name__ # set real unit name (the current unit used) if unit == "": unit = oc.unit_name agent_name_from_config = self.get_config().get_agent_name_from_config(self.__class__.__name__) if agent_name_from_config: self.name = agent_name_from_config self.unit = unit print(f"Agent name : {self.name}") print(f"Unit name : {self.unit}") # (EP) moved to AgentDevice ###self._set_agent_device_aliases_from_config(self.name) self._set_mode_from_config(self.name) log.debug("Step __init__") self.TEST_COMMANDS = iter(self.TEST_COMMANDS_LIST) ##self.RUN_IN_THREAD = RUN_IN_THREAD self._set_status(self.AGT_STATUS.LAUNCHED) ####self._set_idle() # Create 1st survey if none #tmp = AgentSurvey.objects.filter(name=self.name) #if len(tmp) == 0: #nb_agents = AgentSurvey.objects.filter(name=self.name).count() #if nb_agents == 0: if AgentSurvey.objects.filter(name=self.name).exists(): self._agent_survey = AgentSurvey.objects.get(name=self.name) else: self._agent_survey = AgentSurvey.objects.create( name=self.name, validity_duration=60, #mode=self.mode, mode = self.get_mode(), #status=self.status, status=self.get_status(), iteration=-1 ) log.debug("Agent survey is" + str(self._agent_survey)) #self.printd("Agent survey is", self._agent_survey) ("end Agent __init__") #def __init__(self, name:str="Agent", config_filename:str=None, RUN_IN_THREAD=True): #def __init__(self, config_filename:str=None, RUN_IN_THREAD=True, DEBUG_MODE=False): # def __init__(self, config_filename:str=None, RUN_IN_THREAD=True): # ''' # print('PYROS_ENV', PYROS_ENV) # print('ROOT_DIR', ROOT_DIR) # print('DOC_DIR', DOC_DIR) # ''' # # Set logger # ##pyros_logger.set_logger_config() # ##logg = logging.getLogger('pyroslogger') # log.addHandler(handler_filebyagent(logging.INFO, self.__class__.__name__)) # log.debug("start Agent init") # # SET CONFIG INSTANCE # # - OLD CONFIG # log.debug(f'config file is {config_filename}') # ##log.info('PYROS_ENV', config.PYROS_ENV) # log.debug('PYROS_ENV' + config_old.PYROS_ENV) # log.debug('ROOT_DIR' + config_old.ROOT_DIR) # log.debug('DOC_DIR' + config_old.DOC_DIR) # ##if config.is_dev_env(): print("DEV ENV") # if config_old.is_dev_env(): log.debug("DEV ENV") # if config_old.is_prod_env(): log.debug("PROD ENV") # if config_old.is_debug(): log.debug("IN DEBUG MODE") # # - NEW CONFIG # obs_config_file_path = os.environ["PATH_TO_OBSCONF_FILE"] # path_to_obs_config_folder = os.environ["PATH_TO_OBSCONF_FOLDER"] # unit = os.environ["unit_name"] # oc = OBSConfig(obs_config_file_path) # self.set_config(oc, obs_config_file_path, path_to_obs_config_folder, unit) # log.debug("Step __init__") # self.TEST_COMMANDS = iter(self.TEST_COMMANDS_LIST) # self.RUN_IN_THREAD = RUN_IN_THREAD # self._set_status(self.STATUS_LAUNCH) # self._set_idle() # self._set_agent_device_aliases_from_config(self.name) # self._set_mode_from_config(self.name) # #self.name = name # self.name = self.__class__.__name__ # ''' # printd("*** ENVIRONMENT VARIABLE PYROS_DEBUG is:", os.environ.get('PYROS_DEBUG'), '***') # ##self.DEBUG_MODE = DEBUG_MODE # self.DEBUG_MODE = os.environ.get('PYROS_DEBUG', '0')=='1' # ''' # #self.DEBUG_MODE = config.PYROS_ENV # ##self.log = LogPyros(self.name, AgentLogs) # ##self.DEBUG_MODE = config.is_debug() # self.DEBUG_MODE = config_old.is_debug() # ##self.log.debug_level = DEBUG_MODE # ''' # # Default LOG level is INFO # log_level = LogPyros.LOG_LEVEL_INFO # INFO # self.log.set_global_log_level(LogPyros.LOG_LEVEL_DEBUG) if self.DEBUG_MODE else self.log.set_global_log_level(log_level) # ''' # #global_log_level = LogPyros.LOG_LEVEL_DEBUG if self.DEBUG_MODE else self.PYROS_DEFAULT_GLOBAL_LOG_LEVEL # ##global_log_level = LogPyros.LOG_LEVEL_DEBUG if self.DEBUG_MODE else config.PYROS_DEFAULT_GLOBAL_LOG_LEVEL # ##global_log_level = LogPyros.LOG_LEVEL_DEBUG if self.DEBUG_MODE else config_old.PYROS_DEFAULT_GLOBAL_LOG_LEVEL # ##self.log.set_global_log_level(global_log_level) # ##self.printd("LOG LEVEL IS:", self.log.debug_level) # ##self.print("LOG LEVEL IS:", self.log.get_global_log_level()) # # ------------- OLD CONFIG ------------------- # # Est-ce bien utile ??? # # New way with PathLib # # my_parent_abs_dir = Path(__file__).resolve().parent # #TODO: on doit pouvoir faire mieux avec pathlib (sans utiliser str()) # ##self._path_data = str( Path( str(my_parent_abs_dir).split(str(self.PYROS_DJANGO_BASE_DIR))[0] ) / self.CONFIG_DIR_NAME ) # ##self._path_data = config.CONFIG_DIR # # self._path_data = config_old.CONFIG_DIR # #self._set_mode(self.MODE_IDLE) # # config_filename = self.get_config_filename(config_filename) # #self.printd(f"*** Config file used is={config_filename}") # # log.debug(f"*** Config file used is={config_filename}") # # self.config = ConfigPyrosOld(config_filename) # # if self.config.get_last_errno() != self.config.NO_ERROR: # # raise Exception(f"Bad config file name '{config_filename}', error {str(self.config.get_last_errno())}: {str(self.config.get_last_errmsg())}") # #TODO: : à mettre dans la config # ''' # 'AgentDeviceTelescope1': 'AgentDeviceTelescopeGemini', # 'AgentDeviceFilterSelector1': 'AgentDeviceSBIG', # 'AgentDeviceShutter1': 'AgentDeviceSBIG', # 'AgentDeviceSensor1': 'AgentDeviceSBIG', # ''' # #self._my_client_agents = {} # ### self._agent_logs = AgentLogs.objects.create(name=self.name, message="Step __init__") # #self.printdb("Step __init__") # log.debug("Step __init__") # self.TEST_COMMANDS = iter(self.TEST_COMMANDS_LIST) # self.RUN_IN_THREAD = RUN_IN_THREAD # self._set_status(self.STATUS_LAUNCH) # self._set_idle() # self._set_agent_device_aliases_from_config(self.name) # self._set_mode_from_config(self.name) # # TODO: remove # #self._set_idle() # self._set_active() # # Create 1st survey if none # #tmp = AgentSurvey.objects.filter(name=self.name) # #if len(tmp) == 0: # #nb_agents = AgentSurvey.objects.filter(name=self.name).count() # #if nb_agents == 0: # if AgentSurvey.objects.filter(name=self.name).exists(): # self._agent_survey = AgentSurvey.objects.get(name=self.name) # else: # self._agent_survey = AgentSurvey.objects.create(name=self.name, validity_duration=60, mode=self.mode, status=self.status, iteration=-1) # log.debug("Agent survey is" + str(self._agent_survey)) # #self.printd("Agent survey is", self._agent_survey) # ("end Agent init") def set_config(self, oc: OBSConfig, obs_config_file_path: str, path_to_obs_config_folder: str, unit: str): self._oc = { 'config' : oc, 'env' : [ obs_config_file_path, path_to_obs_config_folder, unit ] } ''' config_env = { obs_config_file_path: obs_config_file_path, path_to_obs_config_folder: path_to_obs_config_folder, unit: unit } ''' def get_config(self): return self._oc['config'] def get_config_env(self): return self._oc['env'] def show_config(self): VERBOSE = False oc = self.get_config() obs_config_file_path, path_to_obs_config_folder, unit = self.get_config_env() #self.printd(obs_config_file_path) log.debug(obs_config_file_path) log.debug(path_to_obs_config_folder) log.debug(unit) #log.warning("petit warning bidon") print("\n") print("- Observatory:" + oc.get_obs_name()) my_unit_name = oc.get_units_name()[0] my_unit = (oc.get_units()[my_unit_name]) #print("- Unit description:", my_unit) if VERBOSE: print("\n") print("- Computers:", oc.get_computers()) print("\n") print("- Active Computers:", oc.get_active_computers()) print("\n") print("- Active Devices:", oc.get_active_devices()) print("\n") print("- Unit:", my_unit_name) VERBOSE and print(oc.get_unit_by_name(my_unit_name)) if VERBOSE: print("\n") print("- Unit topology:", oc.get_topology(my_unit_name)) print("\n") print("- Unit active Agents:", oc.get_active_agents(my_unit_name)) VERBOSE and print(oc.get_agents(my_unit_name)) print("\n") print("- Unit Agents per computer:", oc.get_agents_per_computer(my_unit_name)) print("\n") print("- Unit Agents per device:", oc.get_agents_per_device(my_unit_name)) if VERBOSE: print("\n") print("- Unit Channel groups:", oc.get_channel_groups(my_unit_name)) print("\n") print("- Unit Channels:", oc.get_channels(my_unit_name)) #print("\n") #print("- Unit/Channel info:", oc.get_channel_information(my_unit_name, 'OpticalChannel_up')) if VERBOSE: print("\n") print("- Unit Components agents:", oc.get_components_agents(my_unit_name)) if VERBOSE: print("\n") print("- Unit database:", oc.get_database_for_unit(my_unit_name)) print("\n") print("- Devices names:", oc.get_devices_names()) if VERBOSE: print("\n") print("- Devices names & files:", oc.get_devices_names_and_file()) print("\n") print("- Devices:", oc.get_devices()) print("\n") def get_config_filename(self, config_filename: str): if not config_filename: #config_filename = self.DEFAULT_CONFIG_FILE_NAME ##config_filename = config.DEFAULT_CONFIG_FILE_NAME config_filename = config_old.DEFAULT_CONFIG_FILE_NAME # If config file name is RELATIVE (i.e. without path, just the file name) # => give it an absolute path (and remove "src/agent/" from it) if config_filename == os.path.basename(config_filename): ##config_filename = os.path.join(config.CONFIG_DIR, config_filename) config_filename = os.path.join(config_old.CONFIG_DIR, config_filename) ''' # Build abs path including current agent dir config_filename = os.path.abspath(self.CONFIG_DIR_NAME + os.sep + config_filename) # Remove "src/agent_name/" from abs dir : # (1) Remove "src/core/pyros_django/" config_filename = config_filename.replace(str(self.PYROS_DJANGO_BASE_DIR)+os.sep, os.sep) # (2) Remove "agent_name/" #TODO: bidouille, faire plus propre config_filename = config_filename.replace(os.sep+"agent"+os.sep, os.sep) config_filename = config_filename.replace(os.sep+"monitoring"+os.sep, os.sep) ''' return os.path.normpath(config_filename) def __repr__(self): return "I am agent " + self.name def __str__(self): return self.__repr__() #return "I am agent " + self.name # Normal print ##def print(self, *args, **kwargs): self.log.print(*args, **kwargs) """ if args: self.printd(f"({self.name}): ", *args, **kwargs) else: self.printd() """ """ # DEBUG print shortcut ##def printd(self, *args, **kwargs): self.log.printd(*args, **kwargs) #if DEBUG: self.printd(d(*args, **kwargs) def log_d(self, *args, **kwargs): self.log.log_d(*args, **kwargs) def log_i(self, *args, **kwargs): self.log.log_i(*args, **kwargs) def log_w(self, *args, **kwargs): self.log.log_w(*args, **kwargs) def log_e(self, *args, **kwargs): self.log.log_e(*args, **kwargs) def log_c(self, *args, **kwargs): self.log.log_c(*args, **kwargs) def printdb(self, *args, **kwargs): self.log.db( *args, **kwargs) """ def _get_real_agent_name(self, agent_alias_name:str)->str: #self.printd("key is", agent_alias_name) ''' if not self._my_client_agents: return agent_alias_name return self._my_client_agents[agent_alias_name] ''' return self._my_client_agents.get(agent_alias_name) def run(self, nb_iter:int=None, FOR_REAL:bool=True): """ FOR_REAL: set to False if you don't want Agent to send commands to devices but just print messages without really doing anything """ ("in run()") self._set_and_log_status(self.AGT_STATUS.LAUNCHED) #return # TEST MODE ONLY # IF in test mode but with REAL devices (no SIMULATOR), delete all dangerous commands from the test commands list scenario: self._TEST_prepare() #self._DO_EXIT = False # No agent specific cmd currently running (thread) self.AGENT_SPECIFIC_CMD_RUNNING = False # No Routine process (before or after) currently running (thread) self.ROUTINE_PROCESS_RUNNING = False ################ # RESTART loop # ###############@ # REPEAT UNTIL not DO_RESTART_LOOP self.DO_RESTART_LOOP = True while self.DO_RESTART_LOOP: log.info("*"*10+ " STARTING RESTART LOOP "+ "*"*10+ '\n') # By default, no restart after exit from main loop self.DO_RESTART_LOOP = False self.start_time = time.time() #log.debug("on est ici: " + os.getcwd()) self._load_config() self.print_TEST_MODE() self.init() ''' testing log: self.log_e("ERROR") self.log_c("FATAL critical ERROR") ''' #self.log_w("WARNING", "watch your step !") #log.warning("WARNING"+ "watch your step !") # Avoid blocking on false "running" commands # (old commands that stayed with "running" status when agent was killed) AgentCmd.delete_commands_with_running_status_for_agent(self.name) self._iter_num = 1 ############# # MAIN loop # ############@ self.DO_MAIN_LOOP = True while self.DO_MAIN_LOOP: try: self._main_loop(nb_iter,FOR_REAL) #if not self.DO_MAIN_LOOP: break except KeyboardInterrupt: # CTRL-C # In case of CTRL-C, kill the current thread (process) before dying (in error) #log.info("CTRL-C Interrupted, I kill the current thread (process) before exiting (if exists)") log.info("CTRL-C Interrupted, trying to stop cleanly") #self._kill_running_device_cmd_if_exists("USER_CTRLC") #self.do_things_before_exit("USER_CTRLC") ##self._cleanup_before_exit("USER_CTRLC") self.do_stop("asap") exit(1) # TEST mode only self._TEST_test_results() #if self._DO_EXIT: exit(0) #def _kill_running_device_cmd_if_exists(self, abort_cmd_sender): # to be overriden by subclass def do_things_before_exit(self, stopper_agent_name:str=None): pass def _main_loop(self, nb_iter:int=None, FOR_REAL:bool=True): self._set_and_log_status(self.AGT_STATUS.IN_MAIN_LOOP) self._main_loop_start(nb_iter) #if not self.DO_MAIN_LOOP: return self._reload_config_if_changed() # only if changed # Log this agent status (update my current mode and status in DB) ##self._log_agent_state() # better to do this in a subclass #self.show_config() #self.printd("====== START COMMMANDS PROCESSING ======") # SIMU #self.send_cmd_to("AgentScheduler", "do_replan") # ROUTINE process BEFORE # To be specified by subclass if not self.IS_IDLE(): self._routine_process_before() #self.printd("I am IDLE, so I bypass the routine_process (do not send any new command)") # Processing the next pending command if exists # If Agent general level command like (DO_RESTART, DO_EXIT, DO_ABORT) # => will set DO_MAIN_LOOP=False and/or DO_RESTART_LOOP=True print() print() log.info("*"*10 + " NEXT COMMAND PROCESSING (START) " + "*"*10 + '\n') try : cmd = self._process_next_command_if_exists() #print(cmd) except ( # PENDING (non running) cmd exceptions CmdUnimplementedException, CmdBadArgsException, CmdUnknownException, # RUNNING cmd exceptions CmdExecErrorException, CmdExecTimeoutException, CmdExecKilledException ) as e : #print(e) log.error(f"EXCEPTION on Agent command '{e.cmd_name}'") if isinstance(e.cmd, AgentCmd) : cmd = e.cmd if type(e) is CmdUnimplementedException: if cmd.name != "get_specific_cmds": cmd.set_as_unimplemented("EXCEPTION: command known but unimplemented") elif type(e) in (CmdBadArgsException, CmdUnknownException): cmd.set_as_invalid("EXCEPTION: command unknown or bad args") # Execution Exception elif type(e) != CmdExecErrorException: cmd.set_result("EXCEPTION: Problem during execution") # isinstance(e.cmd, str) else: raise Exception("Abnormal error case...") # Ne pas stopper !! ##self._cleanup_before_exit() ##raise log.info("*"*10 + " NEXT COMMAND PROCESSING (END) " + "*"*10 + "\n") # only if in TEST mode self._TEST_check_cmd_res_and_status(cmd) if self.DO_MAIN_LOOP: if not self.IS_IDLE(): self._routine_process_after() # TEST MODE only : execute test routine_process always (even if IDLE) in order to (always) send next command from test scenario self._TEST_test_routine_process() #self.printd("====== END COMMMANDS PROCESSING ======") #self.waitfor(self.mainloop_waittime) self._main_loop_end() self._iter_num += 1 def _main_loop_start(self, nb_iter:int=None): for i in range(3): print() #self.printd("-"*80) log.info("*"*90) log.info("*"*20 + f" {self.name} : MAIN LOOP ITERATION {self._iter_num} (START) " + "*"*20) log.info("*"*90 + '\n') #self.print(f"Iteration {self._iter_num}") # EXIT because of nb of iterations ? if nb_iter is not None: # Bad number of iterations or nb iterations reached => exit if nb_iter <= 0 or nb_iter < self._iter_num: log.info(f"Exit because number of iterations asked ({nb_iter}) has been reached") self.DO_MAIN_LOOP = False return # Temporizing (delay) : Wait a random number of sec before starting iteration # (to avoid to busy to much the processor) # (also to let another agent having the chance to send a me command) if self._DELAY_NB_SEC : #random_waiting_sec = random.randint(0,5) log.info(f"Waiting {self._DELAY_NB_SEC} sec (random) before starting new iteration...") #time.sleep(random_waiting_sec) self.waitfor(self._DELAY_NB_SEC) # (Test only) # EXIT because max duration reached ? if self.TEST_MODE and self.TEST_MAX_DURATION_SEC and (time.time()-self.start_time > self.TEST_MAX_DURATION_SEC): log.info("Exit because of max duration set to ", self.TEST_MAX_DURATION_SEC, "s") #self._kill_running_device_cmd_if_exists(self.name) self._cleanup_before_exit(self.name) #if self.TEST_MODE and self.TEST_WITH_FINAL_TEST: self._TEST_test_results() self.DO_MAIN_LOOP = False return ##self._set_status(self.AGT_STATUS.IN_MAIN_LOOP) self.show_mode_and_status() self.main_loop_start() # To be overriden by subclass def main_loop_start(self): pass def _main_loop_end(self): self.main_loop_end() log.info("*"*20 + " MAIN LOOP ITERATION (END) " + "*"*20 + '\n') #self.do_log(LOG_DEBUG, "Ending main loop iteration") # To be overriden by subclass def main_loop_end(self): pass # To be overriden by subclass (AgentDevice) def process_device_level_cmd(self): pass ''' log.info("(DEVICE LEVEL CMD)") try: self.exec_device_cmd_if_possible(cmd) except (UnimplementedGenericCmdException) as e: #except (UnknownGenericCmdException, UnimplementedGenericCmdException, UnknownNativeCmdException) as e: log.e(f"EXCEPTION caught by {type(self).__name__} (from Agent mainloop) for command '{cmd.name}'", e) log.e("Thus ==> ignore this command") cmd.set_result(e) #cmd.set_as_killed_by(type(self).__name__) cmd.set_as_skipped() #raise ''' def _process_next_command_if_exists(self)->Union[AgentCmd,None]: ''' Processing the next pending command if exists ''' self._set_and_log_status(self.AGT_STATUS.IN_GET_NEXT_CMD) #print() #print() #log.info("*"*10 + " NEXT COMMAND PROCESSING (START) " + "*"*10 + '\n') # Purge commands (every N iterations, starting from 1st, delete old commands) N=5 if ((self._iter_num-1) % N) == 0: log.info("Purging expired commands if exists") #AgentCmd.purge_old_commands_for_agent(self.name) self._purge_old_commands_sent_to_me() # Get next command and process it (if exists) cmd = self._get_next_valid_and_not_running_command() self.CURRENT_CMD = cmd #self._set_status(self.STATUS_GENERAL_PROCESS) #if cmd: self.command_process(cmd) # SIMU #cmd = "do_replan" #self.send_cmd_to("AgentScheduler", "do_replan") # No new command => nothing to do if not cmd: return cmd log.info('-'*6) log.info('-'*6 + " RECEIVED NEW COMMAND TO PROCESS: ") log.info('-'*6 + str(cmd)) if self.is_in_test_mode() and hasattr(self._cmdts,"expected_res"): log.info(f"*** (with expected result : " + str(self._cmdts.expected_res) + ')') log.info('-'*6) cmd.set_read_time() # CASE 1 - AGENT GENERAL command # (DO_RESTART, DO_EXIST, DO_ABORT, DO_ABORT_COMMAND, DO_FLUSH_COMMANDS, ...) # This processing can set DO_MAIN_LOOP and DO_RESTART_LOOP to False if self._is_agent_general_cmd(cmd): try: self._process_agent_general_cmd(cmd) except CmdUnimplementedException as e: #print(e) #cmd.set_as_skipped("ERROR: Unimplemented Agent General command") #self._cleanup_before_exist() # This exception is managed at higher level : raise #except ValueError as e: except CmdBadArgsException as e: #print(e) #cmd.set_as_skipped("ERROR: Bad Argument(s)") #self._cleanup_before_exit() # This exception is managed at higher level : raise #raise AgentCmdBadArgsException(cmd.name) # Must I stop or restart ? # cmd.args => ['arg1','arg2',...] #print("nom", cmd.name) ''' if cmd.name=="do_stop": # by default = "asap" if cmd.args is None: cmd.full_name = "do_exit" else: arg = cmd.args[0] if arg =='asap': cmd.full_name = "do_exit" elif arg =='now': cmd.full_name = "do_abort" else: raise CmdBadArgsException(cmd) ''' #print("nom", cmd.name) #print(self.get_specific_cmds()) #exit(0) #if cmd.name in ('do_restart','do_exit','do_abort') : ''' if cmd.name in ('do_restart','do_stop'): #print(cmd) self.DO_MAIN_LOOP = False if cmd.name == 'do_abort': self._abort_current_running_cmd_if_exists() self._cleanup_before_exit() if cmd.name != 'do_restart': self.DO_RESTART_LOOP = False self._set_status(self.AGT_STATUS.EXITING) # log last agent mode & status self._log_agent_state() #cmd.set_result('EXITING') ''' return cmd # CASE 2 - AGENT SPECIFIC command # => Simple action, short execution time, so I execute it directly (in current main thread, not in parallel) if self._is_agent_specific_cmd(cmd): #log.info("(AGENT LEVEL CMD)") if not self.IS_ATTENTIVE(): cmd.set_as_skipped("Skipped because I am not ATTENTIVE") return cmd try: self._process_agent_specific_cmd(cmd) #self._exec_agent_cmd(cmd) #except AttributeError as e: except (CmdUnimplementedException, CmdBadArgsException) as e: ##print(e) #self.log_e(f"EXCEPTION: Agent level specific command '{cmd.name}' unknown (not implemented as a function) :", e) #self.log_e("Thus => I ignore this command...") ##log.error(f"EXCEPTION: Agent specific command '{cmd.name}' unknown (not implemented as a function) :", e) #log.e("Thus => I ignore this command...") #cmd.set_result("ERROR: UNIMPLEMENTED AGENT SPECIFIC COMMAND", False) #cmd.set_as_pending() ##cmd.set_as_skipped("ERROR: UNIMPLEMENTED AGENT SPECIFIC COMMAND") ##self._cleanup_before_exit() ##raise AgentCmdUnimplementedException(cmd.name) # These exceptions are managed at higher level : raise return cmd # CASE 3 - OTHER level command (DEVsICE level, only for AgentDevice) if self.is_device_level_cmd(cmd): self.process_device_level_cmd(cmd) return cmd # CASE 4 - INVALID COMMAND #raise Exception("INVALID COMMAND: " + cmd.name) log.warning("******************************************************") log.warning("*************** ERROR: INVALID COMMAND (UNKNOWN) ***************") log.warning("******************************************************") #log.warning("Thus => I ignore this command...") #cmd.set_result("ERROR: INVALID COMMAND") ##cmd.set_as_skipped("ERROR: INVALID AGENT COMMAND") ##self._cleanup_before_exit() ##raise UnknownCmdException(cmd.name) raise CmdUnknownException(cmd) #print() #log.info("*"*10 + " NEXT COMMAND PROCESSING (END) " + "*"*10 + "\n") def _abort_current_running_cmd_if_exists(self): log.info("Aborting current running command if exists...") pass def _abort_current_routine_process_if_exists(self): log.info("Aborting current routine process if exists...") pass def _cleanup_before_exit(self, stopper_agent_name:str=None): if not stopper_agent_name: stopper_agent_name = self.name #self._set_status(self.STATUS_EXIT) ##self._log_agent_state() log.info("Trying to stop cleanly") log.info("Before exiting, Here are (if exists) the current (still) pending commands (time ordered) :") #commands = AgentCmd.get_commands_sent_to_agent(self.name) commands = AgentCmd.get_pending_and_running_commands_for_agent(self.name) AgentCmd.show_commands(commands, True) self.do_flush_commands() #if self.TEST_MODE and self.TEST_WITH_FINAL_TEST and self.TEST_COMMANDS_DEST == "myself": self.simulator_test_results() if self.TEST_MODE and self.TEST_WITH_FINAL_TEST: self._TEST_test_results() #self._DO_EXIT=True #exit(0) self.do_things_before_exit(stopper_agent_name) ##self._set_and_log_status(self.AGT_STATUS.EXITING) def print_TEST_MODE(self): if self.TEST_MODE: log.debug("[IN TEST MODE]") log.info("Flush previous commands to be sure to start in clean state") AgentCmd.delete_pending_commands_for_agent(self.name) else: log.debug("[IN NORMAL MODE]") self.TEST_MAX_DURATION_SEC=None def _purge_old_commands_sent_to_me(self): AgentCmd.purge_old_commands_sent_to_agent(self.name) def _routine_process_before(self): """ Routine processing BEFORE processing received commands. IMPORTANT : this processing must be as SHORT as possible, so that the agent can quickly read its received commands and process them This is a command or set of processings that this agent does or commands that it sends regularly, at each iteration """ print() print() log.info("*"*10+ " ROUTINE PROCESSING BEFORE (START) "+ "*"*10+ '\n') self._set_and_log_status(self.AGT_STATUS.IN_ROUTINE_BEF) self.routine_process_before_body() print() log.info("*"*10 + " ROUTINE PROCESSING BEFORE (END) "+ "*"*10) def _routine_process_after(self): """ Routine processing AFTER processing received commands. This processing can be longer than the "before" one above, as the agent has already processed its received commands, but not too long anyway, otherwise it will take too long before the next iteration starts... This is a command or set of processings that this agent does or commands that it sends regularly, at each iteration """ print() print() log.info("*"*10+ " ROUTINE PROCESSING AFTER (START) "+ "*"*10+ '\n') self._set_and_log_status(self.AGT_STATUS.IN_ROUTINE_AFT) self.routine_process_after_body() print() log.info("*"*10 + " ROUTINE PROCESSING AFTER (END) "+ "*"*10 + '\n') # To be overridden by subclasses def routine_process_before_body(self): pass # To be overridden by subclasses def routine_process_after_body(self): #if self.TEST_MODE: self._TEST_test_routine_process() pass """ def purge_commands(self): ### Delete commands (which I am recipient of) older than COMMANDS_PEREMPTION_HOURS (like 48h) ATTENTION !!! EXCEPT the RUNNING command !!! NB: datetime.utcnow() is equivalent to datetime.now(timezone.utc) ### self.printd("Looking for old commands to purge...") ### COMMAND_PEREMPTION_DATE_FROM_NOW = datetime.utcnow() - timedelta(hours = self.COMMANDS_PEREMPTION_HOURS) #self.printd("peremption date", COMMAND_PEREMPTION_DATE_FROM_NOW) old_commands = AgentCmd.objects.filter( # only commands for me recipient = self.name, # only pending commands sender_deposit_time__lt = COMMAND_PEREMPTION_DATE_FROM_NOW, ) ### old_commands = AgentCmd.get_old_commands_for_agent(self.name) if old_commands.exists(): self.printd("Found old commands to delete:") for cmd in old_commands: self.printd(cmd) old_commands.delete() """ def waitfor(self, nbsec:float=2.0): ''' # thread if self._current_device_cmd_thread and self.RUN_IN_THREAD: self._current_device_cmd_thread.wait(nbsec) # process (or main thread) else: time.sleep(nbsec) ''' log.info(f"Now, waiting for {nbsec} second(s)...") time.sleep(nbsec) def sleep(self, nbsec): self.waitfor(nbsec) def get_specific_cmds(self)->str: specific_commands = "" for command_tuple in self.AGENT_SPECIFIC_COMMANDS: cmd_name = command_tuple[0] #print(cmd_name) specific_commands += cmd_name # Exception if exists an unimplemented command try: f = getattr(self, cmd_name) except AttributeError: raise CmdUnimplementedException(cmd_name) from None args = signature(f) #specific_commands += str(args) specific_commands += "(" #print(specific_commands) #print(args.parameters) for arg in args.parameters: arg = args.parameters[arg] #print(arg) #print("name, annot:", arg.name, arg.annotation) #print("annot.name:", arg.annotation.__name__) #print(type(arg.annotation)) arg_type = arg.annotation.__name__ if isinstance(arg.annotation,type) else str(arg.annotation) specific_commands += arg.name+":"+arg_type+"," # specific_commands += str(args) if args.parameters: specific_commands = specific_commands[:-1] specific_commands += ")" specific_commands += ";" if specific_commands: specific_commands = specific_commands[0:-1] return specific_commands # # MODE & STATUS # def show_current_mode_and_status(self): #log.info(f"CURRENT MODE is {self.mode} (status is {self.status})") log.info(f"CURRENT MODE is {self.get_mode()} (status is {self.get_status()})") # @deprecated def show_mode_and_status(self): self.show_current_mode_and_status() # # - STATUS MANAGEMENT # def get_status(self): return self.status # @deprecated def get_state(self): return self.get_status() def _set_status(self, status:str): #self.printd(f"[{status}] (switching from status {self.status})") log.debug(f"[{status}]") self.status = status return False def _set_and_log_status(self, status:str): self._set_status(status) self._log_agent_state() # # - MODE MANAGEMENT # def get_mode(self): return self.mode # Test mode def IS_MODE_IDLE(self): return self.get_mode() == AgentSurvey.MODE_CHOICES.IDLE def IS_MODE_ROUTINE(self): return self.get_mode() == AgentSurvey.MODE_CHOICES.ROUTINE def IS_MODE_ATTENTIVE(self): return self.get_mode() == AgentSurvey.MODE_CHOICES.ATTENTIVE # @deprecated def IS_IDLE(self): return self.IS_MODE_IDLE() # @deprecated def IS_ROUTINE(self): return self.IS_MODE_ROUTINE() # @deprecated def IS_ATTENTIVE(self): return self.IS_MODE_ATTENTIVE() def _set_mode(self, mode:str): #self.printd(f"Switching from mode {self.mode} to mode {mode}") log.info(f"[NEW MODE {mode}]") self.mode = mode def _set_mode_idle(self): self._set_mode(AgentSurvey.MODE_CHOICES.IDLE) # @deprecated def set_idle(self): self._set_mode_idle() def _set_mode_routine(self): self._set_mode(AgentSurvey.MODE_CHOICES.ROUTINE) # @deprecated def set_routine(self): self._set_mode_routine() def _set_mode_attentive(self): #self._set_mode(AgentSurvey.MODE_ATTENTIVE) self._set_mode(AgentSurvey.MODE_CHOICES.ATTENTIVE) # @deprecated def set_attentive(self): self._set_mode_attentive() ''' def die(self): self._set_status(AgentSurvey.STATUS_EXIT) ''' """ suspend/resume """ def suspend(self): """ TODO: Mode IDLE (doit rester à l'écoute d'un resume, et doit continuer à alimenter les tables pour informer de son état via tables agents_logs, et lire table agents_command pour reprendre via resume, et update la table agents_survey pour donner son status "idle" """ self._set_idle() return True def resume(self): """ Quit suspend() mode """ self._set_active() return True ''' (EP) moved to AgentDevice def _set_agent_device_aliases_from_config(self, agent_alias): for a in self._my_client_agents_aliases: # TODO: activer ##self._my_client_agents[a] = self.config.get_paramvalue(a,'general','real_agent_device_name') pass ''' # new config (obsconfig) def _set_mode_from_config(self, agent_name): # all agent are active ? #mode = self.MODE_ACTIVE #mode = self.MODE_ATTENTIVE self._set_mode_attentive() return True # old config # def _set_mode_from_config(self, agent_name): # # --- Get the startmode of the AgentX # modestr = self.config.get_paramvalue(agent_name,'general','startmode') # if self.config.get_last_errno() != self.config.NO_ERROR: # raise Exception(f"error {str(self.config.get_last_errno())}: {str(self.config.get_last_errmsg())}") # if (modestr == None): # return True # # --- Set the mode according the startmode value # mode = self.MODE_IDLE # if modestr.upper() == 'RUN': # mode = self.MODE_ACTIVE # self._set_mode(mode) # return True def set_delay(self, delay_nb_sec:int): self._DELAY_NB_SEC = delay_nb_sec """ ======================================================================================= Generic methods that may be specialized (overriden) by subclasses (except if private) ======================================================================================= """ # To be overridden by subclasses def init(self): #log.debug("*** Initializing... ***") log.info("*"*10+ " INITIALIZING... "+ "*"*10+ '\n') self._set_and_log_status(self.AGT_STATUS.INITIALIZING) if self.TEST_MODE: self.set_delay(2) def _reload_config_if_changed(self): self._load_config() def _load_config(self): """ TODO: only si date fichier xml changée => en RAM, un objet Config avec méthodes d'accès, appelle le parser de AK (classe Config.py indépendante) """ ''' # SETUP try: self.config = get_object_or_404(Config, id=1) # By default, set mode to SCHEDULER (False = REMOTE, which should never be the default) self.config.global_mode = True self.config.save() # self.config = Config.objects.get(pk=1) # self.config = Config.objects.get()[0] except Exception as e: # except Config.ObjectDoesNotExist: self.printd("Config read (or write) exception", str(e)) return -1 ''' log.debug("Loading the config file...") # - NEW CONFIG self.get_config().load(self.get_config_env()[0]) # - OLD CONFIG # self.config.load() # if self.config.get_last_errno() != self.config.NO_ERROR: # raise Exception(f"error {str(self.config.get_last_errno())}: {str(self.config.get_last_errmsg())}") # if not self.config.is_config_contents_changed(): # return # === display informations # --- Get the assembly aliases of the unit # assembled_mount_aliases = [] # assembled_channel_aliases = [] # assembled_aliases = [] # unit_alias = self.config.get_aliases('unit')[0] # params = self.config.get_params(unit_alias) # for param in params: # if param['section']=="assembly" and param['key']=="alias": # assembled_aliases.append(param['value']) #self.printd(f"Unit {unit_alias} is the assembly of {assembled_aliases}") # log.debug("--------- Components of the unit -----------") # log.debug("Configuration file is {}".format(self.config.get_configfile())) # alias = self.config.get_aliases('unit')[0] # namevalue = self.config.get_paramvalue(alias,'unit','description') # log.debug(f"Unit alias is {alias}. Description is {namevalue}:") # unit_subtags = self.config.get_unit_subtags() # for unit_subtag in unit_subtags: # aliases = self.config.get_aliases(unit_subtag) # for alias in aliases: # namevalue = self.config.get_paramvalue(alias,unit_subtag,'description') # log.debug(f"- {unit_subtag} alias is {alias}. Description is {namevalue}") # # --- fill the list of mount and channel assembled # if alias in assembled_aliases: # if unit_subtag=="mount": # assembled_mount_aliases.append(alias) # elif unit_subtag=="channel": # assembled_channel_aliases.append(alias) # log.debug("--------- Assembly of the unit -----------") # log.debug(f"Assembled mount aliases: {assembled_mount_aliases}") # log.debug(f"Assembled channel aliases: {assembled_channel_aliases}") # --- Get the home of the mount[0] # mount_alias = assembled_mount_aliases[0] # home = self.config.get_paramvalue(mount_alias,'MountPointing','home') # log.debug("------------------------------------------") # hostname = socket.gethostname() # self._computer_alias = '' # unit_subtag = 'computer' # aliases = self.config.get_aliases(unit_subtag) # for alias in aliases: # log.debug("alias" + alias) # value = self.config.get_paramvalue(alias,'local','hostname') # log.debug("value" + value) # if value == hostname: # log.debug("value" + value) # self._computer_alias = alias # value = self.config.get_paramvalue(alias,unit_subtag,'description') # self._computer_description = value # value = self.config.get_paramvalue(alias,'path','data') # # Overrides default value # self._path_data = value # break # log.debug(f"hostname = {hostname}") # log.debug(f"path_data = {self._path_data}") # log.debug(f"home = {home}") # log.debug("------------------------------------------") # --- update the log parameters ##self.log.path_data = self._path_data ##print("new self.log.path_data is", self.log.path_data) ##self.log.set_global_path_data(self._path_data) ##self.printd("new self.log.global_path_data is", self.log.get_global_path_data()) ##self.log.home = home #def update_survey(self): def _log_agent_state(self): """ Save (update) this agent current mode, status, and #iteration in DB """ "Updating the agent survey database table..." #self.printd("- fetching table line for agent", self.name) # only necessary when using process (not necessary with threads) #with transaction.atomic(): #self._agent_survey = AgentSurvey.objects.get(name=self.name) self._agent_survey.mode = self.get_mode() self._agent_survey.status = self.get_status() self._agent_survey.iteration = self._iter_num self._agent_survey.save() #self._agent_survey.save(update_fields=["mode", "status"]) """ def send_command(self, cmd_name): recipient_agent = self.name if self.TEST_COMMANDS_DEST=="myself" else self.TEST_COMMANDS_DEST AgentCmd.objects.create(sender=self.name, recipient=recipient_agent, name=cmd_name) """ #def send_command(self, to_agent, cmd_type, cmd_name, cmd_args=None): #def send_cmd_to(self, to_agent:str, cmd_name:str, cmd_args:str=None, validity:int=None, timeout:int=None): def send_cmd_to(self, to_agent:str, cmd_name:str, cmd_args:str=None, validity:int=None): """ #ex: send_command(“AgentX”,”GENERIC”,”EVAL”,“3+4”) ex: send_command(“AgentX”,"EVAL”,“3+4”) """ #return AgentCmd.send_cmd_from_to(self.name, self._get_real_agent_name_for_alias(to_agent), cmd_name, cmd_args) #cmd = self.create_cmd_for(to_agent, cmd_name, cmd_args, validity, timeout).send() cmd = self.create_cmd_for(to_agent, cmd_name, cmd_args, validity).send() #cmd.send() return cmd def create_cmd_for(self, to_agent:str, cmd_name:str, cmd_args:str=None, validity:int=None) -> AgentCmd: ''' real_agent_name = self._get_real_agent_name(to_agent) real_cmd_name = cmd_name if '.' in real_agent_name: real_agent_name, component_name = real_agent_name.split('.') real_cmd_name = component_name+'.'+cmd_name return AgentCmd.create(self.name, real_agent_name, real_cmd_name, cmd_args) try: real_agent_name = self._get_real_agent_name(to_agent) except KeyError as e: ''' return AgentCmd.create(self.name, to_agent, cmd_name, cmd_args, validity) ''' real_agent_name = self._get_real_agent_name(to_agent) if not real_agent_name: return AgentCmd.create(self.name, to_agent, cmd_name, cmd_args) # log.e("UNKNOWN AgentDevice ALIAS", to_agent) # #self.log_e("Exception raised", e) # log.e(f"=> Thus, I do not send this command '{cmd_name}'") # return None return AgentCmd.create(self.name, real_agent_name, cmd_name, cmd_args) ''' ''' return AgentCmd( sender=self.name, recipient=self._get_real_agent_name_for_alias(recipient_agent_alias_name), name=cmd_name ) ''' def _get_next_valid_and_not_running_command(self) -> Union[AgentCmd,None]: """ Return next VALID (not expired) command (read from the DB command table) which is relevant to this agent. Commands are read in chronological order """ self._set_status(self.AGT_STATUS.IN_GET_NEXT_CMD) log.info("Looking for a new command to process (sent by another agent):") # 1) Get all pending commands for me (return if None) # Not sure this is necessary to do it in a transaction, # but there might be a risk # that a command status is modified while we are reading... with transaction.atomic(): self._pending_commands = AgentCmd.get_pending_and_running_commands_for_agent(self.name) commands = self._pending_commands if not commands.exists(): log.info("") return None "Current pending (or running) commands (time ordered):" AgentCmd.show_commands(commands) # 2) Check if a PRIORITY command is in the list (even at the very end), # and if so return this command # (must be still valid and not yet running) for cmd in commands: #if cmd.name in ("do_exit", "do_abort", "do_flush_commands"): break #if cmd.name in ("do_exit", "do_abort"): break if cmd.is_agent_general_priority_cmd(): if cmd.is_running(): return None if cmd.is_expired(): cmd.set_as_expired() return None return cmd ''' #if cmd.name in ("do_exit", "do_abort", "do_flush_commands"): if cmd.name in ("do_exit", "do_abort"): if cmd.is_running(): return None if cmd.is_expired(): cmd.set_as_outofdate() return None return cmd ''' # 3) If first (oldest) command is currently running # (status CMD_RUNNING), then do nothing and return """ cmd_executing = Command.objects.filter( # only commands for me recipient = self.name, # only pending commands recipient_status_code = Command.CMD_STATUS_CODES.CMD_RUNNING, ).first() #if cmd_executing.exists(): if cmd_executing: """ #cmd = commands[0] cmd = commands.first() if cmd.is_expired(): cmd.set_as_expired() return None if cmd.is_running(): #self.printd(f"There is currently a running command ({cmd_executing.first().name}), so I do nothing (wait for end of execution)") log.info(f"There is currently a running command ({cmd.name})") """ # Check that this command is not expired if cmd.is_expired(): self.printd("But this command is expired, so set its status to OUTOFDATE, and go on") cmd_executing.set_as_outofdate() else: """ log.info(f"Thus, I won't execute any new command until this command execution is finished") # TODO: kill si superieur a MAX_EXEC_TIME return None ''' # 4) Tag all expired commands for cmd in commands: if cmd.is_expired(): cmd.set_as_outofdate() # break at 1st "valid" command (not expired) else: break # 5) If no more commands to process, return None if cmd.is_expired(): return None ''' # 6) Current cmd must now be a valid (not expired) and PENDING one, # so return it for execution #self.printd(f"Got command {cmd.name} sent by agent {cmd.sender} at {cmd.sender_deposit_time}") #self.printd(f"Starting processing of this command") return cmd ''' #def _exec_agent_general_cmd(self, cmd:Command): def _exec_agent_cmd(self, cmd:AgentCmd): #self.print(f"Starting execution of an AGENT LEVEL cmd {cmd}...") log.info(f"Starting execution of an AGENT LEVEL cmd...") # Update read time to say that the command has been READ cmd.set_read_time() cmd.set_as_running() # SPECIFIC command (only related to me, not to any agent) if self._is_agent_specific_cmd(cmd): log.info("(Agent level SPECIFIC cmd)") # Execute method self."cmd.name"() # This can raise an exception (caught by this method caller) self.exec_cmd_from_its_name(cmd) #'' try: except AttributeError as e: self.printd(f"EXCEPTION: Agent level specific command '{cmd.name}' unknown (not implemented as a function) :", e) self.printd("Thus => I ignore this command...") cmd.set_result("ERROR: INVALID AGENT LEVEL SPECIFIC COMMAND") cmd.set_as_pending() cmd.set_as_skipped() return #'' cmd.set_result("Agent level SPECIFIC cmd done") cmd.set_as_processed() log.info("...Agent level SPECIFIC cmd has been executed") return # GENERAL command (related to any agent) log.info("(Agent level GENERAL CMD)") _,cmd_name,cmd_args = cmd.get_full_name_parts() #cmd_name, cmd_args = cmd.tokenize() #if cmd.name == "set_state:active": #elif cmd.name == "set_state:idle": if cmd_name == "set_state": if not cmd_args: raise ValueError() state = cmd_args[0] if state == "active": self._set_active() if state == "idle": self._set_idle() cmd.set_result("I am now " + state) #time.sleep(1) self.waitfor(1) elif cmd_name in ("do_flush_commands"): "flush_commands received: Delete all pending commands" AgentCmd.delete_pending_commands_for_agent(self.name) cmd.set_result('DONE') elif cmd_name in ("do_abort", "do_exit", "do_restart_init"): #self.printd("Current pending commands are:") #Command.show_commands(self._pending_commands) log.info("Aborting current executing command if exists:") #self._kill_running_device_cmd_if_exists(cmd.sender) self.do_things_before_exit(cmd.sender) if cmd_name == "do_restart_init": log.info("restart_init received: Restarting from init()") self._DO_RESTART=True elif cmd.name == "do_exit": self._DO_EXIT=True cmd.set_result('SHOULD BE DONE NOW') else: #'' name = cmd.name args = None if " " in name: name,args = name.split() if name == "do_eval": if args==None: raise(ValueError) cmd.set_result(eval(args)) #'' if cmd_name == "do_eval": if not cmd_args: raise ValueError() cmd.set_result(eval(cmd_args)) cmd.set_as_processed() log.info("...Agent level GENERAL cmd has been executed") # If cmd is "do_exit", kill myself (without any question, this is an order soldier !) # This "do_exit" should normally kill any current thread (to be checked...) if cmd.name == "do_exit": log.info("Before exiting, Here are (if exists) the current (still) pending commands (time ordered) :") commands = AgentCmd.get_pending_and_running_commands_for_agent(self.name) AgentCmd.show_commands(commands, True) #if self.TEST_MODE and self.TEST_WITH_FINAL_TEST and self.TEST_COMMANDS_DEST == "myself": self.simulator_test_results() if self.TEST_MODE and self.TEST_WITH_FINAL_TEST: self._TEST_test_results() #self._DO_EXIT=True #exit(0) ''' def _process_agent_general_cmd(self, cmd:AgentCmd): # GENERAL command (related to any agent) #self.print(f"Starting execution of an AGENT LEVEL cmd {cmd}...") log.info(f"Starting execution of an AGENT GENERAL cmd...") # Update read time to say that the command has been READ #cmd.set_read_time() cmd.set_as_running() log.info("(Agent level GENERAL CMD)") cmd_name,cmd_args = cmd.name_and_args #cmd_name, cmd_args = cmd.tokenize() #if cmd.name == "set_state:active": #elif cmd.name == "set_state:idle": # Default result (null) result = None #if cmd_name in ("do_abort", "do_exit", "do_restart_init"): if cmd_name in ("do_stop", "do_restart", #@ deprecated "do_abort", "do_exit", ): #self.printd("Current pending commands are:") #Command.show_commands(self._pending_commands) log.info("Stopping or Restarting...") #self._kill_running_device_cmd_if_exists(cmd.sender) ''' self.do_things_before_exit(cmd.sender) if cmd_name == "do_restart_init": log.info("restart_init received: Restarting from init()") self._DO_RESTART=True elif cmd.name == "do_exit": self._DO_EXIT=True cmd.set_result('SHOULD BE DONE NOW') ''' #result = "RESTARTING" if cmd_name == "do_restart_loop" else "STOPPING" if cmd_name == "do_exit": cmd.full_name = "do_stop asap" elif cmd_name == "do_abort": cmd.full_name = "do_stop now" when = 'asap' if not cmd.args else cmd.args[0] result = self.do_stop_or_restart(False if cmd_name=='do_stop' else True, when) elif cmd_name == "get_state": result = "I am now " + self.get_state() elif cmd_name == "get_mode": result = "MODE is " + self.get_mode() elif cmd_name == "set_mode": #if not cmd_args: raise ValueError() if not cmd_args: raise CmdBadArgsException(cmd) mode = cmd_args[0] if mode == "IDLE": self.set_idle() elif mode == "ROUTINE": self.set_routine() elif mode == "ATTENTIVE": self.set_attentive() else: raise CmdBadArgsException(cmd) #cmd.set_result("I am now " + state) result = "MODE is " + mode #time.sleep(1) #self.waitfor(1) #elif cmd_name in ("do_flush_commands"): elif cmd_name == "do_flush_commands": "flush_commands received: Delete all pending commands" self.do_flush_commands() #cmd.set_result('DONE') result = "FLUSH DONE" elif cmd_name == "do_eval": #if not cmd_args: raise ValueError() if not cmd_args: raise CmdBadArgsException(cmd) #cmd.set_result(eval(cmd_args)) #result = eval(cmd_args) result = self.do_eval(cmd_args[0]) elif cmd_name == "get_specific_cmds": # CmdUnimplementedException if one specific command (in the list) is unknown try: result = self.get_specific_cmds() except CmdUnimplementedException as e: # raise CmdUnimplementedException("get_specific_cmds.unknown_cmd_name") cmd.set_as_exec_error("EXCEPTION - One specific cmd is unimplemented: "+e.cmd_name) raise CmdExecErrorException(cmd) from None cmd.set_as_processed(result) log.info("...Agent level GENERAL cmd has been executed") ''' # If cmd is "do_exit", kill myself (without any question, this is an order soldier !) # This "do_exit" should normally kill any current thread (to be checked...) if cmd.name == "do_exit": log.info("Before exiting, Here are (if exists) the current (still) pending commands (time ordered) :") commands = AgentCmd.get_pending_and_running_commands_for_agent(self.name) AgentCmd.show_commands(commands, True) #if self.TEST_MODE and self.TEST_WITH_FINAL_TEST and self.TEST_COMMANDS_DEST == "myself": self.simulator_test_results() if self.TEST_MODE and self.TEST_WITH_FINAL_TEST: self._TEST_test_results() #self._DO_EXIT=True #exit(0) ''' def _process_agent_specific_cmd(self, cmd:AgentCmd): #self.print(f"Starting execution of an AGENT LEVEL cmd {cmd}...") log.info(f"Starting execution of an AGENT level SPECIFIC cmd...") # Update read time to say that the command has been READ #cmd.set_read_time(False) ##cmd.set_as_running() log.info("(Agent SPECIFIC cmd)") # Execute method self."cmd.name"() # This can raise an exception (caught by this method caller) try: res = self._exec_cmd_from_its_name(cmd) except (CmdUnimplementedException, CmdBadArgsException) as e: ##cmd.set_as_pending() # These exceptions are managed at higher level : raise ''' except AttributeError as e: self.printd(f"EXCEPTION: Agent level specific command '{cmd.name}' unknown (not implemented as a function) :", e) self.printd("Thus => I ignore this command...") cmd.set_result("ERROR: INVALID AGENT LEVEL SPECIFIC COMMAND") cmd.set_as_pending() cmd.set_as_skipped() return ''' #cmd.set_result("Agent level SPECIFIC cmd done") #res = res if res else "Agent SPECIFIC cmd done" cmd.set_as_processed(res) log.info("...Agent SPECIFIC cmd has been executed") ''' def do_log(self): #"" log à 2 endroits ou 1 seul - in file - in db #"" self.printd("Logging data...") ''' def _exec_cmd_from_its_name(self, cmd:AgentCmd)->Any: #print(dir(self)) ''' for method in dir(self): if callable(getattr(self, method)): m = getattr(self, method) print(method, ' => ', signature(m)) ''' #print("***************") #print(self.get_specific_cmds()) #print("***************") methods_list = [method for method in dir(self) if callable(getattr(self, method))] #print(methodsList) func = cmd.name if func not in methods_list: raise CmdUnimplementedException(cmd) ##f = getattr(self, func) ###print(func, ' => ', signature(f)) #for arg in cmd.args: print(arg) #print(cmd.args) #print(*cmd.args) ''' try: if not cmd.args: # equivalent to calling self.func() return getattr(self, func)() else: args = [] # Convert all args to their real type for arg in cmd.args: #print(arg) #try: # Evaluate arg only if it is not a word (letters) if not arg[0].isalpha(): arg = ast.literal_eval(arg) #except ValueError as e: newarg = arg args.append(arg) #print(args) # equivalent to calling self.func(*cmd.args) return getattr(self, func)(*args) ''' args = [] # Convert all args to their real type for arg in cmd.args: #print(arg) #try: # Evaluate arg only if it is not a word (letters) : # - a word like "toto" is not evaluated => stays as is (=str) # Are evaluated : # - an int or float # - a (tuple) # - a [list] # - ... #print('********', arg, " :") if not arg[0].isalpha(): arg = ast.literal_eval(arg) #print("evaluated to", type(arg), arg) #except ValueError as e: newarg = arg args.append(arg) cmd.set_as_running() try: # equivalent to calling self.func(*cmd.args) return getattr(self, func)(*args) except (TypeError, AttributeError, ValueError) as e: # set back to PENDING because this command should never has been RUNNING cmd.set_as_pending() #raise e # "from None" pour ne pas afficher l'exception AttributeError (car interne) raise CmdBadArgsException(cmd) from None #print("I know this specific command but it is not yet implemented : ", func) ##tb = sys.exc_info()[2] ##raise AgentCmdUnimplementedException(cmd).with_traceback(tb) ##raise AgentCmdUnimplementedException(cmd).with_traceback(None) def _is_agent_level_cmd(self, cmd:AgentCmd): return self._is_agent_general_cmd(cmd) or self._is_agent_specific_cmd(cmd) def _is_agent_general_cmd(self, cmd:AgentCmd): return cmd.is_agent_general_cmd() ''' def _exec_agent_cmd(self, cmd:Command): # AGENT "GENERAL LEVEL" command # => I process it directly without asking my DC # => Simple action, short execution time, so I execute it directly (in current main thread, not in parallel) if cmd.is_agent_level_general_cmd(): self.printd("********** -- AGENT LEVEL GENERAL CMD *********") self._exec_agent_general_cmd(cmd) # AGENT "SPECIFIC LEVEL" command # => I process it directly without asking my DC # => Simple action, short execution time, so I execute it directly (in current main thread, not in parallel) #elif self._is_agent_level_specific_cmd(cmd): else: self.printd("********** -- AGENT LEVEL SPECIFIC CMD *********") self._exec_agent_specific_cmd(cmd) ''' def _is_agent_specific_cmd(self, cmd:AgentCmd): #return cmd.name in self.AGENT_SPECIFIC_COMMANDS #return (cmd.name,) in self.AGENT_SPECIFIC_COMMANDS for (cmd_name,_,_) in self.AGENT_SPECIFIC_COMMANDS: if cmd.name == cmd_name : return True ''' def _exec_agent_specific_cmd(self, cmd:Command): # Execute method self."cmd.name"() self.exec_cmd_from_its_name(cmd) ''' def is_in_test_mode(self): return self.TEST_MODE ### # ================================================================================================ # AGENT GENERAL FUNCTIONS # ================================================================================================ ### def do_eval(self, eval_str:str): return eval(eval_str) def do_flush_commands(self): AgentCmd.delete_pending_commands_for_agent(self.name) def do_exec_command(self, what:str): # - Temporaly stop execution of new commands (let them accumulate) if what == "stop": pass # - Resume execution of commands (accumulated since "do_stop_exec_cmd") if what == "resume": pass # NOT PRIO if what == "noprio": pass # Bad arg raise CmdBadArgsException(self.CURRENT_CMD) # Stop currently running cmd or routine def do_stop_current(self, what:str): if what == "cmd": self.do_stop_current_cmd() if what == "routine": self.do_stop_current_routine() if what == "both": self.do_stop_current_cmd() self.do_stop_current_routine() # Bad arg raise CmdBadArgsException(self.CURRENT_CMD) def do_stop_current_cmd(self): pass def do_stop_current_routine(self): pass def do_exit(self): self.do_stop("asap"); def do_abort(self): self.do_stop("now"); # Stop agent asap or now def do_stop(self, when:str='asap'): self.do_stop_or_restart(False, when) def do_restart(self, when:str='asap'): self.do_stop_or_restart(True, when) def do_stop_or_restart(self, restart:bool=False, when:str='asap'): if when not in ('asap','now','noprio'): raise CmdBadArgsException(self.CURRENT_CMD) # NOT PRIO if when == "noprio": pass # log last agent mode & status self._set_and_log_status(self.AGT_STATUS.RESTARTING if restart else self.AGT_STATUS.EXITING) #self._log_agent_state() # Exit main loop self.DO_MAIN_LOOP = False # PRIO if when == 'asap': self._cleanup_before_exit() elif when == 'now': self._abort_current_running_cmd_if_exists() self._abort_current_routine_process_if_exists() if restart: self.DO_RESTART_LOOP = True # cmd result return ('RESTARTING ' if restart else 'STOPPING ') + when ### # ================================================================================================ # AGENT SPECIFIC COMMANDS (functions) # (here just to serve as examples) # ================================================================================================ ### #def do_specific1(self, arg1:int, arg2:int=2, arg3:int=3) -> int: #def do_specific1(self, arg1:int, arg2:int=2, arg3:float=3.1, arg4:list=[1,2,3]) -> float: def do_specific1(self, arg1:int, arg2:int, arg3:float=3.1, arg4:str='toto', arg5:Tuple[int,str,int]=(1,'toto',3), #arg5:Tuple[int,int,int]=(1,2,3), arg6:List[int]=[] ) -> float: ''' arg1 = int(arg1) arg2 = int(arg2) arg3 = float(arg3) arg5 = ast.literal_eval(arg5) print(arg5[1]) arg6 = ast.literal_eval(arg6) ''' #print(arg4) res = arg1 + arg2 + arg3 + arg5[0] + arg5[2] if arg6: res += arg6[0] return res # Undefined specific cmd #def set_specific2(self, arg1:str, arg2:int): pass def do_specific3(self): pass ### # ================================================================================================ # DEVICE SPECIFIC FUNCTIONS (abstract for Agent, overriden and implemented by AgentDevice) # ================================================================================================ ### # To be overriden by subclass (AgentDevice...) # @abstract def is_device_level_cmd(self, cmd): return False ''' # to be overriden by subclass (AgentDevice) # @abstract def exec_device_cmd_if_possible(self, cmd:AgentCmd): pass # TO BE OVERRIDEN by subclass (AgentDevice) # @abstract def exec_device_cmd(self, cmd:AgentCmd): #self.exec_cmd_from_its_name(cmd) pass ''' """ ================================================================= TEST DEDICATED FUNCTIONS ================================================================= """ def _set_debug_mode(self, mode:bool): self.DEBUG_MODE=mode def _set_with_simulator(self, mode:bool): self.WITH_SIMULATOR=mode def _set_test_mode(self, mode:bool): self.TEST_MODE = mode if self.TEST_MODE: log.info("in TEST MODE") def _TEST_get_next_command_to_send(self)->AgentCmd: #cmd_full_name, validity, res_expected, after_status = next(self.TEST_COMMANDS, (None,None,None,None)) cmd_full_name, validity, expected_res, expected_status = next(self.TEST_COMMANDS, (None,None,None,None)) #print(expected_final_status) #print(cmd_full_name, res_expected) #return cmd_name if cmd_full_name is None: return None # Remove excessive spaces import re cmd_full_name = re.sub(r"\s+", " ", cmd_full_name).strip() if ' ' not in cmd_full_name: raise Exception('Command is malformed:', cmd_full_name) agent_recipient, cmd_name_and_args = cmd_full_name.split(' ', 1) if agent_recipient == 'self': agent_recipient = self.name cmd_name, cmd_args = cmd_name_and_args, None if ' ' in cmd_name_and_args: cmd_name,cmd_args = cmd_name_and_args.split(' ', 1) cmd = self.create_cmd_for(agent_recipient, cmd_name, cmd_args, validity) cmd.expected_res = expected_res cmd.expected_status = expected_status # If no cmd created (because of error, bad AgentDevice name), call again this method for next cmd #if cmd is None: return self._TEST_get_next_command_to_send() return cmd """ def simulator_send_next_command(self): #self._current_test_cmd = "set_state:idle" if self._current_test_cmd=="set_state:active" else "set_state:active" #if self._nb_test_cmds == 4: self._current_test_cmd = "do_exit" cmd_name = next(self.TEST_COMMANDS, None) #self.printd("next cmd is ", cmd_name) if cmd_name is None: return #Command.objects.create(sender=self.name, recipient=self.name, name=cmd_name) recipient_agent = self.name if self.TEST_COMMANDS_DEST=="myself" else self.TEST_COMMANDS_DEST Command.objects.create(sender=self.name, recipient=recipient_agent, name=cmd_name) #time.sleep(1) #self._TEST_current_cmd_idx += 1 #self._nb_test_cmds += 1 """ def _TEST_check_cmd_res_and_status(self, cmd:AgentCmd): if not self.is_in_test_mode(): return if not cmd: return log.debug("*** CHECK ***") #if hasattr(self._cmdts,'expected_res'): log.debug(cmd.result) log.debug(self._cmdts.expected_res) ##if cmd.is_executed() and self._cmdts.expected_res: if cmd.is_finished() and self._cmdts.expected_res: actual=str(cmd.result) expected=self._cmdts.expected_res #print(actual, ' <= vs =>', expected) assert actual==expected, f"Cmd result (='{actual}') is not as expected (='{expected}')" log.debug(cmd.state) log.debug(self._cmdts.expected_status) #if hasattr(self._cmdts,'expected_status'): if self._cmdts.expected_status: #assert(cmd.state == self._cmdts.expected_status) actual=cmd.state expected=self._cmdts.expected_status #print(actual, ' <= vs =>', expected) assert actual==expected, f"Cmd status (='{actual}') is not as expected (='{expected}')" def _TEST_test_routine_process(self): """ TEST MODE ONLY Send next command from scenario defined in TEST_COMMANDS_LIST (only if previous command finished) """ if not self.is_in_test_mode(): return log.info("(TEST mode) Trying to send a new command if possible...") # There is a current command being processed # => check if next command is "do_abort" # => if so, instantly send a "do_abort" to abort previous command if self._cmdts is not None: log.info(f"Waiting for end execution of cmd '{self._cmdts.name}' (sent to {self._cmdts.recipient}) ...") # Update cmdts fields from DB self._cmdts.refresh_from_db() # Current cmd is pending or running if self._cmdts.is_pending() or self._cmdts.is_running(): if self._next_cmdts is None: # If next command is "do_abort" then abort becomes the new current command (to be sent) self._next_cmdts = self._TEST_get_next_command_to_send() if self._next_cmdts and self._next_cmdts.name == "do_abort": # Wait a little to give a chance to agentB to start execution of current command, # so that we can abort it then (otherwise it won't be aborted!!) #time.sleep(4) self.waitfor(4) self._cmdts = self._next_cmdts self._next_cmdts = None log.info("***") #log.info(f"*** SEND ", self._cmdts) log.info(f"***" + str(self._cmdts)) log.info("***") self._cmdts.send() # Current cmd is no more running else: # Execution was not completed #if self._cmdts.is_expired() or self._cmdts.is_skipped() or self._cmdts.is_killed(): ##if self._cmdts.is_skipped() or self._cmdts.is_killed(): if self._cmdts.is_finished_with_error(): log.info("Command was not completed") # 2 possible scenarios: # - (1) Send the SAME command again ''' self.printd("Command was not completed, so I send it again") # The command was not completed, so, make a copy of it and send it again # For this, it is enough to set primary key to None, # then the send() command below will save a NEW command #self._cmdts = copy.copy(self._cmdts) self._cmdts.id = None SEND_A_NEW_COMMAND = True ''' # - (2) Send next command #self._cmdts = None # Execution was completeted OK => get result elif self._cmdts.is_executed(): cmdts_res = self._cmdts.get_result() log.info(f"Cmd executed. Result is '{cmdts_res}'") #cmdts_is_processed = True ''' Optimisation possible pour gagner une iteration: self._cmdts = self.simulator_get_next_command_to_send() # No more command to send (from simulator) => return if self._cmdts is None: return SEND_A_NEW_COMMAND = True ''' # Set cmdts to None so that a new command will be sent at next iteration self._cmdts = None # No currently running command => get a new command and SEND it if self._cmdts is None: if self._next_cmdts is not None: self._cmdts = self._next_cmdts self._next_cmdts = None else: self._cmdts = self._TEST_get_next_command_to_send() # No more command to send (from simulator) => return and EXIT if self._cmdts is None: self.DO_MAIN_LOOP = False return # Send cmd (= set as pending and save) log.info("***") #self.printd(f"*** SEND ", self._cmdts) log.info(f"*** NEW COMMAND TO SEND is: " + str(self._cmdts)) if hasattr(self._cmdts,"expected_res") : log.info(f"*** (with expected final status & result : " + "'" + str(self._cmdts.expected_status) + "' & '" + str(self._cmdts.expected_res) + "')") log.info("***") #self._cmdts.set_as_pending() # SEND self._cmdts.send() log.info(f"*** NEW COMMAND SENT") #cmdts_is_processed = False #cmdts_res = None def _TEST_test_results(self): if not (self.TEST_MODE and self.TEST_WITH_FINAL_TEST): return if self.TEST_COMMANDS_LIST == []: return nb_commands_to_send = len(self.TEST_COMMANDS_LIST) nb_commands_sent, commands = self._TEST_test_results_start() #nb_commands_to_send = len(self.TEST_COMMANDS_LIST) # General (default) test #self.printd(commands[0].name, "compared to", self.TEST_COMMANDS_LIST[0].split()[1]) assert commands[0].name == self.TEST_COMMANDS_LIST[0].split()[1] last_cmd = commands[-1] assert last_cmd.name == self.TEST_COMMANDS_LIST[-1].split()[1] assert last_cmd.name == "do_exit" assert last_cmd.is_executed() ##assert last_cmd.get_result() == "SHOULD BE DONE NOW" nb_asserted = 0 nb_agent_general = 0 nb_unknown = 0 nb_unimplemented = 0 for cmd in commands: ##assert cmd.is_executed() or cmd.is_killed() or cmd.is_skipped() assert cmd.is_finished() nb_asserted += 1 if cmd.is_agent_general_cmd(): nb_agent_general += 1 if cmd.name == "do_unknown": assert cmd.is_skipped() #assert "UnimplementedGenericCmdException" in cmd.get_result() assert "INVALID COMMAND" in cmd.get_result() nb_unknown += 1 #if cmd.name in ["do_unimplemented", "do_unknown"]: if cmd.name == "do_unimplemented": assert cmd.is_skipped() assert "UnimplementedGenericCmdException" in cmd.get_result() nb_unimplemented += 1 assert nb_asserted == nb_commands_sent log.info(f"{nb_commands_to_send} cmds I had to send <==> {nb_asserted} cmds executed (or killed), {nb_commands_to_send-nb_commands_sent} cmd ignored") log.info("Among executed commands:") log.info(f"- {nb_agent_general} AGENT general command(s)") log.info(f"- {nb_unimplemented} unimplemented command(s) => UnimplementedGenericCmdException raised then command was skipped") log.info(f"- {nb_unknown} unknown command(s) => skipped") # Can be overriden by subclass (AgentDevice) self.TEST_test_results_other(commands) ''' (EP) moved to AgentDevice # Now test that any "AD get_xx" following a "AD set_xx value" command has result = value for i,cmd_set in enumerate(commands): if cmd_set.name.startswith('set_'): commands_after = commands[i+1:] for cmd_get in commands_after: if cmd_get.name.startswith('get_') and cmd_get.name[4:]==cmd_set.name[4:] and cmd_get.device_type==cmd_set.device_type: log.info("cmd_get.result == cmd_set.args ?" + str(cmd_get.result) + ' ' + str(cmd_set.args)) assert cmd_get.get_result() == ','.join(cmd_set.args), "A get_xx command did not gave the expected result as set by a previous set_xx command" break ''' # Specific (detailed) test (to be overriden by subclass) nb_asserted2 = self.TEST_test_results_main(commands) self._TEST_test_results_end(nb_asserted) def _TEST_prepare(self): if not self.is_in_test_mode(): return log.debug("\n!!! In TEST mode !!! => preparing to run a scenario of test commands") log.debug("- Current test commands list scenario is:\n" + str(self.TEST_COMMANDS_LIST)) if not self.WITH_SIMULATOR: log.debug("\n!!! In TEST but no SIMULATOR mode (using REAL device) !!! => removing dangerous commands for real devices... :") TEST_COMMANDS_LIST_copy = self.TEST_COMMANDS_LIST.copy() for cmd in TEST_COMMANDS_LIST_copy: cmd_key = cmd.split()[1] if ("set_" in cmd_key) or ("do_start" in cmd_key) or cmd_key in ["do_init", "do_goto", "do_open", "do_close"]: self.TEST_COMMANDS_LIST.remove(cmd) log.debug("- NEW test commands list scenario is:\n" + self.TEST_COMMANDS_LIST, '\n') # Can be overriden by subclass (AgentDevice) def TEST_test_results_other(self, commands): pass def _TEST_test_results_start(self): print() log.info("--- Testing if the commands I SENT had the awaited result") log.info("Here are the last commands I sent:") #commands = list(Command.get_last_N_commands_for_agent(self.name, 16)) #commands = Command.get_last_N_commands_sent_to_agent(self.name, 16) nb_commands = len(self.TEST_COMMANDS_LIST) if "ad_unknown get_dec" in self.TEST_COMMANDS_LIST: nb_commands -= 1 commands = AgentCmd.get_last_N_commands_sent_by_agent(self.name, nb_commands) AgentCmd.show_commands(commands) return nb_commands, commands """ OLD SCENARIO nb_asserted = 0 for cmd in commands: if cmd.name == "specific0": assert cmd.is_skipped() nb_asserted+=1 if cmd.name == "specific1": assert cmd.result == "in step #5/5" assert cmd.is_executed() nb_asserted+=1 if cmd.name in ("specific2","specific3"): assert cmd.is_killed() nb_asserted+=1 if cmd.name in ("specific4", "specific5", "specific6", "specific7", "specific8"): assert cmd.is_pending() nb_asserted+=1 # 2 cmds abort if cmd.name in ("do_abort"): assert cmd.is_executed() nb_asserted+=1 if cmd.name in ("do_exit"): assert cmd.is_executed() nb_asserted+=1 assert nb_asserted == 12 self.printd("--- Finished testing => result is ok") """ # To be overriden by subclass def TEST_test_results_main(self, commands): return 0 ''' nb_asserted = 0 self.printd("from simulator_test_results_main", commands) for cmd in commands: assert cmd.is_executed() nb_asserted+=1 return nb_asserted ''' def _TEST_test_results_end(self, nb_asserted): #nb_commands_to_send = len(self.TEST_COMMANDS_LIST) #self.printd(nb_asserted, "vs", nb_commands_to_send) #assert nb_asserted == nb_commands_to_send #self.printd(f"************** Finished testing => result is ok ({nb_asserted} assertions) **************") printFullTerm(Colors.GREEN, f"************** Finished testing => result is ok ({nb_asserted} assertions) **************") """ ================================================================= MAIN ================================================================= """ import argparse def extract_parameters(): """ Usage: Agent.py [-t] [configfile] """ # arg 1 : -t # arg 2 : configfile DEBUG_MODE = False TEST_MODE = False WITH_SIM = False VERBOSE_MODE = False configfile = None log.debug("args:" + str(sys.argv)) for arg in sys.argv[1:] : if arg == "-t": TEST_MODE = True elif arg == "-s": WITH_SIM = True elif arg == "-d": DEBUG_MODE = True elif arg == "-v": VERBOSE_MODE = True #else: configfile = arg ''' if len(sys.argv) > 1: if sys.argv[1] == "-t": TEST_MODE = True if len(sys.argv) == 3: configfile = sys.argv[2] else: configfile = sys.argv[1] ''' #return DEBUG_MODE, WITH_SIM, TEST_MODE, VERBOSE_MODE, configfile return DEBUG_MODE, TEST_MODE, VERBOSE_MODE #def build_agent(Agent_type:Agent, name="GenericAgent", RUN_IN_THREAD=True): #def build_agent(Agent_type:Agent, RUN_IN_THREAD=True): def build_agent(Agent_type:Agent,param_constr=None) -> Agent : #DEBUG_MODE, WITH_SIM, TEST_MODE, VERBOSE_MODE, configfile = extract_parameters() DEBUG_MODE, TEST_MODE, VERBOSE_MODE = extract_parameters() log.debug("debug mode is" + os.getenv("PYROS_DEBUG")) #print(logger) #agent = Agent("GenericAgent", configfile, RUN_IN_THREAD=True) #agent = Agent_type(configfile, RUN_IN_THREAD, DEBUG_MODE=DEBUG_MODE) #agent = Agent_type(RUN_IN_THREAD) if param_constr: if "AgentSST" in str(Agent_type): agent = Agent_type(agent=param_constr.get("agent"),simulated_computer=param_constr.get("computer")) else: agent = Agent_type() if agent.name == "AgentSST": # args are passed two times: first in AgentSST script and then to build_agent function, we need to setup again argparse in order to argparse to allow arguments parser = argparse.ArgumentParser(description='Start a agentSST.') parser.add_argument("--computer",dest="computer",help='Launch agent with simulated computer hostname',action="store") parser.add_argument("--agent",dest="agent",help='Launch an specific agent ',action="store") parser.add_argument("-t", action="store_true" ) # parser.add_argument("-d", action="store_true" ) # parser.add_argument("-s", action="store_true" ) # parser.add_argument("-v", action="store_true" ) # args = parser.parse_args() # if args.computer: # agent.set_computer(args.computer) # AgentSP isn't in a config, so to avoid that WITH_SIM returns an error it's a special case if agent.name == "AgentSP": agent._set_with_simulator(False) agent._set_test_mode(TEST_MODE) return agent #agent = Agent_type(name, configfile, RUN_IN_THREAD) # Get the information of the agent name (name of class) within obsconfig and get the "is_real" attribute if agent.name in agent.get_config().get_agents(agent.unit).keys(): if agent.get_config().get_agent_information(agent.unit,agent.name).get("is_real"): WITH_SIM = not agent.get_config().get_agent_information(agent.unit,agent.name)["is_real"] else: WITH_SIM = True else: WITH_SIM = True agent._set_with_simulator(WITH_SIM) agent._set_test_mode(TEST_MODE) #print(logger) #logg.info("agent built, return it") #agent._set_debug_mode(DEBUG_MODE) return agent if __name__ == "__main__": ''' # with thread RUN_IN_THREAD=True # with process #RUN_IN_THREAD=False ''' #agent = build_agent(Agent, RUN_IN_THREAD=RUN_IN_THREAD) agent = build_agent(Agent) agent.show_config() #agent = build_agent(Agent, name="GenericAgent", RUN_IN_THREAD=RUN_IN_THREAD) ''' TEST_MODE, WITH_SIM, configfile = extract_parameters() agent = Agent("GenericAgent", configfile, RUN_IN_THREAD=True) #agent.setSimulatorMode(TEST_MODE) agent.setTestMode(TEST_MODE) agent.setWithSimulator(WITH_SIM) self.printd(agent) ''' agent.run()