#!/usr/bin/env python3 VERSION = "0.5" DEBUG=True #DEBUG=False """TODO: - 1 log par agent + lastlog (30 dernières lignes) - table agents_log avec log minimum pour affichage dans dashboard, et ordre chrono intéressant pour suivi activité : nom agent, timestamp, message - 1 table par agent: agent__vars : nom variable, value, desc """ """ ================================================================= SETUP FOR DJANGO (see https://docs.djangoproject.com/en/dev/topics/settings) (see also https://docs.djangoproject.com/en/dev/ref/settings) ================================================================= """ import os from pathlib import Path import sys from django.conf import settings as djangosettings # Conseil sur le net: #https://stackoverflow.com/questions/16853649/how-to-execute-a-python-script-from-the-django-shell #"" #import sys, os #sys.path.append('/path/to/your/django/app') #os.environ['DJANGO_SETTINGS_MODULE'] = 'settings' #from django.conf import settings #"" # To avoid a "ModuleNotFoundError: No module named 'dashboard'"... (not even 1 app found) : ##sys.path.insert(0, os.path.abspath("..")) ##sys.path.insert(0, os.path.abspath("src")) ##sys.path.insert(0, "../src") ##sys.path.insert(0, "src") # To avoid a "ModuleNotFoundError: No module named 'src'" sys.path.append("..") sys.path.append("../..") sys.path.append("../../..") sys.path.append("../../../..") ##sys.path.append("src") print("Starting with this sys.path", sys.path) # DJANGO setup # self.printd("file is", __file__) # mypath = os.getcwd() # Go into src/ ##os.chdir("..") ##os.chdir("src") print("Current directory : " + str(os.getcwd())) #os.environ.setdefault("DJANGO_SETTINGS_MODULE", "src.core.pyros_django.pyros.settings") os.environ.setdefault("DJANGO_SETTINGS_MODULE", "src.core.pyros_django.pyros.settings") # os.environ['SECRET_KEY'] = 'abc' # os.environ['ENVIRONMENT'] = 'production' import django django.setup() print("DB2 used is:", djangosettings.DATABASES["default"]["NAME"]) print() """ ================================================================= IMPORT PYTHON PACKAGES ================================================================= """ # --- GENERAL PURPOSE IMPORT --- #from __future__ import absolute_import import utils.Logger as L import platform import random from threading import Thread import threading, multiprocessing import time import socket #import ctypes #import copy # --- DJANGO IMPORT --- from django.db import transaction from django import db # from django.core.exceptions import ObjectDoesNotExist # from django.db.models import Q #from django.shortcuts import get_object_or_404 #from django.conf import settings as djangosettings # --- SPECIFIC IMPORT --- from common.models import AgentSurvey, Command from config.configpyros import ConfigPyros #from dashboard.views import get_sunelev #from devices.TelescopeRemoteControlDefault import TelescopeRemoteControlDefault #from utils.JDManipulator import * from agent.logpyros import LogPyros """ ================================================================= GENERAL MODULE CONSTANT & FUNCTIONS DEFINITIONS ================================================================= """ #DEBUG_FILE = False log = L.setupLogger("AgentLogger", "Agent") IS_WINDOWS = platform.system() == "Windows" class Colors: HEADER = "\033[95m" BLUE = "\033[94m" GREEN = "\033[92m" WARNING = "\033[93m" FAIL = "\033[91m" ENDC = "\033[0m" BOLD = "\033[1m" UNDERLINE = "\033[4m" def printColor(color: Colors, message, file=sys.stdout, eol=os.linesep, forced=False): #system = platform.system() """ if (self.disp == False and forced == False): return 0 """ #if system == "Windows": if IS_WINDOWS: print(message, file=file, end=eol) else: print(color + message + Colors.ENDC, file=file, end=eol) return 0 def printFullTerm(color: Colors, string: str): #system = platform.system() columns = 100 row = 1000 disp = True value = int(columns / 2 - len(string) / 2) printColor(color, "-" * value, eol="") printColor(color, string, eol="") value += len(string) printColor(color, "-" * (columns - value)) return 0 """ ================================================================= class StoppableThread ================================================================= """ class StoppableThreadEvenWhenSleeping(threading.Thread): # Thread class with a stop() method. The thread itself has to check # regularly for the stopped() condition. # It stops even if sleeping # See https://python.developpez.com/faq/?page=Thread#ThreadKill # See also https://www.oreilly.com/library/view/python-cookbook/0596001673/ch06s03.html def __init__(self, *args, **kwargs): #super(StoppableThreadSimple, self).__init__(*args, **kwargs) super().__init__(*args, **kwargs) self._stop_event = threading.Event() #def stop(self): def terminate(self): self._stop_event.set() def stopped(self): return self._stop_event.is_set() def wait(self, nbsec:float=2.0): self._stop_event.wait(nbsec) """ ================================================================= class Agent ================================================================= """ class Agent: """ See Agent_activity_diag.pu for PlantUML activity diagram Behavior of an agent: - If idle : - still does routine_process() and general_process() - does not do specific_process() - Once a command has been sent to another agent : - It waits (non blocking) for the end of execution of the command and get its result - If command is timed out or has been skipped or killed, then it is NOT re-executed at next iteration (except if needed explicitely) """ # --- # --- CLASS (STATIC) attributes (CONSTANTS) # --- If agent is instance of Agent: # --- - CLASS attributes are accessible via agent.__class__.__dict__ # --- - INSTANCE attributes are accessible via agent.__dict__ # --- # Maximum duration of this agent (only for SIMULATION mode) # If set to None, it will never exit except if asked (or CTRL-C) # If set to 20, it will exit after 20s SIMULATOR_MAX_DURATION_SEC = None #SIMULATOR_MAX_DURATION_SEC = 30 # FOR TEST ONLY # Run this agent in simulator mode SIMULATOR_MODE = True # Run the assertion tests at the end SIMULATOR_WITH_TEST = False # Who should I send commands to ? SIMULATOR_COMMANDS_DEST = "myself" # Default scenario to be executed #SIMULATOR_COMMANDS = iter([ SIMULATOR_COMMANDS_LIST = [ "go_active", "go_idle", # specific0 not_executed_because_idle "specific0", "go_active", # specific1 executed_because_not_idle, should complete ok "specific1", # specific2 will be executed only when specific1 is finished, # and should be aborted before end of execution, # because of the 1st coming "abort" command below "specific2", # specific3 should be executed only when specific2 is finished (in fact, aborted), # and should be aborted before end of execution, # because of the 2nd coming "abort" command below "specific3", # These commands should not have the time to be processed # because the "exit" command below should be executed before "specific4", "specific5", "specific6", "specific7", "specific8", # Should abort the current running command (which should normally be specific2) # even if commands above (specific3, ..., specific8) are already pending "abort", # These commands (except abort) won't be executed # because too many commands are already pending (above) "specific9", "abort", "go_active", "go_idle", # Should stop the agent even before the previous pending commands are executed "exit", # Because of the previous "exit" command, # these following commands should not be executed, # and not even be added to the database command table "go_active", "specific10" ] #SIMULATOR_COMMANDS = iter(SIMULATOR_COMMANDS_LIST) """ How to run this agent exec_specific_cmd() method ? - True = inside a Thread (cannot be killed, must be asked to stop, and inadequate for computation) - False = inside a Process If thread, displays : >>>>> Thread: starting execution of command specific1 >>>>> Thread: PID: 2695, Process Name: MainProcess, Thread Name: Thread-1 ... >>>>> Thread: starting execution of command specific2 >>>>> Thread: PID: 2695, Process Name: MainProcess, Thread Name: Thread-2 ... >>>>> Thread: starting execution of command specific3 >>>>> Thread: PID: 2695, Process Name: MainProcess, Thread Name: Thread-3 If process, displays : >>>>> Thread: starting execution of command specific1 >>>>> Thread: PID: 2687, Process Name: Process-1, Thread Name: MainThread ... >>>>> Thread: starting execution of command specific2 >>>>> Thread: PID: 2689, Process Name: Process-2, Thread Name: MainThread ... >>>>> Thread: starting execution of command specific3 >>>>> Thread: PID: 2690, Process Name: Process-3, Thread Name: MainThread """ # with thread RUN_IN_THREAD = True # with process #RUN_IN_THREAD = False _thread_total_steps_number = 1 # Run for real, otherwise just print messages without really doing anything FOR_REAL = True #COMMANDS_PEREMPTION_HOURS = 48 #COMMANDS_PEREMPTION_HOURS = 60/60 name = "Generic Agent" mainloop_waittime = 3 subloop_waittime = 2 status = None mode = None config = None # Statuses STATUS_LAUNCH = "LAUNCHED" STATUS_INIT = "INITIALIZING" STATUS_MAIN_LOOP = "IN_MAIN_LOOP" STATUS_GET_NEXT_COMMAND = "IN_GET_NEXT_COMMAND" STATUS_GENERAL_PROCESS = "IN_GENERAL_PROCESS" STATUS_ROUTINE_PROCESS = "IN_ROUTINE_PROCESS" STATUS_SPECIFIC_PROCESS = "IN_SPECIFIC_PROCESS" STATUS_EXIT = "EXITING" # Modes MODE_ACTIVE = "ACTIVE" MODE_IDLE = "IDLE" PYROS_DJANGO_BASE_DIR = Path("src/core/pyros_django") # pathlib DEFAULT_CONFIG_FILE_NAME = "config_unit_simulunit1.xml" CONFIG_DIR_NAME = "config" # Parameters from config file # for /src/ #_path_data = '../../config' # for /src/core/pyros_django/ #_path_data = '../../../../config' # Path to config _path_data = '' _computer_alias = '' _computer_description = '' # Current and next command to send cmdts = None next_cmdts = None _agent_survey = None _pending_commands = [] _current_specific_cmd = None _current_specific_thread = None _iter_num = None # Log object _log = None def __init__(self, name:str="Agent", config_filename:str=None, RUN_IN_THREAD=True): self.name = name self.SIMULATOR_COMMANDS = iter(self.SIMULATOR_COMMANDS_LIST) self.RUN_IN_THREAD = RUN_IN_THREAD self.set_status(self.STATUS_LAUNCH) self.set_idle() self._log = LogPyros() self._log.set_agent_alias(self.name) ''' (old way) #print(os.path.dirname(__file__)) my_parent_abs_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) #print(os.path.dirname(os.path.realpath(__file__))) #print(djangosettings.BASE_DIR) self.BASE_DIR = my_parent_abs_dir.split(self.PYROS_DJANGO_BASE_DIR)[0] #print(self.BASE_DIR) self._path_data = os.path.join(self.BASE_DIR, self.CONFIG_DIR_NAME) ''' # New way with PathLib my_parent_abs_dir = Path(__file__).resolve().parent #TODO: on doit pouvoir faire mieux avec pathlib (sans utiliser str()) self._path_data = str( Path( str(my_parent_abs_dir).split(str(self.PYROS_DJANGO_BASE_DIR))[0] ) / self.CONFIG_DIR_NAME ) #self.set_mode(self.MODE_IDLE) if not config_filename: config_filename = self.DEFAULT_CONFIG_FILE_NAME self.printd(f"config_filename={config_filename}") # If config file name is RELATIVE (i.e. without path, just the file name) # => give it an absolute path (and remove "src/agent/" from it) if config_filename == os.path.basename(config_filename): # Build abs path including current agent dir config_filename = os.path.abspath(self.CONFIG_DIR_NAME + os.sep + config_filename) # Remove "src/agent_name/" from abs dir : # (1) Remove "src/core/pyros_django/" config_filename = config_filename.replace(str(self.PYROS_DJANGO_BASE_DIR)+os.sep, os.sep) # (2) Remove "agent_name/" #TODO: bidouille, faire plus propre config_filename = config_filename.replace(os.sep+"agent"+os.sep, os.sep) config_filename = config_filename.replace(os.sep+"monitoring"+os.sep, os.sep) self.printd("Config file used is", config_filename) #self.printd("current path", os.getcwd()) #self.printd("this file path :", __file__) #self.printd("config file path is", config_filename) # Instantiate an object for configuration #self.printd("config file path is ", config_abs_filename) self.config = ConfigPyros(config_filename) if self.config.get_last_errno() != self.config.NO_ERROR: raise Exception(f"Bad config file name '{config_filename}', error {str(self.config.get_last_errno())}: {str(self.config.get_last_errmsg())}") self.set_mode_from_config(name) # TODO: remove #self.set_idle() self.set_active() # Create 1st survey if none #tmp = AgentSurvey.objects.filter(name=self.name) #if len(tmp) == 0: #nb_agents = AgentSurvey.objects.filter(name=self.name).count() #if nb_agents == 0: if AgentSurvey.objects.filter(name=self.name).exists(): self._agent_survey = AgentSurvey.objects.get(name=self.name) else: self._agent_survey = AgentSurvey.objects.create(name=self.name, validity_duration=60, mode=self.mode, status=self.status) self.printd("Agent survey is", self._agent_survey) def __repr__(self): return "I am agent " + self.name def __str__(self): return self.__repr__() #return "I am agent " + self.name # Normal print def print(self, *args, **kwargs): if args: print(f"({self.name}): ", *args, **kwargs) else: print() # DEBUG print def printd(self, *args, **kwargs): if DEBUG: self.print(*args, **kwargs) def sleep(self, nbsec:float=2.0): # thread if self._current_specific_thread and self.RUN_IN_THREAD: self._current_specific_thread.wait(nbsec) # process (or main thread) else: time.sleep(nbsec) def run(self, nb_iter:int=None, FOR_REAL:bool=True): """ FOR_REAL: set to False if you don't want Majordome to send commands to devices """ self.DO_EXIT = False self.DO_RESTART = True # Will loop again only if DO_RESTART is True while self.DO_RESTART: self.DO_RESTART = False self.printd("on est ici: ", os.getcwd()) self.load_config() if self.SIMULATOR_MODE: self._log.print("[IN SIMULATOR MODE]") else: self._log.print("[IN NORMAL MODE]") self.SIMULATOR_MAX_DURATION_SEC=None self.start_time = time.time() self.FOR_REAL = FOR_REAL self.init() # Avoid blocking on false "running" commands # (old commands that stayed with "running" status when agent was killed) Command.delete_commands_with_running_status_for_agent(self.name) # SIMULATOR MODE ONLY : flush previous commands to be sure to restart clean if self.SIMULATOR_MODE: self.print("flush previous commands to be sure to start in clean state") Command.delete_pending_commands_for_agent(self.name) ''' # SETUP try: self.config = get_object_or_404(Config, id=1) # By default, set mode to SCHEDULER (False = REMOTE, which should never be the default) self.config.global_mode = True self.config.save() # self.config = Config.objects.get(pk=1) # self.config = Config.objects.get()[0] except Exception as e: # except Config.ObjectDoesNotExist: self.printd("Config read (or write) exception", str(e)) return -1 ''' self._iter_num = 1 self.DO_MAIN_LOOP = True # Main loop while self.DO_MAIN_LOOP: try: self.main_loop(nb_iter,FOR_REAL) if not self.DO_MAIN_LOOP: break except KeyboardInterrupt: # In case of CTRL-C, kill the current thread (process) before dying (in error) self.print("CTRL-C Interrupted, I kill the current thread (process) before exiting (if exists)") self.kill_running_specific_cmd_if_exists("USER_CTRLC") exit(1) #if self.DO_EXIT: exit(0) def main_loop(self, nb_iter:int=None, FOR_REAL:bool=True): # Bad number of iterations, so exit if nb_iter is not None: if nb_iter <= 0: self.DO_MAIN_LOOP = False return # Number of iterations asked is reached, so exit if self._iter_num > nb_iter: print(f"Exit because number of iterations asked ({nb_iter}) has been reached") self.DO_MAIN_LOOP = False return self.print() self.print() #self.printd("-"*80) self.print("-"*20, f"MAIN LOOP ITERATION {self._iter_num} (START)", "-"*20) self.set_status(self.STATUS_MAIN_LOOP) self.show_mode_and_status() # Wait a random number of sec before starting iteration # (to let another agent having the chance to send a command before me) random_waiting_sec = random.randint(0,5) self.print(f"Waiting {random_waiting_sec} sec (random) before starting new iteration...") time.sleep(random_waiting_sec) self.load_config() # only if changed # Log this agent status (update my current mode and status in DB) self.log_agent_status() self.printd("------START COMMMAND PROCESSING------") # Purge commandes (every N iterations, delete old commands) N=3 if ((self._iter_num-1) % N) == 0: self.print("Looking for old commands to purge...") #Command.purge_old_commands_for_agent(self.name) self.purge_old_commands_sent_to_me() # ROUTINE process #if self.is_active(): self.routine_process() #self.printd("I am IDLE, so I bypass the routine_process (do not send any new command)") # Get next command to execute cmd = self.get_next_valid_command() #if cmd: cmd = self.general_process(cmd) # Process this (next) command (if exists) if cmd: self.command_process(cmd) # if restart, exit this loop to restart from beginning if self.DO_RESTART or self.DO_EXIT: self.DO_MAIN_LOOP = False return self.printd("------END COMMMAND PROCESSING------") #self.waitfor(self.mainloop_waittime) self.print("-"*20, "MAIN LOOP ITERATION (END)", "-"*20) #self.do_log(LOG_DEBUG, "Ending main loop iteration") # (simulator only) Exit if max duration is reached if self.SIMULATOR_MAX_DURATION_SEC and (time.time()-self.start_time > self.SIMULATOR_MAX_DURATION_SEC): self.print("Exit because of max duration set to ", self.SIMULATOR_MAX_DURATION_SEC, "s") self.kill_running_specific_cmd_if_exists(self.name) if self.SIMULATOR_MODE and self.SIMULATOR_WITH_TEST: self.simulator_test_results() self.DO_MAIN_LOOP = False return self._iter_num += 1 def purge_old_commands_sent_to_me(self): Command.purge_old_commands_sent_to_agent(self.name) # To be overriden by subclass def routine_process(self): """ Routine processing. This is a command or set of commands that this agent sends regularly, (or just a regular processing) at each iteration """ self.set_status(self.STATUS_ROUTINE_PROCESS) if self.SIMULATOR_MODE: self.simulator_routine_process() """ def purge_commands(self): ### Delete commands (which I am recipient of) older than COMMANDS_PEREMPTION_HOURS (like 48h) ATTENTION !!! EXCEPT the RUNNING command !!! NB: datetime.utcnow() is equivalent to datetime.now(timezone.utc) ### self.printd("Looking for old commands to purge...") ### COMMAND_PEREMPTION_DATE_FROM_NOW = datetime.utcnow() - timedelta(hours = self.COMMANDS_PEREMPTION_HOURS) #self.printd("peremption date", COMMAND_PEREMPTION_DATE_FROM_NOW) old_commands = Command.objects.filter( # only commands for me recipient = self.name, # only pending commands sender_deposit_time__lt = COMMAND_PEREMPTION_DATE_FROM_NOW, ) ### old_commands = Command.get_old_commands_for_agent(self.name) if old_commands.exists(): self.printd("Found old commands to delete:") for cmd in old_commands: self.printd(cmd) old_commands.delete() """ def waitfor(self, nbsec): self.printd(f"Now, waiting for {nbsec} seconds...") time.sleep(nbsec) def set_status(self, status:str): #self.print(f"[{status}] (switching from status {self.status})") self.print(f"[{status}]") self.status = status return False def set_mode(self, mode:str): #self.print(f"Switching from mode {self.mode} to mode {mode}") self.print(f"[NEW MODE {mode}]") self.mode = mode def is_active(self): return self.mode == self.MODE_ACTIVE def set_active(self): self.set_mode(self.MODE_ACTIVE) def set_idle(self): self.set_mode(self.MODE_IDLE) def show_mode_and_status(self): self.print(f"CURRENT MODE is {self.mode} (status is {self.status})") def die(self): self.set_status(self.STATUS_EXIT) """ suspend/resume """ def suspend(self): """ TODO: Mode IDLE (doit rester à l'écoute d'un resume, et doit continuer à alimenter les tables pour informer de son état via tables agents_logs, et lire table agents_command pour reprendre via resume, et update la table agents_survey pour donner son status "idle" """ self.set_idle() return True def resume(self): """ Quit suspend() mode """ self.set_active() return True def set_mode_from_config(self, agent_alias): # --- Get the startmode of the AgentX modestr = self.config.get_paramvalue(agent_alias,'general','startmode') if self.config.get_last_errno() != self.config.NO_ERROR: raise Exception(f"error {str(self.config.get_last_errno())}: {str(self.config.get_last_errmsg())}") if (modestr == None): return True # --- Set the mode according the startmode value mode = self.MODE_IDLE if modestr.upper() == 'RUN': mode = self.MODE_ACTIVE self.set_mode(mode) return True """ ================================================================= Generic methods that may be specialized (overriden) by subclasses ================================================================= """ def init(self): self.printd("Initializing...") self.set_status(self.STATUS_INIT) def load_config(self): """ TODO: only si date fichier xml changée => en RAM, un objet Config avec méthodes d'accès, appelle le parser de AK (classe Config.py indépendante) """ self.printd("Loading the config file...") self.config.load() if self.config.get_last_errno() != self.config.NO_ERROR: raise Exception(f"error {str(self.config.get_last_errno())}: {str(self.config.get_last_errmsg())}") if not self.config.is_config_contents_changed(): return # === display informations # --- Get the assembly aliases of the unit assembled_mount_aliases = [] assembled_channel_aliases = [] assembled_aliases = [] unit_alias = self.config.get_aliases('unit')[0] params = self.config.get_params(unit_alias) for param in params: if param['section']=="assembly" and param['key']=="alias": assembled_aliases.append(param['value']) #print(f"Unit {unit_alias} is the assembly of {assembled_aliases}") print("--------- Components of the unit -----------") print("Configuration file is {}".format(self.config.get_configfile())) alias = self.config.get_aliases('unit')[0] namevalue = self.config.get_paramvalue(alias,'unit','description') print(f"Unit alias is {alias}. Description is {namevalue}:") unit_subtags = self.config.get_unit_subtags() for unit_subtag in unit_subtags: aliases = self.config.get_aliases(unit_subtag) for alias in aliases: namevalue = self.config.get_paramvalue(alias,unit_subtag,'description') print(f"- {unit_subtag} alias is {alias}. Description is {namevalue}") # --- fill the list of mount and channel assembled if alias in assembled_aliases: if unit_subtag=="mount": assembled_mount_aliases.append(alias) elif unit_subtag=="channel": assembled_channel_aliases.append(alias) print("--------- Assembly of the unit -----------") print(f"Assembled mount aliases: {assembled_mount_aliases}") print(f"Assembled channel aliases: {assembled_channel_aliases}") # --- Get the home of the mount[0] mount_alias = assembled_mount_aliases[0] home = self.config.get_paramvalue(mount_alias,'MountPointing','home') print("------------------------------------------") hostname = socket.gethostname() self._computer_alias = '' unit_subtag = 'computer' aliases = self.config.get_aliases(unit_subtag) for alias in aliases: self.printd("alias", alias) value = self.config.get_paramvalue(alias,'local','hostname') self.printd("value", value) if value == hostname: self.printd("value", value) self._computer_alias = alias value = self.config.get_paramvalue(alias,unit_subtag,'description') self._computer_description = value value = self.config.get_paramvalue(alias,'path','data') # Overrides default value self._path_data = value break print(f"hostname = {hostname}") print(f"path_data = {self._path_data}") print(f"home = {home}") print("------------------------------------------") # --- update the log parameters self._log.set_path_data(self._path_data) self._log.set_home(home) #def update_survey(self): def log_agent_status(self): """ Save (update) this agent current mode and status in DB """ self.printd("Updating the agent survey database table...") #self.printd("- fetching table line for agent", self.name) # only necessary when using process (not necessary with threads) #with transaction.atomic(): #self._agent_survey = AgentSurvey.objects.get(name=self.name) self._agent_survey.mode = self.mode self._agent_survey.status = self.status #self._agent_survey.save() self._agent_survey.save(update_fields=["mode", "status"]) """ def send_command(self, cmd_name): recipient_agent = self.name if self.SIMULATOR_COMMANDS_DEST=="myself" else self.SIMULATOR_COMMANDS_DEST Command.objects.create(sender=self.name, recipient=recipient_agent, name=cmd_name) """ #def send_command(self, to_agent, cmd_type, cmd_name, cmd_args=None): def send_command(self, to_agent, cmd_name, cmd_args=None): """ #ex: send_command(“AgentX”,”GENERIC”,”EVAL”,“3+4”) ex: send_command(“AgentX”,"EVAL”,“3+4”) """ return Command.send_command(self.name, to_agent, cmd_name, cmd_args) def get_next_valid_command(self)->Command: """ Return next VALID (not expired) command (read from the DB command table) which is relevant to this agent. Commands are read in chronological order """ self.set_status(self.STATUS_GET_NEXT_COMMAND) #self.printd("Looking for new commands from the database ...") # 1) Get all pending commands for me (return if None) # Not sure this is necessary to do it in a transaction, # but there might be a risk # that a command status is modified while we are reading... with transaction.atomic(): self._pending_commands = Command.get_pending_and_running_commands_for_agent(self.name) commands = self._pending_commands if not commands.exists(): self.printd("No new command to process") return None self.print("Current pending commands are (time ordered) :") Command.show_commands(commands) # 2) If there is a "exit" or "abort" command pending (even at the end of the list), # which is VALID (not expired), # then pass it straight away to general_process() for execution for cmd in commands: if cmd.name in ("exit", "abort", "flush_commands"): break if cmd.name in ("exit", "abort", "flush_commands") and not cmd.is_expired(): return cmd # 3) If first (oldest) command is currently running # (status CMD_RUNNING), then do nothing and return """ cmd_executing = Command.objects.filter( # only commands for me recipient = self.name, # only pending commands recipient_status_code = Command.CMD_STATUS_CODES.CMD_RUNNING, ).first() #if cmd_executing.exists(): if cmd_executing: """ cmd = commands[0] if cmd.is_running(): #self.printd(f"There is currently a running command ({cmd_executing.first().name}), so I do nothing (wait for end of execution)") self.printd(f"There is currently a running command ({cmd.name})") """ # Check that this command is not expired if cmd.is_expired(): self.printd("But this command is expired, so set its status to OUTOFDATE, and go on") cmd_executing.set_as_outofdate() else: """ self.printd(f"Thus, I will do nothing until this command execution is finished") # TODO: kill si superieur a MAX_EXEC_TIME return None # 4) Mark all expired commands as such for cmd in commands: if cmd.is_expired(): cmd.set_as_outofdate() # break at 1st "valid" command (not expired) else: break # 5) If no more commands to process, return None if cmd.is_expired(): return None # 6) Current cmd must now be a valid (not expired) and PENDING one, # so pass it to general_process() for execution #self.printd(f"Got command {cmd.name} sent by agent {cmd.sender} at {cmd.sender_deposit_time}") return cmd def command_process(self, cmd:Command)->Command: assert cmd is not None cmd = self.general_process(cmd) # Sub-level loop (only if ACTIVE) if cmd and self.is_active(): self.specific_process(cmd) def general_process(self, cmd:Command)->Command: assert cmd is not None self.set_status(self.STATUS_GENERAL_PROCESS) self.print("***") self.print("*** RECEIVED", cmd) self.print("***") self.printd(f"Starting general processing of this command") # Update read time to say that the command has been READ cmd.set_read_time() # Precondition: command cmd is valid (not expired), has already been read, is pending assert (not cmd.is_expired()) and cmd.is_pending() and cmd.is_read() #self.printd(f"Starting general processing of command {cmd.name} sent by agent {cmd.sender} at {cmd.sender_deposit_time}") """ # 2) If expired command, change its status to expired and return if cmd.is_expired(): self.printd("This command is expired, so mark it as such, and ignore it") cmd.set_as_outofdate() return None """ # If cmd is generic, execute it, change its status to executed, and return if cmd.is_generic(): self.print("This command is generic, execute it...") self.exec_generic_cmd(cmd) # If cmd is "exit", kill myself (without any question, this is an order soldier !) # This "exit" should normally kill any current thread (to be checked...) if cmd.name == "exit": self.printd("(before exiting) Here are the current (still) pending commands (time ordered) :") commands = Command.get_pending_and_running_commands_for_agent(self.name) Command.show_commands(commands) #if self.SIMULATOR_MODE and self.SIMULATOR_WITH_TEST and self.SIMULATOR_COMMANDS_DEST == "myself": self.simulator_test_results() if self.SIMULATOR_MODE and self.SIMULATOR_WITH_TEST: self.simulator_test_results() #self.DO_EXIT=True #exit(0) # Command is executed, so return None return None # cmd is not generic else: # cmd is not generic but, as I am idle, change its status to SKIPPED, ignore it, and return #if self.mode == self.MODE_IDLE: if not self.is_active(): self.printd("This command is not generic but, as I am IDLE, I mark it SKIPPED and ignore it") cmd.set_as_skipped() return None # Je suis pas idle et cde pas générique: je la traite pas, elle sera traitée par core_process : # attendre que cette commande soit exécutée avant de passer à la commande suivante (situation “bloquante” normale) self.printd("This command is not generic and, as I am not IDLE, I pass it to the specific processing") self.printd("(then I will not execute any other new command until this command is EXECUTED)") return cmd def exec_generic_cmd(self, cmd:Command): self.printd("Starting execution of a Generic cmd...") cmd.set_as_running() # Executing command if cmd.name == "go_active": self.set_active() cmd.set_result("I am now active") time.sleep(1) elif cmd.name == "go_idle": self.set_idle() cmd.set_result("I am now idle") time.sleep(1) elif cmd.name in ("flush_commands"): self.print("flush_commands received: Delete all pending commands") Command.delete_pending_commands_for_agent(self.name) elif cmd.name in ("abort", "exit", "restart_init"): #self.printd("Current pending commands are:") #Command.show_commands(self._pending_commands) self.print("Aborting current executing command if exists:") self.kill_running_specific_cmd_if_exists(cmd.sender) if cmd.name == "restart_init": self.print("restart_init received: Restarting from init()") self.DO_RESTART=True elif cmd.name == "exit": self.DO_EXIT=True else: name = cmd.name args = None if " " in name: name,args = name.split() if name == "eval": if args==None: raise(ValueError) cmd.set_result(eval(args)) cmd.set_as_processed() self.printd("...Generic cmd has been executed") def do_log(self): """ log à 2 endroits ou 1 seul - in file - in db """ self.printd("Logging data...") def specific_process(self, cmd:Command): """ Sublevel Loop (only if ACTIVE) Dans le cas du Majordome, le specific_process() doit lancer la prise d'une séquence. La sequence elle même va être une boucle dans le specific_process. Donc on peut voir une séquence comme un appel unique à une fonction qui va durer un certain temps (max 20 min par exemple) mais dans la méthode specific_process il y a une boucle sur la prise des images. """ assert cmd is not None assert self.is_active() self.set_status(self.STATUS_SPECIFIC_PROCESS) self._current_specific_cmd = cmd self.printd("Starting specific process...") #self._current_thread = threading.Thread(target=self.exec_command) # Run in a thread if self.RUN_IN_THREAD: self.printd("(run cmd in a thread)") self._current_specific_thread = StoppableThreadEvenWhenSleeping(target=self.thread_exec_specific_cmd) #self._current_specific_thread = StoppableThreadEvenWhenSleeping(target=self.exec_specific_cmd, args=(cmd,)) #self._current_thread = threading.Thread(target=self.exec_command) #self._current_specific_thread = StoppableThread(target=self.exec_specific_cmd, args=(cmd,)) #self._current_specific_thread = threading.Thread(target=self.exec_specific_cmd, args=(cmd,)) #self._current_specific_thread = thread_with_exception('thread test') # Run in a process else: self.printd("(run cmd in a process)") # close the database connection first, it will be re-opened in each process db.connections.close_all() self._current_specific_thread = multiprocessing.Process(target=self.thread_exec_specific_cmd) #self._current_specific_thread = multiprocessing.Process(target=self.exec_specific_cmd, args=(cmd,)) #self._current_specific_thread = threading.Thread(target=self.exec_specific_cmd, args=(cmd,)) #self._current_specific_thread = thread_with_exception('thread test') self._current_specific_thread.start() #my_thread.join() #self.waitfor(self.subloop_waittime) self.printd("Ending specific process (thread has been launched)") def kill_running_specific_cmd_if_exists(self, abort_sender): if (self._current_specific_thread is None) or not self._current_specific_thread.is_alive(): self.print("...No current specific command thread to abort...") else: self.print(f"Killing command {self._current_specific_cmd.name}") # Ask the thread to stop itself #self._current_specific_thread.stop() #self._current_specific_thread._stop() #self._current_specific_thread.shutdown() #threading._shutdown() #self._current_specific_thread.raise_exception() self._current_specific_cmd.set_as_killed_by(abort_sender) self._current_specific_thread.terminate() # Now, wait for the end of the thread if self.RUN_IN_THREAD: self._current_specific_thread.join() self._current_specific_thread = None #self._current_specific_cmd.set_as_killed() self._current_specific_cmd = None """ ================================================================= SIMULATOR DEDICATED FUNCTIONS ================================================================= """ def setSimulatorMode(self, mode): self.SIMULATOR_MODE=mode def simulator_get_next_command_to_send(self)->Command: cmd_name = next(self.SIMULATOR_COMMANDS, None) #return cmd_name if cmd_name is None: return None recipient_agent = self.name if self.SIMULATOR_COMMANDS_DEST=="myself" else self.SIMULATOR_COMMANDS_DEST return Command(sender=self.name, recipient=recipient_agent, name=cmd_name) """ def simulator_send_next_command(self): #self._current_test_cmd = "go_idle" if self._current_test_cmd=="go_active" else "go_active" #if self._nb_test_cmds == 4: self._current_test_cmd = "exit" cmd_name = next(self.SIMULATOR_COMMANDS, None) #self.printd("next cmd is ", cmd_name) if cmd_name is None: return #Command.objects.create(sender=self.name, recipient=self.name, name=cmd_name) recipient_agent = self.name if self.SIMULATOR_COMMANDS_DEST=="myself" else self.SIMULATOR_COMMANDS_DEST Command.objects.create(sender=self.name, recipient=recipient_agent, name=cmd_name) #time.sleep(1) #self._simulator_current_cmd_idx += 1 #self._nb_test_cmds += 1 """ def simulator_routine_process(self): """ SIMULATOR MODE ONLY """ # There is a current command being processed # => check if next command is "abort" # => if so, instantly send an "abort" to abort previous command if self.cmdts is not None: self.print(f"Waiting for end of cmd '{self.cmdts.name}' execution...") # Update cmdts fields from DB self.cmdts.refresh_from_db() if self.cmdts.is_pending() or self.cmdts.is_running(): if self.next_cmdts is None: # If next command is "abort" then abort becomes the new current command (to be sent) self.next_cmdts = self.simulator_get_next_command_to_send() if self.next_cmdts and self.next_cmdts.name == "abort": # Wait a little to give a chance to agentB to start execution of current command, # so that we can abort it then (otherwise it won't be aborted!!) time.sleep(4) self.cmdts = self.next_cmdts self.next_cmdts = None self.print("***") self.print(f"*** SEND command", self.cmdts) self.print("***") self.cmdts.send() # Current cmd is no more running else: # Execution was not completed #if self.cmdts.is_expired() or self.cmdts.is_skipped() or self.cmdts.is_killed(): if self.cmdts.is_skipped() or self.cmdts.is_killed(): self.printd("Command was not completed") # 2 possible scenarios: # - (1) Send the SAME command again ''' self.printd("Command was not completed, so I send it again") # The command was not completed, so, make a copy of it and send it again # For this, it is enough to set primary key to None, # then the send() command below will save a NEW command #self.cmdts = copy.copy(self.cmdts) self.cmdts.id = None SEND_A_NEW_COMMAND = True ''' # - (2) Send next command #self.cmdts = None # Execution was not complete => get result elif self.cmdts.is_executed(): cmdts_res = self.cmdts.get_result() self.print(f"Cmd executed. Result is '{cmdts_res}'") #cmdts_is_processed = True ''' Optimisation possible pour gagner une iteration: self.cmdts = self.simulator_get_next_command_to_send() # No more command to send (from simulator) => return if self.cmdts is None: return SEND_A_NEW_COMMAND = True ''' # Set cmdts to None so that a new command will be sent at next iteration self.cmdts = None # No currently running command => get a new command and SEND it if self.cmdts is None: if self.next_cmdts is not None: self.cmdts = self.next_cmdts self.next_cmdts = None else: self.cmdts = self.simulator_get_next_command_to_send() # No more command to send (from simulator) => return if self.cmdts is None: return # Send cmd (= set as pending and save) self.print("***") self.print(f"*** SEND command", self.cmdts) self.print("***") #self.cmdts.set_as_pending() # SEND self.cmdts.send() #cmdts_is_processed = False #cmdts_res = None def simulator_test_results(self): commands = self.simulator_test_results_start() nb_asserted = self.simulator_test_results_main(commands) self.simulator_test_results_end(nb_asserted) def simulator_test_results_start(self): self.print("\n--- Testing if the commands I SENT had the awaited result") self.print("Here are the last commands I sent:") #commands = list(Command.get_last_N_commands_for_agent(self.name, 16)) #commands = Command.get_last_N_commands_sent_to_agent(self.name, 16) commands = Command.get_last_N_commands_sent_by_agent(self.name, len(self.SIMULATOR_COMMANDS_LIST)) Command.show_commands(commands) assert commands[0].name == self.SIMULATOR_COMMANDS_LIST[0] assert commands[-1].name == self.SIMULATOR_COMMANDS_LIST[-1] return commands """ OLD SCENARIO nb_asserted = 0 for cmd in commands: if cmd.name == "specific0": assert cmd.is_skipped() nb_asserted+=1 if cmd.name == "specific1": assert cmd.result == "in step #5/5" assert cmd.is_executed() nb_asserted+=1 if cmd.name in ("specific2","specific3"): assert cmd.is_killed() nb_asserted+=1 if cmd.name in ("specific4", "specific5", "specific6", "specific7", "specific8"): assert cmd.is_pending() nb_asserted+=1 # 2 cmds abort if cmd.name in ("abort"): assert cmd.is_executed() nb_asserted+=1 if cmd.name in ("exit"): assert cmd.is_executed() nb_asserted+=1 assert nb_asserted == 12 self.printd("--- Finished testing => result is ok") """ # To be overriden by subclass def simulator_test_results_main(self, commands): nb_asserted = 0 for cmd in commands: assert cmd.is_executed() nb_asserted+=1 return nb_asserted def simulator_test_results_end(self, nb_asserted): nb_commands_to_send = len(self.SIMULATOR_COMMANDS_LIST) assert nb_asserted == nb_commands_to_send #self.print(f"************** Finished testing => result is ok ({nb_asserted} assertions) **************") printFullTerm(Colors.GREEN, f"************** Finished testing => result is ok ({nb_asserted} assertions) **************") """ ================================================================= FUNCTIONS RUN INSIDE A THREAD (OR A PROCESS) (thread_*()) ================================================================= """ def thread_exec_specific_cmd(self): assert self._current_specific_thread is not None # thread execution setting up self.thread_exec_specific_cmd_start() # processing body (main) self.thread_exec_specific_cmd_main() # thread execution tearing down self.thread_exec_specific_cmd_end() ''' except TimeoutError: pass ''' #def exec_specific_cmd_start(self, cmd:Command): def thread_exec_specific_cmd_start(self): cmd = self._current_specific_cmd """ specific command execution setting up """ #cmd = self.get_current_specific_cmd() self.printd(">>>>> Thread: starting execution of command", cmd.name) self.printd(">>>>> Thread: PID: %s, Process Name: %s, Thread Name: %s" % ( os.getpid(), multiprocessing.current_process().name, threading.current_thread().name) ) cmd.set_as_running() """ if self.RUN_IN_THREAD: cmd.set_as_running() else: with transaction.atomic(): cmd.set_as_running() """ # Define your own command step(s) here def cmd_step(self, step:int): cmd = self._current_specific_cmd """ cmd.result = f"in step #{step}/{self._thread_total_steps_number}" cmd.save() """ cmd.set_result(f"in step #{step}/{self._thread_total_steps_number}") def cmd_step_eval(self, step:int): cmd = self._current_specific_cmd cmd_args = self._current_specific_cmd.name.split()[1] """ cmd.result = f"in step #{step}/{self._thread_total_steps_number}" cmd.save() """ cmd.set_result(eval(cmd_args)) # Default body of the specific processing # Should be overriden by subclass def thread_exec_specific_cmd_main(self): """ cmd = self._current_specific_cmd self.printd("Doing nothing, just sleeping...") self.sleep(3) """ # Specific case of the EVAL command if self._current_specific_cmd.name.startswith("eval"): self.thread_set_total_steps_number(1) self.thread_exec_specific_cmd_step(1, self.cmd_step_eval) return # This is optional self.thread_set_total_steps_number(5) # HERE, write your own scenario # scenario OK self.thread_exec_specific_cmd_step(1, self.cmd_step, 1) self.thread_exec_specific_cmd_step(2, self.cmd_step, 3) self.thread_exec_specific_cmd_step(3, self.cmd_step, 5) self.thread_exec_specific_cmd_step(4, self.cmd_step, 10) self.thread_exec_specific_cmd_step(5, self.cmd_step, 4) # ... as many as you need """ other scenario self.thread_exec_specific_cmd_step(1, self.cmd_step1, 1) self.thread_exec_specific_cmd_step(2, self.cmd_step2, 2) self.thread_exec_specific_cmd_step(3, self.cmd_step1, 2) self.thread_exec_specific_cmd_step(4, self.cmd_step3, 2) self.thread_exec_specific_cmd_step(5, self.cmd_step1, 3) """ def thread_exec_specific_cmd_end(self): """ specific command execution tearing down """ cmd = self._current_specific_cmd cmd.set_as_processed() """ if self.RUN_IN_THREAD: cmd.set_as_processed() else: with transaction.atomic(): cmd.set_as_processed() """ self.printd(">>>>> Thread: ended execution of command", cmd.name) cmd = None # No more current thread #self._current_specific_thread = None # Default body of a specific cmd step # Should be overriden by subclass def thread_exec_specific_cmd_step(self, step:int, cmd_step_function, sleep_time:float=1.0): # Exit if I was asked to stop cmd = self._current_specific_cmd if self.RUN_IN_THREAD and threading.current_thread().stopped(): self.printd(f">>>>> Thread (cmd {cmd.name}): I received the stop signal, so I stop (in error)") exit(1) self.printd(f">>>>> Thread (cmd {cmd.name}): step #{step}/{self._thread_total_steps_number}") # call a specific function to be defined by subclass cmd_step_function(step) # Wait for a specific time (interruptible) self.sleep(sleep_time) def thread_stop_if_asked(self): assert self._current_specific_thread is not None if self.RUN_IN_THREAD and threading.current_thread().stopped(): self.printd("(Thread) I received the stop signal, so I stop (in error)") exit(1) def thread_set_total_steps_number(self, nbsteps): self._thread_total_steps_number = nbsteps """ =================================== OLD FUNCTIONS TO BE REMOVED =================================== def _plc_is_not_auto(self): if not self.plc_is_connected(): return True # now, self.plc_status has been updated, so check it: return self.plc_status.plc_mode != "AUTO" def plc_is_auto(self): return not self._plc_is_not_auto() def plc_is_safe(self): if not self.plc_is_connected(): return False # now, self.plc_status has been updated, so check it: return self.plc_status.is_safe def is_night(self): return get_sunelev() < -10 """ """ ================================================================= MAIN ================================================================= """ def extract_parameters(): """ Usage: Agent.py [-t] [configfile] """ # arg 1 : -t # arg 2 : configfile TEST_MODE = False configfile = None if len(sys.argv) > 1: if sys.argv[1] == "-t": TEST_MODE = True if len(sys.argv) == 3: configfile = sys.argv[2] else: configfile = sys.argv[1] return TEST_MODE, configfile if __name__ == "__main__": TEST_MODE, configfile = extract_parameters() agent = Agent("GenericAgent", configfile, RUN_IN_THREAD=True) agent.setSimulatorMode(TEST_MODE) print(agent) agent.run()