AgentDeviceTelescopeGemini.py
5.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
#!/usr/bin/env python3
import sys
##import utils.Logger as L
##import threading
import time
##from .Agent import Agent
sys.path.append("..")
from agent.AgentDevice import AgentDevice, extract_parameters
from common.models import AgentDeviceTelescopeStatus, get_or_create_unique_row_from_model
sys.path.append("../../..")
from devices_controller.devices_controller_concrete.device_simulator_common.server_udp_or_tcp import get_SocketServer_UDP_TCP
from devices_controller.devices_controller_concrete.device_controller_Gemini.telescope_controller_gemini import TelescopeControllerGEMINI
##log = L.setupLogger("AgentXTaskLogger", "AgentX")
class AgentDeviceTelescopeGemini(AgentDevice):
_agent_device_telescope_status = None
# FOR TEST ONLY
# Run this agent in simulator mode
SIMULATOR_MODE = False
# Run the assertion tests at the end
SIMULATOR_WITH_TEST = True
# Run the assertion tests at the end
SIMULATOR_WITH_REAL_DEVICE = False
#SIMULATOR_MAX_DURATION_SEC = None
SIMULATOR_MAX_DURATION_SEC = 70
# Who should I send commands to ?
#SIMULATOR_COMMANDS_DEST = "myself"
##SIMULATOR_COMMANDS_DEST = "AgentB"
# Scenario to be executed
SIMULATOR_COMMANDS_LIST = []
HOST,PORT = "82.64.28.71", 11110
"""
=================================================================
FUNCTIONS RUN INSIDE MAIN THREAD
=================================================================
"""
# @override
def __init__(self, config_filename=None, RUN_IN_THREAD=True):
'''
if self.is_in_simulator_mode() and not self.SIMULATOR_WITH_REAL_DEVICE:
# START device SIMULATOR (in a thread) so that we can connect to it in place of the real device
self.HOST = "localhost"
thread_device_simulator = threading.Thread(target=self.device_simulator)
thread_device_simulator.start()
'''
super().__init__(self.__class__.__name__, config_filename, RUN_IN_THREAD, device_controller=TelescopeControllerGEMINI, host=self.HOST, port=self.PORT)
# Initialize the device table status
# If table is empty, create a default 1st row
##self._agent_device_telescope_status = get_or_create_unique_row_from_model(AgentDeviceTelescopeStatus)
"""
if not AgentDeviceTelescopeStatus.objects.exists():
print("CREATE first row")
self._agent_device_telescope_status = AgentDeviceTelescopeStatus.objects.create(id=1)
# Get 1st row (will be updated at each iteration by routine_process() with current device status)
print("GET first row")
self._agent_device_telescope_status = AgentDeviceTelescopeStatus.objects.get(id=1)
"""
# Initialize the device socket
# Port local AK 8085 = redirigé sur l’IP du tele 192.168.0.12 sur port 11110
##HOST, PORT = "82.64.28.71", 11110
#HOST, PORT = "localhost", 11110
##self.tele_ctrl = TelescopeControllerGEMINI(HOST, PORT, True)
##self._log.print(f"init done for {name}")
def init(self):
super().init()
# Telescope (long) init
# TODO:
def device_simulator(self):
super().device_simulator()
#HOST, PORT = "localhost", 11110
#with get_SocketServer_UDP_TCP(HOST, PORT, "UDP") as myserver:
print(self.HOST, self.PORT)
with get_SocketServer_UDP_TCP(self.HOST, self.PORT, "UDP") as myserver:
myserver.serve_forever()
'''
myserver = get_SocketServer_UDP_TCP(self.HOST, self.PORT, "UDP")
myserver.serve_forever()
'''
'''
# @override
def load_config(self):
super().load_config()
'''
'''
# @override
def update_survey(self):
super().update_survey()
'''
'''
# @override
def get_next_command(self):
return super().get_next_command()
'''
# @Override
def do_log(self):
super().do_log()
# @Override
def get_device_status(self):
cmd="get date"
res = self._device_ctrl.execute_cmd(cmd)
self.printd("result is", str(res))
if res.ok: print("OK")
dev_date = str(res)
time.sleep(1)
cmd="get time"
res = self._device_ctrl.execute_cmd(cmd)
self.printd("result is", str(res))
if res.ok: print("OK")
dev_time = str(res)
time.sleep(1)
cmd="get radec"
res = self._device_ctrl.execute_cmd(cmd)
self.printd("result is", str(res))
if res.ok: print("OK")
dev_radec = str(res)
time.sleep(1)
return { 'date':dev_date, 'time':dev_time, 'radec':dev_radec }
"""
=================================================================
MAIN FUNCTION
=================================================================
"""
if __name__ == "__main__":
# with thread
RUN_IN_THREAD=True
# with process
#RUN_IN_THREAD=False
TEST_MODE, configfile = extract_parameters()
"""
configfile = None
# arg 1 : config file
if len(sys.argv) == 2:
configfile = sys.argv[1]
"""
#agent = AgentX()
agent = AgentDeviceTelescopeGemini(configfile, RUN_IN_THREAD)
agent.setSimulatorMode(TEST_MODE)
print(agent)
agent.run()