AgentDeviceSBIG.py
6.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
#!/usr/bin/env python3
import sys
##import utils.Logger as L
##import threading
import time
##from .Agent import Agent
sys.path.append("..")
from agent.AgentDevice import AgentDevice, build_agent
from common.models import AgentDeviceTelescopeStatus, get_or_create_unique_row_from_model
sys.path.append("../../..")
##from device_controller.concrete_component.device_simulator_common.server_udp_or_tcp import get_SocketServer_UDP_TCP
#from device_controller.concrete_component.sbig.sbig_simulator import DeviceSimulatorSBIG
from device_controller.concrete_component.sbig.sbig_controller import DC_SBIG
##log = L.setupLogger("AgentXTaskLogger", "AgentX")
class AgentDeviceSBIG(AgentDevice):
# Agent level specific commands
# @override superclass Agent
AGENT_SPECIFIC_COMMANDS = [
"do_ad_sbig_specific1",
"set_ad_sbig_specific2",
# Error case
"do_ad_sbig_specific3_unimplemented",
"do_ad_sbig_specific4_unimplemented",
]
_agent_device_telescope_status = None
# Host and Port of the device
# TODO: set IP address
HOST,PORT = "xx.xx.xx.xx", 11112
# FOR TEST ONLY :
# - Run this agent in simulator mode ?
TEST_MODE = None
# - Run the assertion tests at the end ?
TEST_WITH_FINAL_TEST = True
# - Run the test with the real device ?
WITH_SIMULATOR = False
#WITH_SIMULATOR = True
# - How many seconds should I be running ?
TEST_MAX_DURATION_SEC = None
#TEST_MAX_DURATION_SEC = 110
# - Who should I send commands to ?
#TEST_COMMANDS_DEST = "myself"
##TEST_COMMANDS_DEST = "AgentB"
# - Scenario to be executed
TEST_COMMANDS_LIST = []
"""
=================================================================
FUNCTIONS RUN INSIDE MAIN THREAD
=================================================================
"""
#def __init__(self, config_filename=None, RUN_IN_THREAD=True, DEBUG_MODE=False):
def __init__(self, config_filename=None, RUN_IN_THREAD=True):
'''
if self.is_in_simulator_mode() and not self.WITH_SIMULATOR:
# START device SIMULATOR (in a thread) so that we can connect to it in place of the real device
self.HOST = "localhost"
thread_device_simulator = threading.Thread(target=self.device_simulator)
thread_device_simulator.start()
'''
#super().__init__(self.__class__.__name__,
super().__init__(
config_filename,
RUN_IN_THREAD,
device_controller=DC_SBIG, host=self.HOST, port=self.PORT
)
#DEBUG_MODE=DEBUG_MODE)
#device_simulator=DeviceSimulatorSBIG)
# Initialize the device table status
# If table is empty, create a default 1st row
##self._agent_device_telescope_status = get_or_create_unique_row_from_model(AgentDeviceTelescopeStatus)
"""
if not AgentDeviceTelescopeStatus.objects.exists():
self.printd("CREATE first row")
self._agent_device_telescope_status = AgentDeviceTelescopeStatus.objects.create(id=1)
# Get 1st row (will be updated at each iteration by routine_process() with current device status)
self.printd("GET first row")
self._agent_device_telescope_status = AgentDeviceTelescopeStatus.objects.get(id=1)
"""
# Initialize the device socket
# Port local AK 8085 = redirigé sur l’IP du tele 192.168.0.12 sur port 11110
##HOST, PORT = "82.64.28.71", 11110
#HOST, PORT = "localhost", 11110
##self.tele_ctrl = TelescopeControllerGemini(HOST, PORT, True)
##self._log.self.printd(f"init done for {name}")
def init(self):
super().init()
# Telescope (long) init
# TODO:
"""
def device_simulator(self):
super().device_simulator()
#HOST, PORT = "localhost", 11110
#with get_SocketServer_UDP_TCP(HOST, PORT, "UDP") as myserver:
#with get_SocketServer_UDP_TCP(self.HOST, self.PORT, "UDP") as myserver: myserver.serve_forever()
TelescopeGeminiSimulator.serve_forever(self.PORT)
'''
myserver = get_SocketServer_UDP_TCP(self.HOST, self.PORT, "UDP")
myserver.serve_forever()
'''
"""
'''
# @override
def load_config(self):
super().load_config()
'''
'''
# @override
def update_survey(self):
super().update_survey()
'''
'''
# @override
def get_next_command(self):
return super().get_next_command()
'''
# @Override
def do_log(self):
super().do_log()
'''
AGENT LEVEL SPECIFIC COMMANDS
'''
def do_ad_sbig_specific1(self):
self.printd("processing do_sbig_specific1... ")
def set_ad_sbig_specific2(self):
self.printd("processing set_sbig_specific2... ")
# @Override
def get_device_status(self):
cmd="get_date"
res = self._device_ctrl.exec_cmd(cmd)
self.printd("result is", str(res))
if res.ok: self.printd("OK")
dev_date = str(res)
time.sleep(1)
cmd="get_time"
res = self._device_ctrl.exec_cmd(cmd)
self.printd("result is", str(res))
if res.ok: self.printd("OK")
dev_time = str(res)
time.sleep(1)
'''
cmd="get radec"
res = self._device_ctrl.execute_cmd(cmd)
self.printd("result is", str(res))
if res.ok: self.printd("OK")
dev_radec = str(res)
time.sleep(1)
return { 'date':dev_date, 'time':dev_time, 'radec':dev_radec }
'''
return { 'date':dev_date, 'time':dev_time }
"""
=================================================================
MAIN FUNCTION
=================================================================
"""
if __name__ == "__main__":
# with thread
RUN_IN_THREAD=True
# with process
#RUN_IN_THREAD=False
agent = build_agent(AgentDeviceSBIG, RUN_IN_THREAD=RUN_IN_THREAD)
'''
TEST_MODE, WITH_SIM, configfile = extract_parameters()
#agent = AgentX()
agent = AgentDeviceTelescopeGemini(configfile, RUN_IN_THREAD)
agent.setSimulatorMode(TEST_MODE)
self.printd(agent)
'''
agent.run()