AgentDeviceSBIG.py
5.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
#!/usr/bin/env python3
import sys
##import utils.Logger as L
##import threading
import time
##from .Agent import Agent
sys.path.append("..")
from agent.AgentDevice import AgentDevice, build_agent
from common.models import AgentDeviceTelescopeStatus, get_or_create_unique_row_from_model
sys.path.append("../../..")
##from device_controller.concrete_component.device_simulator_common.server_udp_or_tcp import get_SocketServer_UDP_TCP
from device_controller.concrete_component.sbig.sbig_simulator import DeviceSimulatorSBIG
from device_controller.concrete_component.sbig.sbig_controller import DeviceControllerSBIG
##log = L.setupLogger("AgentXTaskLogger", "AgentX")
class AgentDeviceSBIG(AgentDevice):
_agent_device_telescope_status = None
# Host and Port of the device
# TODO: set IP address
HOST,PORT = "xx.xx.xx.xx", 11112
# FOR TEST ONLY :
# - Run this agent in simulator mode ?
TEST_MODE = None
# - Run the assertion tests at the end ?
TEST_WITH_FINAL_TEST = True
# - Run the test with the real device ?
WITH_SIMULATOR = False
#WITH_SIMULATOR = True
# - How many seconds should I be running ?
#TEST_MAX_DURATION_SEC = None
TEST_MAX_DURATION_SEC = 70
# - Who should I send commands to ?
#TEST_COMMANDS_DEST = "myself"
##TEST_COMMANDS_DEST = "AgentB"
# - Scenario to be executed
TEST_COMMANDS_LIST = []
"""
=================================================================
FUNCTIONS RUN INSIDE MAIN THREAD
=================================================================
"""
# @override
def __init__(self, config_filename=None, RUN_IN_THREAD=True):
'''
if self.is_in_simulator_mode() and not self.WITH_SIMULATOR:
# START device SIMULATOR (in a thread) so that we can connect to it in place of the real device
self.HOST = "localhost"
thread_device_simulator = threading.Thread(target=self.device_simulator)
thread_device_simulator.start()
'''
#super().__init__(self.__class__.__name__,
super().__init__(
config_filename,
RUN_IN_THREAD,
device_controller=DeviceControllerSBIG, host=self.HOST, port=self.PORT,
device_simulator=DeviceSimulatorSBIG)
# Initialize the device table status
# If table is empty, create a default 1st row
##self._agent_device_telescope_status = get_or_create_unique_row_from_model(AgentDeviceTelescopeStatus)
"""
if not AgentDeviceTelescopeStatus.objects.exists():
print("CREATE first row")
self._agent_device_telescope_status = AgentDeviceTelescopeStatus.objects.create(id=1)
# Get 1st row (will be updated at each iteration by routine_process() with current device status)
print("GET first row")
self._agent_device_telescope_status = AgentDeviceTelescopeStatus.objects.get(id=1)
"""
# Initialize the device socket
# Port local AK 8085 = redirigé sur l’IP du tele 192.168.0.12 sur port 11110
##HOST, PORT = "82.64.28.71", 11110
#HOST, PORT = "localhost", 11110
##self.tele_ctrl = TelescopeControllerGemini(HOST, PORT, True)
##self._log.print(f"init done for {name}")
def init(self):
super().init()
# Telescope (long) init
# TODO:
"""
def device_simulator(self):
super().device_simulator()
#HOST, PORT = "localhost", 11110
#with get_SocketServer_UDP_TCP(HOST, PORT, "UDP") as myserver:
#with get_SocketServer_UDP_TCP(self.HOST, self.PORT, "UDP") as myserver: myserver.serve_forever()
TelescopeGeminiSimulator.serve_forever(self.PORT)
'''
myserver = get_SocketServer_UDP_TCP(self.HOST, self.PORT, "UDP")
myserver.serve_forever()
'''
"""
'''
# @override
def load_config(self):
super().load_config()
'''
'''
# @override
def update_survey(self):
super().update_survey()
'''
'''
# @override
def get_next_command(self):
return super().get_next_command()
'''
# @Override
def do_log(self):
super().do_log()
# @Override
def get_device_status(self):
cmd="get date"
res = self._device_ctrl.execute_cmd(cmd)
self.printd("result is", str(res))
if res.ok: print("OK")
dev_date = str(res)
time.sleep(1)
cmd="get time"
res = self._device_ctrl.execute_cmd(cmd)
self.printd("result is", str(res))
if res.ok: print("OK")
dev_time = str(res)
time.sleep(1)
'''
cmd="get radec"
res = self._device_ctrl.execute_cmd(cmd)
self.printd("result is", str(res))
if res.ok: print("OK")
dev_radec = str(res)
time.sleep(1)
return { 'date':dev_date, 'time':dev_time, 'radec':dev_radec }
'''
return { 'date':dev_date, 'time':dev_time }
"""
=================================================================
MAIN FUNCTION
=================================================================
"""
if __name__ == "__main__":
# with thread
RUN_IN_THREAD=True
# with process
#RUN_IN_THREAD=False
agent = build_agent(AgentDeviceSBIG, RUN_IN_THREAD=RUN_IN_THREAD)
'''
TEST_MODE, WITH_SIM, configfile = extract_parameters()
#agent = AgentX()
agent = AgentDeviceTelescopeGemini(configfile, RUN_IN_THREAD)
agent.setSimulatorMode(TEST_MODE)
print(agent)
'''
agent.run()