device_simulator.py
11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
#!/usr/bin/env python3
import sys
#from device_controller.abstract_component.sbig.server_udp_or_tcp_sbig import get_SocketServer_UDP_TCP_sbig
from device_controller.channels.server_udp_or_tcp import get_SocketServer_UDP_TCP
from device_controller.abstract_component.device_controller import DeviceController
HOST = "localhost"
class UnknownCommandException(Exception):
pass
# Voir https://stackoverflow.com/questions/10085996/shutdown-socketserver-serve-forever-in-one-thread-python-application
# Abstract class
class DeviceSimulator:
#with socketserver_type((HOST, PORT), MyUDPorTCPHandler_classic) as myserver:
# class attributes
myserver = None
protoc = None
gen2nat_cmds = None
# My associated Device Controller
my_dc = None
# This method will be called by the DC when launching its simulator
@classmethod
def set_dc(cls, dc:DeviceController):
cls.my_dc = dc
# This method will be called by the DC when launching its simulator
@classmethod
def set_protoc_and_cmds(cls, protoc, gen2nat_cmds, DEBUG=False):
# (EP) This is a bit ugly but I had to do this...
# Set protoc and gen2nat_cmds class attributes for class DeviceSimulator
'''
DeviceSimulator.protoc = protoc
DeviceSimulator.gen2nat_cmds = gen2nat_cmds
'''
# Idem for subclasses (DS_Gemini, DS_SBIG, ...) class attributes
cls.protoc = protoc
cls.gen2nat_cmds = gen2nat_cmds
print("****** (SIM) my cmds are:", cls.gen2nat_cmds, "******")
@classmethod
def getp(cls): return cls.protoc
'''
@classmethod
def getc(cls): return cls.gen2nat_cmds
'''
# shortcut
@classmethod
def get_simulated_answer_for_native_cmd(cls, command_start):
return cls.gen2nat_cmds.get_simulated_answer_for_generic_cmd(command_start)
@classmethod
##def serve_forever(cls, PORT, func_server = get_SocketServer_UDP_TCP):
def serve_forever(cls, PORT):
#def serve_forever(cls, PORT, PROTOCOL):
#with get_SocketServer_UDP_TCP(HOST, PORT, "UDP") as myserver: myserver.serve_forever()
##cls.myserver = func_server(HOST, PORT, "UDP")
#cls.myserver = cls.get_SocketServer_UDP_TCP_gemini(HOST, PORT, "UDP")
cls.myserver = cls.get_server(HOST, PORT, "UDP")
#cls.myserver = cls.get_Server(HOST, PORT, PROTOCOL)
print("******** myserver START: *********", cls.myserver)
try:
# Handle requests in an infinite loop. Runs until the loop is broken with an exception
cls.myserver.serve_forever()
except KeyboardInterrupt:
pass
print("******** myserver SHUTDOWN: *********", cls.myserver)
cls.myserver.server_close()
cls.myserver.shutdown()
@classmethod
def stop(cls):
print("******** myserver: *********", cls.myserver)
cls.myserver.shutdown()
# To be overriden by subclasses to change the simulator server channel type (socket, serial, ...)
# DEFAULT server type
@classmethod
def get_server(cls, myhost:str="localhost", myport:int=11110, PROTOCOL:str="TCP"):
print("\n****************** ORIG get_server() ********************************\n")
return get_SocketServer_UDP_TCP(myhost, myport, PROTOCOL, cls.make_answer_for_request)
'''
@classmethod
def get_Server(cls, myhost:str="localhost", myport:int=11110, PROTOCOL:str="TCP"):
# Default simulator using default make_answer_for_request() function
#return get_SocketServer_UDP_TCP(myhost, myport, PROTOCOL)
# Specific simulator using the make_answer_for_request() function from this module
return get_SocketServer_UDP_TCP(myhost, myport, PROTOCOL, cls.make_answer_for_request)
'''
# DEFAULT simulator answers = send cmd name in upper case
@classmethod
def make_answer_for_request_CMD_TO_UPPER(cls, request_bytes):
#raise NotImplementedError
print("(SIM serv) Request received is", request_bytes)
# Convert to string
request = request_bytes.decode("utf-8")
#if len(request) < STAMP_LENGTH+2+1: raise UnknownCommandException(request)
# Remove TERMINATOR
#request = request[0:-1]
# Remove leading stamp
#stamp = request[0:STAMP_LENGTH]
#command = request[STAMP_LENGTH:]
command = request
print("(SIM serv) Command received is", repr(command))
answer = 'ANSWER TO '+command.upper()
#if command == ':GR#': answer = "15:01:49"
#elif command == ':GD#': answer = "+12:29"
#elif command == ':SG+00#': answer = "1"
#elif command == ':GG#': answer = "+00"
#full_answer_in_bytes = bytes(stamp + answer + '#' + TERMINATOR, "utf-8")
full_answer_in_bytes = bytes(answer, "utf-8")
#print("request str upper is", str(request).upper())
print("(SIM serv) Answer sent is", full_answer_in_bytes)
return full_answer_in_bytes
# DEFAULT simulator answers
@classmethod
def make_answer_for_request_DEFAULT_A_VIRER(cls, request_bytes):
#raise NotImplementedError
print("(SIM serv) Request received is", request_bytes)
# Convert to string
request = request_bytes.decode("utf-8")
#if len(request) < STAMP_LENGTH+2+1: raise UnknownCommandException(request)
# Remove TERMINATOR
#request = request[0:-1]
# Remove leading stamp
#stamp = request[0:STAMP_LENGTH]
#command = request[STAMP_LENGTH:]
command = request
print("(SIM serv) Command received is", repr(command))
command_start = command[1:3]
answer = cls.get_simulated_answer_for_native_cmd(command_start)
if answer is None: answer='EMPTY '+command
print("****** (SIM) answer for native cmd", command_start, "is:", answer)
#if command == ':GR#': answer = "15:01:49"
#elif command == ':GD#': answer = "+12:29"
#elif command == ':SG+00#': answer = "1"
#elif command == ':GG#': answer = "+00"
#full_answer_in_bytes = bytes(stamp + answer + '#' + TERMINATOR, "utf-8")
full_answer_in_bytes = bytes(answer, "utf-8")
#print("request str upper is", str(request).upper())
print("(SIM serv) Answer sent is", full_answer_in_bytes)
return full_answer_in_bytes
# DEFAULT simulator answers
@classmethod
def make_answer_for_request_DEFAULT(cls, request_bytes:bytes):
#if request == STAMP + ":GD#": return STAMP + "+12:28"
#if request == b'0100000000000000:GD#': return bytes(STAMP + "+12:28", "utf-8")
print("(SIM Gemini) Request received is", request_bytes)
'''
3 types of commands:
- TYPE1: '06' or '050000'
- TYPE2: ':cde#' (mainly ':??#' - i.e : ':GD#', ':GR#', ...)
- TYPE3: 'b?#' (bC# = Cold Start, bW# = selecting Warm Start, bR# = selecting Warm Restart)
'''
# Convert to string
request = request_bytes.decode("utf-8")
if len(request) < cls.protoc.STAMP_LENGTH+2+1: raise UnknownCommandException(request)
# Remove TERMINATOR
request = request[0:-1]
# Remove leading stamp
stamp = request[0:cls.protoc.STAMP_LENGTH]
command = request[cls.protoc.STAMP_LENGTH:]
print("(SIM Gemini) Command received is", repr(command))
# TYPE1
if command not in (cls.protoc.COMMAND5, cls.protoc.COMMAND6):
# TYPE2 or 3
if (
(len(command) < 3)
or
(not command[-1]=='#')
or (
not (command[0]==':') and command not in ('bC#','bW#','bR#')
)
):
raise UnknownCommandException()
command_start = command[1:3]
# If "set_xxx" command, update related "get_xxx" command simulated answer in the commands dictionary
#if command_start in ('SC', 'SL', 'Sg', 'St'):
if command_start[0] == 'S':
#Memo.set(command_start[1],command[3:])
#related_get_cmd_name = 'G'+ command_start[1]
related_get_cmd_name = cls.my_dc.get_related_native_get_cmd_name_for_set_cmd(command_start)
#print('(SIM) related_get_cmd_name:', related_get_cmd_name)
if related_get_cmd_name:
cls.my_dc.set_simulated_answer_for_native_get_cmd(related_get_cmd_name, command)
##answer = cls.get_simulated_answer_for_native_cmd(command_start)
answer = cls.my_dc.get_simulated_answer_for_native_cmd(command_start)
if answer is None: answer='NO ANSWER IMPLEMENTED FOR COMMAND '+command
print("****** (SIM) answer for native cmd", command_start, "is:", answer)
'''
if command == cls.protoc.COMMAND6: answer = "G"
elif command == cls.protoc.COMMAND5: answer = cls.protoc.COMMAND5
#gr_request = STAMP + ':GR#' + END
#return bytes(stamp + "15:01:48#" + TERMINATOR, "utf-8")
elif command == ':GR#': answer = "15:01:49"
#gd_request = STAMP + ':GD#' + END
#if request == bytes(gd_request, "utf-8"): return bytes(STAMP + "+12:28#", "utf-8")
#elif useful_request == 'GD': answer = "+12:28"
elif command == ':GD#': answer = "+12:29"
elif command == ':SG+00#': answer = "1"
elif command == ':GG#': answer = "+00"
elif command_start == 'Gv': answer = 'T'
elif command_start in ('GC','GL', 'Gg', 'Gt'):
answer = Memo.get(command_start[1])
##answer = getc().get_answer_for_native_command(command_start)
##print("****** (SIM Gemini) answer for native cmd", command_start, "is:", getc().get_simulated_answer_for_native_cmd(command_start))
print("****** (SIM Gemini) answer for native cmd", command_start, "is:", cls.get_simulated_answer_for_native_cmd(command_start))
# Gemini telescope replaces "*" with ":"
if command_start in ('Gg','Gt'): answer = answer.replace('*',':')
else:
# Remove ending '#'
command = command[0:-1]
##if command_start in ('SC', 'SL', 'Sg', 'St'): Memo.set(command_start[1],command[3:])
if command_start in ('SC', 'SL', 'Sg', 'St'): Memo.set(command_start[1],command[3:])
if command[0] == ':': command = command[1:]
answer = command.upper()
'''
full_answer_in_bytes = bytes(stamp + answer + '#' + cls.protoc.TERMINATOR, "utf-8")
#print("request str upper is", str(request).upper())
print("(SIM Gemini) Answer sent is", full_answer_in_bytes)
return full_answer_in_bytes
# To be overriden by subclasses to change the simulator answers
# DEFAULT simulator answers
@classmethod
def make_answer_for_request(cls, request_bytes): return cls.make_answer_for_request_DEFAULT(request_bytes)
'''
def getp(): return DeviceSimulator.getp()
def getc(): return DeviceSimulator.getc()
'''
'''
def getp(): return DeviceSimulator.protoc
def getc(): return DeviceSimulator.gen2nat_cmds
'''