diff --git a/privatedev/plugin/agent/Agent.py b/privatedev/plugin/agent/Agent.py new file mode 100755 index 0000000..930e84c --- /dev/null +++ b/privatedev/plugin/agent/Agent.py @@ -0,0 +1,2646 @@ +#!/usr/bin/env python3 + +#DEBUG=True +#DEBUG=False + +""" +================================================================= +Agent class +================================================================= + +This class is the base class for all AgentXXX classes (AgentSST, AgentScheduler, AgentImagesProcessor...) + +It runs an infinite loop (called main_loop) doing at each iteration +some routine tasks (process_routine before and after) +and executing commands sent by other agents. + +To start it up, from the project root : + +- In normal mode : +- ./PYROS start -fg agent -o tnc +(add -d after the PYROS command for debug mode) + +- In test mode (the agent will execute a scenario +as a suite of commands (in TEST_COMMANDS_LIST) +to send to himself (or other agents) +and execute them on reception at each iteration : +- ./PYROS -t start -fg agent -o tnc + +""" + + +### +# ================================================================= +# SETUP FOR DJANGO +# +# (see https://docs.djangoproject.com/en/dev/topics/settings) +# (see also https://docs.djangoproject.com/en/dev/ref/settings) +# ================================================================= +### + + +# For cmd parsing +from typing import List, Tuple +import ast + +import os +from pathlib import Path +import sys + +import logging + +from django.conf import settings as djangosettings + +# Conseil sur le net: +#https://stackoverflow.com/questions/16853649/how-to-execute-a-python-script-from-the-django-shell +#"" +#import sys, os +#sys.path.append('/path/to/your/django/app') +#os.environ['DJANGO_SETTINGS_MODULE'] = 'settings' +#from django.conf import settings +#"" + +# To avoid a "ModuleNotFoundError: No module named 'dashboard'"... (not even 1 app found) : +##sys.path.insert(0, os.path.abspath("..")) +##sys.path.insert(0, os.path.abspath("src")) +##sys.path.insert(0, "../src") +##sys.path.insert(0, "src") +# To avoid a "ModuleNotFoundError: No module named 'dashboard'" +## sys.path.append("..") +py_pwd = os.path.normpath(os.getcwd() + "/..") +if (py_pwd not in os.sys.path): + (os.sys.path).append(py_pwd) +# To avoid a "ModuleNotFoundError: No module named 'src'" +## sys.path.append("../../../..") +py_pwd = os.path.normpath(os.getcwd() + "/../../../..") +if (py_pwd not in os.sys.path): + (os.sys.path).append(py_pwd) +##sys.path.append("src") + +from src.pyros_logger import log, handler_filebyagent +#from src.pyros_logger import logger as logg, handler_filebyagent, +##from src import pyros_logger + +''' +def printd(*args, **kwargs): + if os.environ.get('PYROS_DEBUG', '0')=='1': print(*args, **kwargs) +''' + +#printd("Starting with this sys.path", sys.path) +log.debug("Starting with this sys.path" + str(sys.path)) + +# DJANGO setup +# self.printd("file is", __file__) +# mypath = os.getcwd() +# Go into src/ +##os.chdir("..") +##os.chdir("src") +#printd("Current directory : " + str(os.getcwd())) +log.debug("Current directory : " + str(os.getcwd())) + +#os.environ.setdefault("DJANGO_SETTINGS_MODULE", "src.core.pyros_django.pyros.settings") +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "src.core.pyros_django.pyros.settings") +# os.environ['SECRET_KEY'] = 'abc' +# os.environ['ENVIRONMENT'] = 'production' +import django + +django.setup() + +#printd("DB2 used is:", djangosettings.DATABASES["default"]["NAME"]) +log.debug("DB2 used is:" + djangosettings.DATABASES["default"]["NAME"]) + + +### +# ================================================================= +# IMPORT PYTHON PACKAGES +#================================================================= +### + +# --- GENERAL PURPOSE IMPORT --- +#from __future__ import absolute_import +##import utils.Logger as L +import platform +import random +import threading +#import multiprocessing +import time +''' +from threading import Thread +import socket +''' +#import ctypes +#import copy + +# --- DJANGO IMPORT --- +from django.db import transaction +from django import db +# from django.core.exceptions import ObjectDoesNotExist +# from django.db.models import Q +#from django.shortcuts import get_object_or_404 +#from django.conf import settings as djangosettings + +# --- SPECIFIC IMPORT --- +# Many ways to import configuration settings: +##import config +import config.old_config as config_old +#from config import PYROS_ENV, ROOT_DIR, DOC_DIR +#from config import * + +from common.models import AgentSurvey, AgentCmd, AgentLogs + +#from config.configpyros import ConfigPyros +from config.old_config.configpyros import ConfigPyros as ConfigPyrosOld +from src.core.pyros_django.obsconfig.obsconfig_class import OBSConfig + +#from dashboard.views import get_sunelev +#from devices.TelescopeRemoteControlDefault import TelescopeRemoteControlDefault +#from utils.JDManipulator import * + +##from agent.logpyros import LogPyros +##from src.logpyros import LogPyros + +''' +from device_controller.abstract_component.device_controller import ( + DCCNotFoundException, UnknownGenericCmdException, + UnimplementedGenericCmdException, UnknownNativeCmdException +) +''' + + + +### +# ================================================================= +# GENERAL MODULE CONSTANT & FUNCTIONS DEFINITIONS +# ================================================================= +### + +#DEBUG_FILE = False + +##log = L.setupLogger("AgentLogger", "Agent") + +IS_WINDOWS = platform.system() == "Windows" + + +class Colors: + HEADER = "\033[95m" + BLUE = "\033[94m" + GREEN = "\033[92m" + WARNING = "\033[93m" + FAIL = "\033[91m" + ENDC = "\033[0m" + BOLD = "\033[1m" + UNDERLINE = "\033[4m" + +def printColor(color: Colors, message, file=sys.stdout, eol=os.linesep, forced=False): + #system = platform.system() + """ + if (self.disp == False and forced == False): + return 0 + """ + #if system == "Windows": + if IS_WINDOWS: + print(message, file=file, end=eol) + else: + print(color + message + Colors.ENDC, file=file, end=eol) + return 0 + +def printFullTerm(color: Colors, string: str): + #system = platform.system() + columns = 100 + row = 1000 + disp = True + value = int(columns / 2 - len(string) / 2) + printColor(color, "-" * value, eol="") + printColor(color, string, eol="") + value += len(string) + printColor(color, "-" * (columns - value)) + return 0 + + +""" +================================================================= + class StoppableThread +================================================================= +""" +''' +class StoppableThreadEvenWhenSleeping(threading.Thread): + # Thread class with a stop() method. The thread itself has to check + # regularly for the stopped() condition. + # It stops even if sleeping + # See https://python.developpez.com/faq/?page=Thread#ThreadKill + # See also https://www.oreilly.com/library/view/python-cookbook/0596001673/ch06s03.html + + def __init__(self, *args, **kwargs): + #super(StoppableThreadSimple, self).__init__(*args, **kwargs) + super().__init__(*args, **kwargs) + self._stop_event = threading.Event() + + #def stop(self): + def terminate(self): + self._stop_event.set() + + def stopped(self): + return self._stop_event.is_set() + + def wait(self, nbsec:float=2.0): + self._stop_event.wait(nbsec) +''' + + +### +# ================================================================= +# EXCEPTIONS classes +# ================================================================= +### + +class AgentCmdException(Exception): + ''' Base class for all Agent command exceptions ''' + # pass + def __init__(self, cmd:AgentCmd): + self.cmd = cmd + ''' + def __str__(self): + return f"The Agent command '{self.cmd.name}' is unknown to the agent" + #return f"({type(self).__name__}): Device Generic command has no implementation in the controller" + ''' + +class UnknownCmdException(AgentCmdException): + ''' Raised when an Agent (specific) cmd is NOT known by the agent ''' + def __str__(self): + return f"The Agent command '{self.cmd.name}' is unknown to the agent" + #return f"({type(self).__name__}): Device Generic command has no implementation in the controller" + +class AgentCmdUnimplementedException(AgentCmdException): + ''' Raised when an Agent Specific cmd is known by the agent but not implemented ''' + def __str__(self): + return f"The Agent command '{self.cmd.name}' is known by the agent but not implemented" + #return f"({type(self).__name__}): Device Generic command has no implementation in the controller" + +class AgentCmdBadArgsException(AgentCmdException): + ''' Raised when an Agent cmd has bad, missing, or too many argument(s) ''' + def __str__(self): + return f"The Agent command '{self.cmd.name}' has bad, missing, or too many argument(s)" + #return f"({type(self).__name__}): Device Generic command has no implementation in the controller" + + + +### +# ================================================================= +# class Agent +# ================================================================= +### + +class Agent: + """ + See Agent_activity_diag.pu for PlantUML activity diagram + + Behavior of an agent: + - If idle : + - still does routine_process() and general_process() + - does not do specific_process() + - Once a command has been sent to another agent : + - It waits (non blocking) for the end of execution of the command and get its result + - If command is timed out or has been skipped or killed, then it is NOT re-executed at next iteration (except if needed explicitely) + """ + + # --- + # --- CLASS (STATIC) attributes (CONSTANTS) + # --- If agent is instance of Agent: + # --- - CLASS attributes are accessible via agent.__class__.__dict__ + # --- - INSTANCE attributes are accessible via agent.__dict__ + # --- + + # Default modes + DEBUG_MODE = False + WITH_SIMULATOR = False + #TEST_MODE = False + + # By default, a command is valid during 5s (and then perempted) + DEFAULT_CMD_VALIDITY_DURATION = 5 + + # Wait a fixed number of seconds before each loop ? + #WITH_RANDOM_WAIT = False + # 1 sec by default + _DELAY_NB_SEC = 1 + # - YES if TEST mode (in init()) + + # Default LOG level is INFO + #PYROS_DEFAULT_GLOBAL_LOG_LEVEL = LogPyros.LOG_LEVEL_INFO # INFO + + # This Agent Specific commands list, that he is able to execute + # To be overriden by subclasses (here are just a few examples of commands) + # Format : ("cmd",timeout) with : + # - cmd : the command name + # - timeout : the command timeout (in sec) + AGENT_SPECIFIC_COMMANDS = [ + #"do_specific1", + #"set_specific2", + #"do_specific3", + ("do_specific1", 10), + ("set_specific2", 5), + ("do_specific3", 3), + ] + + # + # --- FOR TEST ONLY --- + # + + # By default, NOT in test mode + #TEST_MODE = False + + # Maximum duration of this agent (only for SIMULATION mode) + # If set to None, it will never exit except if asked (or CTRL-C) + # If set to 20, it will exit after 20s + TEST_MAX_DURATION_SEC = None + #TEST_MAX_DURATION_SEC = 30 + # Run this agent in simulator mode + #TEST_MODE = True + WITH_SIMULATOR = False + # Run the assertion tests at the end + TEST_WITH_FINAL_TEST = False + + CMD_STATUS = AgentCmd.CMD_STATUS_CODES + + # (EP 2022/07) Default SCENARIO to be executed (only in TEST mode), and then STOP + # + # It is a list of commands to be sent by this agent to other agents ("self" means himself) + # + # Format : List of tuples (command, validity, expected_res, expected_status), with : + # + # - command : the format is "recipient cmd args", with : + # - recipient : name of the Agent that the command is to be sent to (use "self" to mean himself) + # - cmd : the command name + # - args : (optional) the list of command arguments, separated by blanks : arg1 arg2 arg3 ... + # + # - validity : the command is valid for this duration, afterwards you can forget it + # + # - expected_res : the expected result + # + # - expected_status : the status of the command expected after execution (expired, killed, skipped, executed...) + # + # Ex : + # - "AgentScheduler set_state ATTENTIVE" => means to send the command "set_state ATTENTIVE" to the agent AgentScheduler + # - "AgentX set_state ATTENTIVE" => means to send the command "set_state ATTENTIVE" to the agent AgentX + # - "self set_state ATTENTIVE" => means to send the command "set_state ATTENTIVE" to MYSELF + # - "self do_restart_loop" => means to send the command "do_restart_loop" to MYSELF (no args) + # + TEST_COMMANDS_LIST = [ + + # 1) First, 3 EXCEPTION CASES (uncomment to activate exception) + # Each of these lines will stop execution with an exception + # ------------------------------------------------------------ + + # - Agent command, unknown => ko, UnknownCmdException + #("self do_unknown", 10, None,None), + + # - Agent general command malformed (missing arg) => ko, AgentCmdBadArgsException + #("Agent set_mode", 10, None,None), + + # - Agent specific command, known but not implemented => ko, AgentCmdUnimplementedException + #("self set_specific2", 10, None,None), + + # - Agent specific command, implemented but missing args => ko, AgentCmdBadArgsException + #(" self do_specific1 1 ", 10, None,None), + + + # 2) NORMAL CASES (test scenario) + # All these commands should be executed without error, from the 1st to the last one + # ------------------------------- + + # This command has a validity of 0s and thus should be tagged as "expired" + ("self set_mode ATTENTIVE", 0, "MODE = ATTENTIVE", CMD_STATUS.CMD_OUTOFDATE), + + # Agent general command + ("self set_mode ATTENTIVE", 200, "MODE = ATTENTIVE", CMD_STATUS.CMD_EXECUTED), + # => should get "ATTENTIVE" + ("self get_mode", 100, "MODE = ATTENTIVE", None), + + # => should get "7" + ("self do_eval 3+5-1", 200, 7, None), + + # END, will not go further + #("self do_exit", 2, "STOPPING"), + + # Agent specific commands => should be executed + ("self do_specific3", 200, None, None), + ("self do_exit", 500, "STOPPING", None), + ("self do_specific1 1 2 3.5 titi (3,'titi',5) [1,3,5,7,9]", 200, 7, None), + + ("self set_mode ROUTINE", 200, "MODE = ROUTINE", None), + # => should get "ROUTINE" + ("self get_mode", 200, "MODE = ROUTINE", None), + # Agent specific command => should be skipped (because not ATTENTIVE) + ("self do_specific1 1 2 3.5 titi (3,'titi',5) [1,3,5,7,9]", 200, "SKIPPED", None), + + # From now on, should not run anymore process_before/after + # => and should skip next specific commands + ("self set_mode IDLE", 200, "MODE = IDLE", None), + # => should get "IDLE" + ("self get_mode", 200, "MODE = IDLE", None), + # Agent specific command => should be skipped (because not ATTENTIVE) + ("self do_specific1 1 2 3.5 titi (3,'titi',5) [1,3,5,7,9]", 200, 'SKIPPED', None), + + # TODO: test priority commands : do_abort, do_flush_cmds, ... + # - Stop executing new commands (just let them accumulate) + ##("self do_stop_exec",), + ##("self set_mode ATTENTIVE",), + ##("self get_mode",), + ##("self do_specific1 1 2 3.5 titi (3,'titi',5) [1,3,5,7,9]",), + ##("self set_mode ROUTINE",), + ##("self get_mode",), + ##("self do_abort",), + # - Now, resume execution of commands : + # we should have the previous list pending, + # but as "do_abort" is priority, it should be executed first even if last ! + ##("self do_resume_exec",), + + # Restart the restart loop (from init()) + ("self do_restart_loop", 200, "RESTARTING", None), + + # Now stop + ("self do_exit", 200, "STOPPING", None), + + ''' + # specific0 not_executed_because_idle + "specific0", + + "set_state:active", + + # specific1 executed_because_not_idle, should complete ok + "specific1", + + # specific2 will be executed only when specific1 is finished, + # and should be aborted before end of execution, + # because of the 1st coming "do_abort" command below + "specific2", + + # specific3 should be executed only when specific2 is finished (in fact, aborted), + # and should be aborted before end of execution, + # because of the 2nd coming "do_abort" command below + "specific3", + + # These commands should not have the time to be processed + # because the "do_exit" command below should be executed before + "specific4", + "specific5", + "specific6", + "specific7", + "specific8", + + # Should abort the current running command (which should normally be specific2) + # even if commands above (specific3, ..., specific8) are already pending + "do_abort", + + # These commands (except abort) won't be executed + # because too many commands are already pending (above) + "specific9", + "do_abort", + "set_state:active", + "set_state:idle", + + # Should stop the agent even before the previous pending commands are executed + "do_exit", + + # Because of the previous "do_exit" command, + # these following commands should not be executed, + # and not even be added to the database command table + "set_state:active", + "specific10" + ''' + ] + #TEST_COMMANDS = iter(TEST_COMMANDS_LIST) + + + + ''' + # with thread + RUN_IN_THREAD = True + # with process + #RUN_IN_THREAD = False + _thread_total_steps_number = 1 + ''' + + + #COMMANDS_PEREMPTION_HOURS = 48 + #COMMANDS_PEREMPTION_HOURS = 60/60 + + name = "Generic Agent" + status = None + mode = None + config = None + + # STATUS + STATUS_LAUNCH = "LAUNCHED" + STATUS_INIT = "INITIALIZING" + STATUS_MAIN_LOOP = "IN_MAIN_LOOP" + STATUS_GET_NEXT_COMMAND = "IN_GET_NEXT_COMMAND" + STATUS_GENERAL_PROCESS = "IN_GENERAL_PROCESS" + STATUS_ROUTINE_PROCESS = "IN_ROUTINE_PROCESS" + ###STATUS_SPECIFIC_PROCESS = "IN_SPECIFIC_PROCESS" + STATUS_EXIT = "EXITING" + + + # MODE + # + # In all modes, the Agent listens to commands sent to him and executes Agent level GENERAL ones. + # - MODE_IDLE : "idle" mode, does nothing, only executes Agent level GENERAL commands (DO_RESTART, DO_EXIT, DO_ABORT, DO_FLUSH, SET_ACTIVE, ...) + # - MODE_ROUTINE : idem IDLE + executes routine process (before & after) + # - MODE_ATTENTIVE : idem ROUTINE + executes Agent level SPECIFIC commands (commands specific to this agent, that only this agent understands and can execute) + # + # Default mode is MODE_ATTENTIVE (most active mode) + MODE_IDLE = "IDLE" + MODE_ROUTINE = "ROUTINE" + MODE_ATTENTIVE = "ATTENTIVE" + + ''' Moved to more central file : config.config_base + PYROS_DJANGO_BASE_DIR = Path("src/core/pyros_django") # pathlib + DEFAULT_CONFIG_FILE_NAME = "config_unit_simulunit1.xml" + CONFIG_DIR_NAME = "config" + ''' + + # My own ConfigPyros instance (Observatory Configuration) + _oc = None + + # Parameters from config file + # for /src/ + #_path_data = '../../config' + # for /src/core/pyros_django/ + #_path_data = '../../../../config' + # Path to config + _path_data = '' + _computer_alias = '' + _computer_description = '' + + # Current and next command to send + _cmdts: AgentCmd = None + _next_cmdts = None + + _agent_survey = None + _pending_commands = [] + + ''' + _current_device_cmd = None + _current_device_cmd_thread = None + ''' + + # List of agents I will send commands to + _my_client_agents_aliases = [] + _my_client_agents = {} + + _iter_num = None + + # Log object + _log = None + + # new obsconfig init for agent: + ##def __init__(self, RUN_IN_THREAD=True): + def __init__(self): + # Agent is by default in mode ATTENTIVE (most active mode) + self.mode = self.MODE_ATTENTIVE + log.addHandler(handler_filebyagent(logging.INFO, self.__class__.__name__)) + log.debug("start Agent init") + obs_config_file_path = os.environ["PATH_TO_OBSCONF_FILE"] + path_to_obs_config_folder = os.environ["PATH_TO_OBSCONF_FOLDER"] + unit = os.environ["unit_name"] + oc = OBSConfig(obs_config_file_path,unit) + self.set_config(oc, obs_config_file_path, path_to_obs_config_folder, unit) + + self.name = self.__class__.__name__ + # set real unit name (the current unit used) + if unit == "": + unit = oc.unit_name + agent_name_from_config = self.get_config().get_agent_name_from_config(self.__class__.__name__) + if agent_name_from_config: + self.name = agent_name_from_config + self.unit = unit + print(f"Agent name : {self.name}") + print(f"Unit name : {self.unit}") + + # (EP) moved to AgentDevice + ###self._set_agent_device_aliases_from_config(self.name) + + self._set_mode_from_config(self.name) + log.debug("Step __init__") + + self.TEST_COMMANDS = iter(self.TEST_COMMANDS_LIST) + ##self.RUN_IN_THREAD = RUN_IN_THREAD + self._set_status(self.STATUS_LAUNCH) + ####self._set_idle() + + # Create 1st survey if none + #tmp = AgentSurvey.objects.filter(name=self.name) + #if len(tmp) == 0: + #nb_agents = AgentSurvey.objects.filter(name=self.name).count() + #if nb_agents == 0: + if AgentSurvey.objects.filter(name=self.name).exists(): + self._agent_survey = AgentSurvey.objects.get(name=self.name) + else: + self._agent_survey = AgentSurvey.objects.create(name=self.name, validity_duration=60, mode=self.mode, status=self.status, iteration=-1) + log.debug("Agent survey is" + str(self._agent_survey)) + #self.printd("Agent survey is", self._agent_survey) + + ("end Agent __init__") + + + #def __init__(self, name:str="Agent", config_filename:str=None, RUN_IN_THREAD=True): + #def __init__(self, config_filename:str=None, RUN_IN_THREAD=True, DEBUG_MODE=False): + # def __init__(self, config_filename:str=None, RUN_IN_THREAD=True): + + # ''' + # print('PYROS_ENV', PYROS_ENV) + # print('ROOT_DIR', ROOT_DIR) + # print('DOC_DIR', DOC_DIR) + # ''' + + # # Set logger + # ##pyros_logger.set_logger_config() + # ##logg = logging.getLogger('pyroslogger') + # log.addHandler(handler_filebyagent(logging.INFO, self.__class__.__name__)) + + # log.debug("start Agent init") + + # # SET CONFIG INSTANCE + + # # - OLD CONFIG + # log.debug(f'config file is {config_filename}') + # ##log.info('PYROS_ENV', config.PYROS_ENV) + # log.debug('PYROS_ENV' + config_old.PYROS_ENV) + # log.debug('ROOT_DIR' + config_old.ROOT_DIR) + # log.debug('DOC_DIR' + config_old.DOC_DIR) + # ##if config.is_dev_env(): print("DEV ENV") + # if config_old.is_dev_env(): log.debug("DEV ENV") + # if config_old.is_prod_env(): log.debug("PROD ENV") + # if config_old.is_debug(): log.debug("IN DEBUG MODE") + + # # - NEW CONFIG + # obs_config_file_path = os.environ["PATH_TO_OBSCONF_FILE"] + # path_to_obs_config_folder = os.environ["PATH_TO_OBSCONF_FOLDER"] + # unit = os.environ["unit_name"] + # oc = OBSConfig(obs_config_file_path) + # self.set_config(oc, obs_config_file_path, path_to_obs_config_folder, unit) + # log.debug("Step __init__") + + # self.TEST_COMMANDS = iter(self.TEST_COMMANDS_LIST) + # self.RUN_IN_THREAD = RUN_IN_THREAD + # self._set_status(self.STATUS_LAUNCH) + # self._set_idle() + + # self._set_agent_device_aliases_from_config(self.name) + # self._set_mode_from_config(self.name) + # #self.name = name + # self.name = self.__class__.__name__ + + + # ''' + # printd("*** ENVIRONMENT VARIABLE PYROS_DEBUG is:", os.environ.get('PYROS_DEBUG'), '***') + # ##self.DEBUG_MODE = DEBUG_MODE + # self.DEBUG_MODE = os.environ.get('PYROS_DEBUG', '0')=='1' + # ''' + # #self.DEBUG_MODE = config.PYROS_ENV + # ##self.log = LogPyros(self.name, AgentLogs) + # ##self.DEBUG_MODE = config.is_debug() + # self.DEBUG_MODE = config_old.is_debug() + # ##self.log.debug_level = DEBUG_MODE + # ''' + # # Default LOG level is INFO + # log_level = LogPyros.LOG_LEVEL_INFO # INFO + # self.log.set_global_log_level(LogPyros.LOG_LEVEL_DEBUG) if self.DEBUG_MODE else self.log.set_global_log_level(log_level) + # ''' + # #global_log_level = LogPyros.LOG_LEVEL_DEBUG if self.DEBUG_MODE else self.PYROS_DEFAULT_GLOBAL_LOG_LEVEL + # ##global_log_level = LogPyros.LOG_LEVEL_DEBUG if self.DEBUG_MODE else config.PYROS_DEFAULT_GLOBAL_LOG_LEVEL + # ##global_log_level = LogPyros.LOG_LEVEL_DEBUG if self.DEBUG_MODE else config_old.PYROS_DEFAULT_GLOBAL_LOG_LEVEL + # ##self.log.set_global_log_level(global_log_level) + # ##self.printd("LOG LEVEL IS:", self.log.debug_level) + # ##self.print("LOG LEVEL IS:", self.log.get_global_log_level()) + + + # # ------------- OLD CONFIG ------------------- + # # Est-ce bien utile ??? + # # New way with PathLib + # # my_parent_abs_dir = Path(__file__).resolve().parent + # #TODO: on doit pouvoir faire mieux avec pathlib (sans utiliser str()) + # ##self._path_data = str( Path( str(my_parent_abs_dir).split(str(self.PYROS_DJANGO_BASE_DIR))[0] ) / self.CONFIG_DIR_NAME ) + # ##self._path_data = config.CONFIG_DIR + # # self._path_data = config_old.CONFIG_DIR + + # #self._set_mode(self.MODE_IDLE) + # # config_filename = self.get_config_filename(config_filename) + # #self.printd(f"*** Config file used is={config_filename}") + # # log.debug(f"*** Config file used is={config_filename}") + # # self.config = ConfigPyrosOld(config_filename) + # # if self.config.get_last_errno() != self.config.NO_ERROR: + # # raise Exception(f"Bad config file name '{config_filename}', error {str(self.config.get_last_errno())}: {str(self.config.get_last_errmsg())}") + + # #TODO: : à mettre dans la config + # ''' + # 'AgentDeviceTelescope1': 'AgentDeviceTelescopeGemini', + # 'AgentDeviceFilterSelector1': 'AgentDeviceSBIG', + # 'AgentDeviceShutter1': 'AgentDeviceSBIG', + # 'AgentDeviceSensor1': 'AgentDeviceSBIG', + # ''' + # #self._my_client_agents = {} + + # ### self._agent_logs = AgentLogs.objects.create(name=self.name, message="Step __init__") + # #self.printdb("Step __init__") + # log.debug("Step __init__") + + # self.TEST_COMMANDS = iter(self.TEST_COMMANDS_LIST) + # self.RUN_IN_THREAD = RUN_IN_THREAD + # self._set_status(self.STATUS_LAUNCH) + # self._set_idle() + + # self._set_agent_device_aliases_from_config(self.name) + # self._set_mode_from_config(self.name) + # # TODO: remove + # #self._set_idle() + # self._set_active() + + # # Create 1st survey if none + # #tmp = AgentSurvey.objects.filter(name=self.name) + # #if len(tmp) == 0: + # #nb_agents = AgentSurvey.objects.filter(name=self.name).count() + # #if nb_agents == 0: + # if AgentSurvey.objects.filter(name=self.name).exists(): + # self._agent_survey = AgentSurvey.objects.get(name=self.name) + # else: + # self._agent_survey = AgentSurvey.objects.create(name=self.name, validity_duration=60, mode=self.mode, status=self.status, iteration=-1) + # log.debug("Agent survey is" + str(self._agent_survey)) + # #self.printd("Agent survey is", self._agent_survey) + + # ("end Agent init") + + + + def set_config(self, oc: OBSConfig, obs_config_file_path: str, path_to_obs_config_folder: str, unit: str): + self._oc = { + 'config' : oc, + 'env' : [ + obs_config_file_path, + path_to_obs_config_folder, + unit + ] + } + ''' + config_env = { + obs_config_file_path: obs_config_file_path, + path_to_obs_config_folder: path_to_obs_config_folder, + unit: unit + } + ''' + + def get_config(self): return self._oc['config'] + + def get_config_env(self): return self._oc['env'] + + def show_config(self): + VERBOSE = False + + oc = self.get_config() + obs_config_file_path, path_to_obs_config_folder, unit = self.get_config_env() + #self.printd(obs_config_file_path) + log.debug(obs_config_file_path) + log.debug(path_to_obs_config_folder) + log.debug(unit) + #log.warning("petit warning bidon") + print("\n") + print("- Observatory:" + oc.get_obs_name()) + my_unit_name = oc.get_units_name()[0] + my_unit = (oc.get_units()[my_unit_name]) + + #print("- Unit description:", my_unit) + + if VERBOSE: + print("\n") + print("- Computers:", oc.get_computers()) + + print("\n") + print("- Active Computers:", oc.get_active_computers()) + + print("\n") + print("- Active Devices:", oc.get_active_devices()) + + print("\n") + print("- Unit:", my_unit_name) + VERBOSE and print(oc.get_unit_by_name(my_unit_name)) + + if VERBOSE: + print("\n") + print("- Unit topology:", oc.get_topology(my_unit_name)) + + print("\n") + print("- Unit active Agents:", oc.get_active_agents(my_unit_name)) + VERBOSE and print(oc.get_agents(my_unit_name)) + + print("\n") + print("- Unit Agents per computer:", oc.get_agents_per_computer(my_unit_name)) + + print("\n") + print("- Unit Agents per device:", oc.get_agents_per_device(my_unit_name)) + + if VERBOSE: + print("\n") + print("- Unit Channel groups:", oc.get_channel_groups(my_unit_name)) + + print("\n") + print("- Unit Channels:", oc.get_channels(my_unit_name)) + + #print("\n") + #print("- Unit/Channel info:", oc.get_channel_information(my_unit_name, 'OpticalChannel_up')) + + if VERBOSE: + print("\n") + print("- Unit Components agents:", oc.get_components_agents(my_unit_name)) + + if VERBOSE: + print("\n") + print("- Unit database:", oc.get_database_for_unit(my_unit_name)) + + print("\n") + print("- Devices names:", oc.get_devices_names()) + if VERBOSE: + print("\n") + print("- Devices names & files:", oc.get_devices_names_and_file()) + print("\n") + print("- Devices:", oc.get_devices()) + + print("\n") + + + + + + + def get_config_filename(self, config_filename: str): + if not config_filename: + #config_filename = self.DEFAULT_CONFIG_FILE_NAME + ##config_filename = config.DEFAULT_CONFIG_FILE_NAME + config_filename = config_old.DEFAULT_CONFIG_FILE_NAME + # If config file name is RELATIVE (i.e. without path, just the file name) + # => give it an absolute path (and remove "src/agent/" from it) + if config_filename == os.path.basename(config_filename): + ##config_filename = os.path.join(config.CONFIG_DIR, config_filename) + config_filename = os.path.join(config_old.CONFIG_DIR, config_filename) + ''' + # Build abs path including current agent dir + config_filename = os.path.abspath(self.CONFIG_DIR_NAME + os.sep + config_filename) + # Remove "src/agent_name/" from abs dir : + # (1) Remove "src/core/pyros_django/" + config_filename = config_filename.replace(str(self.PYROS_DJANGO_BASE_DIR)+os.sep, os.sep) + # (2) Remove "agent_name/" + #TODO: bidouille, faire plus propre + config_filename = config_filename.replace(os.sep+"agent"+os.sep, os.sep) + config_filename = config_filename.replace(os.sep+"monitoring"+os.sep, os.sep) + ''' + return os.path.normpath(config_filename) + + def __repr__(self): + return "I am agent " + self.name + + def __str__(self): + return self.__repr__() + #return "I am agent " + self.name + + # Normal print + ##def print(self, *args, **kwargs): self.log.print(*args, **kwargs) + """ + if args: + self.printd(f"({self.name}): ", *args, **kwargs) + else: + self.printd() + """ + """ + # DEBUG print shortcut + ##def printd(self, *args, **kwargs): self.log.printd(*args, **kwargs) + #if DEBUG: self.printd(d(*args, **kwargs) + def log_d(self, *args, **kwargs): self.log.log_d(*args, **kwargs) + def log_i(self, *args, **kwargs): self.log.log_i(*args, **kwargs) + def log_w(self, *args, **kwargs): self.log.log_w(*args, **kwargs) + def log_e(self, *args, **kwargs): self.log.log_e(*args, **kwargs) + def log_c(self, *args, **kwargs): self.log.log_c(*args, **kwargs) + def printdb(self, *args, **kwargs): self.log.db( *args, **kwargs) + """ + + + + def _get_real_agent_name(self, agent_alias_name:str)->str: + #self.printd("key is", agent_alias_name) + ''' + if not self._my_client_agents: return agent_alias_name + return self._my_client_agents[agent_alias_name] + ''' + return self._my_client_agents.get(agent_alias_name) + + + + def run(self, nb_iter:int=None, FOR_REAL:bool=True): + """ + FOR_REAL: set to False if you don't want Agent to send commands to devices but just print messages without really doing anything + """ + + ("in run()") + #return + + # TEST MODE ONLY + # IF in test mode but with REAL devices (no SIMULATOR), delete all dangerous commands from the test commands list scenario: + self._TEST_prepare() + + #self._DO_EXIT = False + + # No agent specific cmd currently running (thread) + self.AGENT_SPECIFIC_CMD_RUNNING = False + + # No Routine process (before or after) currently running (thread) + self.ROUTINE_PROCESS_RUNNING = False + + ################ + # RESTART loop # + ###############@ + # REPEAT UNTIL not DO_RESTART_LOOP + self.DO_RESTART_LOOP = True + while self.DO_RESTART_LOOP: + # By default, no restart after exit from main loop + self.DO_RESTART_LOOP = False + + self.start_time = time.time() + #log.debug("on est ici: " + os.getcwd()) + + self._load_config() + + self.print_TEST_MODE() + + self.init() + ''' testing log: + self.log_e("ERROR") + self.log_c("FATAL critical ERROR") + ''' + #self.log_w("WARNING", "watch your step !") + #log.warning("WARNING"+ "watch your step !") + + # Avoid blocking on false "running" commands + # (old commands that stayed with "running" status when agent was killed) + AgentCmd.delete_commands_with_running_status_for_agent(self.name) + + self._iter_num = 1 + + ############# + # MAIN loop # + ############@ + self.DO_MAIN_LOOP = True + while self.DO_MAIN_LOOP: + try: + self._main_loop(nb_iter,FOR_REAL) + #if not self.DO_MAIN_LOOP: break + except KeyboardInterrupt: # CTRL-C + # In case of CTRL-C, kill the current thread (process) before dying (in error) + #log.info("CTRL-C Interrupted, I kill the current thread (process) before exiting (if exists)") + log.info("CTRL-C Interrupted, trying to stop cleanly") + #self._kill_running_device_cmd_if_exists("USER_CTRLC") + #self.do_things_before_exit("USER_CTRLC") + self._cleanup_before_exit("USER_CTRLC") + exit(1) + + # TEST mode only + self._TEST_test_results() + #if self._DO_EXIT: exit(0) + + + + #def _kill_running_device_cmd_if_exists(self, abort_cmd_sender): + # to be overriden by subclass + def do_things_before_exit(self, stopper_agent_name=None): + pass + + + def _main_loop(self, nb_iter:int=None, FOR_REAL:bool=True): + + self._main_loop_start(nb_iter) + #if not self.DO_MAIN_LOOP: return + + self._reload_config_if_changed() # only if changed + + # better to do this in a subclass + #self.show_config() + + # Log this agent status (update my current mode and status in DB) + self._log_agent_status() + + #self.printd("====== START COMMMANDS PROCESSING ======") + + # SIMU + #self.send_cmd_to("AgentScheduler", "do_replan") + + # ROUTINE process BEFORE + # To be specified by subclass + if not self.IS_IDLE(): self._routine_process_before() + #self.printd("I am IDLE, so I bypass the routine_process (do not send any new command)") + + # Processing the next pending command if exists + # If Agent general level command like (DO_RESTART, DO_EXIT, DO_ABORT) + # => will set DO_MAIN_LOOP=False and/or DO_RESTART_LOOP=True + print() + print() + log.info("*"*10 + " NEXT COMMAND PROCESSING (START) " + "*"*10 + '\n') + try : + cmd = self._process_next_command_if_exists() + except (AgentCmdUnimplementedException, AgentCmdBadArgsException, UnknownCmdException) as e : + print(e) + log.error(f"EXCEPTION on Agent command '{e.cmd.name}'") + if type(e) is UnknownCmdException: + e.cmd.set_as_skipped("EXCEPTION: unknown command") + else : + e.cmd.set_as_processed("EXCEPTION: command known but unimplemented or bad args") + self._cleanup_before_exit() + raise + log.info("*"*10 + " NEXT COMMAND PROCESSING (END) " + "*"*10 + "\n") + + # only if in TEST mode + self._TEST_check_cmd_res_and_status(cmd) + + if not self.DO_MAIN_LOOP: return + + ''' + # if restart, exit this loop to restart from beginning + if self.DO_RESTART or self.DO_EXIT or self.DO_ABORT: + self.DO_MAIN_LOOP = False + return + ''' + + if not self.IS_IDLE(): self._routine_process_after() + + # TEST MODE only : execute test routine_process always (even if IDLE) in order to (always) send next command from test scenario + self._TEST_test_routine_process() + + #self.printd("====== END COMMMANDS PROCESSING ======") + + #self.waitfor(self.mainloop_waittime) + + self._main_loop_end() + + self._iter_num += 1 + + + + def _main_loop_start(self, nb_iter:int=None): + + for i in range(3): print() + #self.printd("-"*80) + log.info("*"*73) + log.info("*"*20 + f" MAIN LOOP ITERATION {self._iter_num} (START) " + "*"*20) + log.info("*"*73 + '\n') + #self.print(f"Iteration {self._iter_num}") + + # EXIT because of nb of iterations ? + if nb_iter is not None: + # Bad number of iterations or nb iterations reached => exit + if nb_iter <= 0 or nb_iter < self._iter_num: + log.info(f"Exit because number of iterations asked ({nb_iter}) has been reached") + self.DO_MAIN_LOOP = False + return + + # Temporizing (delay) : Wait a random number of sec before starting iteration + # (to avoid to busy to much the processor) + # (also to let another agent having the chance to send a me command) + if self._DELAY_NB_SEC : + #random_waiting_sec = random.randint(0,5) + log.info(f"Waiting {self._DELAY_NB_SEC} sec (random) before starting new iteration...") + #time.sleep(random_waiting_sec) + self.waitfor(self._DELAY_NB_SEC) + + # (Test only) + # EXIT because max duration reached ? + if self.TEST_MODE and self.TEST_MAX_DURATION_SEC and (time.time()-self.start_time > self.TEST_MAX_DURATION_SEC): + log.info("Exit because of max duration set to ", self.TEST_MAX_DURATION_SEC, "s") + #self._kill_running_device_cmd_if_exists(self.name) + self._cleanup_before_exit(self.name) + #if self.TEST_MODE and self.TEST_WITH_FINAL_TEST: self._TEST_test_results() + self.DO_MAIN_LOOP = False + return + + self._set_status(self.STATUS_MAIN_LOOP) + self.show_mode_and_status() + + self.main_loop_start() + + # To be overriden by subclass + def main_loop_start(self): + pass + + def _main_loop_end(self): + self.main_loop_end() + log.info("*"*20 + " MAIN LOOP ITERATION (END) " + "*"*20) + #self.do_log(LOG_DEBUG, "Ending main loop iteration") + + # To be overriden by subclass + def main_loop_end(self): + pass + + + # To be overriden by subclass (AgentDevice) + def process_device_level_cmd(self): + pass + ''' + log.info("(DEVICE LEVEL CMD)") + try: + self.exec_device_cmd_if_possible(cmd) + except (UnimplementedGenericCmdException) as e: + #except (UnknownGenericCmdException, UnimplementedGenericCmdException, UnknownNativeCmdException) as e: + log.e(f"EXCEPTION caught by {type(self).__name__} (from Agent mainloop) for command '{cmd.name}'", e) + log.e("Thus ==> ignore this command") + cmd.set_result(e) + #cmd.set_as_killed_by(type(self).__name__) + cmd.set_as_skipped() + #raise + ''' + + + + def _process_next_command_if_exists(self)->AgentCmd: + ''' Processing the next pending command if exists ''' + + #print() + #print() + #log.info("*"*10 + " NEXT COMMAND PROCESSING (START) " + "*"*10 + '\n') + + + # Purge commands (every N iterations, starting from 1st, delete old commands) + N=5 + if ((self._iter_num-1) % N) == 0: + log.info("Purging expired commands if exists") + #AgentCmd.purge_old_commands_for_agent(self.name) + self._purge_old_commands_sent_to_me() + + # Get next command and process it (if exists) + cmd = self._get_next_valid_and_not_running_command() + self.CURRENT_CMD = cmd + #self._set_status(self.STATUS_GENERAL_PROCESS) + #if cmd: self.command_process(cmd) + + # SIMU + #cmd = "do_replan" + #self.send_cmd_to("AgentScheduler", "do_replan") + + # No new command => nothing to do + if not cmd: return cmd + + log.info('-'*6) + log.info('-'*6 + " RECEIVED NEW COMMAND TO PROCESS: ") + log.info('-'*6 + str(cmd)) + if self.is_in_test_mode() and hasattr(self._cmdts,"expected_res"): + log.info(f"*** (with expected result : " + str(self._cmdts.expected_res) + ')') + log.info('-'*6) + + cmd.set_read_time() + + # CASE 1 - AGENT GENERAL command + # (DO_RESTART, DO_EXIST, DO_ABORT, DO_ABORT_COMMAND, DO_FLUSH_COMMANDS, ...) + # This processing can set DO_MAIN_LOOP and DO_RESTART_LOOP to False + if self._is_agent_general_cmd(cmd): + try: + self._process_agent_general_cmd(cmd) + except AgentCmdUnimplementedException as e: + #print(e) + #cmd.set_as_skipped("ERROR: Unimplemented Agent General command") + #self._cleanup_before_exist() + # This exception is managed at higher level : + raise + #except ValueError as e: + except AgentCmdBadArgsException as e: + #print(e) + #cmd.set_as_skipped("ERROR: Bad Argument(s)") + #self._cleanup_before_exit() + # This exception is managed at higher level : + raise + #raise AgentCmdBadArgsException(cmd.name) + # Must I stop or restart ? + if cmd.name in ('do_stop','do_restart_loop','do_exit','do_abort') : + self.DO_MAIN_LOOP = False + if cmd.name == 'do_abort': + self._abort_current_running_cmd_if_exists() + self._cleanup_before_exit() + if cmd.name != 'do_restart_loop': + self.DO_RESTART_LOOP = False + return cmd + + # CASE 2 - AGENT SPECIFIC command + # => Simple action, short execution time, so I execute it directly (in current main thread, not in parallel) + if self._is_agent_specific_cmd(cmd): + #log.info("(AGENT LEVEL CMD)") + if not self.IS_ATTENTIVE(): + cmd.set_as_skipped("Skipped because I am not ATTENTIVE") + return cmd + try: + self._process_agent_specific_cmd(cmd) + #self._exec_agent_cmd(cmd) + #except AttributeError as e: + except (AgentCmdUnimplementedException, AgentCmdBadArgsException) as e: + ##print(e) + #self.log_e(f"EXCEPTION: Agent level specific command '{cmd.name}' unknown (not implemented as a function) :", e) + #self.log_e("Thus => I ignore this command...") + ##log.error(f"EXCEPTION: Agent specific command '{cmd.name}' unknown (not implemented as a function) :", e) + #log.e("Thus => I ignore this command...") + #cmd.set_result("ERROR: UNIMPLEMENTED AGENT SPECIFIC COMMAND", False) + #cmd.set_as_pending() + ##cmd.set_as_skipped("ERROR: UNIMPLEMENTED AGENT SPECIFIC COMMAND") + ##self._cleanup_before_exit() + ##raise AgentCmdUnimplementedException(cmd.name) + # These exceptions are managed at higher level : + raise + return cmd + + # CASE 3 - OTHER level command (DEVsICE level, only for AgentDevice) + if self.is_device_level_cmd(cmd): + self.process_device_level_cmd(cmd) + return cmd + + # CASE 4 - INVALID COMMAND + #raise Exception("INVALID COMMAND: " + cmd.name) + log.warning("******************************************************") + log.warning("*************** ERROR: INVALID COMMAND (UNKNOWN) ***************") + log.warning("******************************************************") + #log.warning("Thus => I ignore this command...") + #cmd.set_result("ERROR: INVALID COMMAND") + ##cmd.set_as_skipped("ERROR: INVALID AGENT COMMAND") + ##self._cleanup_before_exit() + ##raise UnknownCmdException(cmd.name) + raise UnknownCmdException(cmd) + + #print() + #log.info("*"*10 + " NEXT COMMAND PROCESSING (END) " + "*"*10 + "\n") + + + + + def _abort_current_running_cmd_if_exists(self): + pass + + def _cleanup_before_exit(self, stopper_agent_name:str=None): + if not stopper_agent_name: stopper_agent_name = self.name + self._set_status(self.STATUS_EXIT) + self._log_agent_status() + + log.info("Trying to stop cleanly") + log.info("Before exiting, Here are (if exists) the current (still) pending commands (time ordered) :") + #commands = AgentCmd.get_commands_sent_to_agent(self.name) + commands = AgentCmd.get_pending_and_running_commands_for_agent(self.name) + AgentCmd.show_commands(commands, True) + self.do_flush_commands() + #if self.TEST_MODE and self.TEST_WITH_FINAL_TEST and self.TEST_COMMANDS_DEST == "myself": self.simulator_test_results() + if self.TEST_MODE and self.TEST_WITH_FINAL_TEST: + self._TEST_test_results() + #self._DO_EXIT=True + #exit(0) + + self.do_things_before_exit(stopper_agent_name) + + + def print_TEST_MODE(self): + if self.TEST_MODE: + log.debug("[IN TEST MODE]") + log.info("Flush previous commands to be sure to start in clean state") + AgentCmd.delete_pending_commands_for_agent(self.name) + else: + log.debug("[IN NORMAL MODE]") + self.TEST_MAX_DURATION_SEC=None + + + def _purge_old_commands_sent_to_me(self): + AgentCmd.purge_old_commands_sent_to_agent(self.name) + + + def _routine_process_before(self): + """ + Routine processing BEFORE processing received commands. + + IMPORTANT : + this processing must be as SHORT as possible, + so that the agent can quickly read its received commands and process them + + This is a command or set of processings that this agent does or commands that it sends regularly, + at each iteration + """ + print() + print() + log.info("*"*10+ " ROUTINE PROCESSING BEFORE (START) "+ "*"*10+ '\n') + + self._set_status(self.STATUS_ROUTINE_PROCESS) + self.routine_process_before_body() + print() + log.info("*"*10 + " ROUTINE PROCESSING BEFORE (END) "+ "*"*10) + + + def _routine_process_after(self): + """ + Routine processing AFTER processing received commands. + + This processing can be longer than the "before" one above, + as the agent has already processed its received commands, + but not too long anyway, otherwise it will take too long before the next iteration starts... + + This is a command or set of processings that this agent does or commands that it sends regularly, + at each iteration + """ + print() + print() + log.info("*"*10+ " ROUTINE PROCESSING AFTER (START) "+ "*"*10+ '\n') + + self._set_status(self.STATUS_ROUTINE_PROCESS) + self.routine_process_after_body() + print() + log.info("*"*10 + " ROUTINE PROCESSING AFTER (END) "+ "*"*10) + + + # To be overridden by subclasses + def routine_process_before_body(self): + pass + + # To be overridden by subclasses + def routine_process_after_body(self): + #if self.TEST_MODE: self._TEST_test_routine_process() + pass + + """ + def purge_commands(self): + ### + Delete commands (which I am recipient of) older than COMMANDS_PEREMPTION_HOURS (like 48h) + ATTENTION !!! EXCEPT the RUNNING command !!! + + NB: datetime.utcnow() is equivalent to datetime.now(timezone.utc) + ### + + self.printd("Looking for old commands to purge...") + ### + COMMAND_PEREMPTION_DATE_FROM_NOW = datetime.utcnow() - timedelta(hours = self.COMMANDS_PEREMPTION_HOURS) + #self.printd("peremption date", COMMAND_PEREMPTION_DATE_FROM_NOW) + old_commands = AgentCmd.objects.filter( + # only commands for me + recipient = self.name, + # only pending commands + sender_deposit_time__lt = COMMAND_PEREMPTION_DATE_FROM_NOW, + ) + ### + old_commands = AgentCmd.get_old_commands_for_agent(self.name) + if old_commands.exists(): + self.printd("Found old commands to delete:") + for cmd in old_commands: self.printd(cmd) + old_commands.delete() + """ + + def waitfor(self, nbsec:float=2.0): + ''' + # thread + if self._current_device_cmd_thread and self.RUN_IN_THREAD: + self._current_device_cmd_thread.wait(nbsec) + # process (or main thread) + else: + time.sleep(nbsec) + ''' + log.info(f"Now, waiting for {nbsec} second(s)...") + time.sleep(nbsec) + + def sleep(self, nbsec): + self.waitfor(nbsec) + + def _set_status(self, status:str): + #self.printd(f"[{status}] (switching from status {self.status})") + log.debug(f"[{status}]") + self.status = status + return False + + def _set_mode(self, mode:str): + #self.printd(f"Switching from mode {self.mode} to mode {mode}") + log.info(f"[NEW MODE {mode}]") + self.mode = mode + + # Test mode + def IS_IDLE(self): return self.mode == self.MODE_IDLE + def IS_ROUTINE(self): return self.mode == self.MODE_ROUTINE + def IS_ATTENTIVE(self): return self.mode == self.MODE_ATTENTIVE + + def show_mode_and_status(self): + log.info(f"CURRENT MODE is {self.mode} (status is {self.status})") + + def get_specifics_cmd(self): + specific_commands = "" + for index, command_tuple in enumerate(self.AGENT_SPECIFIC_COMMANDS): + specific_commands += f"{command_tuple[0]}" + if index != len(self.AGENT_SPECIFIC_COMMANDS)-1: + specific_commands += ";" + return specific_commands + + def get_mode(self): + return self.mode + + def get_state(self): + return self.status + + def set_idle(self): + self._set_mode(self.MODE_IDLE) + def set_routine(self): + self._set_mode(self.MODE_ROUTINE) + def set_attentive(self): + self._set_mode(self.MODE_ATTENTIVE) + + + def die(self): + self._set_status(self.STATUS_EXIT) + + """ + suspend/resume + """ + def suspend(self): + """ + TODO: + Mode IDLE (doit rester à l'écoute d'un resume, + et doit continuer à alimenter les tables pour informer de son état via tables agents_logs, + et lire table agents_command pour reprendre via resume, + et update la table agents_survey pour donner son status "idle" + """ + self._set_idle() + return True + + def resume(self): + """ + Quit suspend() mode + """ + self._set_active() + return True + + + ''' + (EP) moved to AgentDevice + def _set_agent_device_aliases_from_config(self, agent_alias): + for a in self._my_client_agents_aliases: + # TODO: activer + ##self._my_client_agents[a] = self.config.get_paramvalue(a,'general','real_agent_device_name') + pass + ''' + + # new config (obsconfig) + def _set_mode_from_config(self, agent_name): + # all agent are active ? + + #mode = self.MODE_ACTIVE + mode = self.MODE_ATTENTIVE + self._set_mode(mode) + return True + # old config + # def _set_mode_from_config(self, agent_name): + # # --- Get the startmode of the AgentX + # modestr = self.config.get_paramvalue(agent_name,'general','startmode') + # if self.config.get_last_errno() != self.config.NO_ERROR: + # raise Exception(f"error {str(self.config.get_last_errno())}: {str(self.config.get_last_errmsg())}") + # if (modestr == None): + # return True + # # --- Set the mode according the startmode value + # mode = self.MODE_IDLE + # if modestr.upper() == 'RUN': + # mode = self.MODE_ACTIVE + # self._set_mode(mode) + # return True + + + def set_delay(self, delay_nb_sec:int): + self._DELAY_NB_SEC = delay_nb_sec + + """ + ======================================================================================= + Generic methods that may be specialized (overriden) by subclasses (except if private) + ======================================================================================= + """ + + # To be overridden by subclasses + def init(self): + #log.debug("*** Initializing... ***") + log.info("*** INITIALIZING... ***") + self._set_status(self.STATUS_INIT) + if self.TEST_MODE: self.set_delay(2) + + + def _reload_config_if_changed(self): + self._load_config() + + def _load_config(self): + """ + TODO: + only si date fichier xml changée => en RAM, un objet Config avec méthodes d'accès, appelle le parser de AK (classe Config.py indépendante) + """ + ''' + # SETUP + try: + self.config = get_object_or_404(Config, id=1) + # By default, set mode to SCHEDULER (False = REMOTE, which should never be the default) + self.config.global_mode = True + self.config.save() + # self.config = Config.objects.get(pk=1) + # self.config = Config.objects.get()[0] + except Exception as e: + # except Config.ObjectDoesNotExist: + self.printd("Config read (or write) exception", str(e)) + return -1 + ''' + log.debug("Loading the config file...") + + # - NEW CONFIG + self.get_config().load(self.get_config_env()[0]) + + + # - OLD CONFIG + # self.config.load() + # if self.config.get_last_errno() != self.config.NO_ERROR: + # raise Exception(f"error {str(self.config.get_last_errno())}: {str(self.config.get_last_errmsg())}") + # if not self.config.is_config_contents_changed(): + # return + + # === display informations + # --- Get the assembly aliases of the unit + # assembled_mount_aliases = [] + # assembled_channel_aliases = [] + # assembled_aliases = [] + # unit_alias = self.config.get_aliases('unit')[0] + # params = self.config.get_params(unit_alias) + # for param in params: + # if param['section']=="assembly" and param['key']=="alias": + # assembled_aliases.append(param['value']) + #self.printd(f"Unit {unit_alias} is the assembly of {assembled_aliases}") + + # log.debug("--------- Components of the unit -----------") + # log.debug("Configuration file is {}".format(self.config.get_configfile())) + # alias = self.config.get_aliases('unit')[0] + # namevalue = self.config.get_paramvalue(alias,'unit','description') + # log.debug(f"Unit alias is {alias}. Description is {namevalue}:") + # unit_subtags = self.config.get_unit_subtags() + # for unit_subtag in unit_subtags: + # aliases = self.config.get_aliases(unit_subtag) + # for alias in aliases: + # namevalue = self.config.get_paramvalue(alias,unit_subtag,'description') + # log.debug(f"- {unit_subtag} alias is {alias}. Description is {namevalue}") + # # --- fill the list of mount and channel assembled + # if alias in assembled_aliases: + # if unit_subtag=="mount": + # assembled_mount_aliases.append(alias) + # elif unit_subtag=="channel": + # assembled_channel_aliases.append(alias) + + # log.debug("--------- Assembly of the unit -----------") + # log.debug(f"Assembled mount aliases: {assembled_mount_aliases}") + # log.debug(f"Assembled channel aliases: {assembled_channel_aliases}") + + # --- Get the home of the mount[0] + # mount_alias = assembled_mount_aliases[0] + # home = self.config.get_paramvalue(mount_alias,'MountPointing','home') + + # log.debug("------------------------------------------") + # hostname = socket.gethostname() + # self._computer_alias = '' + # unit_subtag = 'computer' + # aliases = self.config.get_aliases(unit_subtag) + # for alias in aliases: + # log.debug("alias" + alias) + # value = self.config.get_paramvalue(alias,'local','hostname') + # log.debug("value" + value) + # if value == hostname: + # log.debug("value" + value) + # self._computer_alias = alias + # value = self.config.get_paramvalue(alias,unit_subtag,'description') + # self._computer_description = value + # value = self.config.get_paramvalue(alias,'path','data') + # # Overrides default value + # self._path_data = value + # break + # log.debug(f"hostname = {hostname}") + # log.debug(f"path_data = {self._path_data}") + # log.debug(f"home = {home}") + # log.debug("------------------------------------------") + + # --- update the log parameters + ##self.log.path_data = self._path_data + ##print("new self.log.path_data is", self.log.path_data) + ##self.log.set_global_path_data(self._path_data) + ##self.printd("new self.log.global_path_data is", self.log.get_global_path_data()) + ##self.log.home = home + + + #def update_survey(self): + def _log_agent_status(self): + """ + Save (update) this agent current mode and status in DB + """ + "Updating the agent survey database table..." + #self.printd("- fetching table line for agent", self.name) + # only necessary when using process (not necessary with threads) + #with transaction.atomic(): + #self._agent_survey = AgentSurvey.objects.get(name=self.name) + self._agent_survey.mode = self.mode + self._agent_survey.status = self.status + self._agent_survey.iteration = self._iter_num + self._agent_survey.save() + #self._agent_survey.save(update_fields=["mode", "status"]) + + + """ + def send_command(self, cmd_name): + recipient_agent = self.name if self.TEST_COMMANDS_DEST=="myself" else self.TEST_COMMANDS_DEST + AgentCmd.objects.create(sender=self.name, recipient=recipient_agent, name=cmd_name) + """ + #def send_command(self, to_agent, cmd_type, cmd_name, cmd_args=None): + def send_cmd_to(self, to_agent:str, cmd_name:str, cmd_args:str=None, validity:int=None, timeout:int=None): + """ + #ex: send_command(“AgentX”,”GENERIC”,”EVAL”,“3+4”) + ex: send_command(“AgentX”,"EVAL”,“3+4”) + """ + #return AgentCmd.send_cmd_from_to(self.name, self._get_real_agent_name_for_alias(to_agent), cmd_name, cmd_args) + #cmd = self.create_cmd_for(to_agent, cmd_name, cmd_args, validity, timeout).send() + cmd = self.create_cmd_for(to_agent, cmd_name, cmd_args, validity).send() + #cmd.send() + return cmd + + def create_cmd_for(self, to_agent:str, cmd_name:str, cmd_args:str=None, validity:int=None) -> AgentCmd: + + ''' + real_agent_name = self._get_real_agent_name(to_agent) + real_cmd_name = cmd_name + if '.' in real_agent_name: + real_agent_name, component_name = real_agent_name.split('.') + real_cmd_name = component_name+'.'+cmd_name + return AgentCmd.create(self.name, real_agent_name, real_cmd_name, cmd_args) + try: + real_agent_name = self._get_real_agent_name(to_agent) + except KeyError as e: + ''' + return AgentCmd.create(self.name, to_agent, cmd_name, cmd_args, validity) + ''' + real_agent_name = self._get_real_agent_name(to_agent) + if not real_agent_name: + return AgentCmd.create(self.name, to_agent, cmd_name, cmd_args) + # log.e("UNKNOWN AgentDevice ALIAS", to_agent) + # #self.log_e("Exception raised", e) + # log.e(f"=> Thus, I do not send this command '{cmd_name}'") + # return None + return AgentCmd.create(self.name, real_agent_name, cmd_name, cmd_args) + ''' + ''' + return AgentCmd( + sender=self.name, + recipient=self._get_real_agent_name_for_alias(recipient_agent_alias_name), + name=cmd_name + ) + ''' + + def _get_next_valid_and_not_running_command(self) -> AgentCmd: + """ + Return next VALID (not expired) command (read from the DB command table) + which is relevant to this agent. + Commands are read in chronological order + """ + self._set_status(self.STATUS_GET_NEXT_COMMAND) + log.info("Looking for a new command to process (sent by another agent):") + + # 1) Get all pending commands for me (return if None) + # Not sure this is necessary to do it in a transaction, + # but there might be a risk + # that a command status is modified while we are reading... + with transaction.atomic(): + self._pending_commands = AgentCmd.get_pending_and_running_commands_for_agent(self.name) + commands = self._pending_commands + if not commands.exists(): + log.info("") + return None + "Current pending (or running) commands are (time ordered):" + AgentCmd.show_commands(commands) + + # 2) Check if a PRIORITY command is in the list (even at the very end), + # and if so return this command + # (must be still valid and not yet running) + for cmd in commands: + #if cmd.name in ("do_exit", "do_abort", "do_flush_commands"): break + #if cmd.name in ("do_exit", "do_abort"): break + if cmd.is_agent_general_priority_cmd(): + if cmd.is_running(): + return None + if cmd.is_expired(): + cmd.set_as_outofdate() + return None + return cmd + ''' + #if cmd.name in ("do_exit", "do_abort", "do_flush_commands"): + if cmd.name in ("do_exit", "do_abort"): + if cmd.is_running(): + return None + if cmd.is_expired(): + cmd.set_as_outofdate() + return None + return cmd + ''' + + # 3) If first (oldest) command is currently running + # (status CMD_RUNNING), then do nothing and return + """ + cmd_executing = Command.objects.filter( + # only commands for me + recipient = self.name, + # only pending commands + recipient_status_code = Command.CMD_STATUS_CODES.CMD_RUNNING, + ).first() + #if cmd_executing.exists(): + if cmd_executing: + """ + #cmd = commands[0] + cmd = commands.first() + + if cmd.is_expired(): + cmd.set_as_outofdate() + return None + + if cmd.is_running(): + #self.printd(f"There is currently a running command ({cmd_executing.first().name}), so I do nothing (wait for end of execution)") + log.info(f"There is currently a running command ({cmd.name})") + """ + # Check that this command is not expired + if cmd.is_expired(): + self.printd("But this command is expired, so set its status to OUTOFDATE, and go on") + cmd_executing.set_as_outofdate() + else: + """ + log.info(f"Thus, I won't execute any new command until this command execution is finished") + # TODO: kill si superieur a MAX_EXEC_TIME + return None + + ''' + # 4) Tag all expired commands + for cmd in commands: + if cmd.is_expired(): cmd.set_as_outofdate() + # break at 1st "valid" command (not expired) + else: break + + # 5) If no more commands to process, return None + if cmd.is_expired(): return None + ''' + + # 6) Current cmd must now be a valid (not expired) and PENDING one, + # so return it for execution + #self.printd(f"Got command {cmd.name} sent by agent {cmd.sender} at {cmd.sender_deposit_time}") + #self.printd(f"Starting processing of this command") + return cmd + + + ''' + #def _exec_agent_general_cmd(self, cmd:Command): + def _exec_agent_cmd(self, cmd:AgentCmd): + + #self.print(f"Starting execution of an AGENT LEVEL cmd {cmd}...") + log.info(f"Starting execution of an AGENT LEVEL cmd...") + + # Update read time to say that the command has been READ + cmd.set_read_time() + cmd.set_as_running() + + # SPECIFIC command (only related to me, not to any agent) + if self._is_agent_specific_cmd(cmd): + log.info("(Agent level SPECIFIC cmd)") + # Execute method self."cmd.name"() + # This can raise an exception (caught by this method caller) + self.exec_cmd_from_its_name(cmd) + #'' + try: + except AttributeError as e: + self.printd(f"EXCEPTION: Agent level specific command '{cmd.name}' unknown (not implemented as a function) :", e) + self.printd("Thus => I ignore this command...") + cmd.set_result("ERROR: INVALID AGENT LEVEL SPECIFIC COMMAND") + cmd.set_as_pending() + cmd.set_as_skipped() + return + #'' + cmd.set_result("Agent level SPECIFIC cmd done") + cmd.set_as_processed() + log.info("...Agent level SPECIFIC cmd has been executed") + return + + # GENERAL command (related to any agent) + log.info("(Agent level GENERAL CMD)") + _,cmd_name,cmd_args = cmd.get_full_name_parts() + #cmd_name, cmd_args = cmd.tokenize() + #if cmd.name == "set_state:active": + #elif cmd.name == "set_state:idle": + if cmd_name == "set_state": + if not cmd_args: raise ValueError() + state = cmd_args[0] + if state == "active": self._set_active() + if state == "idle": self._set_idle() + cmd.set_result("I am now " + state) + #time.sleep(1) + self.waitfor(1) + elif cmd_name in ("do_flush_commands"): + "flush_commands received: Delete all pending commands" + AgentCmd.delete_pending_commands_for_agent(self.name) + cmd.set_result('DONE') + elif cmd_name in ("do_abort", "do_exit", "do_restart_init"): + #self.printd("Current pending commands are:") + #Command.show_commands(self._pending_commands) + log.info("Aborting current executing command if exists:") + #self._kill_running_device_cmd_if_exists(cmd.sender) + self.do_things_before_exit(cmd.sender) + if cmd_name == "do_restart_init": + log.info("restart_init received: Restarting from init()") + self._DO_RESTART=True + elif cmd.name == "do_exit": + self._DO_EXIT=True + cmd.set_result('SHOULD BE DONE NOW') + else: + #'' + name = cmd.name + args = None + if " " in name: name,args = name.split() + if name == "do_eval": + if args==None: raise(ValueError) + cmd.set_result(eval(args)) + #'' + if cmd_name == "do_eval": + if not cmd_args: raise ValueError() + cmd.set_result(eval(cmd_args)) + + cmd.set_as_processed() + log.info("...Agent level GENERAL cmd has been executed") + + # If cmd is "do_exit", kill myself (without any question, this is an order soldier !) + # This "do_exit" should normally kill any current thread (to be checked...) + if cmd.name == "do_exit": + log.info("Before exiting, Here are (if exists) the current (still) pending commands (time ordered) :") + commands = AgentCmd.get_pending_and_running_commands_for_agent(self.name) + AgentCmd.show_commands(commands, True) + #if self.TEST_MODE and self.TEST_WITH_FINAL_TEST and self.TEST_COMMANDS_DEST == "myself": self.simulator_test_results() + if self.TEST_MODE and self.TEST_WITH_FINAL_TEST: + self._TEST_test_results() + #self._DO_EXIT=True + #exit(0) + ''' + + + def _process_agent_general_cmd(self, cmd:AgentCmd): + # GENERAL command (related to any agent) + + #self.print(f"Starting execution of an AGENT LEVEL cmd {cmd}...") + log.info(f"Starting execution of an AGENT GENERAL cmd...") + + # Update read time to say that the command has been READ + #cmd.set_read_time() + cmd.set_as_running() + + log.info("(Agent level GENERAL CMD)") + cmd_name,cmd_args = cmd.name_and_args + #cmd_name, cmd_args = cmd.tokenize() + #if cmd.name == "set_state:active": + #elif cmd.name == "set_state:idle": + + # Default result (null) + result = None + + #if cmd_name in ("do_abort", "do_exit", "do_restart_init"): + if cmd_name in ("do_abort", "do_exit", "do_restart_loop"): + #self.printd("Current pending commands are:") + #Command.show_commands(self._pending_commands) + log.info("Stopping/Aborting current executing command if exists:") + #self._kill_running_device_cmd_if_exists(cmd.sender) + ''' + self.do_things_before_exit(cmd.sender) + if cmd_name == "do_restart_init": + log.info("restart_init received: Restarting from init()") + self._DO_RESTART=True + elif cmd.name == "do_exit": + self._DO_EXIT=True + cmd.set_result('SHOULD BE DONE NOW') + ''' + result = "RESTARTING" if cmd_name == "do_restart_loop" else "STOPPING" + + elif cmd_name == "get_state": + result = "I am now " + self.get_state() + + elif cmd_name == "get_mode": + result = "MODE = " + self.get_mode() + + elif cmd_name == "set_mode": + #if not cmd_args: raise ValueError() + if not cmd_args: raise AgentCmdBadArgsException(cmd) + mode = cmd_args[0] + if mode == "IDLE": self.set_idle() + elif mode == "ROUTINE": self.set_routine() + elif mode == "ATTENTIVE": self.set_attentive() + else: raise AgentCmdBadArgsException(cmd) + #cmd.set_result("I am now " + state) + result = "MODE = " + mode + #time.sleep(1) + #self.waitfor(1) + + #elif cmd_name in ("do_flush_commands"): + elif cmd_name == "do_flush_commands": + "flush_commands received: Delete all pending commands" + self.do_flush_commands() + #cmd.set_result('DONE') + result = "FLUSH DONE" + + elif cmd_name == "do_eval": + #if not cmd_args: raise ValueError() + if not cmd_args: raise AgentCmdBadArgsException(cmd) + #cmd.set_result(eval(cmd_args)) + #result = eval(cmd_args) + result = self.do_eval(cmd_args[0]) + elif cmd_name == "get_specifics_cmd": + result = self.get_specifics_cmd() + + cmd.set_as_processed(result) + log.info("...Agent level GENERAL cmd has been executed") + + ''' + # If cmd is "do_exit", kill myself (without any question, this is an order soldier !) + # This "do_exit" should normally kill any current thread (to be checked...) + if cmd.name == "do_exit": + log.info("Before exiting, Here are (if exists) the current (still) pending commands (time ordered) :") + commands = AgentCmd.get_pending_and_running_commands_for_agent(self.name) + AgentCmd.show_commands(commands, True) + #if self.TEST_MODE and self.TEST_WITH_FINAL_TEST and self.TEST_COMMANDS_DEST == "myself": self.simulator_test_results() + if self.TEST_MODE and self.TEST_WITH_FINAL_TEST: + self._TEST_test_results() + #self._DO_EXIT=True + #exit(0) + ''' + + + + + def _process_agent_specific_cmd(self, cmd:AgentCmd): + + #self.print(f"Starting execution of an AGENT LEVEL cmd {cmd}...") + log.info(f"Starting execution of an AGENT level SPECIFIC cmd...") + + # Update read time to say that the command has been READ + #cmd.set_read_time(False) + cmd.set_as_running() + + log.info("(Agent SPECIFIC cmd)") + # Execute method self."cmd.name"() + # This can raise an exception (caught by this method caller) + try: + res = self.exec_cmd_from_its_name(cmd) + except (AgentCmdUnimplementedException, AgentCmdBadArgsException) as e: + # These exceptions are managed at higher level : + raise + ''' + except AttributeError as e: + self.printd(f"EXCEPTION: Agent level specific command '{cmd.name}' unknown (not implemented as a function) :", e) + self.printd("Thus => I ignore this command...") + cmd.set_result("ERROR: INVALID AGENT LEVEL SPECIFIC COMMAND") + cmd.set_as_pending() + cmd.set_as_skipped() + return + ''' + #cmd.set_result("Agent level SPECIFIC cmd done") + #res = res if res else "Agent SPECIFIC cmd done" + cmd.set_as_processed(res) + log.info("...Agent SPECIFIC cmd has been executed") + + + + + ''' + def do_log(self): + #"" + log à 2 endroits ou 1 seul + - in file + - in db + #"" + self.printd("Logging data...") + ''' + + def exec_cmd_from_its_name(self, cmd:AgentCmd): + methods_list = [method for method in dir(self) if callable(getattr(self, method))] + #print(methodsList) + func = cmd.name + if func not in methods_list: raise AgentCmdUnimplementedException(cmd) + #for arg in cmd.args: print(arg) + #print(cmd.args) + #print(*cmd.args) + ''' + try: + if not cmd.args: + # equivalent to calling self.func() + return getattr(self, func)() + else: + args = [] + # Convert all args to their real type + for arg in cmd.args: + #print(arg) + #try: + # Evaluate arg only if it is not a word (letters) + if not arg[0].isalpha(): + arg = ast.literal_eval(arg) + #except ValueError as e: newarg = arg + args.append(arg) + #print(args) + # equivalent to calling self.func(*cmd.args) + return getattr(self, func)(*args) + ''' + args = [] + # Convert all args to their real type + for arg in cmd.args: + #print(arg) + #try: + # Evaluate arg only if it is not a word (letters) + if not arg[0].isalpha(): + arg = ast.literal_eval(arg) + #except ValueError as e: newarg = arg + args.append(arg) + try: + # equivalent to calling self.func(*cmd.args) + return getattr(self, func)(*args) + except (TypeError, AttributeError, ValueError) as e: + #raise e + # "from None" pour ne pas afficher l'exception AttributeError (car interne) + raise AgentCmdBadArgsException(cmd) from None + #print("I know this specific command but it is not yet implemented : ", func) + ##tb = sys.exc_info()[2] + ##raise AgentCmdUnimplementedException(cmd).with_traceback(tb) + ##raise AgentCmdUnimplementedException(cmd).with_traceback(None) + + def _is_agent_level_cmd(self, cmd:AgentCmd): + return self._is_agent_general_cmd(cmd) or self._is_agent_specific_cmd(cmd) + + def _is_agent_general_cmd(self, cmd:AgentCmd): + return cmd.is_agent_general_cmd() + + + + ''' + def _exec_agent_cmd(self, cmd:Command): + # AGENT "GENERAL LEVEL" command + # => I process it directly without asking my DC + # => Simple action, short execution time, so I execute it directly (in current main thread, not in parallel) + if cmd.is_agent_level_general_cmd(): + self.printd("********** -- AGENT LEVEL GENERAL CMD *********") + self._exec_agent_general_cmd(cmd) + + # AGENT "SPECIFIC LEVEL" command + # => I process it directly without asking my DC + # => Simple action, short execution time, so I execute it directly (in current main thread, not in parallel) + #elif self._is_agent_level_specific_cmd(cmd): + else: + self.printd("********** -- AGENT LEVEL SPECIFIC CMD *********") + self._exec_agent_specific_cmd(cmd) + ''' + + def _is_agent_specific_cmd(self, cmd:AgentCmd): + #return cmd.name in self.AGENT_SPECIFIC_COMMANDS + #return (cmd.name,) in self.AGENT_SPECIFIC_COMMANDS + for (cmd_name,timeout) in self.AGENT_SPECIFIC_COMMANDS: + if cmd.name == cmd_name : return True + + ''' + def _exec_agent_specific_cmd(self, cmd:Command): + # Execute method self."cmd.name"() + self.exec_cmd_from_its_name(cmd) + ''' + + def is_in_test_mode(self): + return self.TEST_MODE + + + + ### + # ================================================================================================ + # AGENT GENERAL FUNCTIONS + # ================================================================================================ + ### + + def do_eval(self, eval_str:str): + return eval(eval_str) + + def do_flush_commands(self): + AgentCmd.delete_pending_commands_for_agent(self.name) + + def do_exec_commands(self, what:str): + # - Temporaly stop execution of new commands (let them accumulate) + if what == "stop": pass + # - Resume execution of commands (accumulated since "do_stop_exec_cmd") + if what == "resume": pass + # NOT PRIO + if what == "noprio": pass + # Bad arg + raise AgentCmdBadArgsException(self.CURRENT_CMD) + + # Stop currently running cmd or routine + def do_stop_current(self, what:str): + if what == "cmd": self.do_stop_current_cmd() + if what == "routine": self.do_stop_current_routine() + if what == "both": + self.do_stop_current_cmd() + self.do_stop_current_routine() + # Bad arg + raise AgentCmdBadArgsException(self.CURRENT_CMD) + def do_stop_current_cmd(self): + pass + def do_stop_current_routine(self): + pass + + def do_exit(self): self.do_stop("asap"); + def do_abort(self): self.do_stop("now"); + # Stop agent asap or now + def do_stop(self, when:str): + # PRIO + if when == "asap": + pass + if when == "now": + pass + # NOT PRIO + if when == "noprio": + pass + # Bad arg + raise AgentCmdBadArgsException(self.CURRENT_CMD) + + + # Stop agent asap or now + def do_restart_loop(self, when:str): + # PRIO + if when == "asap": + pass + if when == "now": + pass + # NOT PRIO + if when == "noprio": + pass + # Bad arg + raise AgentCmdBadArgsException(self.CURRENT_CMD) + + + + + ### + # ================================================================================================ + # AGENT SPECIFIC FUNCTIONS + # (here just to serve as examples) + # ================================================================================================ + ### + + #def do_specific1(self, arg1:int, arg2:int=2, arg3:int=3) -> int: + #def do_specific1(self, arg1:int, arg2:int=2, arg3:float=3.1, arg4:list=[1,2,3]) -> float: + def do_specific1(self, + arg1:int, + arg2:int, + arg3:float=3.1, + arg4:str='toto', + arg5:Tuple[int,str,int]=(1,'toto',3), + #arg5:Tuple[int,int,int]=(1,2,3), + arg6:List[int]=[] + ) -> float: + ''' + arg1 = int(arg1) + arg2 = int(arg2) + arg3 = float(arg3) + arg5 = ast.literal_eval(arg5) + print(arg5[1]) + arg6 = ast.literal_eval(arg6) + ''' + print(arg4) + res = arg1 + arg2 + arg3 + arg5[0] + arg5[2] + if arg6: res += arg6[0] + return res + + #def set_specific2(self, arg1:str, arg2:int): pass + + def do_specific3(self): + pass + + + ### + # ================================================================================================ + # DEVICE SPECIFIC FUNCTIONS (abstract for Agent, overriden and implemented by AgentDevice) + # ================================================================================================ + ### + + # To be overriden by subclass (AgentDevice...) + # @abstract + def is_device_level_cmd(self, cmd): + return False + + ''' + # to be overriden by subclass (AgentDevice) + # @abstract + def exec_device_cmd_if_possible(self, cmd:AgentCmd): + pass + + # TO BE OVERRIDEN by subclass (AgentDevice) + # @abstract + def exec_device_cmd(self, cmd:AgentCmd): + #self.exec_cmd_from_its_name(cmd) + pass + ''' + + + + + + """ + ================================================================= + TEST DEDICATED FUNCTIONS + ================================================================= + """ + + def _set_debug_mode(self, mode:bool): + self.DEBUG_MODE=mode + + def _set_with_simulator(self, mode:bool): + self.WITH_SIMULATOR=mode + + def _set_test_mode(self, mode:bool): + self.TEST_MODE = mode + if self.TEST_MODE: log.info("in TEST MODE") + + def _TEST_get_next_command_to_send(self)->AgentCmd: + #cmd_full_name, validity, res_expected, after_status = next(self.TEST_COMMANDS, (None,None,None,None)) + cmd_full_name, validity, expected_res, expected_status = next(self.TEST_COMMANDS) + #print(expected_final_status) + #print(cmd_full_name, res_expected) + #return cmd_name + if cmd_full_name is None: return None + # Remove excessive spaces + import re + cmd_full_name = re.sub(r"\s+", " ", cmd_full_name).strip() + if ' ' not in cmd_full_name: raise Exception('Command is malformed:', cmd_full_name) + agent_recipient, cmd_name_and_args = cmd_full_name.split(' ', 1) + if agent_recipient == 'self': agent_recipient = self.name + cmd_name, cmd_args = cmd_name_and_args, None + if ' ' in cmd_name_and_args: cmd_name,cmd_args = cmd_name_and_args.split(' ', 1) + cmd = self.create_cmd_for(agent_recipient, cmd_name, cmd_args, validity) + cmd.expected_res = expected_res + cmd.expected_status = expected_status + # If no cmd created (because of error, bad AgentDevice name), call again this method for next cmd + #if cmd is None: return self._TEST_get_next_command_to_send() + return cmd + + """ + def simulator_send_next_command(self): + #self._current_test_cmd = "set_state:idle" if self._current_test_cmd=="set_state:active" else "set_state:active" + #if self._nb_test_cmds == 4: self._current_test_cmd = "do_exit" + cmd_name = next(self.TEST_COMMANDS, None) + #self.printd("next cmd is ", cmd_name) + if cmd_name is None: return + #Command.objects.create(sender=self.name, recipient=self.name, name=cmd_name) + recipient_agent = self.name if self.TEST_COMMANDS_DEST=="myself" else self.TEST_COMMANDS_DEST + Command.objects.create(sender=self.name, recipient=recipient_agent, name=cmd_name) + #time.sleep(1) + #self._TEST_current_cmd_idx += 1 + #self._nb_test_cmds += 1 + """ + + def _TEST_check_cmd_res_and_status(self, cmd:AgentCmd): + + if not self.is_in_test_mode(): return + if not cmd: return + + log.debug("*** CHECK ***") + #if hasattr(self._cmdts,'expected_res'): + + log.debug(cmd.result) + log.debug(self._cmdts.expected_res) + if cmd.is_executed() and self._cmdts.expected_res: + assert(cmd.result == self._cmdts.expected_res) + + log.debug(cmd.state) + log.debug(self._cmdts.expected_status) + #if hasattr(self._cmdts,'expected_status'): + if self._cmdts.expected_status: + assert(cmd.state == self._cmdts.expected_status) + + + def _TEST_test_routine_process(self): + """ + TEST MODE ONLY + + Send next command from scenario defined in TEST_COMMANDS_LIST + (only if previous command finished) + """ + + if not self.is_in_test_mode(): return + + log.info("(TEST mode) Trying to send a new command if possible...") + + # There is a current command being processed + # => check if next command is "do_abort" + # => if so, instantly send a "do_abort" to abort previous command + if self._cmdts is not None: + log.info(f"Waiting for end execution of cmd '{self._cmdts.name}' (sent to {self._cmdts.recipient}) ...") + # Update cmdts fields from DB + self._cmdts.refresh_from_db() + + # Current cmd is pending or running + if self._cmdts.is_pending() or self._cmdts.is_running(): + if self._next_cmdts is None: + # If next command is "do_abort" then abort becomes the new current command (to be sent) + self._next_cmdts = self._TEST_get_next_command_to_send() + if self._next_cmdts and self._next_cmdts.name == "do_abort": + # Wait a little to give a chance to agentB to start execution of current command, + # so that we can abort it then (otherwise it won't be aborted!!) + #time.sleep(4) + self.waitfor(4) + self._cmdts = self._next_cmdts + self._next_cmdts = None + log.info("***") + #log.info(f"*** SEND ", self._cmdts) + log.info(f"***" + str(self._cmdts)) + log.info("***") + self._cmdts.send() + + # Current cmd is no more running + else: + + # Execution was not completed + #if self._cmdts.is_expired() or self._cmdts.is_skipped() or self._cmdts.is_killed(): + if self._cmdts.is_skipped() or self._cmdts.is_killed(): + log.info("Command was not completed") + # 2 possible scenarios: + # - (1) Send the SAME command again + ''' + self.printd("Command was not completed, so I send it again") + # The command was not completed, so, make a copy of it and send it again + # For this, it is enough to set primary key to None, + # then the send() command below will save a NEW command + #self._cmdts = copy.copy(self._cmdts) + self._cmdts.id = None + SEND_A_NEW_COMMAND = True + ''' + # - (2) Send next command + #self._cmdts = None + + # Execution was completeted => get result + elif self._cmdts.is_executed(): + cmdts_res = self._cmdts.get_result() + print("toto") + log.info(f"Cmd executed. Result is '{cmdts_res}'") + #cmdts_is_processed = True + ''' Optimisation possible pour gagner une iteration: + self._cmdts = self.simulator_get_next_command_to_send() + # No more command to send (from simulator) => return + if self._cmdts is None: return + SEND_A_NEW_COMMAND = True + ''' + # Set cmdts to None so that a new command will be sent at next iteration + self._cmdts = None + + # No currently running command => get a new command and SEND it + if self._cmdts is None: + if self._next_cmdts is not None: + self._cmdts = self._next_cmdts + self._next_cmdts = None + else: + self._cmdts = self._TEST_get_next_command_to_send() + # No more command to send (from simulator) => return and EXIT + if self._cmdts is None: + self.DO_MAIN_LOOP = False + return + # Send cmd (= set as pending and save) + log.info("***") + #self.printd(f"*** SEND ", self._cmdts) + log.info(f"*** NEW COMMAND TO SEND is: " + str(self._cmdts)) + if hasattr(self._cmdts,"expected_res") : + log.info(f"*** (with expected result & final status : " + "'"+str(self._cmdts.expected_res)+"' & '"+str(self._cmdts.expected_status) + "')") + log.info("***") + #self._cmdts.set_as_pending() + # SEND + self._cmdts.send() + #cmdts_is_processed = False + #cmdts_res = None + + + def _TEST_test_results(self): + if not (self.TEST_MODE and self.TEST_WITH_FINAL_TEST): return + if self.TEST_COMMANDS_LIST == []: return + + nb_commands_to_send = len(self.TEST_COMMANDS_LIST) + nb_commands_sent, commands = self._TEST_test_results_start() + #nb_commands_to_send = len(self.TEST_COMMANDS_LIST) + + # General (default) test + #self.printd(commands[0].name, "compared to", self.TEST_COMMANDS_LIST[0].split()[1]) + assert commands[0].name == self.TEST_COMMANDS_LIST[0].split()[1] + last_cmd = commands[-1] + assert last_cmd.name == self.TEST_COMMANDS_LIST[-1].split()[1] + assert last_cmd.name == "do_exit" + assert last_cmd.is_executed() + assert last_cmd.get_result() == "SHOULD BE DONE NOW" + + nb_asserted = 0 + nb_agent_general = 0 + nb_unknown = 0 + nb_unimplemented = 0 + for cmd in commands: + assert cmd.is_executed() or cmd.is_killed() or cmd.is_skipped() + nb_asserted += 1 + if cmd.is_agent_general_cmd(): + nb_agent_general += 1 + if cmd.name == "do_unknown": + assert cmd.is_skipped() + #assert "UnimplementedGenericCmdException" in cmd.get_result() + assert "INVALID COMMAND" in cmd.get_result() + nb_unknown += 1 + #if cmd.name in ["do_unimplemented", "do_unknown"]: + if cmd.name == "do_unimplemented": + assert cmd.is_skipped() + assert "UnimplementedGenericCmdException" in cmd.get_result() + nb_unimplemented += 1 + assert nb_asserted == nb_commands_sent + log.info(f"{nb_commands_to_send} cmds I had to send <==> {nb_asserted} cmds executed (or killed), {nb_commands_to_send-nb_commands_sent} cmd ignored") + log.info("Among executed commands:") + log.info(f"- {nb_agent_general} AGENT general command(s)") + log.info(f"- {nb_unimplemented} unimplemented command(s) => UnimplementedGenericCmdException raised then command was skipped") + log.info(f"- {nb_unknown} unknown command(s) => skipped") + + + # Can be overriden by subclass (AgentDevice) + self.TEST_test_results_other(commands) + ''' + (EP) moved to AgentDevice + # Now test that any "AD get_xx" following a "AD set_xx value" command has result = value + for i,cmd_set in enumerate(commands): + if cmd_set.name.startswith('set_'): + commands_after = commands[i+1:] + for cmd_get in commands_after: + if cmd_get.name.startswith('get_') and cmd_get.name[4:]==cmd_set.name[4:] and cmd_get.device_type==cmd_set.device_type: + log.info("cmd_get.result == cmd_set.args ?" + str(cmd_get.result) + ' ' + str(cmd_set.args)) + assert cmd_get.get_result() == ','.join(cmd_set.args), "A get_xx command did not gave the expected result as set by a previous set_xx command" + break + ''' + + # Specific (detailed) test (to be overriden by subclass) + nb_asserted2 = self.TEST_test_results_main(commands) + self._TEST_test_results_end(nb_asserted) + + def _TEST_prepare(self): + if not self.is_in_test_mode(): return + log.debug("\n!!! In TEST mode !!! => preparing to run a scenario of test commands") + log.debug("- Current test commands list scenario is:\n" + str(self.TEST_COMMANDS_LIST)) + if not self.WITH_SIMULATOR: + log.debug("\n!!! In TEST but no SIMULATOR mode (using REAL device) !!! => removing dangerous commands for real devices... :") + TEST_COMMANDS_LIST_copy = self.TEST_COMMANDS_LIST.copy() + for cmd in TEST_COMMANDS_LIST_copy: + cmd_key = cmd.split()[1] + if ("set_" in cmd_key) or ("do_start" in cmd_key) or cmd_key in ["do_init", "do_goto", "do_open", "do_close"]: + self.TEST_COMMANDS_LIST.remove(cmd) + log.debug("- NEW test commands list scenario is:\n" + self.TEST_COMMANDS_LIST, '\n') + + # Can be overriden by subclass (AgentDevice) + def TEST_test_results_other(self, commands): + pass + + def _TEST_test_results_start(self): + print() + log.info("--- Testing if the commands I SENT had the awaited result") + log.info("Here are the last commands I sent:") + #commands = list(Command.get_last_N_commands_for_agent(self.name, 16)) + #commands = Command.get_last_N_commands_sent_to_agent(self.name, 16) + nb_commands = len(self.TEST_COMMANDS_LIST) + if "ad_unknown get_dec" in self.TEST_COMMANDS_LIST: nb_commands -= 1 + commands = AgentCmd.get_last_N_commands_sent_by_agent(self.name, nb_commands) + AgentCmd.show_commands(commands) + return nb_commands, commands + """ OLD SCENARIO + nb_asserted = 0 + for cmd in commands: + if cmd.name == "specific0": + assert cmd.is_skipped() + nb_asserted+=1 + if cmd.name == "specific1": + assert cmd.result == "in step #5/5" + assert cmd.is_executed() + nb_asserted+=1 + if cmd.name in ("specific2","specific3"): + assert cmd.is_killed() + nb_asserted+=1 + if cmd.name in ("specific4", "specific5", "specific6", "specific7", "specific8"): + assert cmd.is_pending() + nb_asserted+=1 + # 2 cmds abort + if cmd.name in ("do_abort"): + assert cmd.is_executed() + nb_asserted+=1 + if cmd.name in ("do_exit"): + assert cmd.is_executed() + nb_asserted+=1 + assert nb_asserted == 12 + self.printd("--- Finished testing => result is ok") + """ + + # To be overriden by subclass + def TEST_test_results_main(self, commands): + return 0 + ''' + nb_asserted = 0 + self.printd("from simulator_test_results_main", commands) + for cmd in commands: + assert cmd.is_executed() + nb_asserted+=1 + return nb_asserted + ''' + + def _TEST_test_results_end(self, nb_asserted): + #nb_commands_to_send = len(self.TEST_COMMANDS_LIST) + #self.printd(nb_asserted, "vs", nb_commands_to_send) + #assert nb_asserted == nb_commands_to_send + #self.printd(f"************** Finished testing => result is ok ({nb_asserted} assertions) **************") + printFullTerm(Colors.GREEN, f"************** Finished testing => result is ok ({nb_asserted} assertions) **************") + + + + + + +""" +================================================================= + MAIN +================================================================= +""" + +def extract_parameters(): + """ Usage: Agent.py [-t] [configfile] """ + # arg 1 : -t + # arg 2 : configfile + DEBUG_MODE = False + TEST_MODE = False + WITH_SIM = False + VERBOSE_MODE = False + configfile = None + log.debug("args:" + str(sys.argv)) + for arg in sys.argv[1:] : + if arg == "-t": TEST_MODE = True + elif arg == "-s": WITH_SIM = True + elif arg == "-d": DEBUG_MODE = True + elif arg == "-v": VERBOSE_MODE = True + #else: configfile = arg + ''' + if len(sys.argv) > 1: + if sys.argv[1] == "-t": + TEST_MODE = True + if len(sys.argv) == 3: + configfile = sys.argv[2] + else: + configfile = sys.argv[1] + ''' + #return DEBUG_MODE, WITH_SIM, TEST_MODE, VERBOSE_MODE, configfile + return DEBUG_MODE, TEST_MODE, VERBOSE_MODE + +#def build_agent(Agent_type:Agent, name="GenericAgent", RUN_IN_THREAD=True): +#def build_agent(Agent_type:Agent, RUN_IN_THREAD=True): +def build_agent(Agent_type:Agent) -> Agent : + #DEBUG_MODE, WITH_SIM, TEST_MODE, VERBOSE_MODE, configfile = extract_parameters() + DEBUG_MODE, TEST_MODE, VERBOSE_MODE = extract_parameters() + log.debug("debug mode is" + os.getenv("PYROS_DEBUG")) + + #print(logger) + + #agent = Agent("GenericAgent", configfile, RUN_IN_THREAD=True) + #agent = Agent_type(configfile, RUN_IN_THREAD, DEBUG_MODE=DEBUG_MODE) + #agent = Agent_type(RUN_IN_THREAD) + agent = Agent_type() + # AgentSP isn't in a config, so to avoid that WITH_SIM returns an error it's a special case + if agent.name == "AgentSP": + agent._set_with_simulator(False) + agent._set_test_mode(TEST_MODE) + return agent + #agent = Agent_type(name, configfile, RUN_IN_THREAD) + # Get the information of the agent name (name of class) within obsconfig and get the "is_real" attribute + if agent.name in agent.get_config().get_agents(agent.unit).keys(): + if agent.get_config().get_agent_information(agent.unit,agent.name).get("is_real"): + WITH_SIM = not agent.get_config().get_agent_information(agent.unit,agent.name)["is_real"] + else: + WITH_SIM = True + else: + WITH_SIM = True + agent._set_with_simulator(WITH_SIM) + agent._set_test_mode(TEST_MODE) + #print(logger) + #logg.info("agent built, return it") + + #agent._set_debug_mode(DEBUG_MODE) + return agent + + +if __name__ == "__main__": + + ''' + # with thread + RUN_IN_THREAD=True + # with process + #RUN_IN_THREAD=False + ''' + + #agent = build_agent(Agent, RUN_IN_THREAD=RUN_IN_THREAD) + agent = build_agent(Agent) + + agent.show_config() + + #agent = build_agent(Agent, name="GenericAgent", RUN_IN_THREAD=RUN_IN_THREAD) + ''' + TEST_MODE, WITH_SIM, configfile = extract_parameters() + agent = Agent("GenericAgent", configfile, RUN_IN_THREAD=True) + #agent.setSimulatorMode(TEST_MODE) + agent.setTestMode(TEST_MODE) + agent.setWithSimulator(WITH_SIM) + self.printd(agent) + ''' + agent.run() diff --git a/pyros.py b/pyros.py index 3b6929f..bc9eb55 100755 --- a/pyros.py +++ b/pyros.py @@ -1115,8 +1115,6 @@ def new_start(configfile: str, observatory: str, unit: str, computer_hostname: s cmd += " -t" if verbose_mode(): cmd += " -v" - if configfile: - cmd += f" {configfile}" if computer_hostname: cmd += f" --computer {computer_hostname}" -- libgit2 0.21.2