#!/usr/bin/env python3 import argparse import fileinput import glob import os import platform import signal import subprocess import sys import time """ ***************************************************************** ******************** GENERAL CONSTANTS ************************** ***************************************************************** """ #DEBUG = False PYROS_DJANGO_BASE_DIR = "src/core/pyros_django" INIT_FIXTURE = "initial_fixture.json" _previous_dir = None AGENTS = { #"agentX" : "agent", "agent" : "Agent", "agentX" : "AgentX", "agentA" : "AgentA", "agentB" : "AgentB", "agentM" : "AgentM", #"agentDevice" : "AgentDevice", #"agentDeviceTelescopeGemini" : "AgentDeviceTelescopeGemini", "agentDeviceGemini" : "AgentDeviceGemini", "agentDeviceSBIG" : "AgentDeviceSBIG", "agentTelescopeRequester" : "AgentTelescopeRequester", "agentMultiRequester" : "AgentMultiRequester", "webserver" : "webserver", "monitoring" : "monitoring", "majordome" : "majordome", "scheduler" : "scheduler", "alert_manager" : "alert_manager" } #AGENTS = ["agentX", "webserver", "monitoring", "majordome", "scheduler", "alert_manager"] #AGENTS = ["all", "webserver", "monitoring", "majordome", "scheduler", "alert"] #COMMANDS = {"install": [], "start": AGENTS, "stop": AGENTS} REQUIREMENTS = 'REQUIREMENTS.txt' b_in_dir = "bin" PYTHON = "python3" # should also be ok from venv: #PYTHON = "python" IS_WINDOWS = platform.system() == "Windows" if IS_WINDOWS: REQUIREMENTS = 'REQUIREMENTS_WINDOWS.txt' b_in_dir = "Scripts" PYTHON = "python.exe" my_abs_path = os.path.dirname(os.path.realpath(__file__)) VENV_BIN = ( my_abs_path + os.sep + "venv" + os.sep + "venv_py3_pyros" + os.sep + b_in_dir + os.sep + PYTHON ) class Colors: HEADER = "\033[95m" BLUE = "\033[94m" GREEN = "\033[92m" WARNING = "\033[93m" FAIL = "\033[91m" ENDC = "\033[0m" BOLD = "\033[1m" UNDERLINE = "\033[4m" # First, install the click package !!! #import fire try: import click # https://click.palletsprojects.com except: #pip = "pip" if platform.system() == "Windows" else "pip3" # TODO: "python -m pip" au lieu de "pip" pip = "pip" if IS_WINDOWS else "pip3" process = subprocess.Popen(pip + " install --upgrade click", shell=True) process.wait() if process.returncode == 0: print("click package installation successfull") # self.addExecuted(self.current_command, command) import click else: print("click package installation failed") # self.addError(self.current_command, command) """ ************************************************************************** ******************** GENERAL AND UTIL FUNCTIONS ************************** ************************************************************************** """ def _in_dir(dirname: str = ""): return os.path.basename(os.getcwd()) == dirname def _in_abs_dir(dirname: str = ""): return os.getcwd() == dirname def die(msg: str = ""): print() print("...ERROR...") print(msg) print() exit(1) #TODO: implement is_async def execProcess(command, from_venv=False, is_async=False): from_venv_str = " from venv ("+VENV_BIN+")" if from_venv else "" #printFullTerm(Colors.BLUE, "Executing command" + " [" + command + "]" + from_venv_str) printd("Executing command" + " [" + command + "]" + from_venv_str) if from_venv: command = VENV_BIN+' ' + command process = subprocess.Popen(command, shell=True) if is_async: #current_processes.append(process) return process # Not is_async => Wait for end of execution process.wait() if process.returncode == 0: printFullTerm(Colors.GREEN, "Process executed successfully") # self.addExecuted(self.current_command, command) else: printFullTerm(Colors.WARNING, "Process execution failed") # self.addError(self.current_command, command) #return process.returncode return True if process.returncode==0 else False def execProcessFromVenv(command:str, is_async=False): #return execProcess(command, from_venv=True, is_async) return execProcess(command, True, is_async) #TODO: fusionner dans execProcess avec param is_async def execProcessFromVenvAsync(command:str): return execProcessFromVenv(command, True) """ args = command.split() printFullTerm( Colors.BLUE, "Executing command from venv [" + str(" ".join(args[1:])) + "]" ) p = subprocess.Popen(args) subproc.append((p, " ".join(args[1:]))) printFullTerm(Colors.GREEN, "Process launched successfully") # self.addExecuted(self.current_command, str(' '.join(args[1:]))) # p.wait() return p """ def printColor(color: Colors, message, file=sys.stdout, eol=os.linesep, forced=False): #system = platform.system() """ if (self.disp == False and forced == False): return 0 """ #if system == "Windows": if IS_WINDOWS: print(message, file=file, end=eol) else: print(color + message + Colors.ENDC, file=file, end=eol) return 0 def printFullTerm(color: Colors, string: str): #system = platform.system() columns = 100 row = 1000 disp = True value = int(columns / 2 - len(string) / 2) printColor(color, "-" * value, eol="") printColor(color, string, eol="") value += len(string) printColor(color, "-" * (columns - value)) return 0 """ ******************************************************************************** ******************** CLI COMMANDS DEFINITION (click format) ******************** ******************************************************************************** """ ''' _global_test_options = [ click.option('--test', '-t', is_flag=True, help="don't do it for real, just show what it would do"), click.option('--verbose', '-v', 'verbosity', flag_value=2, default=1, help='Verbose output'), click.option('--quiet', '-q', 'verbosity', flag_value=0, help='Minimal output'), #click.option('--fail-fast', '--failfast', '-f', 'fail_fast', is_flag=True, default=False, help='Stop on failure'), ] def global_test_options(func): for option in reversed(_global_test_options): func = option(func) return func ''' def printd(*args, **kwargs): if os.environ.get('PYROS_DEBUG', '0')=='1': print(*args, **kwargs) GLOBAL_OPTIONS = {} def verbose_mode(): return GLOBAL_OPTIONS["verbose"] def test_mode(): return GLOBAL_OPTIONS["test"] def sim_mode(): return GLOBAL_OPTIONS["sim"] def debug_mode(): return GLOBAL_OPTIONS["debug"] @click.group() @click.option('--debug', '-d', is_flag=True, help='DEBUG mode') @click.option('--sim', '-s', is_flag=True, help="only for an AgentDevice, asking it to start its simulator and work with it (instead of real device)") @click.option('--test', '-t', is_flag=True, help="don't do it for real, just show what it would do") @click.option('--verbose', '-v', is_flag=True, help='Verbose output') #@click.option('--verbose', '-v', 'verbosity', flag_value=2, default=1, help='Verbose output'), #@click.option('--quiet', '-q', 'verbosity', flag_value=0, help='Minimal output'), #@click.option('--fail-fast', '--failfast', '-f', 'fail_fast', is_flag=True, default=False, help='Stop on failure'), def pyros_launcher(debug, sim, test, verbose): #pass os.environ['PYROS_DEBUG'] = '1' if debug else '0' if debug: click.echo('DEBUG mode') if sim: click.echo('Starting and connecting to simulator instead of real device') if test: click.echo('Test mode') if verbose: click.echo('Verbose mode') GLOBAL_OPTIONS["debug"] = debug GLOBAL_OPTIONS["sim"] = sim GLOBAL_OPTIONS["test"] = test GLOBAL_OPTIONS["verbose"] = verbose @pyros_launcher.command(help="Run a pyros shell (django included)") #@global_test_options def shell(): print() print("Launching a pyros shell") print() print("NB1: If you want to play with an agent, type:") print(" >>> from agent.AgentA import AgentA") print(" >>> agent=AgentA('agent_toto')") print(" >>> agent") print(" >>> agent.run(2) (=> will run 2 iterations)") print(" >>> cmd = agent.send_command('AgentB','eval 2+2')") print(" >>> cmd") print(" >>> cmd.get_updated_result()") print(" >>> ...") print(" - See documentation, section 'Play with a pyros agent' inside the chapter 'Running pyros' for more details") print() print("NB2: If you want to play with the pyros objects, type:") print(" >>> from common.models import *") print(" - (This will import all the pyros objects)") print(" - Then, you can create any pyros object just by typing its name.") print(" - For example, to create an AgentSurvey object, type:") print(" >>> agent_survey = AgentSurvey()") print(" >>> agent_survey") print(" >>> ...") print(" - See documentation, section 'Play with the pyros objects' inside the chapter 'Running pyros' for more details") print() print("Type 'exit()' to quit") print() change_dir(PYROS_DJANGO_BASE_DIR) # execProcess("python install.py install") if not test_mode(): execProcessFromVenv("manage.py shell") # Go back to the initial dir #os.chdir("../") change_dir("PREVIOUS") return True @pyros_launcher.command(help="Run a database (Mysql) shell") def dbshell(): print() print("Launching a database (mysql) shell") print("From this shell, you can type 'show tables;' to see all the pyros tables") print("Then for example, type 'select * from config;' to see the content of the 'config' table") print("Type 'exit' to quit") print() # execProcess("python install.py install") if not test_mode(): execProcessFromVenv(PYROS_DJANGO_BASE_DIR+"/manage.py dbshell") # Go back to the initial dir return True @pyros_launcher.command(help="Install the pyros software") @click.option('--packages_only', '-p', is_flag=True, help='install only the python packages (no database installation)') @click.option('--database_only', '-d', is_flag=True, help='install only the pyros database (no python packages installation)') #@global_test_options def install(packages_only, database_only): print("Running install command") print("- packages_only:", packages_only) print("- database_only:", database_only) #if test_mode(): print("in test mode") # self.execProcess("python3 install/install.py install") # if (os.path.basename(os.getcwd()) != "private"): start_dir = os.getcwd() os.chdir("install/") ; _in_dir("install") or die("Bad dir") # execProcess("python install.py install") option='' if packages_only: option='-p' if database_only: option='-d' test_mode() or execProcess(PYTHON + " install.py " + option) # cd - # os.chdir("-") os.chdir(start_dir) ; _in_abs_dir(start_dir) or die("Bad dir") # return 0 return True @pyros_launcher.command(help="Run some tests") def test(): print("Running tests") #start_dir = os.getcwd() apps = ['common', 'scheduler', 'routine_manager', 'user_manager', 'alert_manager.tests.TestStrategyChange'] for app in apps: _loaddata() or die() change_dir(PYROS_DJANGO_BASE_DIR) # Delete test_pyros database after tests #execProcessFromVenv('manage.py test ' + app) or die() # KEEP test_pyros database after tests execProcessFromVenv('manage.py test --keep ' + app) or die() change_dir("PREVIOUS") # execProcess("python install.py install") return True @pyros_launcher.command(help="Run ALL tests") def testall(): change_dir(PYROS_DJANGO_BASE_DIR) # Delete test_pyros database after tests # execProcessFromVenvAsync("manage.py test") # KEEP test_pyros database after tests execProcessFromVenvAsync("manage.py test --keep") change_dir("PREVIOUS") return True ''' TODO: ''' @pyros_launcher.command(help="Update (only if necessary) the python packages AND the source code AND the DB structure") def update(): print("Running update command") # 1) Update source code (git pull) printFullTerm(Colors.BLUE, "1) UPDATING SOURCE CODE: Running git pull") _gitpull() or die() # 2) Update python packages (pip install requirements) printFullTerm(Colors.BLUE, "2) UPDATING PYTHON PACKAGES") _update_python_packages_from_requirements() or die() # 3) Update PlantUML diagrams printFullTerm(Colors.BLUE, "3) UPDATING PLANTUML DIAGRAMS") _update_plantuml_diags() or die() # 4) Update database structure (make migrations + migrate) printFullTerm(Colors.BLUE, "4) UPDATING DATABASE") _updatedb() or die() return True def _gitpull(): print("-- running git pull") GIT = "git.exe" if IS_WINDOWS else "git" if not test_mode(): return execProcess(f"{GIT} pull") return True #@pyros_launcher.command(help="Update the pyros database") def _updatedb(): print("-- update db (make migrations + migrate)") if not test_mode() : _makemigrations() or die() _migrate() or die() return True @pyros_launcher.command(help="Update the pyros database and fill it with initial fixture data") def initdb(): if not test_mode(): #updatedb() _makemigrations() _migrate() _loaddata() return True @pyros_launcher.command(help="Launch an agent") #@global_test_options @click.argument('agent') @click.option('--configfile', '-c', help='the configuration file to be used') #@click.option('--format', '-f', type=click.Choice(['html', 'xml', 'text']), default='html', show_default=True) #@click.option('--port', default=8000) #def start(agent:str, configfile:str, test, verbosity): def start(agent:str, configfile:str): printd("Running start command") if configfile: printd("With config file", configfile) else: configfile = '' #if test_mode(): print("in test mode") #if verbose_mode(): print("in verbose mode") # Check each agent in the given list agents = agent.split(",") if "," in agent else [agent] for a in agents: if not _check_agent(a): return #print("Agents are:", agents) ''' # Check if multiple agents: agents = None if "," in agent: agents = agent.split(",") for a in agents: if not _check_agent(a): return print("Agents are:", agents) # 1 agent only else: agents = [agent] if not _check_agent(agent): return ''' # Start Agents (processes) current_processes = [] for agent_name,agent_folder in AGENTS.items(): #if agent in ("all", agent_name) : if agent=="all" or agent_name in agents: # Default case, launch agentX #if agent_name == "agentX": # execProcessFromVenvAsync(VENV_BIN + " manage.py runserver") printd(VENV_BIN) printd("Launching agent", agent_name, "...") #if not test_mode(): execProcess(VENV_BIN + " manage.py runserver") #if not test_mode(): execProcessFromVenv("start_agent_" + agent_name + ".py " + configfile) current_dir = os.getcwd() # OLD format agents: majordome, monitoring, alert... cmd = "start_agent.py " + agent_name + " " + configfile # Agent "webserver" if agent_name == "webserver": cmd = "manage.py runserver" os.chdir(PYROS_DJANGO_BASE_DIR) #if not test_mode(): execProcessFromVenv("start_agent.py " + agent_name + " " + configfile) # Agent "agentM", "agentA", "agentB", "agentX", ... elif agent_name.startswith("agent"): # Run agent without actual commands sent to devices (FOR_REAL=False) ##agentX.run(FOR_REAL=True) if agent_name == "agentM": os.chdir(PYROS_DJANGO_BASE_DIR+"/monitoring/") else: os.chdir(PYROS_DJANGO_BASE_DIR+"/agent/") #cmd = "-m AgentX" #cmd = f" Agent{agent_name[5:]}.py {configfile}" cmd = f"Agent{agent_name[5:]}.py" if debug_mode(): cmd += " -d" if sim_mode(): cmd += " -s" if test_mode(): cmd += " -t" if verbose_mode(): cmd += " -v" if configfile: cmd += " {configfile}" #if not test_mode(): current_processes.append( [execProcessFromVenvAsync(cmd), agent_name, -1] ) # Append this process ( [process id, agent_name, result=failure] ) # ("result" will be updated at the end of execution) current_processes.append( [execProcessFromVenvAsync(cmd), agent_name, -1] ) # self.change_dir("..") os.chdir(current_dir) # Go back to root folder (/) # self.change_dir('..') #os.chdir("..") # Wait for end of each process execution #for (p,agent) in current_processes: for process in current_processes: p,agent,_ = process printd(f"************ Waiting for end of execution of agent {agent} ************") p.wait() process[2] = p.returncode print(f"************ END of execution of agent {agent} ************") if p.returncode == 0: printFullTerm(Colors.GREEN, f"Process {agent} executed successfully") # self.addExecuted(self.current_command, command) else: printFullTerm(Colors.WARNING, f"Process {agent} execution failed") # self.addError(self.current_command, command) print() print() print("Synthesis of the results:") for process in current_processes: p,agent,returncode = process if returncode == 0: printFullTerm(Colors.GREEN, f"Process {agent} executed successfully") # self.addExecuted(self.current_command, command) else: printFullTerm(Colors.WARNING, f"Process {agent} execution failed") # self.addError(self.current_command, command) #print("************ end of START() ************") # Only according to the last process status: #return True if p.returncode==0 else False return True if p.returncode==0 else False # TODO: implémenter le STOP !!! @pyros_launcher.command(help="Kill an agent") @click.argument('agent') def stop(agent): print("Running stop command") if not _check_agent(agent): return """ ******************************************************************************** ******************** PRIVATE FUNCTIONS DEFINITION ****************************** ******************************************************************************** """ def _update_python_packages_from_requirements(): # 1) try to upgrade pip res = execProcessFromVenv("-m pip install --upgrade pip") if not res: return False # 2) install only "not yet installed" python packages res = execProcessFromVenv("-m pip install -r install/"+REQUIREMENTS) return res def _update_plantuml_diags(): for dirpath, dirnames, files in os.walk(PYROS_DJANGO_BASE_DIR): if os.path.basename(dirpath) == "doc": diagrams = glob.glob(dirpath+os.sep+"*.pu") # For each diagram source file (.pu), generate the diagram PNG file (only if CHANGED) for diag in diagrams: # if no diag.png or if diag.pu more recent than diag.png => re-generate diag.png diag_png = diag.split('.pu')[0] + '.png' if not os.path.isfile(diag_png) or ( os.path.getmtime(diag) > os.path.getmtime(diag_png) ) : res = execProcessFromVenv("-m plantuml "+diag) if not res: return False return True def _migrate(): change_dir(PYROS_DJANGO_BASE_DIR) # Migrate only migrations for the app "common" #res = execProcessFromVenv("manage.py migrate common") # Migrate all migrations for ALL apps res = execProcessFromVenv("manage.py migrate") change_dir("PREVIOUS") return res def _makemigrations(): change_dir(PYROS_DJANGO_BASE_DIR) #execProcessFromVenv(self.venv_bin + " manage.py makemigrations") #res = execProcessFromVenv("manage.py makemigrations common") res = execProcessFromVenv("manage.py makemigrations") change_dir("PREVIOUS") return res #TODO: mettre la fixture en date naive (sans time zone) def _loaddata(): change_dir(PYROS_DJANGO_BASE_DIR) #execProcessFromVenv(self.venv_bin + " manage.py loaddata misc" + os.sep + "fixtures" + os.sep + self.INIT_FIXTURE) res = execProcessFromVenv("manage.py loaddata misc" + os.sep + "fixtures" + os.sep + INIT_FIXTURE) change_dir("PREVIOUS") return res def change_dir(path): global _previous_dir if path == "PREVIOUS": path=_previous_dir _previous_dir = os.getcwd() printd("Moving to : " + path) os.chdir(path) printd("Current directory : " + str(os.getcwd())) def _check_agent(agent): # Check that agent exists if agent != "all" and agent not in AGENTS.keys() : # Check if multiple agents: print("This agent does not exist") print("Here is the allowed list of agents:") print("- all => will launch ALL agents") for agent_name in AGENTS.keys(): print('-',agent_name) return False return True """ ******************************************************************************** ********************************* main() FUNCTION ****************************** ******************************************************************************** """ def main(): pyros_launcher() # AVIRER #@click.command() #@click.argument('start') def oldmain(): ''' cli() return ''' #fire.Fire(Commands) #return # if len(sys.argv) == 3 and sys.argv[1].startswith("simulator"): SIMULATOR_CONFIG_FILE = sys.argv[2] # print(sys.argv) """ system = platform.system() columns = 100 row = 1000 disp = True """ # Read command if len(sys.argv) <= 1: die("You must give a command name") command = sys.argv[1] if not command in COMMANDS.keys(): die("This command does not exist") command_all_args = COMMANDS[command] # Read command args if should be command_arg = None if command_all_args: if len(sys.argv) <= 2: die("This command should be given an argument") command_arg = sys.argv[2] if not command_arg in command_all_args: die("This argument does not exist for command " + command) print("Executing command", command) if command_arg: print("with arg", command_arg) # command(command_arg) if command_arg: globals()[command](command_arg) else: globals()[command]() # sys.exit(pyros.exec()) if __name__ == "__main__": main()