sbig_simulator.py
5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
#!/usr/bin/env python3
import sys
#from device_controller.concrete_component.sbig.server_udp_or_tcp_sbig import get_SocketServer_UDP_TCP_sbig
from device_controller.abstract_component.device_simulator import DeviceSimulator
#HOST = "localhost"
#!/usr/bin/env python3
#from device_controller.concrete_component.device_simulator_common.server_udp_or_tcp import get_SocketServer_UDP_TCP
from device_controller.channels.server_udp_or_tcp import get_SocketServer_UDP_TCP
###STAMP = '01000000'
#STAMP = '0100000000000000'
#HOST, PORT = "localhost", 11110
#HOST, PORT = "localhost", 9999
#HOST, PORT = "localhost", 20001
class UnknownCommandException(Exception):
pass
# Voir https://stackoverflow.com/questions/10085996/shutdown-socketserver-serve-forever-in-one-thread-python-application
#class DeviceSimulatorSBIG(DeviceSimulator):
class DS_SBIG(DeviceSimulator):
#with socketserver_type((HOST, PORT), MyUDPorTCPHandler_classic) as myserver:
#_protoc = None
#_gen2nat_cmds = {}
@classmethod
def serve_forever(cls, PORT):
super().serve_forever(PORT, get_SocketServer_UDP_TCP_sbig)
@classmethod
def set_protoc_and_cmds(cls, protoc, gen2nat_cmds, DEBUG=False):
cls._protoc = protoc
cls._gen2nat_cmds = gen2nat_cmds
print("****** (SIM) my cmds are:", cls._gen2nat_cmds, "******")
'''
def __init__(self, protoc, gen2nat_cmds, DEBUG=False):
self._protoc = protoc
self._gen2nat_cmds = gen2nat_cmds
print("****** (SIM) my cmds are:", self._gen2nat_cmds, "******")
def serve_forever(self, PORT):
super().serve_forever(PORT, get_SocketServer_UDP_TCP_sbig)
'''
# SHORTCUTS
#_protoc = getp()
#_gen2nat_cmds = DS_SBIG._gen2nat_cmds
class Memo:
@classmethod
def get(cls, var):
return cls._variables[var]
@classmethod
def set(cls, var, value):
cls._variables[var] = value
_variables = {
"C" : '20/10/19',
"L" : '20:20:36',
"g" : '+10',
"t" : '+45:00:00',
}
'''
@property
def protoc(): return getp()
'''
def getp(): return DS_SBIG._protoc
def getc(): return DS_SBIG._gen2nat_cmds
# @override superclass method
def make_answer_for_request(request_bytes:bytes):
#if request == STAMP + ":GD#": return STAMP + "+12:28"
#if request == b'0100000000000000:GD#': return bytes(STAMP + "+12:28", "utf-8")
print("(SIM) Request received is", request_bytes)
'''
3 types of commands:
- TYPE1: '06' or '050000'
- TYPE2: ':cde#' (mainly ':??#' - i.e : ':GD#', ':GR#', ...)
- TYPE3: 'b?#' (bC# = Cold Start, bW# = selecting Warm Start, bR# = selecting Warm Restart)
'''
# Convert to string
request = request_bytes.decode("utf-8")
#if len(request) < getp().STAMP_LENGTH+2+1: raise UnknownCommandException(request)
if len(request) < getp().STAMP_LENGTH+2+1: raise UnknownCommandException(request)
# Remove TERMINATOR
request = request[0:-1]
# Remove leading stamp
stamp = request[0:getp().STAMP_LENGTH]
command = request[getp().STAMP_LENGTH:]
print("(SIM) Command received is", repr(command))
# TYPE1
if command not in (getp().COMMAND5, getp().COMMAND6):
# TYPE2 or 3
if len(command) < 3: raise UnknownCommandException()
if not (command[-1]=='#'): raise UnknownCommandException()
if not (command[0]==':') and command not in ('bC#','bW#','bR#'): raise UnknownCommandException()
command_start = command[1:3]
if command == getp().COMMAND6: answer = "G"
elif command == getp().COMMAND5: answer = getp().COMMAND5
#gr_request = STAMP + ':GR#' + END
#return bytes(stamp + "15:01:48#" + TERMINATOR, "utf-8")
elif command == ':GR#': answer = "25:01:49"
#gd_request = STAMP + ':GD#' + END
#if request == bytes(gd_request, "utf-8"): return bytes(STAMP + "+12:28#", "utf-8")
#elif useful_request == 'GD': answer = "+12:28"
elif command == ':GD#': answer = "+22:29"
elif command == ':SG+00#': answer = "1"
elif command == ':GG#': answer = "+00"
elif command_start == 'Gv': answer = 'T'
elif command_start in ('GC','GL', 'Gg', 'Gt'):
answer = Memo.get(command_start[1])
# Gemini telescope replaces "*" with ":"
if command_start in ('Gg','Gt'): answer = answer.replace('*',':')
else:
# Remove ending '#'
command = command[0:-1]
if command_start in ('SC', 'SL', 'Sg', 'St'): Memo.set(command_start[1],command[3:])
if command[0] == ':': command = command[1:]
answer = command.upper()
full_answer_in_bytes = bytes(stamp + answer + '#' + getp().TERMINATOR, "utf-8")
#print("request str upper is", str(request).upper())
print("(SIM) Answer sent is", full_answer_in_bytes)
return full_answer_in_bytes
def get_SocketServer_UDP_TCP_sbig(myhost:str="localhost", myport:int=11112, PROTOCOL:str="TCP"):
#return get_SocketServer_UDP_TCP(myhost, myport, PROTOCOL)
return get_SocketServer_UDP_TCP(myhost, myport, PROTOCOL, make_answer_for_request)