gemini_simulator.py
5.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#!/usr/bin/env python3
import sys
from device_controller.abstract_component.device_simulator import printd, DeviceSimulator, UnknownCommandException
# Only if we override the get_server() method to change the server type
from device_controller.channels.server_udp_or_tcp import get_SocketServer_UDP_TCP
###STAMP = '01000000'
#STAMP = '0100000000000000'
#HOST, PORT = "localhost", 11110
#HOST, PORT = "localhost", 9999
#HOST, PORT = "localhost", 20001
'''
# stamp is 8 digits long
STAMP_LENGTH = 8
# COMMON CONSTANTS WITH CLIENT
TERMINATOR = '\x00'
COMMAND5 = '050000'
#COMMAND6_SIMPLE = '6'
COMMAND6 = '\x00\x06\x00'
'''
'''
class Memo:
@classmethod
def get(cls, var):
return cls._variables[var]
@classmethod
def set(cls, var, value):
cls._variables[var] = value
_variables = {
"C" : '10/10/20',
"L" : '10:20:37',
"g" : '+11',
"t" : '+45:00:01',
}
'''
# Voir https://stackoverflow.com/questions/10085996/shutdown-socketserver-serve-forever-in-one-thread-python-application
#class DeviceSimulatorTelescopeGemini(DeviceSimulator):
class DS_Gemini(DeviceSimulator):
#with socketserver_type((HOST, PORT), MyUDPorTCPHandler_classic) as myserver:
'''
##
# @override
@classmethod
def serve_forever(cls, PORT):
super().serve_forever(PORT, cls.get_SocketServer_UDP_TCP_gemini)
#super().serve_forever(PORT, cls.get_Server)
'''
# @override superclass method (if named make_answer_for_request)
@classmethod
#def make_answer_for_request(cls, request_bytes:bytes):
def make_answer_for_request_OTHER(cls, request_bytes:bytes):
#if request == STAMP + ":GD#": return STAMP + "+12:28"
#if request == b'0100000000000000:GD#': return bytes(STAMP + "+12:28", "utf-8")
printd("(SIM Gemini) Request received is", request_bytes)
'''
3 types of commands:
- TYPE1: '06' or '050000'
- TYPE2: ':cde#' (mainly ':??#' - i.e : ':GD#', ':GR#', ...)
- TYPE3: 'b?#' (bC# = Cold Start, bW# = selecting Warm Start, bR# = selecting Warm Restart)
'''
# Convert to string
request = request_bytes.decode("utf-8")
if len(request) < cls.protoc.STAMP_LENGTH+2+1: raise UnknownCommandException(request)
# Remove TERMINATOR
request = request[0:-1]
# Remove leading stamp
stamp = request[0:cls.protoc.STAMP_LENGTH]
command = request[cls.protoc.STAMP_LENGTH:]
printd("(SIM Gemini) Command received is", repr(command))
# TYPE1
if command not in (cls.protoc.COMMAND5, cls.protoc.COMMAND6):
# TYPE2 or 3
if len(command) < 3: raise UnknownCommandException()
if not (command[-1]=='#'): raise UnknownCommandException()
if not (command[0]==':') and command not in ('bC#','bW#','bR#'): raise UnknownCommandException()
command_start = command[1:3]
if command == cls.protoc.COMMAND6: answer = "G"
elif command == cls.protoc.COMMAND5: answer = cls.protoc.COMMAND5
#gr_request = STAMP + ':GR#' + END
#return bytes(stamp + "15:01:48#" + TERMINATOR, "utf-8")
elif command == ':GR#': answer = "15:01:50"
#gd_request = STAMP + ':GD#' + END
#if request == bytes(gd_request, "utf-8"): return bytes(STAMP + "+12:28#", "utf-8")
#elif useful_request == 'GD': answer = "+12:28"
elif command == ':GD#': answer = "+12:30"
elif command == ':SG+00#': answer = "1"
elif command == ':GG#': answer = "+00"
elif command_start == 'Gv': answer = 'T'
elif command_start in ('GC','GL', 'Gg', 'Gt'):
answer = Memo.get(command_start[1])
##answer = getc().get_answer_for_native_command(command_start)
##printd("****** (SIM Gemini) answer for native cmd", command_start, "is:", getc().get_simulated_answer_for_native_cmd(command_start))
printd("****** (SIM Gemini) answer for native cmd", command_start, "is:", cls.get_simulated_answer_for_native_cmd(command_start))
# Gemini telescope replaces "*" with ":"
if command_start in ('Gg','Gt'): answer = answer.replace('*',':')
else:
# Remove ending '#'
command = command[0:-1]
##if command_start in ('SC', 'SL', 'Sg', 'St'): Memo.set(command_start[1],command[3:])
if command_start in ('SC', 'SL', 'Sg', 'St'): Memo.set(command_start[1],command[3:])
if command[0] == ':': command = command[1:]
answer = command.upper()
full_answer_in_bytes = bytes(stamp + answer + '#' + cls.protoc.TERMINATOR, "utf-8")
#printd("request str upper is", str(request).upper())
printd("(SIM Gemini) Answer sent is", full_answer_in_bytes)
return full_answer_in_bytes
# @override superclass to change the simulator server channel type (socket, serial, ...)
@classmethod
def get_server(cls, myhost:str="localhost", myport:int=11110, PROTOCOL:str="TCP"):
printd("\n****************** GEMINI get_server() ********************************\n")
return get_SocketServer_UDP_TCP(myhost, myport, PROTOCOL, cls.make_answer_for_request)
'''
def cls.protoc: return DS_Gemini.protoc
def getc(): return DS_Gemini.gen2nat_cmds
'''