gemini_simulator.py 5.52 KB
#!/usr/bin/env python3

import sys

#from device_controller.concrete_component.sbig.server_udp_or_tcp_sbig import get_SocketServer_UDP_TCP_sbig
from device_controller.abstract_component.device_simulator import DeviceSimulator, UnknownCommandException
#HOST = "localhost"


#from device_controller.concrete_component.device_simulator_common.server_udp_or_tcp import get_SocketServer_UDP_TCP

# Only if we override the get_server() method to change the server type
from device_controller.channels.server_udp_or_tcp import get_SocketServer_UDP_TCP



###STAMP = '01000000'
#STAMP = '0100000000000000'
#HOST, PORT = "localhost", 11110
#HOST, PORT = "localhost", 9999
#HOST, PORT = "localhost", 20001
'''
# stamp is 8 digits long
STAMP_LENGTH = 8

# COMMON CONSTANTS WITH CLIENT
TERMINATOR = '\x00'
COMMAND5 = '050000'
#COMMAND6_SIMPLE = '6'
COMMAND6 = '\x00\x06\x00'
'''

class Memo:
    
    @classmethod
    def get(cls, var):
        return cls._variables[var]
    
    @classmethod
    def set(cls, var, value):
        cls._variables[var] = value
    
    _variables = {
        "C" : '10/10/20',
        "L" : '10:20:37',
        "g" : '+11',
        "t" : '+45:00:01',
    }


# Voir https://stackoverflow.com/questions/10085996/shutdown-socketserver-serve-forever-in-one-thread-python-application

#class DeviceSimulatorTelescopeGemini(DeviceSimulator):
class DS_Gemini(DeviceSimulator):
    #with socketserver_type((HOST, PORT), MyUDPorTCPHandler_classic) as myserver:

    '''
    ##
    # @override
    @classmethod
    def serve_forever(cls, PORT):
        super().serve_forever(PORT, cls.get_SocketServer_UDP_TCP_gemini)
        #super().serve_forever(PORT, cls.get_Server)
    '''

    # @override superclass method (if named make_answer_for_request)
    @classmethod
    #def make_answer_for_request(cls, request_bytes:bytes): 
    def make_answer_for_request_OTHER(cls, request_bytes:bytes): 
        #if request == STAMP + ":GD#": return STAMP + "+12:28"
        #if request == b'0100000000000000:GD#': return bytes(STAMP + "+12:28", "utf-8")
        print("(SIM Gemini) Request received is", request_bytes)
    
        '''
        3 types of commands:
        - TYPE1: '06' or '050000'
        - TYPE2: ':cde#' (mainly ':??#' - i.e : ':GD#', ':GR#', ...)
        - TYPE3: 'b?#' (bC# = Cold Start, bW# = selecting Warm Start, bR# = selecting Warm Restart)
        '''
        
        # Convert to string
        request = request_bytes.decode("utf-8")
        if len(request) < cls.protoc.STAMP_LENGTH+2+1: raise UnknownCommandException(request)
        
        # Remove TERMINATOR
        request = request[0:-1]
        
        # Remove leading stamp
        stamp = request[0:cls.protoc.STAMP_LENGTH]
        command = request[cls.protoc.STAMP_LENGTH:]
        print("(SIM Gemini) Command received is", repr(command))
        
        # TYPE1
        if command not in (cls.protoc.COMMAND5, cls.protoc.COMMAND6):
            # TYPE2 or 3
            if len(command) < 3: raise UnknownCommandException()
            if not (command[-1]=='#'): raise UnknownCommandException()
            if not (command[0]==':') and command not in ('bC#','bW#','bR#'): raise UnknownCommandException()
        
        command_start = command[1:3]
        if command == cls.protoc.COMMAND6: answer = "G"
        elif command == cls.protoc.COMMAND5: answer = cls.protoc.COMMAND5
        
        #gr_request = STAMP + ':GR#' + END
        #return bytes(stamp + "15:01:48#" + TERMINATOR, "utf-8")
        elif command == ':GR#': answer = "15:01:50"
        
        #gd_request = STAMP + ':GD#' + END
        #if request == bytes(gd_request, "utf-8"): return bytes(STAMP + "+12:28#", "utf-8")
        #elif useful_request == 'GD': answer = "+12:28"
        elif command == ':GD#': answer = "+12:30"
        elif command == ':SG+00#': answer = "1"
        elif command == ':GG#': answer = "+00"
        elif command_start == 'Gv': answer = 'T'
        elif command_start in ('GC','GL', 'Gg', 'Gt'):
            answer = Memo.get(command_start[1])
            ##answer = getc().get_answer_for_native_command(command_start)
            ##print("****** (SIM Gemini) answer for native cmd", command_start, "is:", getc().get_simulated_answer_for_native_cmd(command_start))
            print("****** (SIM Gemini) answer for native cmd", command_start, "is:", cls.get_simulated_answer_for_native_cmd(command_start))
            # Gemini telescope replaces "*" with ":"
            if command_start in ('Gg','Gt'): answer = answer.replace('*',':')
        else:
            # Remove ending '#'
            command = command[0:-1]
            ##if command_start in ('SC', 'SL', 'Sg', 'St'): Memo.set(command_start[1],command[3:])
            if command_start in ('SC', 'SL', 'Sg', 'St'): Memo.set(command_start[1],command[3:])
            if command[0] == ':': command = command[1:]  
            answer = command.upper()

        full_answer_in_bytes = bytes(stamp + answer + '#' + cls.protoc.TERMINATOR, "utf-8")
        #print("request str upper is", str(request).upper())
        print("(SIM Gemini) Answer sent is", full_answer_in_bytes)
        return full_answer_in_bytes


    # @override superclass to change the simulator server channel type (socket, serial, ...)
    @classmethod
    def get_server(cls, myhost:str="localhost", myport:int=11110, PROTOCOL:str="TCP"):
        print("\n****************** GEMINI get_server() ********************************\n")
        return get_SocketServer_UDP_TCP(myhost, myport, PROTOCOL, cls.make_answer_for_request)


'''
def cls.protoc: return DS_Gemini.protoc
def getc(): return DS_Gemini.gen2nat_cmds
'''