#!/usr/bin/env python3 import subprocess import sys import os ##import utils.Logger as L pwd = os.environ['PROJECT_ROOT_PATH'] if pwd not in sys.path: sys.path.append(pwd) ##from .Agent import Agent from src.core.pyros_django.agent.Agent import Agent, build_agent, log ##log = L.setupLogger("AgentXTaskLogger", "AgentX") class AgentMCO(Agent): # FOR TEST ONLY # Run this agent in simulator mode TEST_MODE = False # Run the assertion tests at the end TEST_WITH_FINAL_TEST = False #TEST_MAX_DURATION_SEC = None TEST_MAX_DURATION_SEC = 100 # Who should I send commands to ? TEST_COMMANDS_DEST = "myself" _AGENT_SPECIFIC_COMMANDS = [ ("do_process_image",10,0), ] TEST_COMMANDS_LIST = [ ("self do_process_image i.fit", 200, None, "CMD_EXECUTED"), ("self do_exit", 500, "STOPPING", "CMD_EXECUTED"), ] """ ================================================================= FUNCTIONS RUN INSIDE MAIN THREAD ================================================================= """ def __init__(self, name:str=None): if name is None: name = self.__class__.__name__ super().__init__() #super().__init__(RUN_IN_THREAD) # @override def _init(self): self.processes = [] mco_folder = self.get_config().get_agent_information(self.get_config().unit_name,self.name)["path"] mco_folder_abs_path = pwd + "/" + mco_folder + "/" self.mco_folder_abs_path = mco_folder_abs_path if not os.path.exists(mco_folder_abs_path+"output/"): os.makedirs(mco_folder_abs_path+"output/") if not os.path.exists(mco_folder_abs_path+"input/"): os.makedirs(mco_folder_abs_path+"input/") self.output_folder = mco_folder_abs_path+"output/" self.input_folder = mco_folder_abs_path+"input/" def do_process_image(self, image_name:str): image_path = self.input_folder + image_name output_file_name = image_name.split(".")[0] + ".json" output_file_path = self.output_folder + output_file_name if os.path.exists(image_path): cmd = "python3 " + self.mco_folder_abs_path + "estimation_FWHM.py -i " + image_path + " -o " + self.output_folder print(f"Running cmd {cmd}") process = subprocess.Popen(cmd,shell=True) self.processes.append(process) cmd = "python3 " + self.mco_folder_abs_path + "image_stat.py -i " + image_path + " -o " + self.output_folder print(f"Running cmd {cmd}") process = subprocess.Popen(cmd,shell=True) self.processes.append(process) for process in self.processes: process.wait() else: print(f"Can't find image {image_path}") #@override def main_loop_start(self): print("LOOP START") #@override def main_loop_end(self): print("LOOP END"); ''' # @override def load_config(self): super().load_config() ''' ''' # @override def update_survey(self): super().update_survey() ''' ''' # @override def get_next_command(self): return super().get_next_command() ''' # @override def do_log(self): super().do_log() # @override def do_things_before_exit(self, stopper_agent_name=None): print("AgentMCO fait quelques trucs à lui avant de stopper...") """ ================================================================= MAIN FUNCTION ================================================================= """ if __name__ == "__main__": agent = build_agent(AgentMCO) agent.run()