#!/usr/bin/env python3 import subprocess import sys import os ##import utils.Logger as L pwd = os.environ['PROJECT_ROOT_PATH'] if pwd not in sys.path: sys.path.append(pwd) ##from .Agent import Agent from src.core.pyros_django.agent.Agent import Agent, build_agent ##log = L.setupLogger("AgentXTaskLogger", "AgentX") class AgentTriton(Agent): # FOR TEST ONLY # Run this agent in simulator mode TEST_MODE = False # Run the assertion tests at the end TEST_WITH_FINAL_TEST = False #TEST_MAX_DURATION_SEC = None TEST_MAX_DURATION_SEC = 100 # Who should I send commands to ? TEST_COMMANDS_DEST = "myself" #TEST_COMMANDS_DEST = "AgentB" # Scenario to be executed _AGENT_SPECIFIC_COMMANDS = { # Format : “cmd_name” : (timeout, exec_mode) "do_process_image" : (10, Agent.EXEC_MODE.SEQUENTIAL), } _TEST_COMMANDS_LIST = [ (True, "self do_process_image i.fit", 200, None, Agent.CMD_STATUS.CMD_EXECUTED), (True, "self do_stop", 500, "STOPPING", Agent.CMD_STATUS.CMD_EXECUTED), ] """ ================================================================= FUNCTIONS RUN INSIDE MAIN THREAD ================================================================= """ # @Override ''' #def __init__(self, name:str=None, config_filename=None, RUN_IN_THREAD=True): def __init__(self, config_filename=None): ##if name is None: name = self.__class__.__name__ #super().__init__(name, config_filename, RUN_IN_THREAD) super().__init__(config_filename) #self._log.print(f"init done for {name}") self._log.print("init done") ''' def __init__(self, name:str=None): if name is None: name = self.__class__.__name__ super().__init__() #super().__init__(RUN_IN_THREAD) def shell_source(self,script): """ Sometime you want to emulate the action of "source" in bash, settings some environment variables. Here is a way to do it. """ pipe = subprocess.Popen(". %s && env -0" % script, stdout=subprocess.PIPE, shell=True) output = pipe.communicate()[0].decode('utf-8') output = output[:-1] # fix for index out for range in 'env[ line[0] ] = line[1]' env = {} # split using null char for line in output.split('\x00'): line = line.split( '=', 1) # print(line) env[ line[0] ] = line[1] os.environ.update(env) # @Override def _init(self): triton_folder = self.get_config().get_agent_information(self.get_config().unit_name,self.name)["path"] triton_folder_abs_path = pwd + "/" + triton_folder + "/" self.triton_folder_abs_path = triton_folder_abs_path if not os.path.exists(triton_folder_abs_path+"output/"): os.makedirs(triton_folder_abs_path+"output/") if not os.path.exists(triton_folder_abs_path+"input/"): os.makedirs(triton_folder_abs_path+"input/") self.output_folder = triton_folder_abs_path+"output/" self.input_folder = triton_folder_abs_path+"input/" self.shell_source(triton_folder_abs_path+'triton/Triton_ubuntu_20.04/Triton_3.1.2/conf/configure.sh') #main_cmd = triton_folder_abs_path + "triton/Triton_ubuntu_20.04/Triton_3.1.2/bin/triton -i " + triton_folder_abs_path + "input/" + " -s " + triton_folder_abs_path + "output/" #subprocess.Popen(main_cmd) # --- Set the mode according the startmode value ##agent_alias = self.__class__.__name__ ##self.set_mode_from_config(agent_alias) def do_process_image(self, image_name:str): image_path = self.input_folder + image_name output_file_name = image_name.split(".")[0] + ".txt" output_file_path = self.output_folder + output_file_name if os.path.exists(image_path): main_cmd = self.triton_folder_abs_path + "triton/Triton_ubuntu_20.04/Triton_3.1.2/bin/triton -i " + image_path + " -s " + output_file_path subprocess.Popen(main_cmd,shell=True) else: print(f"Can't find image {image_path}") #@override def _main_loop_start(self): print("LOOP START") #@override def _main_loop_end(self): print("LOOP END"); ''' # @override def load_config(self): super().load_config() ''' ''' # @override def update_survey(self): super().update_survey() ''' ''' # @override def get_next_command(self): return super().get_next_command() ''' # @override def do_log(self): super().do_log() # @override def _do_things_before_exit(self, stopper_agent_name=None): print("AgentBasic fait quelques trucs à lui avant de stopper...") """ ================================================================= MAIN FUNCTION ================================================================= """ if __name__ == "__main__": agent = build_agent(AgentTriton) ''' TEST_MODE, configfile = extract_parameters() #agent = AgentX() agent = AgentA("AgentA", configfile, RUN_IN_THREAD) agent.setSimulatorMode(TEST_MODE) print(agent) ''' agent.run()