#!/usr/bin/env python3 #DEBUG=True #DEBUG=False """ ================================================================= Agent class ================================================================= This class is the base class for all AgentXXX classes (AgentSST, AgentScheduler, AgentImagesProcessor...) It runs an infinite loop (called main_loop) doing at each iteration some routine tasks (process_routine before and after) and executing commands sent by other agents. To start it up, from the project root : - In normal mode : - ./PYROS start -fg agent -o tnc (add -d after the PYROS command for debug mode) - In test mode (the agent will execute a scenario as a suite of commands (in TEST_COMMANDS_LIST) to send to himself (or other agents) and execute them on reception at each iteration : - ./PYROS -t start -fg agent -o tnc """ ### # ================================================================= # SETUP FOR DJANGO # # (see https://docs.djangoproject.com/en/dev/topics/settings) # (see also https://docs.djangoproject.com/en/dev/ref/settings) # ================================================================= ### # For cmd parsing from array import array from typing import List, Tuple, Union, Any import ast from inspect import signature import os from pathlib import Path import sys import logging import typing from django.conf import settings as djangosettings # Conseil sur le net: #https://stackoverflow.com/questions/16853649/how-to-execute-a-python-script-from-the-django-shell #"" #import sys, os #sys.path.append('/path/to/your/django/app') #os.environ['DJANGO_SETTINGS_MODULE'] = 'settings' #from django.conf import settings #"" # To avoid a "ModuleNotFoundError: No module named 'dashboard'"... (not even 1 app found) : ##sys.path.insert(0, os.path.abspath("..")) ##sys.path.insert(0, os.path.abspath("src")) ##sys.path.insert(0, "../src") ##sys.path.insert(0, "src") # To avoid a "ModuleNotFoundError: No module named 'dashboard'" ## sys.path.append("..") py_pwd = os.path.normpath(os.getcwd() + "/..") if (py_pwd not in os.sys.path): (os.sys.path).append(py_pwd) # To avoid a "ModuleNotFoundError: No module named 'src'" ## sys.path.append("../../../..") py_pwd = os.path.normpath(os.getcwd() + "/../../../..") if (py_pwd not in os.sys.path): (os.sys.path).append(py_pwd) ##sys.path.append("src") from src.pyros_logger import log, handler_filebyagent #from src.pyros_logger import logger as logg, handler_filebyagent, ##from src import pyros_logger ''' def printd(*args, **kwargs): if os.environ.get('PYROS_DEBUG', '0')=='1': print(*args, **kwargs) ''' #printd("Starting with this sys.path", sys.path) log.debug("Starting with this sys.path" + str(sys.path)) # DJANGO setup # self.printd("file is", __file__) # mypath = os.getcwd() # Go into src/ ##os.chdir("..") ##os.chdir("src") #printd("Current directory : " + str(os.getcwd())) log.debug("Current directory : " + str(os.getcwd())) #os.environ.setdefault("DJANGO_SETTINGS_MODULE", "src.core.pyros_django.pyros.settings") os.environ.setdefault("DJANGO_SETTINGS_MODULE", "src.core.pyros_django.pyros.settings") # os.environ['SECRET_KEY'] = 'abc' # os.environ['ENVIRONMENT'] = 'production' import django django.setup() #printd("DB2 used is:", djangosettings.DATABASES["default"]["NAME"]) log.debug("DB2 used is:" + djangosettings.DATABASES["default"]["NAME"]) ### # ================================================================= # IMPORT PYTHON PACKAGES #================================================================= ### # --- GENERAL PURPOSE IMPORT --- #from __future__ import absolute_import ##import utils.Logger as L import platform import random import threading #import multiprocessing import time ''' from threading import Thread import socket ''' #import ctypes #import copy # --- DJANGO IMPORT --- from django.db import transaction from django import db # from django.core.exceptions import ObjectDoesNotExist # from django.db.models import Q #from django.shortcuts import get_object_or_404 #from django.conf import settings as djangosettings # --- SPECIFIC IMPORT --- # Many ways to import configuration settings: ##import config import config.old_config as config_old #from config import PYROS_ENV, ROOT_DIR, DOC_DIR #from config import * from common.models import AgentSurvey, AgentCmd, AgentLogs #from config.configpyros import ConfigPyros from config.old_config.configpyros import ConfigPyros as ConfigPyrosOld from src.core.pyros_django.obsconfig.obsconfig_class import OBSConfig #from dashboard.views import get_sunelev #from devices.TelescopeRemoteControlDefault import TelescopeRemoteControlDefault #from utils.JDManipulator import * ##from agent.logpyros import LogPyros ##from src.logpyros import LogPyros ''' from device_controller.abstract_component.device_controller import ( DCCNotFoundException, UnknownGenericCmdException, UnimplementedGenericCmdException, UnknownNativeCmdException ) ''' ### # ================================================================= # GENERAL MODULE CONSTANT & FUNCTIONS DEFINITIONS # ================================================================= ### #DEBUG_FILE = False ##log = L.setupLogger("AgentLogger", "Agent") IS_WINDOWS = platform.system() == "Windows" class Colors: HEADER = "\033[95m" BLUE = "\033[94m" GREEN = "\033[92m" WARNING = "\033[93m" FAIL = "\033[91m" ENDC = "\033[0m" BOLD = "\033[1m" UNDERLINE = "\033[4m" def printColor(color: Colors, message, file=sys.stdout, eol=os.linesep, forced=False): #system = platform.system() """ if (self.disp == False and forced == False): return 0 """ #if system == "Windows": if IS_WINDOWS: print(message, file=file, end=eol) else: print(color + message + Colors.ENDC, file=file, end=eol) return 0 def printFullTerm(color: Colors, string: str): #system = platform.system() columns = 100 row = 1000 disp = True value = int(columns / 2 - len(string) / 2) printColor(color, "-" * value, eol="") printColor(color, string, eol="") value += len(string) printColor(color, "-" * (columns - value)) return 0 """ ================================================================= class StoppableThread ================================================================= """ ''' class StoppableThreadEvenWhenSleeping(threading.Thread): # Thread class with a stop() method. The thread itself has to check # regularly for the stopped() condition. # It stops even if sleeping # See https://python.developpez.com/faq/?page=Thread#ThreadKill # See also https://www.oreilly.com/library/view/python-cookbook/0596001673/ch06s03.html def __init__(self, *args, **kwargs): #super(StoppableThreadSimple, self).__init__(*args, **kwargs) super().__init__(*args, **kwargs) self._stop_event = threading.Event() #def stop(self): def terminate(self): self._stop_event.set() def stopped(self): return self._stop_event.is_set() def wait(self, nbsec:float=2.0): self._stop_event.wait(nbsec) ''' ### # ================================================================= # EXCEPTIONS classes # ================================================================= ### class CmdException(Exception): ''' Base class for all Agent command exceptions ''' # pass def __init__( self, cmd: Union[AgentCmd,str] ): self.cmd = cmd self.cmd_name = cmd if isinstance(cmd,str) else cmd.name ''' def __str__(self): return f"The Agent command '{self.cmd.name}' is unknown to the agent" #return f"({type(self).__name__}): Device Generic command has no implementation in the controller" ''' # --- CMD PENDING (non running) EXCEPTIONS --- class CmdUnknownException(CmdException): ''' Raised when a PENDING (non running) Agent (specific) cmd is NOT known by the agent ''' def __str__(self): return f"The Agent command '{self.cmd_name}' is unknown to the agent" #return f"({type(self).__name__}): Device Generic command has no implementation in the controller" class CmdUnimplementedException(CmdException): ''' Raised when a PENDING (non running) Agent Specific cmd is known by the agent but not implemented ''' def __str__(self): return f"The Agent command '{self.cmd_name}' is known by the agent but not implemented" #return f"({type(self).__name__}): Device Generic command has no implementation in the controller" class CmdBadArgsException(CmdException): ''' Raised when a PENDING (non running) Agent cmd has bad, missing, or too many argument(s) ''' def __str__(self): return f"The Agent command '{self.cmd_name}' has bad, missing, or too many argument(s)" #return f"({type(self).__name__}): Device Generic command has no implementation in the controller" # --- CMD RUNNING EXCEPTIONS --- class CmdExecErrorException(CmdException): ''' Raised when a RUNNING Agent cmd has had a running error ''' def __str__(self): return f"The running Agent command '{self.cmd_name}' has had an error (during execution)" class CmdExecTimeoutException(CmdException): ''' Raised when a RUNNING Agent cmd is timeout ''' def __str__(self): return f"The running Agent command '{self.cmd_name}' is timeout" class CmdExecKilledException(CmdException): ''' Raised when a RUNNING Agent cmd has been aborted (by another agent) ''' def __str__(self): return f"The running Agent command '{self.cmd_name}' has been killed (by another agent)" ### # ================================================================= # class Agent # ================================================================= ### class Agent: """ See Agent_activity_diag.pu for PlantUML activity diagram Behavior of an agent: - If idle : - still does routine_process() and general_process() - does not do specific_process() - Once a command has been sent to another agent : - It waits (non blocking) for the end of execution of the command and get its result - If command is timed out or has been skipped or killed, then it is NOT re-executed at next iteration (except if needed explicitely) """ # --- # --- CLASS (STATIC) attributes (CONSTANTS) # --- If agent is instance of Agent: # --- - CLASS attributes are accessible via agent.__class__.__dict__ # --- - INSTANCE attributes are accessible via agent.__dict__ # --- # Default modes DEBUG_MODE:bool = False #TEST_MODE = False # By default, a command is valid during 5s (and then perempted) DEFAULT_CMD_VALIDITY_DURATION:int = 5 # Wait a fixed number of seconds before each loop ? #WITH_RANDOM_WAIT = False # 1 sec by default _DELAY_NB_SEC:int = 1 # - YES if TEST mode (in init()) # Default LOG level is INFO #PYROS_DEFAULT_GLOBAL_LOG_LEVEL = LogPyros.LOG_LEVEL_INFO # INFO ''' This Agent Specific commands list, that he is able to execute To be overriden by subclasses (here are just a few examples of commands) Format : ("cmd", args, timeout) with : - cmd (str) : the command name - timeout (int) : the command timeout (in sec) - exec_mode (int) : 0 (sequential), 1 (thread), or 2 (process) ''' AGENT_SPECIFIC_COMMANDS: List[ Tuple[str, int, int] ] = [ # Format : (“cmd_name”, timeout, exec_mode) ("do_specific1", 10, 0), #("set_specific2", 5, 0), ("do_specific3", 3, 0), ] # # --- FOR TEST ONLY --- # # By default, NOT in test mode #TEST_MODE = False # Maximum duration of this agent (only for SIMULATION mode) # If set to None, it will never exit except if asked (or CTRL-C) # If set to 20, it will exit after 20s TEST_MAX_DURATION_SEC = None #TEST_MAX_DURATION_SEC = 30 # Run this agent in simulator mode #TEST_MODE = True WITH_SIMULATOR:bool = False # Run the assertion tests at the end TEST_WITH_FINAL_TEST:bool = False CMD_STATUS = AgentCmd.CMD_STATUS_CODES AGT_STATUS = AgentSurvey.STATUS_CHOICES # (EP 2022/07) Default SCENARIO to be executed (only in TEST mode), and then STOP # # It is a list of commands to be sent by this agent to other agents ("self" means himself) # # Format : List of tuples (command, validity, expected_res, expected_status), with : # # - command : the format is "recipient cmd args", with : # - recipient : name of the Agent that the command is to be sent to (use "self" to mean himself) # - cmd : the command name # - args : (optional) the list of command arguments, separated by blanks : arg1 arg2 arg3 ... # # - validity : the command is valid for this duration, afterwards you can forget it # # - expected_res : the expected result (set to None if not to be tested) # # - expected_status : the status of the command expected after execution (expired, killed, skipped, executed...) (set to None if not to be tested) # # Ex : # - "AgentScheduler set_state ATTENTIVE" => means to send the command "set_state ATTENTIVE" to the agent AgentScheduler # - "AgentX set_state ATTENTIVE" => means to send the command "set_state ATTENTIVE" to the agent AgentX # - "self set_state ATTENTIVE" => means to send the command "set_state ATTENTIVE" to MYSELF # - "self do_restart_loop" => means to send the command "do_restart_loop" to MYSELF (no args) # TEST_COMMANDS_LIST: List[ Tuple[ str, int, Union[str,None], Union[int,None] ] ] = [ # Format : ("self cmd_name cmd_args", timeout, "expected_result", expected_status), #("self do_stop now", 200, '15.5', None), ("self do_specific1 1 2 3.5 titi (3,'titi',5) [1,3,5,7,9]", 200, '15.5', None), # 1) First, 3 EXCEPTION CASES (uncomment to activate exception) # Each of these lines will stop execution with an exception # ------------------------------------------------------------ # - Agent command, unknown => ko, UnknownCmdException #("self do_unknown", 10, None,None), # - Agent general command malformed (missing arg) => ko, AgentCmdBadArgsException #("Agent set_mode", 10, None,None), # - Agent specific command, known but not implemented => ko, AgentCmdUnimplementedException #("self set_specific2", 10, None,None), # - Agent specific command, implemented but missing args => ko, AgentCmdBadArgsException #(" self do_specific1 1 ", 10, None,None), # 2) NORMAL CASES (test scenario) # All these commands should be executed without error, from the 1st to the last one # ------------------------------- # This command has a validity of 0s and thus should be tagged as "expired" ("self set_mode ATTENTIVE", 0, "MODE is ATTENTIVE", CMD_STATUS.CMD_EXPIRED), # Agent general command ("self set_mode ATTENTIVE", 200, "MODE is ATTENTIVE", CMD_STATUS.CMD_EXECUTED), # => should get "ATTENTIVE" ("self get_mode", 100, "MODE is ATTENTIVE", None), # => should get "7" ("self do_eval 3+5-1", 200, '7', None), # END, will not go further #("self do_exit", 2, "STOPPING"), # Agent specific commands => should be executed ("self do_specific3", 200, '', None), ("self do_exit", 500, "STOPPING", None), ("self do_specific1 1 2 3.5 titi (3,'titi',5) [1,3,5,7,9]", 200, '15.5', None), ("self set_mode ROUTINE", 200, "MODE is ROUTINE", None), # => should get "ROUTINE" ("self get_mode", 200, "MODE is ROUTINE", None), # Agent specific command => should be skipped (because not ATTENTIVE) ("self do_specific1 1 2 3.5 titi (3,'titi',5) [1,3,5,7,9]", 200, "SKIPPED", None), # From now on, should not run anymore process_before/after # => and should skip next specific commands ("self set_mode IDLE", 200, "MODE is IDLE", None), # => should get "IDLE" ("self get_mode", 200, "MODE is IDLE", None), # Agent specific command => should be skipped (because not ATTENTIVE) ("self do_specific1 1 2 3.5 titi (3,'titi',5) [1,3,5,7,9]", 200, 'SKIPPED', None), # TODO: test priority commands : do_abort, do_flush_cmds, ... # - Stop executing new commands (just let them accumulate) ##("self do_stop_exec",), ##("self set_mode ATTENTIVE",), ##("self get_mode",), ##("self do_specific1 1 2 3.5 titi (3,'titi',5) [1,3,5,7,9]",), ##("self set_mode ROUTINE",), ##("self get_mode",), ##("self do_abort",), # - Now, resume execution of commands : # we should have the previous list pending, # but as "do_abort" is priority, it should be executed first even if last ! ##("self do_resume_exec",), # Restart the restart loop (from init()) ("self do_restart", 200, "RESTARTING", None), # Now stop ("self do_exit", 200, "STOPPING", None), ''' # specific0 not_executed_because_idle "specific0", "set_state:active", # specific1 executed_because_not_idle, should complete ok "specific1", # specific2 will be executed only when specific1 is finished, # and should be aborted before end of execution, # because of the 1st coming "do_abort" command below "specific2", # specific3 should be executed only when specific2 is finished (in fact, aborted), # and should be aborted before end of execution, # because of the 2nd coming "do_abort" command below "specific3", # These commands should not have the time to be processed # because the "do_exit" command below should be executed before "specific4", "specific5", "specific6", "specific7", "specific8", # Should abort the current running command (which should normally be specific2) # even if commands above (specific3, ..., specific8) are already pending "do_abort", # These commands (except abort) won't be executed # because too many commands are already pending (above) "specific9", "do_abort", "set_state:active", "set_state:idle", # Should stop the agent even before the previous pending commands are executed "do_exit", # Because of the previous "do_exit" command, # these following commands should not be executed, # and not even be added to the database command table "set_state:active", "specific10" ''' ] #TEST_COMMANDS = iter(TEST_COMMANDS_LIST) ''' # with thread RUN_IN_THREAD = True # with process #RUN_IN_THREAD = False _thread_total_steps_number = 1 ''' #COMMANDS_PEREMPTION_HOURS = 48 #COMMANDS_PEREMPTION_HOURS = 60/60 ##name = "Generic Agent" ##status = None ##mode = None config = None ''' Moved to more central file : config.config_base PYROS_DJANGO_BASE_DIR = Path("src/core/pyros_django") # pathlib DEFAULT_CONFIG_FILE_NAME = "config_unit_simulunit1.xml" CONFIG_DIR_NAME = "config" ''' # My own ConfigPyros instance (Observatory Configuration) _oc = None # Parameters from config file # for /src/ #_path_data = '../../config' # for /src/core/pyros_django/ #_path_data = '../../../../config' # Path to config _path_data = '' _computer_alias = '' _computer_description = '' # Current and next command to send _cmdts: AgentCmd = None _next_cmdts = None _agent_survey = None _pending_commands = [] ''' _current_device_cmd = None _current_device_cmd_thread = None ''' # List of agents I will send commands to _my_client_agents_aliases = [] _my_client_agents = {} _iter_num = None # Log object _log = None # new obsconfig init for agent: ##def __init__(self, RUN_IN_THREAD=True): def __init__(self): # Declaration of Instance attributes, default values self.name = "Generic Agent" self.status = None self.mode = None self.unit = None self.TEST_COMMANDS = None self.CURRENT_CMD = None #print(AgentSurvey.MODE_CHOICES.IDLE) #sys.exit() # Agent is by default in mode ATTENTIVE (most active mode) #self.mode = self.MODE_ATTENTIVE self._set_mode_attentive() #self._set_mode(MODES.) log.addHandler(handler_filebyagent(logging.INFO, self.__class__.__name__)) log.debug("start Agent init") obs_config_file_path = os.environ["PATH_TO_OBSCONF_FILE"] path_to_obs_config_folder = os.environ["PATH_TO_OBSCONF_FOLDER"] unit = os.environ["unit_name"] oc = OBSConfig(obs_config_file_path,unit) self.set_config(oc, obs_config_file_path, path_to_obs_config_folder, unit) self.name = self.__class__.__name__ # set real unit name (the current unit used) if unit == "": unit = oc.unit_name agent_name_from_config = self.get_config().get_agent_name_from_config(self.__class__.__name__) if agent_name_from_config: self.name = agent_name_from_config self.unit = unit print(f"Agent name : {self.name}") print(f"Unit name : {self.unit}") # (EP) moved to AgentDevice ###self._set_agent_device_aliases_from_config(self.name) self._set_mode_from_config(self.name) log.debug("Step __init__") self.TEST_COMMANDS = iter(self.TEST_COMMANDS_LIST) ##self.RUN_IN_THREAD = RUN_IN_THREAD self._set_status(self.AGT_STATUS.LAUNCHED) ####self._set_idle() # Create 1st survey if none #tmp = AgentSurvey.objects.filter(name=self.name) #if len(tmp) == 0: #nb_agents = AgentSurvey.objects.filter(name=self.name).count() #if nb_agents == 0: if AgentSurvey.objects.filter(name=self.name).exists(): self._agent_survey = AgentSurvey.objects.get(name=self.name) else: self._agent_survey = AgentSurvey.objects.create( name=self.name, validity_duration=60, #mode=self.mode, mode = self.get_mode(), #status=self.status, status=self.get_status(), iteration=-1 ) log.debug("Agent survey is" + str(self._agent_survey)) #self.printd("Agent survey is", self._agent_survey) ("end Agent __init__") #def __init__(self, name:str="Agent", config_filename:str=None, RUN_IN_THREAD=True): #def __init__(self, config_filename:str=None, RUN_IN_THREAD=True, DEBUG_MODE=False): # def __init__(self, config_filename:str=None, RUN_IN_THREAD=True): # ''' # print('PYROS_ENV', PYROS_ENV) # print('ROOT_DIR', ROOT_DIR) # print('DOC_DIR', DOC_DIR) # ''' # # Set logger # ##pyros_logger.set_logger_config() # ##logg = logging.getLogger('pyroslogger') # log.addHandler(handler_filebyagent(logging.INFO, self.__class__.__name__)) # log.debug("start Agent init") # # SET CONFIG INSTANCE # # - OLD CONFIG # log.debug(f'config file is {config_filename}') # ##log.info('PYROS_ENV', config.PYROS_ENV) # log.debug('PYROS_ENV' + config_old.PYROS_ENV) # log.debug('ROOT_DIR' + config_old.ROOT_DIR) # log.debug('DOC_DIR' + config_old.DOC_DIR) # ##if config.is_dev_env(): print("DEV ENV") # if config_old.is_dev_env(): log.debug("DEV ENV") # if config_old.is_prod_env(): log.debug("PROD ENV") # if config_old.is_debug(): log.debug("IN DEBUG MODE") # # - NEW CONFIG # obs_config_file_path = os.environ["PATH_TO_OBSCONF_FILE"] # path_to_obs_config_folder = os.environ["PATH_TO_OBSCONF_FOLDER"] # unit = os.environ["unit_name"] # oc = OBSConfig(obs_config_file_path) # self.set_config(oc, obs_config_file_path, path_to_obs_config_folder, unit) # log.debug("Step __init__") # self.TEST_COMMANDS = iter(self.TEST_COMMANDS_LIST) # self.RUN_IN_THREAD = RUN_IN_THREAD # self._set_status(self.STATUS_LAUNCH) # self._set_idle() # self._set_agent_device_aliases_from_config(self.name) # self._set_mode_from_config(self.name) # #self.name = name # self.name = self.__class__.__name__ # ''' # printd("*** ENVIRONMENT VARIABLE PYROS_DEBUG is:", os.environ.get('PYROS_DEBUG'), '***') # ##self.DEBUG_MODE = DEBUG_MODE # self.DEBUG_MODE = os.environ.get('PYROS_DEBUG', '0')=='1' # ''' # #self.DEBUG_MODE = config.PYROS_ENV # ##self.log = LogPyros(self.name, AgentLogs) # ##self.DEBUG_MODE = config.is_debug() # self.DEBUG_MODE = config_old.is_debug() # ##self.log.debug_level = DEBUG_MODE # ''' # # Default LOG level is INFO # log_level = LogPyros.LOG_LEVEL_INFO # INFO # self.log.set_global_log_level(LogPyros.LOG_LEVEL_DEBUG) if self.DEBUG_MODE else self.log.set_global_log_level(log_level) # ''' # #global_log_level = LogPyros.LOG_LEVEL_DEBUG if self.DEBUG_MODE else self.PYROS_DEFAULT_GLOBAL_LOG_LEVEL # ##global_log_level = LogPyros.LOG_LEVEL_DEBUG if self.DEBUG_MODE else config.PYROS_DEFAULT_GLOBAL_LOG_LEVEL # ##global_log_level = LogPyros.LOG_LEVEL_DEBUG if self.DEBUG_MODE else config_old.PYROS_DEFAULT_GLOBAL_LOG_LEVEL # ##self.log.set_global_log_level(global_log_level) # ##self.printd("LOG LEVEL IS:", self.log.debug_level) # ##self.print("LOG LEVEL IS:", self.log.get_global_log_level()) # # ------------- OLD CONFIG ------------------- # # Est-ce bien utile ??? # # New way with PathLib # # my_parent_abs_dir = Path(__file__).resolve().parent # #TODO: on doit pouvoir faire mieux avec pathlib (sans utiliser str()) # ##self._path_data = str( Path( str(my_parent_abs_dir).split(str(self.PYROS_DJANGO_BASE_DIR))[0] ) / self.CONFIG_DIR_NAME ) # ##self._path_data = config.CONFIG_DIR # # self._path_data = config_old.CONFIG_DIR # #self._set_mode(self.MODE_IDLE) # # config_filename = self.get_config_filename(config_filename) # #self.printd(f"*** Config file used is={config_filename}") # # log.debug(f"*** Config file used is={config_filename}") # # self.config = ConfigPyrosOld(config_filename) # # if self.config.get_last_errno() != self.config.NO_ERROR: # # raise Exception(f"Bad config file name '{config_filename}', error {str(self.config.get_last_errno())}: {str(self.config.get_last_errmsg())}") # #TODO: : à mettre dans la config # ''' # 'AgentDeviceTelescope1': 'AgentDeviceTelescopeGemini', # 'AgentDeviceFilterSelector1': 'AgentDeviceSBIG', # 'AgentDeviceShutter1': 'AgentDeviceSBIG', # 'AgentDeviceSensor1': 'AgentDeviceSBIG', # ''' # #self._my_client_agents = {} # ### self._agent_logs = AgentLogs.objects.create(name=self.name, message="Step __init__") # #self.printdb("Step __init__") # log.debug("Step __init__") # self.TEST_COMMANDS = iter(self.TEST_COMMANDS_LIST) # self.RUN_IN_THREAD = RUN_IN_THREAD # self._set_status(self.STATUS_LAUNCH) # self._set_idle() # self._set_agent_device_aliases_from_config(self.name) # self._set_mode_from_config(self.name) # # TODO: remove # #self._set_idle() # self._set_active() # # Create 1st survey if none # #tmp = AgentSurvey.objects.filter(name=self.name) # #if len(tmp) == 0: # #nb_agents = AgentSurvey.objects.filter(name=self.name).count() # #if nb_agents == 0: # if AgentSurvey.objects.filter(name=self.name).exists(): # self._agent_survey = AgentSurvey.objects.get(name=self.name) # else: # self._agent_survey = AgentSurvey.objects.create(name=self.name, validity_duration=60, mode=self.mode, status=self.status, iteration=-1) # log.debug("Agent survey is" + str(self._agent_survey)) # #self.printd("Agent survey is", self._agent_survey) # ("end Agent init") def set_config(self, oc: OBSConfig, obs_config_file_path: str, path_to_obs_config_folder: str, unit: str): self._oc = { 'config' : oc, 'env' : [ obs_config_file_path, path_to_obs_config_folder, unit ] } ''' config_env = { obs_config_file_path: obs_config_file_path, path_to_obs_config_folder: path_to_obs_config_folder, unit: unit } ''' def get_config(self): return self._oc['config'] def get_config_env(self): return self._oc['env'] def show_config(self): VERBOSE = False oc = self.get_config() obs_config_file_path, path_to_obs_config_folder, unit = self.get_config_env() #self.printd(obs_config_file_path) log.debug(obs_config_file_path) log.debug(path_to_obs_config_folder) log.debug(unit) #log.warning("petit warning bidon") print("\n") print("- Observatory:" + oc.get_obs_name()) my_unit_name = oc.get_units_name()[0] my_unit = (oc.get_units()[my_unit_name]) #print("- Unit description:", my_unit) if VERBOSE: print("\n") print("- Computers:", oc.get_computers()) print("\n") print("- Active Computers:", oc.get_active_computers()) print("\n") print("- Active Devices:", oc.get_active_devices()) print("\n") print("- Unit:", my_unit_name) VERBOSE and print(oc.get_unit_by_name(my_unit_name)) if VERBOSE: print("\n") print("- Unit topology:", oc.get_topology(my_unit_name)) print("\n") print("- Unit active Agents:", oc.get_active_agents(my_unit_name)) VERBOSE and print(oc.get_agents(my_unit_name)) print("\n") print("- Unit Agents per computer:", oc.get_agents_per_computer(my_unit_name)) print("\n") print("- Unit Agents per device:", oc.get_agents_per_device(my_unit_name)) if VERBOSE: print("\n") print("- Unit Channel groups:", oc.get_channel_groups(my_unit_name)) print("\n") print("- Unit Channels:", oc.get_channels(my_unit_name)) #print("\n") #print("- Unit/Channel info:", oc.get_channel_information(my_unit_name, 'OpticalChannel_up')) if VERBOSE: print("\n") print("- Unit Components agents:", oc.get_components_agents(my_unit_name)) if VERBOSE: print("\n") print("- Unit database:", oc.get_database_for_unit(my_unit_name)) print("\n") print("- Devices names:", oc.get_devices_names()) if VERBOSE: print("\n") print("- Devices names & files:", oc.get_devices_names_and_file()) print("\n") print("- Devices:", oc.get_devices()) print("\n") def get_config_filename(self, config_filename: str): if not config_filename: #config_filename = self.DEFAULT_CONFIG_FILE_NAME ##config_filename = config.DEFAULT_CONFIG_FILE_NAME config_filename = config_old.DEFAULT_CONFIG_FILE_NAME # If config file name is RELATIVE (i.e. without path, just the file name) # => give it an absolute path (and remove "src/agent/" from it) if config_filename == os.path.basename(config_filename): ##config_filename = os.path.join(config.CONFIG_DIR, config_filename) config_filename = os.path.join(config_old.CONFIG_DIR, config_filename) ''' # Build abs path including current agent dir config_filename = os.path.abspath(self.CONFIG_DIR_NAME + os.sep + config_filename) # Remove "src/agent_name/" from abs dir : # (1) Remove "src/core/pyros_django/" config_filename = config_filename.replace(str(self.PYROS_DJANGO_BASE_DIR)+os.sep, os.sep) # (2) Remove "agent_name/" #TODO: bidouille, faire plus propre config_filename = config_filename.replace(os.sep+"agent"+os.sep, os.sep) config_filename = config_filename.replace(os.sep+"monitoring"+os.sep, os.sep) ''' return os.path.normpath(config_filename) def __repr__(self): return "I am agent " + self.name def __str__(self): return self.__repr__() #return "I am agent " + self.name # Normal print ##def print(self, *args, **kwargs): self.log.print(*args, **kwargs) """ if args: self.printd(f"({self.name}): ", *args, **kwargs) else: self.printd() """ """ # DEBUG print shortcut ##def printd(self, *args, **kwargs): self.log.printd(*args, **kwargs) #if DEBUG: self.printd(d(*args, **kwargs) def log_d(self, *args, **kwargs): self.log.log_d(*args, **kwargs) def log_i(self, *args, **kwargs): self.log.log_i(*args, **kwargs) def log_w(self, *args, **kwargs): self.log.log_w(*args, **kwargs) def log_e(self, *args, **kwargs): self.log.log_e(*args, **kwargs) def log_c(self, *args, **kwargs): self.log.log_c(*args, **kwargs) def printdb(self, *args, **kwargs): self.log.db( *args, **kwargs) """ def _get_real_agent_name(self, agent_alias_name:str)->str: #self.printd("key is", agent_alias_name) ''' if not self._my_client_agents: return agent_alias_name return self._my_client_agents[agent_alias_name] ''' return self._my_client_agents.get(agent_alias_name) def run(self, nb_iter:int=None, FOR_REAL:bool=True): """ FOR_REAL: set to False if you don't want Agent to send commands to devices but just print messages without really doing anything """ ("in run()") self._set_and_log_status(self.AGT_STATUS.LAUNCHED) #return # TEST MODE ONLY # IF in test mode but with REAL devices (no SIMULATOR), delete all dangerous commands from the test commands list scenario: self._TEST_prepare() #self._DO_EXIT = False # No agent specific cmd currently running (thread) self.AGENT_SPECIFIC_CMD_RUNNING = False # No Routine process (before or after) currently running (thread) self.ROUTINE_PROCESS_RUNNING = False ################ # RESTART loop # ###############@ # REPEAT UNTIL not DO_RESTART_LOOP self.DO_RESTART_LOOP = True while self.DO_RESTART_LOOP: log.info("*"*10+ " STARTING RESTART LOOP "+ "*"*10+ '\n') # By default, no restart after exit from main loop self.DO_RESTART_LOOP = False self.start_time = time.time() #log.debug("on est ici: " + os.getcwd()) self._load_config() self.print_TEST_MODE() self.init() ''' testing log: self.log_e("ERROR") self.log_c("FATAL critical ERROR") ''' #self.log_w("WARNING", "watch your step !") #log.warning("WARNING"+ "watch your step !") # Avoid blocking on false "running" commands # (old commands that stayed with "running" status when agent was killed) AgentCmd.delete_commands_with_running_status_for_agent(self.name) self._iter_num = 1 ############# # MAIN loop # ############@ self.DO_MAIN_LOOP = True while self.DO_MAIN_LOOP: try: self._main_loop(nb_iter,FOR_REAL) #if not self.DO_MAIN_LOOP: break except KeyboardInterrupt: # CTRL-C # In case of CTRL-C, kill the current thread (process) before dying (in error) #log.info("CTRL-C Interrupted, I kill the current thread (process) before exiting (if exists)") log.info("CTRL-C Interrupted, trying to stop cleanly") #self._kill_running_device_cmd_if_exists("USER_CTRLC") #self.do_things_before_exit("USER_CTRLC") ##self._cleanup_before_exit("USER_CTRLC") self.do_stop("asap") exit(1) # TEST mode only self._TEST_test_results() #if self._DO_EXIT: exit(0) #def _kill_running_device_cmd_if_exists(self, abort_cmd_sender): # to be overriden by subclass def do_things_before_exit(self, stopper_agent_name:str=None): pass def _main_loop(self, nb_iter:int=None, FOR_REAL:bool=True): self._set_and_log_status(self.AGT_STATUS.IN_MAIN_LOOP) self._main_loop_start(nb_iter) #if not self.DO_MAIN_LOOP: return self._reload_config_if_changed() # only if changed # Log this agent status (update my current mode and status in DB) ##self._log_agent_state() # better to do this in a subclass #self.show_config() #self.printd("====== START COMMMANDS PROCESSING ======") # SIMU #self.send_cmd_to("AgentScheduler", "do_replan") # ROUTINE process BEFORE # To be specified by subclass if not self.IS_IDLE(): self._routine_process_before() #self.printd("I am IDLE, so I bypass the routine_process (do not send any new command)") # Processing the next pending command if exists # If Agent general level command like (DO_RESTART, DO_EXIT, DO_ABORT) # => will set DO_MAIN_LOOP=False and/or DO_RESTART_LOOP=True print() print() log.info("*"*10 + " NEXT COMMAND PROCESSING (START) " + "*"*10 + '\n') try : cmd = self._process_next_command_if_exists() #print(cmd) except ( # PENDING (non running) cmd exceptions CmdUnimplementedException, CmdBadArgsException, CmdUnknownException, # RUNNING cmd exceptions CmdExecErrorException, CmdExecTimeoutException, CmdExecKilledException ) as e : #print(e) log.error(f"EXCEPTION on Agent command '{e.cmd_name}'") if isinstance(e.cmd, AgentCmd) : cmd = e.cmd if type(e) is CmdUnimplementedException: if cmd.name != "get_specific_cmds": cmd.set_as_unimplemented("EXCEPTION: command known but unimplemented") elif type(e) in (CmdBadArgsException, CmdUnknownException): # set back to "pending" if ever was wrongly marked "running" #cmd.set_as_pending() cmd.set_as_invalid("EXCEPTION: command unknown or bad args") # Execution Exception elif type(e) != CmdExecErrorException: cmd.set_result("EXCEPTION: Problem during execution") # isinstance(e.cmd, str) else: raise Exception("Abnormal error case...") # Ne pas stopper !! ##self._cleanup_before_exit() ##raise log.info("*"*10 + " NEXT COMMAND PROCESSING (END) " + "*"*10 + "\n") # only if in TEST mode self._TEST_check_cmd_res_and_status(cmd) if self.DO_MAIN_LOOP: if not self.IS_IDLE(): self._routine_process_after() # TEST MODE only : execute test routine_process always (even if IDLE) in order to (always) send next command from test scenario self._TEST_test_routine_process() #self.printd("====== END COMMMANDS PROCESSING ======") #self.waitfor(self.mainloop_waittime) self._main_loop_end() self._iter_num += 1 def _main_loop_start(self, nb_iter:int=None): for i in range(3): print() #self.printd("-"*80) log.info("*"*90) log.info("*"*20 + f" {self.name} : MAIN LOOP ITERATION {self._iter_num} (START) " + "*"*20) log.info("*"*90 + '\n') #self.print(f"Iteration {self._iter_num}") # EXIT because of nb of iterations ? if nb_iter is not None: # Bad number of iterations or nb iterations reached => exit if nb_iter <= 0 or nb_iter < self._iter_num: log.info(f"Exit because number of iterations asked ({nb_iter}) has been reached") self.DO_MAIN_LOOP = False return # Temporizing (delay) : Wait a random number of sec before starting iteration # (to avoid to busy to much the processor) # (also to let another agent having the chance to send a me command) if self._DELAY_NB_SEC : #random_waiting_sec = random.randint(0,5) log.info(f"Waiting {self._DELAY_NB_SEC} sec (random) before starting new iteration...") #time.sleep(random_waiting_sec) self.waitfor(self._DELAY_NB_SEC) # (Test only) # EXIT because max duration reached ? if self.TEST_MODE and self.TEST_MAX_DURATION_SEC and (time.time()-self.start_time > self.TEST_MAX_DURATION_SEC): log.info("Exit because of max duration set to ", self.TEST_MAX_DURATION_SEC, "s") #self._kill_running_device_cmd_if_exists(self.name) self._cleanup_before_exit(self.name) #if self.TEST_MODE and self.TEST_WITH_FINAL_TEST: self._TEST_test_results() self.DO_MAIN_LOOP = False return ##self._set_status(self.AGT_STATUS.IN_MAIN_LOOP) self.show_mode_and_status() self.main_loop_start() # To be overriden by subclass def main_loop_start(self): pass def _main_loop_end(self): self.main_loop_end() log.info("*"*20 + " MAIN LOOP ITERATION (END) " + "*"*20 + '\n') #self.do_log(LOG_DEBUG, "Ending main loop iteration") # To be overriden by subclass def main_loop_end(self): pass # To be overriden by subclass (AgentDevice) def process_device_level_cmd(self, cmd): pass ''' log.info("(DEVICE LEVEL CMD)") try: self.exec_device_cmd_if_possible(cmd) except (UnimplementedGenericCmdException) as e: #except (UnknownGenericCmdException, UnimplementedGenericCmdException, UnknownNativeCmdException) as e: log.e(f"EXCEPTION caught by {type(self).__name__} (from Agent mainloop) for command '{cmd.name}'", e) log.e("Thus ==> ignore this command") cmd.set_result(e) #cmd.set_as_killed_by(type(self).__name__) cmd.set_as_skipped() #raise ''' def _process_next_command_if_exists(self)->Union[AgentCmd,None]: ''' Processing the next pending command if exists ''' self._set_and_log_status(self.AGT_STATUS.IN_GET_NEXT_CMD) #print() #print() #log.info("*"*10 + " NEXT COMMAND PROCESSING (START) " + "*"*10 + '\n') # Purge commands (every N iterations, starting from 1st, delete old commands) N=5 if ((self._iter_num-1) % N) == 0: log.info("Purging expired commands if exists") #AgentCmd.purge_old_commands_for_agent(self.name) self._purge_old_commands_sent_to_me() # Get next command and process it (if exists) cmd = self._get_next_valid_and_not_running_command() self.CURRENT_CMD = cmd #self._set_status(self.STATUS_GENERAL_PROCESS) #if cmd: self.command_process(cmd) # SIMU #cmd = "do_replan" #self.send_cmd_to("AgentScheduler", "do_replan") # No new command => nothing to do if not cmd: return cmd log.info('-'*6) log.info('-'*6 + " RECEIVED NEW COMMAND TO PROCESS: ") log.info('-'*6 + str(cmd)) if self.is_in_test_mode() and hasattr(self._cmdts,"expected_res"): log.info(f"*** (with expected result : " + str(self._cmdts.expected_res) + ')') log.info('-'*6) cmd.set_read_time() # CASE 1 - AGENT GENERAL command # (DO_RESTART, DO_EXIST, DO_ABORT, DO_ABORT_COMMAND, DO_FLUSH_COMMANDS, ...) # This processing can set DO_MAIN_LOOP and DO_RESTART_LOOP to False if self._is_agent_general_cmd(cmd): try: self._process_agent_general_cmd(cmd) except (CmdUnimplementedException, CmdBadArgsException) : # cmd should not be set as "running" cmd.set_as_pending() # These exceptions are managed at higher level : raise return cmd # CASE 2 - AGENT SPECIFIC command # => Simple action, short execution time, so I execute it directly (in current main thread, not in parallel) if self._is_agent_specific_cmd(cmd): #log.info("(AGENT LEVEL CMD)") if not self.IS_ATTENTIVE(): cmd.set_as_skipped("Skipped because I am not ATTENTIVE") return cmd try: self._process_agent_specific_cmd(cmd) #self._exec_agent_cmd(cmd) #except AttributeError as e: except (CmdUnimplementedException, CmdBadArgsException) as e: # cmd should not be set as "running" #cmd.set_as_pending() # These exceptions are managed at higher level : raise return cmd # CASE 3 - OTHER level command (DEVsICE level, only for AgentDevice) if self.is_device_level_cmd(cmd): self.process_device_level_cmd(cmd) return cmd # CASE 4 - INVALID COMMAND #raise Exception("INVALID COMMAND: " + cmd.name) log.warning("******************************************************") log.warning("*************** ERROR: INVALID COMMAND (UNKNOWN) ***************") log.warning("******************************************************") #log.warning("Thus => I ignore this command...") #cmd.set_result("ERROR: INVALID COMMAND") ##cmd.set_as_skipped("ERROR: INVALID AGENT COMMAND") ##self._cleanup_before_exit() ##raise UnknownCmdException(cmd.name) raise CmdUnknownException(cmd) #print() #log.info("*"*10 + " NEXT COMMAND PROCESSING (END) " + "*"*10 + "\n") def _abort_current_running_cmd_if_exists(self): log.info("Aborting current running command if exists...") pass def _abort_current_routine_process_if_exists(self): log.info("Aborting current routine process if exists...") pass def _cleanup_before_exit(self, stopper_agent_name:str=None): if not stopper_agent_name: stopper_agent_name = self.name #self._set_status(self.STATUS_EXIT) ##self._log_agent_state() log.info("Trying to stop cleanly") log.info("Before exiting, Here are (if exists) the current (still) pending commands (time ordered) :") #commands = AgentCmd.get_commands_sent_to_agent(self.name) commands = AgentCmd.get_pending_and_running_commands_for_agent(self.name) AgentCmd.show_commands(commands, True) self.do_flush_commands() #if self.TEST_MODE and self.TEST_WITH_FINAL_TEST and self.TEST_COMMANDS_DEST == "myself": self.simulator_test_results() if self.TEST_MODE and self.TEST_WITH_FINAL_TEST: self._TEST_test_results() #self._DO_EXIT=True #exit(0) self.do_things_before_exit(stopper_agent_name) ##self._set_and_log_status(self.AGT_STATUS.EXITING) def print_TEST_MODE(self): if self.TEST_MODE: log.debug("[IN TEST MODE]") log.info("Flush previous commands to be sure to start in clean state") AgentCmd.delete_pending_commands_for_agent(self.name) else: log.debug("[IN NORMAL MODE]") self.TEST_MAX_DURATION_SEC=None def _purge_old_commands_sent_to_me(self): AgentCmd.purge_old_commands_sent_to_agent(self.name) def _routine_process_before(self): """ Routine processing BEFORE processing received commands. IMPORTANT : this processing must be as SHORT as possible, so that the agent can quickly read its received commands and process them This is a command or set of processings that this agent does or commands that it sends regularly, at each iteration """ print() print() log.info("*"*10+ " ROUTINE PROCESSING BEFORE (START) "+ "*"*10+ '\n') self._set_and_log_status(self.AGT_STATUS.IN_ROUTINE_BEF) self.routine_process_before_body() print() log.info("*"*10 + " ROUTINE PROCESSING BEFORE (END) "+ "*"*10) def _routine_process_after(self): """ Routine processing AFTER processing received commands. This processing can be longer than the "before" one above, as the agent has already processed its received commands, but not too long anyway, otherwise it will take too long before the next iteration starts... This is a command or set of processings that this agent does or commands that it sends regularly, at each iteration """ print() print() log.info("*"*10+ " ROUTINE PROCESSING AFTER (START) "+ "*"*10+ '\n') self._set_and_log_status(self.AGT_STATUS.IN_ROUTINE_AFT) self.routine_process_after_body() print() log.info("*"*10 + " ROUTINE PROCESSING AFTER (END) "+ "*"*10 + '\n') # To be overridden by subclasses def routine_process_before_body(self): pass # To be overridden by subclasses def routine_process_after_body(self): #if self.TEST_MODE: self._TEST_test_routine_process() pass """ def purge_commands(self): ### Delete commands (which I am recipient of) older than COMMANDS_PEREMPTION_HOURS (like 48h) ATTENTION !!! EXCEPT the RUNNING command !!! NB: datetime.utcnow() is equivalent to datetime.now(timezone.utc) ### self.printd("Looking for old commands to purge...") ### COMMAND_PEREMPTION_DATE_FROM_NOW = datetime.utcnow() - timedelta(hours = self.COMMANDS_PEREMPTION_HOURS) #self.printd("peremption date", COMMAND_PEREMPTION_DATE_FROM_NOW) old_commands = AgentCmd.objects.filter( # only commands for me recipient = self.name, # only pending commands sender_deposit_time__lt = COMMAND_PEREMPTION_DATE_FROM_NOW, ) ### old_commands = AgentCmd.get_old_commands_for_agent(self.name) if old_commands.exists(): self.printd("Found old commands to delete:") for cmd in old_commands: self.printd(cmd) old_commands.delete() """ def waitfor(self, nbsec:float=2.0): ''' # thread if self._current_device_cmd_thread and self.RUN_IN_THREAD: self._current_device_cmd_thread.wait(nbsec) # process (or main thread) else: time.sleep(nbsec) ''' log.info(f"Now, waiting for {nbsec} second(s)...") time.sleep(nbsec) def sleep(self, nbsec): self.waitfor(nbsec) def get_specific_cmds(self)->str: specific_commands = "" for command_tuple in self.AGENT_SPECIFIC_COMMANDS: cmd_name = command_tuple[0] #print(cmd_name) specific_commands += cmd_name # Exception if exists an unimplemented command try: f = getattr(self, cmd_name) except AttributeError: raise CmdUnimplementedException(cmd_name) from None args = signature(f) #specific_commands += str(args) specific_commands += "(" #print(specific_commands) #print(args.parameters) for arg in args.parameters: arg = args.parameters[arg] #print(arg) #print("name, annot:", arg.name, arg.annotation) #print("annot.name:", arg.annotation.__name__) #print(type(arg.annotation)) arg_type = arg.annotation.__name__ if isinstance(arg.annotation,type) else str(arg.annotation) specific_commands += arg.name+":"+arg_type+"," # specific_commands += str(args) if args.parameters: specific_commands = specific_commands[:-1] specific_commands += ")" specific_commands += ";" if specific_commands: specific_commands = specific_commands[0:-1] return specific_commands # # MODE & STATUS # def show_current_mode_and_status(self): #log.info(f"CURRENT MODE is {self.mode} (status is {self.status})") log.info(f"CURRENT MODE is {self.get_mode()} (status is {self.get_status()})") # @deprecated def show_mode_and_status(self): self.show_current_mode_and_status() # # - STATUS MANAGEMENT # def get_status(self): return self.status # @deprecated def get_state(self): return self.get_status() def _set_status(self, status:str): #self.printd(f"[{status}] (switching from status {self.status})") log.debug(f"[{status}]") self.status = status return False def _set_and_log_status(self, status:str): self._set_status(status) self._log_agent_state() # # - MODE MANAGEMENT # def get_mode(self): return self.mode # Test mode def IS_MODE_IDLE(self): return self.get_mode() == AgentSurvey.MODE_CHOICES.IDLE def IS_MODE_ROUTINE(self): return self.get_mode() == AgentSurvey.MODE_CHOICES.ROUTINE def IS_MODE_ATTENTIVE(self): return self.get_mode() == AgentSurvey.MODE_CHOICES.ATTENTIVE # @deprecated def IS_IDLE(self): return self.IS_MODE_IDLE() # @deprecated def IS_ROUTINE(self): return self.IS_MODE_ROUTINE() # @deprecated def IS_ATTENTIVE(self): return self.IS_MODE_ATTENTIVE() def _set_mode(self, mode:str): #self.printd(f"Switching from mode {self.mode} to mode {mode}") log.info(f"[NEW MODE {mode}]") self.mode = mode def _set_mode_idle(self): self._set_mode(AgentSurvey.MODE_CHOICES.IDLE) # @deprecated def set_idle(self): self._set_mode_idle() def _set_mode_routine(self): self._set_mode(AgentSurvey.MODE_CHOICES.ROUTINE) # @deprecated def set_routine(self): self._set_mode_routine() def _set_mode_attentive(self): #self._set_mode(AgentSurvey.MODE_ATTENTIVE) self._set_mode(AgentSurvey.MODE_CHOICES.ATTENTIVE) # @deprecated def set_attentive(self): self._set_mode_attentive() ''' def die(self): self._set_status(AgentSurvey.STATUS_EXIT) ''' """ suspend/resume """ def suspend(self): """ TODO: Mode IDLE (doit rester à l'écoute d'un resume, et doit continuer à alimenter les tables pour informer de son état via tables agents_logs, et lire table agents_command pour reprendre via resume, et update la table agents_survey pour donner son status "idle" """ self._set_idle() return True def resume(self): """ Quit suspend() mode """ self._set_active() return True ''' (EP) moved to AgentDevice def _set_agent_device_aliases_from_config(self, agent_alias): for a in self._my_client_agents_aliases: # TODO: activer ##self._my_client_agents[a] = self.config.get_paramvalue(a,'general','real_agent_device_name') pass ''' # new config (obsconfig) def _set_mode_from_config(self, agent_name): # all agent are active ? #mode = self.MODE_ACTIVE #mode = self.MODE_ATTENTIVE self._set_mode_attentive() return True # old config # def _set_mode_from_config(self, agent_name): # # --- Get the startmode of the AgentX # modestr = self.config.get_paramvalue(agent_name,'general','startmode') # if self.config.get_last_errno() != self.config.NO_ERROR: # raise Exception(f"error {str(self.config.get_last_errno())}: {str(self.config.get_last_errmsg())}") # if (modestr == None): # return True # # --- Set the mode according the startmode value # mode = self.MODE_IDLE # if modestr.upper() == 'RUN': # mode = self.MODE_ACTIVE # self._set_mode(mode) # return True def set_delay(self, delay_nb_sec:int): self._DELAY_NB_SEC = delay_nb_sec """ ======================================================================================= Generic methods that may be specialized (overriden) by subclasses (except if private) ======================================================================================= """ # To be overridden by subclasses def init(self): #log.debug("*** Initializing... ***") log.info("*"*10+ " INITIALIZING... "+ "*"*10+ '\n') self._set_and_log_status(self.AGT_STATUS.INITIALIZING) if self.TEST_MODE: self.set_delay(2) def _reload_config_if_changed(self): self._load_config() def _load_config(self): """ TODO: only si date fichier xml changée => en RAM, un objet Config avec méthodes d'accès, appelle le parser de AK (classe Config.py indépendante) """ ''' # SETUP try: self.config = get_object_or_404(Config, id=1) # By default, set mode to SCHEDULER (False = REMOTE, which should never be the default) self.config.global_mode = True self.config.save() # self.config = Config.objects.get(pk=1) # self.config = Config.objects.get()[0] except Exception as e: # except Config.ObjectDoesNotExist: self.printd("Config read (or write) exception", str(e)) return -1 ''' log.debug("Loading the config file...") # - NEW CONFIG self.get_config().load(self.get_config_env()[0]) # - OLD CONFIG # self.config.load() # if self.config.get_last_errno() != self.config.NO_ERROR: # raise Exception(f"error {str(self.config.get_last_errno())}: {str(self.config.get_last_errmsg())}") # if not self.config.is_config_contents_changed(): # return # === display informations # --- Get the assembly aliases of the unit # assembled_mount_aliases = [] # assembled_channel_aliases = [] # assembled_aliases = [] # unit_alias = self.config.get_aliases('unit')[0] # params = self.config.get_params(unit_alias) # for param in params: # if param['section']=="assembly" and param['key']=="alias": # assembled_aliases.append(param['value']) #self.printd(f"Unit {unit_alias} is the assembly of {assembled_aliases}") # log.debug("--------- Components of the unit -----------") # log.debug("Configuration file is {}".format(self.config.get_configfile())) # alias = self.config.get_aliases('unit')[0] # namevalue = self.config.get_paramvalue(alias,'unit','description') # log.debug(f"Unit alias is {alias}. Description is {namevalue}:") # unit_subtags = self.config.get_unit_subtags() # for unit_subtag in unit_subtags: # aliases = self.config.get_aliases(unit_subtag) # for alias in aliases: # namevalue = self.config.get_paramvalue(alias,unit_subtag,'description') # log.debug(f"- {unit_subtag} alias is {alias}. Description is {namevalue}") # # --- fill the list of mount and channel assembled # if alias in assembled_aliases: # if unit_subtag=="mount": # assembled_mount_aliases.append(alias) # elif unit_subtag=="channel": # assembled_channel_aliases.append(alias) # log.debug("--------- Assembly of the unit -----------") # log.debug(f"Assembled mount aliases: {assembled_mount_aliases}") # log.debug(f"Assembled channel aliases: {assembled_channel_aliases}") # --- Get the home of the mount[0] # mount_alias = assembled_mount_aliases[0] # home = self.config.get_paramvalue(mount_alias,'MountPointing','home') # log.debug("------------------------------------------") # hostname = socket.gethostname() # self._computer_alias = '' # unit_subtag = 'computer' # aliases = self.config.get_aliases(unit_subtag) # for alias in aliases: # log.debug("alias" + alias) # value = self.config.get_paramvalue(alias,'local','hostname') # log.debug("value" + value) # if value == hostname: # log.debug("value" + value) # self._computer_alias = alias # value = self.config.get_paramvalue(alias,unit_subtag,'description') # self._computer_description = value # value = self.config.get_paramvalue(alias,'path','data') # # Overrides default value # self._path_data = value # break # log.debug(f"hostname = {hostname}") # log.debug(f"path_data = {self._path_data}") # log.debug(f"home = {home}") # log.debug("------------------------------------------") # --- update the log parameters ##self.log.path_data = self._path_data ##print("new self.log.path_data is", self.log.path_data) ##self.log.set_global_path_data(self._path_data) ##self.printd("new self.log.global_path_data is", self.log.get_global_path_data()) ##self.log.home = home #def update_survey(self): def _log_agent_state(self): """ Save (update) this agent current mode, status, and #iteration in DB """ "Updating the agent survey database table..." #self.printd("- fetching table line for agent", self.name) # only necessary when using process (not necessary with threads) #with transaction.atomic(): #self._agent_survey = AgentSurvey.objects.get(name=self.name) self._agent_survey.mode = self.get_mode() self._agent_survey.status = self.get_status() self._agent_survey.iteration = self._iter_num self._agent_survey.save() #self._agent_survey.save(update_fields=["mode", "status"]) """ def send_command(self, cmd_name): recipient_agent = self.name if self.TEST_COMMANDS_DEST=="myself" else self.TEST_COMMANDS_DEST AgentCmd.objects.create(sender=self.name, recipient=recipient_agent, name=cmd_name) """ #def send_command(self, to_agent, cmd_type, cmd_name, cmd_args=None): #def send_cmd_to(self, to_agent:str, cmd_name:str, cmd_args:str=None, validity:int=None, timeout:int=None): def send_cmd_to(self, to_agent:str, cmd_name:str, cmd_args:str=None, validity:int=None): """ #ex: send_command(“AgentX”,”GENERIC”,”EVAL”,“3+4”) ex: send_command(“AgentX”,"EVAL”,“3+4”) """ #return AgentCmd.send_cmd_from_to(self.name, self._get_real_agent_name_for_alias(to_agent), cmd_name, cmd_args) #cmd = self.create_cmd_for(to_agent, cmd_name, cmd_args, validity, timeout).send() cmd = self.create_cmd_for(to_agent, cmd_name, cmd_args, validity).send() #cmd.send() return cmd def create_cmd_for(self, to_agent:str, cmd_name:str, cmd_args:str=None, validity:int=None) -> AgentCmd: ''' real_agent_name = self._get_real_agent_name(to_agent) real_cmd_name = cmd_name if '.' in real_agent_name: real_agent_name, component_name = real_agent_name.split('.') real_cmd_name = component_name+'.'+cmd_name return AgentCmd.create(self.name, real_agent_name, real_cmd_name, cmd_args) try: real_agent_name = self._get_real_agent_name(to_agent) except KeyError as e: ''' return AgentCmd.create(self.name, to_agent, cmd_name, cmd_args, validity) ''' real_agent_name = self._get_real_agent_name(to_agent) if not real_agent_name: return AgentCmd.create(self.name, to_agent, cmd_name, cmd_args) # log.e("UNKNOWN AgentDevice ALIAS", to_agent) # #self.log_e("Exception raised", e) # log.e(f"=> Thus, I do not send this command '{cmd_name}'") # return None return AgentCmd.create(self.name, real_agent_name, cmd_name, cmd_args) ''' ''' return AgentCmd( sender=self.name, recipient=self._get_real_agent_name_for_alias(recipient_agent_alias_name), name=cmd_name ) ''' def _get_next_valid_and_not_running_command(self) -> Union[AgentCmd,None]: """ Return next VALID (not expired) command (read from the DB command table) which is relevant to this agent. Commands are read in chronological order """ self._set_status(self.AGT_STATUS.IN_GET_NEXT_CMD) log.info("Looking for a new command to process (sent by another agent):") # 1) Get all pending commands for me (return if None) # Not sure this is necessary to do it in a transaction, # but there might be a risk # that a command status is modified while we are reading... with transaction.atomic(): self._pending_commands = AgentCmd.get_pending_and_running_commands_for_agent(self.name) commands = self._pending_commands if not commands.exists(): log.info("") return None "Current pending (or running) commands (time ordered):" AgentCmd.show_commands(commands) # 2) Check if a PRIORITY command is in the list (even at the very end), # and if so return this command # (must be still valid and not yet running) for cmd in commands: #if cmd.name in ("do_exit", "do_abort", "do_flush_commands"): break #if cmd.name in ("do_exit", "do_abort"): break if cmd.is_agent_general_priority_cmd(): if cmd.is_running(): return None if cmd.is_expired(): cmd.set_as_expired() return None return cmd ''' #if cmd.name in ("do_exit", "do_abort", "do_flush_commands"): if cmd.name in ("do_exit", "do_abort"): if cmd.is_running(): return None if cmd.is_expired(): cmd.set_as_outofdate() return None return cmd ''' # 3) If first (oldest) command is currently running # (status CMD_RUNNING), then do nothing and return """ cmd_executing = Command.objects.filter( # only commands for me recipient = self.name, # only pending commands recipient_status_code = Command.CMD_STATUS_CODES.CMD_RUNNING, ).first() #if cmd_executing.exists(): if cmd_executing: """ #cmd = commands[0] cmd = commands.first() if cmd.is_expired(): cmd.set_as_expired() return None if cmd.is_running(): #self.printd(f"There is currently a running command ({cmd_executing.first().name}), so I do nothing (wait for end of execution)") log.info(f"There is currently a running command ({cmd.name})") "