#!/usr/bin/env python3 import sys #from device_controller.abstract_component.sbig.server_udp_or_tcp_sbig import get_SocketServer_UDP_TCP_sbig from device_controller.channels.server_udp_or_tcp import get_SocketServer_UDP_TCP from device_controller.abstract_component.device_controller import printd, DeviceController HOST = "localhost" class UnknownCommandException(Exception): pass # Voir https://stackoverflow.com/questions/10085996/shutdown-socketserver-serve-forever-in-one-thread-python-application # Abstract (static) class class DeviceSimulator: #with socketserver_type((HOST, PORT), MyUDPorTCPHandler_classic) as myserver: # class attributes myserver = None #protoc = None #gen2nat_cmds = None # My associated Device Controller my_dc = None # This method will be called by the DC when launching its simulator @classmethod def set_dc(cls, dc:DeviceController): cls.my_dc = dc cls.protoc = dc.getp() ''' # This method will be called by the DC when launching its simulator @classmethod def set_protoc_and_cmds(cls, protoc, gen2nat_cmds, DEBUG=False): # (EP) This is a bit ugly but I had to do this... # Set protoc and gen2nat_cmds class attributes for class DeviceSimulator #'' DeviceSimulator.protoc = protoc DeviceSimulator.gen2nat_cmds = gen2nat_cmds #'' # Idem for subclasses (DS_Gemini, DS_SBIG, ...) class attributes cls.protoc = protoc cls.gen2nat_cmds = gen2nat_cmds printd("****** (SIM) my cmds are:", cls.gen2nat_cmds, "******") ''' ''' @classmethod def getp(cls): return cls.my_dc.getp() ''' ''' @classmethod def getc(cls): return cls.gen2nat_cmds ''' # shortcut @classmethod def get_simulated_answer_for_native_cmd(cls, command_start): return cls.gen2nat_cmds.get_simulated_answer_for_generic_cmd(command_start) @classmethod ##def serve_forever(cls, PORT, func_server = get_SocketServer_UDP_TCP): def serve_forever(cls, PORT): #def serve_forever(cls, PORT, PROTOCOL): #with get_SocketServer_UDP_TCP(HOST, PORT, "UDP") as myserver: myserver.serve_forever() ##cls.myserver = func_server(HOST, PORT, "UDP") #cls.myserver = cls.get_SocketServer_UDP_TCP_gemini(HOST, PORT, "UDP") cls.myserver = cls.get_server(HOST, PORT, "UDP") #cls.myserver = cls.get_Server(HOST, PORT, PROTOCOL) printd("******** myserver START: *********", cls.myserver) try: # Handle requests in an infinite loop. Runs until the loop is broken with an exception cls.myserver.serve_forever() except KeyboardInterrupt: pass printd("******** myserver SHUTDOWN: *********", cls.myserver) cls.myserver.server_close() cls.myserver.shutdown() @classmethod def stop(cls): printd("******** myserver: *********", cls.myserver) cls.myserver.shutdown() # To be overriden by subclasses to change the simulator server channel type (socket, serial, ...) # DEFAULT server type @classmethod def get_server(cls, myhost:str="localhost", myport:int=11110, PROTOCOL:str="TCP"): printd("\n****************** ORIG get_server() ********************************\n") return get_SocketServer_UDP_TCP(myhost, myport, PROTOCOL, cls.make_answer_for_request) ''' @classmethod def get_Server(cls, myhost:str="localhost", myport:int=11110, PROTOCOL:str="TCP"): # Default simulator using default make_answer_for_request() function #return get_SocketServer_UDP_TCP(myhost, myport, PROTOCOL) # Specific simulator using the make_answer_for_request() function from this module return get_SocketServer_UDP_TCP(myhost, myport, PROTOCOL, cls.make_answer_for_request) ''' # DEFAULT simulator answers = send cmd name in upper case @classmethod def make_answer_for_request_CMD_TO_UPPER(cls, request_bytes): #raise NotImplementedError printd("(SIM serv) Request received is", request_bytes) # Convert to string request = request_bytes.decode("utf-8") #if len(request) < STAMP_LENGTH+2+1: raise UnknownCommandException(request) # Remove TERMINATOR #request = request[0:-1] # Remove leading stamp #stamp = request[0:STAMP_LENGTH] #command = request[STAMP_LENGTH:] command = request printd("(SIM serv) Command received is", repr(command)) answer = 'ANSWER TO '+command.upper() #if command == ':GR#': answer = "15:01:49" #elif command == ':GD#': answer = "+12:29" #elif command == ':SG+00#': answer = "1" #elif command == ':GG#': answer = "+00" #full_answer_in_bytes = bytes(stamp + answer + '#' + TERMINATOR, "utf-8") full_answer_in_bytes = bytes(answer, "utf-8") #printd("request str upper is", str(request).upper()) printd("(SIM serv) Answer sent is", full_answer_in_bytes) return full_answer_in_bytes # DEFAULT simulator answers @classmethod def make_answer_for_request_DEFAULT_A_VIRER(cls, request_bytes): #raise NotImplementedError printd("(SIM serv) Request received is", request_bytes) # Convert to string request = request_bytes.decode("utf-8") #if len(request) < STAMP_LENGTH+2+1: raise UnknownCommandException(request) # Remove TERMINATOR #request = request[0:-1] # Remove leading stamp #stamp = request[0:STAMP_LENGTH] #command = request[STAMP_LENGTH:] command = request printd("(SIM serv) Command received is", repr(command)) command_start = command[1:3] answer = cls.get_simulated_answer_for_native_cmd(command_start) if answer is None: answer='EMPTY '+command printd("****** (SIM) answer for native cmd", command_start, "is:", answer) #if command == ':GR#': answer = "15:01:49" #elif command == ':GD#': answer = "+12:29" #elif command == ':SG+00#': answer = "1" #elif command == ':GG#': answer = "+00" #full_answer_in_bytes = bytes(stamp + answer + '#' + TERMINATOR, "utf-8") full_answer_in_bytes = bytes(answer, "utf-8") #printd("request str upper is", str(request).upper()) printd("(SIM serv) Answer sent is", full_answer_in_bytes) return full_answer_in_bytes # DEFAULT simulator answers @classmethod def make_answer_for_request_DEFAULT(cls, request_bytes:bytes): #if request == STAMP + ":GD#": return STAMP + "+12:28" #if request == b'0100000000000000:GD#': return bytes(STAMP + "+12:28", "utf-8") printd("(SIM Gemini) Request received is", request_bytes) ''' 3 types of commands: - TYPE1: '06' or '050000' - TYPE2: ':cde#' (mainly ':??#' - i.e : ':GD#', ':GR#', ...) - TYPE3: 'b?#' (bC# = Cold Start, bW# = selecting Warm Start, bR# = selecting Warm Restart) ''' # Convert to string request = request_bytes.decode("utf-8") if len(request) < cls.protoc.STAMP_LENGTH+2+1: raise UnknownCommandException(request) # Remove TERMINATOR request = request[0:-1] # Remove leading stamp stamp = request[0:cls.protoc.STAMP_LENGTH] command = request[cls.protoc.STAMP_LENGTH:] printd("(SIM Gemini) Command received is", repr(command)) # TYPE1 if command not in (cls.protoc.COMMAND5, cls.protoc.COMMAND6): # TYPE2 or 3 ''' if len(command) < 3: raise UnknownCommandException() if not (command[-1]=='#'): raise UnknownCommandException() if not (command[0]==':') and command not in ('bC#','bW#','bR#'): raise UnknownCommandException() ''' if ( (len(command) < 3) or (not command[-1]=='#') or ( not (command[0]==':') and command not in ('bC#','bW#','bR#') ) ): raise UnknownCommandException() # TODO: GEMINI SPECIFIC => move from this default method if command in (cls.protoc.COMMAND6, cls.protoc.COMMAND5): if command == cls.protoc.COMMAND6: answer = "G" elif command == cls.protoc.COMMAND5: answer = cls.protoc.COMMAND5 else: command_start = command[1:3] # If "set_xxx" command, update related "get_xxx" command simulated answer in the commands dictionary #if command_start in ('SC', 'SL', 'Sg', 'St'): if command_start[0] == 'S': #Memo.set(command_start[1],command[3:]) #related_get_cmd_name = 'G'+ command_start[1] related_get_cmd_name = cls.my_dc.get_related_native_get_cmd_name_for_set_cmd(command_start) #printd('(SIM) related_get_cmd_name:', related_get_cmd_name) if related_get_cmd_name: cls.my_dc.set_simulated_answer_for_native_get_cmd(related_get_cmd_name, command) ##answer = cls.get_simulated_answer_for_native_cmd(command_start) answer = cls.my_dc.get_simulated_answer_for_native_cmd(command_start) if answer is None: answer='NO ANSWER IMPLEMENTED FOR COMMAND '+command printd("****** (SIM) answer for native cmd", command_start, "is:", answer) ''' if command == cls.protoc.COMMAND6: answer = "G" elif command == cls.protoc.COMMAND5: answer = cls.protoc.COMMAND5 #gr_request = STAMP + ':GR#' + END #return bytes(stamp + "15:01:48#" + TERMINATOR, "utf-8") elif command == ':GR#': answer = "15:01:49" #gd_request = STAMP + ':GD#' + END #if request == bytes(gd_request, "utf-8"): return bytes(STAMP + "+12:28#", "utf-8") #elif useful_request == 'GD': answer = "+12:28" elif command == ':GD#': answer = "+12:29" elif command == ':SG+00#': answer = "1" elif command == ':GG#': answer = "+00" elif command_start == 'Gv': answer = 'T' elif command_start in ('GC','GL', 'Gg', 'Gt'): answer = Memo.get(command_start[1]) ##answer = getc().get_answer_for_native_command(command_start) ##printd("****** (SIM Gemini) answer for native cmd", command_start, "is:", getc().get_simulated_answer_for_native_cmd(command_start)) printd("****** (SIM Gemini) answer for native cmd", command_start, "is:", cls.get_simulated_answer_for_native_cmd(command_start)) # Gemini telescope replaces "*" with ":" if command_start in ('Gg','Gt'): answer = answer.replace('*',':') else: # Remove ending '#' command = command[0:-1] ##if command_start in ('SC', 'SL', 'Sg', 'St'): Memo.set(command_start[1],command[3:]) if command_start in ('SC', 'SL', 'Sg', 'St'): Memo.set(command_start[1],command[3:]) if command[0] == ':': command = command[1:] answer = command.upper() ''' full_answer_in_bytes = bytes(stamp + answer + '#' + cls.protoc.TERMINATOR, "utf-8") #printd("request str upper is", str(request).upper()) printd("(SIM Gemini) Answer sent is", full_answer_in_bytes) return full_answer_in_bytes # To be overriden by subclasses to change the simulator answers # DEFAULT simulator answers @classmethod def make_answer_for_request(cls, request_bytes): return cls.make_answer_for_request_DEFAULT(request_bytes) ''' def getp(): return DeviceSimulator.getp() def getc(): return DeviceSimulator.getc() ''' ''' def getp(): return DeviceSimulator.protoc def getc(): return DeviceSimulator.gen2nat_cmds '''