device_simulator.py 11.7 KB
#!/usr/bin/env python3

import sys

#from device_controller.abstract_component.sbig.server_udp_or_tcp_sbig import get_SocketServer_UDP_TCP_sbig
from device_controller.channels.server_udp_or_tcp import get_SocketServer_UDP_TCP
from device_controller.abstract_component.device_controller import printd, DeviceController

HOST = "localhost"


class UnknownCommandException(Exception):
    pass

# Voir https://stackoverflow.com/questions/10085996/shutdown-socketserver-serve-forever-in-one-thread-python-application


# Abstract (static) class
class DeviceSimulator:
    #with socketserver_type((HOST, PORT), MyUDPorTCPHandler_classic) as myserver:

    # class attributes
    myserver = None
    #protoc = None
    #gen2nat_cmds = None
    # My associated Device Controller
    my_dc = None

    # This method will be called by the DC when launching its simulator
    @classmethod
    def set_dc(cls, dc:DeviceController):
        cls.my_dc = dc
        cls.protoc = dc.getp()

    '''
    # This method will be called by the DC when launching its simulator
    @classmethod
    def set_protoc_and_cmds(cls, protoc, gen2nat_cmds, DEBUG=False):
        # (EP) This is a bit ugly but I had to do this...
        # Set protoc and gen2nat_cmds class attributes for class DeviceSimulator
        #''
        DeviceSimulator.protoc = protoc
        DeviceSimulator.gen2nat_cmds = gen2nat_cmds
        #''
        # Idem for subclasses (DS_Gemini, DS_SBIG, ...) class attributes
        cls.protoc = protoc
        cls.gen2nat_cmds = gen2nat_cmds
        printd("****** (SIM) my cmds are:", cls.gen2nat_cmds, "******")
    '''

    '''
    @classmethod
    def getp(cls): return cls.my_dc.getp()
    '''
    '''
    @classmethod
    def getc(cls): return cls.gen2nat_cmds
    '''

    # shortcut
    @classmethod
    def get_simulated_answer_for_native_cmd(cls, command_start):
        return cls.gen2nat_cmds.get_simulated_answer_for_generic_cmd(command_start)

    @classmethod
    ##def serve_forever(cls, PORT, func_server = get_SocketServer_UDP_TCP):
    def serve_forever(cls, PORT):
    #def serve_forever(cls, PORT, PROTOCOL):
        #with get_SocketServer_UDP_TCP(HOST, PORT, "UDP") as myserver: myserver.serve_forever()
        ##cls.myserver = func_server(HOST, PORT, "UDP")
        #cls.myserver = cls.get_SocketServer_UDP_TCP_gemini(HOST, PORT, "UDP")
        cls.myserver = cls.get_server(HOST, PORT, "UDP")
        #cls.myserver = cls.get_Server(HOST, PORT, PROTOCOL)
        printd("******** myserver START: *********", cls.myserver)
        try:
            # Handle requests in an infinite loop. Runs until the loop is broken with an exception
            cls.myserver.serve_forever()
        except KeyboardInterrupt:
            pass
        printd("******** myserver SHUTDOWN: *********", cls.myserver)
        cls.myserver.server_close()
        cls.myserver.shutdown()


    @classmethod
    def stop(cls):
        printd("******** myserver: *********", cls.myserver)
        cls.myserver.shutdown()


    # To be overriden by subclasses to change the simulator server channel type (socket, serial, ...)
    # DEFAULT server type
    @classmethod
    def get_server(cls, myhost:str="localhost", myport:int=11110, PROTOCOL:str="TCP"):
        printd("\n****************** ORIG get_server() ********************************\n")
        return get_SocketServer_UDP_TCP(myhost, myport, PROTOCOL, cls.make_answer_for_request)
    '''
    @classmethod
    def get_Server(cls, myhost:str="localhost", myport:int=11110, PROTOCOL:str="TCP"):
        # Default simulator using default make_answer_for_request() function
        #return get_SocketServer_UDP_TCP(myhost, myport, PROTOCOL)
        # Specific simulator using the make_answer_for_request() function from this module
        return get_SocketServer_UDP_TCP(myhost, myport, PROTOCOL, cls.make_answer_for_request)
    '''

    # DEFAULT simulator answers = send cmd name in upper case
    @classmethod
    def make_answer_for_request_CMD_TO_UPPER(cls, request_bytes):
        #raise NotImplementedError
        printd("(SIM serv) Request received is", request_bytes)
    
        # Convert to string
        request = request_bytes.decode("utf-8")
        #if len(request) < STAMP_LENGTH+2+1: raise UnknownCommandException(request)
    
        # Remove TERMINATOR
        #request = request[0:-1]
    
        # Remove leading stamp
        #stamp = request[0:STAMP_LENGTH]
        #command = request[STAMP_LENGTH:]
        command = request
        printd("(SIM serv) Command received is", repr(command))
    
        answer = 'ANSWER TO '+command.upper()
        #if command == ':GR#': answer = "15:01:49"
        #elif command == ':GD#': answer = "+12:29"
        #elif command == ':SG+00#': answer = "1"
        #elif command == ':GG#': answer = "+00"
    
        #full_answer_in_bytes = bytes(stamp + answer + '#' + TERMINATOR, "utf-8")
        full_answer_in_bytes = bytes(answer, "utf-8")
        #printd("request str upper is", str(request).upper())
        printd("(SIM serv) Answer sent is", full_answer_in_bytes)
        return full_answer_in_bytes

    # DEFAULT simulator answers
    @classmethod
    def make_answer_for_request_DEFAULT_A_VIRER(cls, request_bytes):
        #raise NotImplementedError
        printd("(SIM serv) Request received is", request_bytes)
    
        # Convert to string
        request = request_bytes.decode("utf-8")
        #if len(request) < STAMP_LENGTH+2+1: raise UnknownCommandException(request)
    
        # Remove TERMINATOR
        #request = request[0:-1]
    
        # Remove leading stamp
        #stamp = request[0:STAMP_LENGTH]
        #command = request[STAMP_LENGTH:]
        command = request
        printd("(SIM serv) Command received is", repr(command))
    
        command_start = command[1:3]
        answer = cls.get_simulated_answer_for_native_cmd(command_start)
        if answer is None: answer='EMPTY '+command
        printd("****** (SIM) answer for native cmd", command_start, "is:", answer)
    
        #if command == ':GR#': answer = "15:01:49"
        #elif command == ':GD#': answer = "+12:29"
        #elif command == ':SG+00#': answer = "1"
        #elif command == ':GG#': answer = "+00"
    
        #full_answer_in_bytes = bytes(stamp + answer + '#' + TERMINATOR, "utf-8")
        full_answer_in_bytes = bytes(answer, "utf-8")
        #printd("request str upper is", str(request).upper())
        printd("(SIM serv) Answer sent is", full_answer_in_bytes)
        return full_answer_in_bytes

    # DEFAULT simulator answers
    @classmethod
    def make_answer_for_request_DEFAULT(cls, request_bytes:bytes): 
        #if request == STAMP + ":GD#": return STAMP + "+12:28"
        #if request == b'0100000000000000:GD#': return bytes(STAMP + "+12:28", "utf-8")
        printd("(SIM Gemini) Request received is", request_bytes)
    
        '''
        3 types of commands:
        - TYPE1: '06' or '050000'
        - TYPE2: ':cde#' (mainly ':??#' - i.e : ':GD#', ':GR#', ...)
        - TYPE3: 'b?#' (bC# = Cold Start, bW# = selecting Warm Start, bR# = selecting Warm Restart)
        '''
        
        # Convert to string
        request = request_bytes.decode("utf-8")
        if len(request) < cls.protoc.STAMP_LENGTH+2+1: raise UnknownCommandException(request)
        
        # Remove TERMINATOR
        request = request[0:-1]
        
        # Remove leading stamp
        stamp = request[0:cls.protoc.STAMP_LENGTH]
        command = request[cls.protoc.STAMP_LENGTH:]
        printd("(SIM Gemini) Command received is", repr(command))

        # TYPE1
        if command not in (cls.protoc.COMMAND5, cls.protoc.COMMAND6):
            # TYPE2 or 3
            '''
            if len(command) < 3: raise UnknownCommandException()
            if not (command[-1]=='#'): raise UnknownCommandException()
            if not (command[0]==':') and command not in ('bC#','bW#','bR#'): raise UnknownCommandException()
            '''        
            if (
                (len(command) < 3)
                or
                (not command[-1]=='#') 
                or (
                not (command[0]==':') and command not in ('bC#','bW#','bR#')
                )
            ):
                raise UnknownCommandException()


        # TODO: GEMINI SPECIFIC => move from this default method
        if command in (cls.protoc.COMMAND6, cls.protoc.COMMAND5):
            if command == cls.protoc.COMMAND6: answer = "G"
            elif command == cls.protoc.COMMAND5: answer = cls.protoc.COMMAND5
        else:
            command_start = command[1:3]
            # If "set_xxx" command, update related "get_xxx" command simulated answer in the commands dictionary
            #if command_start in ('SC', 'SL', 'Sg', 'St'):
            if command_start[0] == 'S':
                #Memo.set(command_start[1],command[3:])
                #related_get_cmd_name = 'G'+ command_start[1]
                related_get_cmd_name = cls.my_dc.get_related_native_get_cmd_name_for_set_cmd(command_start)
                #printd('(SIM) related_get_cmd_name:', related_get_cmd_name)
                if related_get_cmd_name:
                    cls.my_dc.set_simulated_answer_for_native_get_cmd(related_get_cmd_name, command)
            ##answer = cls.get_simulated_answer_for_native_cmd(command_start)
            answer = cls.my_dc.get_simulated_answer_for_native_cmd(command_start)
            if answer is None: answer='NO ANSWER IMPLEMENTED FOR COMMAND '+command
            printd("****** (SIM) answer for native cmd", command_start, "is:", answer)

        '''
        if command == cls.protoc.COMMAND6: answer = "G"
        elif command == cls.protoc.COMMAND5: answer = cls.protoc.COMMAND5
        
        #gr_request = STAMP + ':GR#' + END
        #return bytes(stamp + "15:01:48#" + TERMINATOR, "utf-8")
        elif command == ':GR#': answer = "15:01:49"
        
        #gd_request = STAMP + ':GD#' + END
        #if request == bytes(gd_request, "utf-8"): return bytes(STAMP + "+12:28#", "utf-8")
        #elif useful_request == 'GD': answer = "+12:28"
        elif command == ':GD#': answer = "+12:29"
        elif command == ':SG+00#': answer = "1"
        elif command == ':GG#': answer = "+00"
        elif command_start == 'Gv': answer = 'T'
        elif command_start in ('GC','GL', 'Gg', 'Gt'):
            answer = Memo.get(command_start[1])
            ##answer = getc().get_answer_for_native_command(command_start)
            ##printd("****** (SIM Gemini) answer for native cmd", command_start, "is:", getc().get_simulated_answer_for_native_cmd(command_start))
            printd("****** (SIM Gemini) answer for native cmd", command_start, "is:", cls.get_simulated_answer_for_native_cmd(command_start))
            # Gemini telescope replaces "*" with ":"
            if command_start in ('Gg','Gt'): answer = answer.replace('*',':')
        else:
            # Remove ending '#'
            command = command[0:-1]
            ##if command_start in ('SC', 'SL', 'Sg', 'St'): Memo.set(command_start[1],command[3:])
            if command_start in ('SC', 'SL', 'Sg', 'St'): Memo.set(command_start[1],command[3:])
            if command[0] == ':': command = command[1:]  
            answer = command.upper()
        '''

        full_answer_in_bytes = bytes(stamp + answer + '#' + cls.protoc.TERMINATOR, "utf-8")
        #printd("request str upper is", str(request).upper())
        printd("(SIM Gemini) Answer sent is", full_answer_in_bytes)
        return full_answer_in_bytes


    # To be overriden by subclasses to change the simulator answers
    # DEFAULT simulator answers
    @classmethod
    def make_answer_for_request(cls, request_bytes): return cls.make_answer_for_request_DEFAULT(request_bytes)





'''
def getp(): return DeviceSimulator.getp()
def getc(): return DeviceSimulator.getc()
'''

'''
def getp(): return DeviceSimulator.protoc
def getc(): return DeviceSimulator.gen2nat_cmds
'''