AgentMCO.py
3.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/usr/bin/env python3
import subprocess
import sys
import os
##import utils.Logger as L
pwd = os.environ['PROJECT_ROOT_PATH']
if pwd not in sys.path:
sys.path.append(pwd)
##from .Agent import Agent
from src.core.pyros_django.agent.Agent import Agent, build_agent, log
##log = L.setupLogger("AgentXTaskLogger", "AgentX")
class AgentMCO(Agent):
# FOR TEST ONLY
# Run this agent in simulator mode
TEST_MODE = False
# Run the assertion tests at the end
TEST_WITH_FINAL_TEST = False
#TEST_MAX_DURATION_SEC = None
TEST_MAX_DURATION_SEC = 100
# Who should I send commands to ?
TEST_COMMANDS_DEST = "myself"
_AGENT_SPECIFIC_COMMANDS = [
("do_process_image",10,0),
]
TEST_COMMANDS_LIST = [
("self do_process_image i.fit", 200, None, "CMD_EXECUTED"),
("self do_exit", 500, "STOPPING", "CMD_EXECUTED"),
]
"""
=================================================================
FUNCTIONS RUN INSIDE MAIN THREAD
=================================================================
"""
def __init__(self, name:str=None):
if name is None:
name = self.__class__.__name__
super().__init__()
#super().__init__(RUN_IN_THREAD)
# @override
def _init(self):
self.processes = []
mco_folder = self.get_config().get_agent_information(self.get_config().unit_name,self.name)["path"]
mco_folder_abs_path = pwd + "/" + mco_folder + "/"
self.mco_folder_abs_path = mco_folder_abs_path
if not os.path.exists(mco_folder_abs_path+"output/"):
os.makedirs(mco_folder_abs_path+"output/")
if not os.path.exists(mco_folder_abs_path+"input/"):
os.makedirs(mco_folder_abs_path+"input/")
self.output_folder = mco_folder_abs_path+"output/"
self.input_folder = mco_folder_abs_path+"input/"
def do_process_image(self, image_name:str):
image_path = self.input_folder + image_name
output_file_name = image_name.split(".")[0] + ".json"
output_file_path = self.output_folder + output_file_name
if os.path.exists(image_path):
cmd = "python3 " + self.mco_folder_abs_path + "estimation_FWHM.py -i " + image_path + " -o " + self.output_folder
print(f"Running cmd {cmd}")
process = subprocess.Popen(cmd,shell=True)
self.processes.append(process)
cmd = "python3 " + self.mco_folder_abs_path + "image_stat.py -i " + image_path + " -o " + self.output_folder
print(f"Running cmd {cmd}")
process = subprocess.Popen(cmd,shell=True)
self.processes.append(process)
for process in self.processes:
process.wait()
else:
print(f"Can't find image {image_path}")
#@override
def main_loop_start(self):
print("LOOP START")
#@override
def main_loop_end(self):
print("LOOP END");
'''
# @override
def load_config(self):
super().load_config()
'''
'''
# @override
def update_survey(self):
super().update_survey()
'''
'''
# @override
def get_next_command(self):
return super().get_next_command()
'''
# @override
def do_log(self):
super().do_log()
# @override
def do_things_before_exit(self, stopper_agent_name=None):
print("AgentMCO fait quelques trucs à lui avant de stopper...")
"""
=================================================================
MAIN FUNCTION
=================================================================
"""
if __name__ == "__main__":
agent = build_agent(AgentMCO)
agent.run()