#!/usr/bin/env python3 import sys #from device_controller.concrete_component.sbig.server_udp_or_tcp_sbig import get_SocketServer_UDP_TCP_sbig from device_controller.abstract_component.device_simulator import DeviceSimulator #HOST = "localhost" #!/usr/bin/env python3 #from device_controller.concrete_component.device_simulator_common.server_udp_or_tcp import get_SocketServer_UDP_TCP from device_controller.channels.server_udp_or_tcp import get_SocketServer_UDP_TCP ###STAMP = '01000000' #STAMP = '0100000000000000' #HOST, PORT = "localhost", 11110 #HOST, PORT = "localhost", 9999 #HOST, PORT = "localhost", 20001 class UnknownCommandException(Exception): pass # Voir https://stackoverflow.com/questions/10085996/shutdown-socketserver-serve-forever-in-one-thread-python-application #class DeviceSimulatorSBIG(DeviceSimulator): class DS_SBIG(DeviceSimulator): #with socketserver_type((HOST, PORT), MyUDPorTCPHandler_classic) as myserver: #_protoc = None #_gen2nat_cmds = {} @classmethod def serve_forever(cls, PORT): super().serve_forever(PORT, get_SocketServer_UDP_TCP_sbig) @classmethod def set_protoc_and_cmds(cls, protoc, gen2nat_cmds, DEBUG=False): cls._protoc = protoc cls._gen2nat_cmds = gen2nat_cmds print("****** (SIM) my cmds are:", cls._gen2nat_cmds, "******") ''' def __init__(self, protoc, gen2nat_cmds, DEBUG=False): self._protoc = protoc self._gen2nat_cmds = gen2nat_cmds print("****** (SIM) my cmds are:", self._gen2nat_cmds, "******") def serve_forever(self, PORT): super().serve_forever(PORT, get_SocketServer_UDP_TCP_sbig) ''' # SHORTCUTS #_protoc = getp() #_gen2nat_cmds = DS_SBIG._gen2nat_cmds class Memo: @classmethod def get(cls, var): return cls._variables[var] @classmethod def set(cls, var, value): cls._variables[var] = value _variables = { "C" : '20/10/19', "L" : '20:20:36', "g" : '+10', "t" : '+45:00:00', } ''' @property def protoc(): return getp() ''' def getp(): return DS_SBIG._protoc def getc(): return DS_SBIG._gen2nat_cmds # @override superclass method def make_answer_for_request(request_bytes:bytes): #if request == STAMP + ":GD#": return STAMP + "+12:28" #if request == b'0100000000000000:GD#': return bytes(STAMP + "+12:28", "utf-8") print("(SIM) Request received is", request_bytes) ''' 3 types of commands: - TYPE1: '06' or '050000' - TYPE2: ':cde#' (mainly ':??#' - i.e : ':GD#', ':GR#', ...) - TYPE3: 'b?#' (bC# = Cold Start, bW# = selecting Warm Start, bR# = selecting Warm Restart) ''' # Convert to string request = request_bytes.decode("utf-8") #if len(request) < getp().STAMP_LENGTH+2+1: raise UnknownCommandException(request) if len(request) < getp().STAMP_LENGTH+2+1: raise UnknownCommandException(request) # Remove TERMINATOR request = request[0:-1] # Remove leading stamp stamp = request[0:getp().STAMP_LENGTH] command = request[getp().STAMP_LENGTH:] print("(SIM) Command received is", repr(command)) # TYPE1 if command not in (getp().COMMAND5, getp().COMMAND6): # TYPE2 or 3 if len(command) < 3: raise UnknownCommandException() if not (command[-1]=='#'): raise UnknownCommandException() if not (command[0]==':') and command not in ('bC#','bW#','bR#'): raise UnknownCommandException() command_start = command[1:3] if command == getp().COMMAND6: answer = "G" elif command == getp().COMMAND5: answer = getp().COMMAND5 #gr_request = STAMP + ':GR#' + END #return bytes(stamp + "15:01:48#" + TERMINATOR, "utf-8") elif command == ':GR#': answer = "25:01:49" #gd_request = STAMP + ':GD#' + END #if request == bytes(gd_request, "utf-8"): return bytes(STAMP + "+12:28#", "utf-8") #elif useful_request == 'GD': answer = "+12:28" elif command == ':GD#': answer = "+22:29" elif command == ':SG+00#': answer = "1" elif command == ':GG#': answer = "+00" elif command_start == 'Gv': answer = 'T' elif command_start in ('GC','GL', 'Gg', 'Gt'): answer = Memo.get(command_start[1]) # Gemini telescope replaces "*" with ":" if command_start in ('Gg','Gt'): answer = answer.replace('*',':') else: # Remove ending '#' command = command[0:-1] if command_start in ('SC', 'SL', 'Sg', 'St'): Memo.set(command_start[1],command[3:]) if command[0] == ':': command = command[1:] answer = command.upper() full_answer_in_bytes = bytes(stamp + answer + '#' + getp().TERMINATOR, "utf-8") #print("request str upper is", str(request).upper()) print("(SIM) Answer sent is", full_answer_in_bytes) return full_answer_in_bytes def get_SocketServer_UDP_TCP_sbig(myhost:str="localhost", myport:int=11112, PROTOCOL:str="TCP"): #return get_SocketServer_UDP_TCP(myhost, myport, PROTOCOL) return get_SocketServer_UDP_TCP(myhost, myport, PROTOCOL, make_answer_for_request)