pyros2 12.1 KB
#!/usr/bin/env python3


import argparse
import fileinput
import os
import platform
import signal
import subprocess
import sys
import time



"""
*****************************************************************
******************** GENERAL CONSTANTS **************************
*****************************************************************
"""

AGENTS = {
    #"agentX" : "majordome", 
    "agentX" : "agent", 
    "webserver" : "webserver", 
    "monitoring" : "monitoring", 
    "majordome" : "majordome", 
    "scheduler" : "scheduler", 
    "alert_manager" : "alert_manager"
}
#AGENTS = ["agentX", "webserver", "monitoring", "majordome", "scheduler", "alert_manager"]
#AGENTS = ["all", "webserver", "monitoring", "majordome", "scheduler", "alert"]

#COMMANDS = {"install": [], "start": AGENTS, "stop": AGENTS}

IS_WINDOWS = platform.system() == "Windows"

my_abs_path = os.path.dirname(os.path.realpath(__file__))
if IS_WINDOWS:
    bin_dir = "Scripts"
    PYTHON = "python.exe"
    # should also be ok:
    #PYTHON = "python"
else:
    bin_dir = "bin"
    PYTHON = "python3"
    # ok only from venv:
    #PYTHON = "python"
VENV_BIN = (
    my_abs_path
    + os.sep + "private"
    + os.sep + "venv_py3_pyros"
    + os.sep + bin_dir
    + os.sep + PYTHON
)

class Colors:
    HEADER = "\033[95m"
    BLUE = "\033[94m"
    GREEN = "\033[92m"
    WARNING = "\033[93m"
    FAIL = "\033[91m"
    ENDC = "\033[0m"
    BOLD = "\033[1m"
    UNDERLINE = "\033[4m"




# First, install the click package !!!
#import fire
try:
    import click # https://click.palletsprojects.com
except:
    #pip = "pip" if platform.system() == "Windows" else "pip3"
    pip = "pip" if IS_WINDOWS else "pip3"
    process = subprocess.Popen(pip + " install --upgrade click", shell=True)
    process.wait()
    if process.returncode == 0:
        print("click package installation successfull")
        # self.addExecuted(self.current_command, command)
        import click
    else:
        print("click package installation failed")
        # self.addError(self.current_command, command)
        
    




"""
**************************************************************************
******************** GENERAL AND UTIL FUNCTIONS **************************
**************************************************************************
"""

def in_dir(dirname: str = ""):
    return os.path.basename(os.getcwd()) == dirname


def in_abs_dir(dirname: str = ""):
    return os.getcwd() == dirname


def abort(msg: str = ""):
    print("ERROR")
    print(msg)
    exit(1)

#TODO: implement is_async
def execProcess(command, from_venv=False, is_async=False):
    from_venv_str = " from venv ("+VENV_BIN+")" if from_venv else ""
    printFullTerm(Colors.BLUE, "Executing command" + " [" + command + "]" + from_venv_str)
    if from_venv: command = VENV_BIN+' ' + command
    process = subprocess.Popen(command, shell=True)
    process.wait()
    if process.returncode == 0:
        printFullTerm(Colors.GREEN, "Process executed successfully")
        # self.addExecuted(self.current_command, command)
    else:
        printFullTerm(Colors.WARNING, "Process execution failed")
        # self.addError(self.current_command, command)
    return process.returncode

def execProcessFromVenv(command:str):
    execProcess(command, from_venv=True)

#TODO: fusionner dans execProcess avec param is_async
def execProcessFromVenvAsync(command:str):
    args = command.split()
    printFullTerm(
        Colors.BLUE, "Executing command from venv [" + str(" ".join(args[1:])) + "]"
    )
    p = subprocess.Popen(args)
    subproc.append((p, " ".join(args[1:])))
    printFullTerm(Colors.GREEN, "Process launched successfully")
    # self.addExecuted(self.current_command, str(' '.join(args[1:])))
    # p.wait()
    return p


def printColor(color: Colors, message, file=sys.stdout, eol=os.linesep, forced=False):
    #system = platform.system()
    """
    if (self.disp == False and forced == False):
        return 0
    """
    #if system == "Windows":
    if IS_WINDOWS:
        print(message, file=file, end=eol)
    else:
        print(color + message + Colors.ENDC, file=file, end=eol)
    return 0


def printFullTerm(color: Colors, string: str):
    #system = platform.system()
    columns = 100
    row = 1000
    disp = True
    value = int(columns / 2 - len(string) / 2)
    printColor(color, "-" * value, eol="")
    printColor(color, string, eol="")
    value += len(string)
    printColor(color, "-" * (columns - value))
    return 0



"""
********************************************************************************
******************** CLI COMMANDS DEFINITION (click format) ********************
********************************************************************************
"""

'''
_global_test_options = [
    click.option('--test', '-t', is_flag=True, help="don't do it for real, just show what it would do"),
    click.option('--verbose', '-v', 'verbosity', flag_value=2, default=1, help='Verbose output'),
    click.option('--quiet', '-q', 'verbosity', flag_value=0, help='Minimal output'),
    #click.option('--fail-fast', '--failfast', '-f', 'fail_fast', is_flag=True, default=False, help='Stop on failure'),
]
def global_test_options(func):
    for option in reversed(_global_test_options):
        func = option(func)
    return func
'''

GLOBAL_OPTIONS = {}
def verbose_mode(): return GLOBAL_OPTIONS["verbose"]
def test_mode(): return GLOBAL_OPTIONS["test"]

@click.group()
@click.option('--test', '-t', is_flag=True, help="don't do it for real, just show what it would do")
@click.option('--verbose', '-v', is_flag=True, help='Verbose output')
#@click.option('--verbose', '-v', 'verbosity', flag_value=2, default=1, help='Verbose output'),
#@click.option('--quiet', '-q', 'verbosity', flag_value=0, help='Minimal output'),
#@click.option('--fail-fast', '--failfast', '-f', 'fail_fast', is_flag=True, default=False, help='Stop on failure'),
def pyros_launcher(test, verbose):
    #pass
    if test: click.echo('Test mode')
    if verbose: click.echo('Verbose mode')
    GLOBAL_OPTIONS["test"] = test
    GLOBAL_OPTIONS["verbose"] = verbose



@pyros_launcher.command(help="Run a pyros shell (django included)")
#@global_test_options
def shell():
    print()
    print("Launching a pyros shell")
    print("From this shell, type 'from common import models' to import all the pyros objects")
    print("See documentation, chapter '9.6 - Play with the pyros objects' for more details")
    print("Type 'exit()' to quit")
    print()
    os.chdir("src/")
    # execProcess("python install.py install")
    if not test_mode(): execProcessFromVenv(" manage.py shell")
    # Go back to the initial dir
    os.chdir("../")
    return True


@pyros_launcher.command(help="Install the pyros software")
#@global_test_options
def install():
    print("Running install command")
    #if test_mode(): print("in test mode")
    # self.execProcess("python3 install/install.py install")
    # if (os.path.basename(os.getcwd()) != "private"):
    start_dir = os.getcwd()
    os.chdir("install/") ; in_dir("install") or abort("Bad dir")
    # execProcess("python install.py install")
    if not test_mode(): execProcess(PYTHON + " install.py")
    # cd -
    # os.chdir("-")
    os.chdir(start_dir) ; in_abs_dir(start_dir) or abort("Bad dir")
    # return 0
    return True



def check_agent(agent):
    # Check that agent exists
    if agent not in AGENTS.keys() and agent != "all": 
        print("This agent does not exist")
        print("Here is the allowed list of agents:")
        print("- all => will launch ALL agents")
        for agent_name in AGENTS.keys(): print('-',agent_name)
        return False
    return True


@pyros_launcher.command(help="Launch an agent")
#@global_test_options
@click.argument('agent')
@click.option('--configfile', '-c', help='the configuration file to be used')
#@click.option('--format', '-f', type=click.Choice(['html', 'xml', 'text']), default='html', show_default=True)
#@click.option('--port', default=8000)
#def start(agent:str, configfile:str, test, verbosity):
def start(agent:str, configfile:str):
    print("Running start command")
    if configfile: 
        print("With config file", configfile)
    else: 
        configfile = '' 
    #if test_mode(): print("in test mode")
    #if verbose_mode(): print("in verbose mode")
    if not check_agent(agent): return
    # VENV_BIN = 'private/venv_py3_pyros' +  os.sep + self.bin_dir + os.sep + self.bin_name


    # Start Agents
    """            
    if agent=="majordome" or agent=="all":
        from majordome.tasks import Majordome
        Majordome().run()
    if agent=="alert_manager" or agent=="all":
        from alert_manager.tasks import AlertListener
        AlertListener().run()
    """
    for agent_name,agent_folder in AGENTS.items():

        if agent in ("all", agent_name) :
            
            # Default case, launch agentX
            #if agent_name == "agentX":

            # execProcessFromVenvAsync(VENV_BIN + " manage.py runserver")
            print(VENV_BIN)
            print("Launching agent", agent_name, "...")
            #if not test_mode(): execProcess(VENV_BIN + " manage.py runserver")
            #if not test_mode(): execProcessFromVenv("start_agent_" + agent_name + ".py " + configfile)
            cmd = "start_agent.py " + agent_name + " " + configfile
            if agent_name == "webserver": 
                cmd = "manage.py runserver"
                os.chdir("src")
            #if not test_mode(): execProcessFromVenv("start_agent.py " + agent_name + " " + configfile)
            if not test_mode(): execProcessFromVenv(cmd)
            # self.changeDirectory("..")

            '''
            # Any other agent
            else:
                # Go into src/
                # self.changeDirectory("src")
                os.chdir("src")
                # print("Current directory : " + str(os.getcwd()))
                if agent_name != "webserver": os.chdir(agent_folder)
                # self.execProcessFromVenvAsync(self.VENV_BIN + ' start_agent_'+agent+'.py')
                #print(VENV_BIN)
                print("Launching agent", agent_name, "...")
                #if not test_mode(): execProcess(VENV_BIN + " start_agent_" + agent_name + ".py")
                #TODO:
                # - start_agent agent_name (1 script unique)
                # - start_agent -c configfile
                cmd = "start_agent_" + agent_name + ".py " + configfile
                # Django default dev web server
                if agent_name == "webserver": cmd = "manage.py runserver"
                if not test_mode(): execProcessFromVenv(cmd)
                # Go back to src/
                # self.changeDirectory('..')
                os.chdir("..")
            '''
                
    # Go back to root folder (/)
    # self.changeDirectory('..')
    os.chdir("..")
    return True




@pyros_launcher.command(help="Kill an agent")
@click.argument('agent')
def stop(agent):
    print("Running stop command")
    if not check_agent(agent): return



def main():
    pyros_launcher()
    
    
# AVIRER
#@click.command()
#@click.argument('start')
def oldmain():
    '''
    cli()
    return
    '''
    
    #fire.Fire(Commands)
    #return

    # if len(sys.argv) == 3 and sys.argv[1].startswith("simulator"): SIMULATOR_CONFIG_FILE = sys.argv[2]
    # print(sys.argv)
    """
    system = platform.system()
    columns = 100
    row = 1000
    disp = True
    """

    # Read command
    if len(sys.argv) <= 1:
        abort("You must give a command name")
    command = sys.argv[1]
    if not command in COMMANDS.keys():
        abort("This command does not exist")
    command_all_args = COMMANDS[command]

    # Read command args if should be
    command_arg = None
    if command_all_args:
        if len(sys.argv) <= 2:
            abort("This command should be given an argument")
        command_arg = sys.argv[2]
        if not command_arg in command_all_args:
            abort("This argument does not exist for command " + command)

    print("Executing command", command)
    if command_arg:
        print("with arg", command_arg)

    # command(command_arg)
    if command_arg:
        globals()[command](command_arg)
    else:
        globals()[command]()
    # sys.exit(pyros.exec())



if __name__ == "__main__": main()