sbig_simulator.py 5.18 KB
#!/usr/bin/env python3

import sys

from device_controller.abstract_component.device_simulator import printd, DeviceSimulator, UnknownCommandException

# Only if we override the get_server() method to change the server type
from device_controller.channels.server_udp_or_tcp import get_SocketServer_UDP_TCP



class DS_SBIG(DeviceSimulator):

    #_protoc = None
    #_gen2nat_cmds = {}

    '''
    # @override
    @classmethod
    def serve_forever(cls, PORT):
        super().serve_forever(PORT, get_SocketServer_UDP_TCP_sbig)
    '''

    '''
    @classmethod
    def set_protoc_and_cmds(cls, protoc, gen2nat_cmds, DEBUG=False):
        cls._protoc = protoc
        cls._gen2nat_cmds = gen2nat_cmds
        printd("****** (SIM SBIG) my cmds are:", cls._gen2nat_cmds, "******")
    '''

    '''
    def __init__(self, protoc, gen2nat_cmds, DEBUG=False):
        self._protoc = protoc
        self._gen2nat_cmds = gen2nat_cmds
        printd("****** (SIM SBIG) my cmds are:", self._gen2nat_cmds, "******")

    def serve_forever(self, PORT):
        super().serve_forever(PORT, get_SocketServer_UDP_TCP_sbig)
    '''


    # @override superclass method (if named make_answer_for_request)
    @classmethod
    #def make_answer_for_request(cls, request_bytes:bytes): 
    def make_answer_for_request_OTHER(cls, request_bytes:bytes):
        #if request == STAMP + ":GD#": return STAMP + "+12:28"
        #if request == b'0100000000000000:GD#': return bytes(STAMP + "+12:28", "utf-8")
        printd("(SIM SBIG) Request received is", request_bytes)
    
        '''
        3 types of commands:
        - TYPE1: '06' or '050000'
        - TYPE2: ':cde#' (mainly ':??#' - i.e : ':GD#', ':GR#', ...)
        - TYPE3: 'b?#' (bC# = Cold Start, bW# = selecting Warm Start, bR# = selecting Warm Restart)
        '''
        
        # Convert to string
        request = request_bytes.decode("utf-8")
        if len(request) < cls.protoc.STAMP_LENGTH+2+1: raise UnknownCommandException(request)
        
        # Remove TERMINATOR
        request = request[0:-1]
        
        # Remove leading stamp
        stamp = request[0:cls.protoc.STAMP_LENGTH]
        command = request[cls.protoc.STAMP_LENGTH:]
        printd("(SIM SBIG) Command received is", repr(command))
        
        # TYPE1
        if command not in (cls.protoc.COMMAND5, cls.protoc.COMMAND6):
            # TYPE2 or 3
            if len(command) < 3: raise UnknownCommandException()
            if not (command[-1]=='#'): raise UnknownCommandException()
            if not (command[0]==':') and command not in ('bC#','bW#','bR#'): raise UnknownCommandException()
        
        command_start = command[1:3]
        if command == cls.protoc.COMMAND6: answer = "G"
        elif command == cls.protoc.COMMAND5: answer = cls.protoc.COMMAND5
        
        #gr_request = STAMP + ':GR#' + END
        #return bytes(stamp + "15:01:48#" + TERMINATOR, "utf-8")
        elif command == ':GR#': answer = "25:01:49"
        
        #gd_request = STAMP + ':GD#' + END
        #if request == bytes(gd_request, "utf-8"): return bytes(STAMP + "+12:28#", "utf-8")
        #elif useful_request == 'GD': answer = "+12:28"
        elif command == ':GD#': answer = "+22:29"
        elif command == ':SG+00#': answer = "1"
        elif command == ':GG#': answer = "+00"
        elif command_start == 'Gv': answer = 'T'
        elif command_start in ('GC','GL', 'Gg', 'Gt'):
            answer = Memo.get(command_start[1])
            ##answer = getc().get_answer_for_native_command(command_start)
            printd("****** (SIM SBIG) answer for native cmd", command_start, "is:", cls.get_simulated_answer_for_native_cmd(command_start))
            # Gemini telescope replaces "*" with ":"
            if command_start in ('Gg','Gt'): answer = answer.replace('*',':')
        else:
            # Remove ending '#'
            command = command[0:-1]
            ##if command_start in ('SC', 'SL', 'Sg', 'St'): Memo.set(command_start[1],command[3:])
            if command_start in ('SC', 'SL', 'Sg', 'St'): Memo.set(command_start[1],command[3:])
            if command[0] == ':': command = command[1:]  
            answer = command.upper()

        full_answer_in_bytes = bytes(stamp + answer + '#' + cls.protoc.TERMINATOR, "utf-8")
        #printd("request str upper is", str(request).upper())
        printd("(SIM SBIG) Answer sent is", full_answer_in_bytes)
        return full_answer_in_bytes


    # @override superclass to change the simulator server channel type (socket, serial, ...)
    @classmethod
    def get_server(cls, myhost:str="localhost", myport:int=11112, PROTOCOL:str="TCP"):
        printd("\n****************** SBIG get_server() ********************************\n")
        return get_SocketServer_UDP_TCP(myhost, myport, PROTOCOL, cls.make_answer_for_request)

'''
def get_SocketServer_UDP_TCP_sbig(myhost:str="localhost", myport:int=11112, PROTOCOL:str="TCP"):
    #return get_SocketServer_UDP_TCP(myhost, myport, PROTOCOL)
    return get_SocketServer_UDP_TCP(myhost, myport, PROTOCOL, make_answer_for_request)
'''


# SHORTCUTS
#_protoc = cls.protoc
#_gen2nat_cmds = DS_SBIG._gen2nat_cmds



'''
@property
def protoc(): return cls.protoc
'''

'''
def cls.protoc: return DS_SBIG._protoc
def getc(): return DS_SBIG._gen2nat_cmds
'''