sbig_controller.py 11.9 KB
#!/usr/bin/env python3

"""Socket Client Gemini Telescope implementation
To be used as a concrete class to system control a Gemini telescope
"""

# Standard library imports
import sys
import time

# Third party imports
# None

# Local application imports
sys.path.append('../../..')

# My parent class and exceptions
from device_controller.abstract_component.device_controller import printd, DeviceController, UnknownNativeCmdException, UnknownGenericCmdArgException

# My DC components:
from device_controller.abstract_component.filter_selector import DC_FilterSelector
from device_controller.abstract_component.detector_sensor import DC_DetectorSensor
from device_controller.abstract_component.detector_shutter import DC_DetectorShutter

# My simulator
from device_controller.concrete_component.sbig.sbig_simulator import DS_SBIG 


MY_DEVICE_CHANNEL_BUFFER_SIZE = 1024


class DC_SBIG(DeviceController):
    '''
    # Components (list of my capabilities or roles):
    DeviceControllerDetectorSensor,
    DeviceControllerFilterSelector,
    DeviceControllerDetectorShutter):
    '''


    # Gemini communication protocol
    # @override
    class Protocol:

        # Default timeouts
        TIMEOUT_SEND = 10
        TIMEOUT_RECEIVE = 10

        # COMMON CONSTANTS WITH SERVER
        TERMINATOR = '\x00'
        COMMAND5 = '050000'
        COMMAND6 = '\x00\x06\x00'
        COMMAND6_SIMPLE = '6'

        # STAMP :
        # Initialize request number (will be increased at each request)
        request_num = 0
        # Request number stands on 4 digits (from 0001 to 9999)
        request_num_nb_digits = 4
        # For 4 digits, we should get the format '{:04d}':
        request_num_format = '{:0'+str(request_num_nb_digits)+'d}'
        # Stamp filler with 0 (stamp is 8 digits long)
        STAMP_LENGTH = 8
        STAMP_FILLER = '0' * (STAMP_LENGTH - request_num_nb_digits)

        # @override
        @classmethod
        def formated_cmd(cls, cmd:str, values_to_set:str=None)->str:
            if values_to_set != None: 
                for value_to_set in values_to_set:
                    cmd += value_to_set
            if cmd not in (cls.COMMAND6, cls.COMMAND5):
                cmd += '#' 
                if cmd not in ('bC#','bW#','bR#'): 
                    cmd=':'+cmd
            return cmd

        # @override
        #def encapsulate_data_to_send(self, command:str):
        #def encap_data_to_send(self, command:str):
        @classmethod
        def encap(cls, command:str):
            r''' Encapsulate useful data to be ready for sending
            
            If data is "complete" (with stamp, and well formatted), send it as is
            Otherwise, add stamp and format it
    
            3 types of commands:
            - TYPE1: '06' or '050000'
            - TYPE2: ':cde#' (mainly ':??#' - i.e : ':GD#', ':GR#', ...)
            - TYPE3: 'b?#' (bC# = Cold Start, bW# = selecting Warm Start, bR# = selecting Warm Restart)
            
            :Examples:
            >>> tele = DC_Gemini("localhost", 11110, DEBUG=False)
            Starting device simulator on (host:port):  localhost:11110
            >>> tele.encap(':GD#')
            '00010000:GD#\x00'
            >>> tele.encap(':GR#')
            '00020000:GR#\x00'
            >>> tele.close()
    
            # ne marche pas => '00010000:GD#\x00'
            '''
    
            ####return bytes(self.MYSTAMP + self.STAMP_FILLER + data + "\n", "utf-8")
            
            # TYPE1
            #printd("command to encapsulate is", repr(command))
            
            if command == cls.COMMAND6_SIMPLE: command = cls.COMMAND6
            
            if command not in (cls.COMMAND6, cls.COMMAND5):
                # TYPE2 or 3
                #has_starting_car = data.find(':') > -1
                if len(command) < 3: raise UnknownNativeCmdException(command)
                if not (command[-1]=='#'): raise UnknownNativeCmdException(command)
                if not (command[0]==':') and command not in ('bC#','bW#','bR#'): raise UnknownNativeCmdException(command)
                #stamp,command = data.split(':',1)
            
            #if command.find('#')>-1: command,_ = command.split('#',1)
            cls.request_num += 1
            # Format to request_num_nb_digits (4) digits
            request_num_str = cls.request_num_format.format(cls.request_num)
            cls.last_stamp = request_num_str + cls.STAMP_FILLER
            
            #if command == COMMAND6_SIMPLE: command = COMMAND6_REAL
            data_encapsulated = cls.last_stamp + command + cls.TERMINATOR
            #return super().encap(data_encapsulated)
            #return self._my_channel.encap(data_encapsulated)
            ##return data_encapsulated
            return cls.last_stamp, data_encapsulated
    
            #return bytes(data + "\n", "utf-8")
            ####return bytes(self.MYSTAMP + self.STAMP_FILLER + data + "\n", "utf-8")
    
        # @override
        #def uncap_received_data(self, data_received:str)->str:
        #def uncap(cls, dcc_name:str, data_received:str)->str:
        @classmethod
        def uncap(cls, dcc_name:str, stamp:str, data_received:str)->str:
            #data_received = super().uncap(data_received_bytes)
            ##data_received = self._my_channel.uncap(data_received_bytes)

            #>>> tele.uncap(b'001700001\x00')

            r"""
                Extract useful data from received raw data 

            >>> tele = DC_Gemini("localhost", 11110, DEBUG=False)
            Starting device simulator on (host:port):  localhost:11110
            >>> tele.last_stamp = '00170000'
            >>> tele.uncap('001700001#')
            '1'
            >>> tele.close()
            """

            printd(f"(sbig protoc used from {dcc_name}) data_received_bytes type is", type(data_received))
            printd(f"(sbig protoc used from {dcc_name}) data received is", data_received)
            ##data_received = data_received_bytes.decode()
            #printd("data_received is", data_received)
            # Remove STAMP (and \n at the end):
            #useful_data = data_received.split(self.MY_FULL_STAMP)[1][:-1]
            printd(f"*** (sbig protoc used from {dcc_name}) Last stamp is ***", cls.last_stamp, ", data received is", data_received)
            '''
            # Normal case: LAST stamp found
            if cls.last_stamp in data_received:
                useful_data = data_received.split(cls.last_stamp)[1][:-1]
            # Bad case: LAST stamp NOT found => try PREVIOUS stamp
            else:
                request_num_str = cls.request_num_format.format(cls.request_num-1)
                temp_last_stamp = request_num_str + cls.STAMP_FILLER
                if temp_last_stamp in data_received:
                    useful_data = data_received.split(temp_last_stamp)[1][:-1]
                else:
                    raise Exception("BAD STAMP")
            '''
            if stamp not in data_received:
                    raise Exception("BAD STAMP, this is not the answer to my request, but another request's")
            else:
                useful_data = data_received.split(stamp)[1][:-1]
            # Remove '#' at the end, if exists
            if useful_data[-1] == '#': useful_data = useful_data[:-1]
            return useful_data

        # end of class Protocol


    
    #data = " ".join(sys.argv[1:])
    #data_to_send = bytes(data + "\n", "utf-8")

    ''' Commands dictionary
    NEW foramt is:
    'generic-command-name': ['native-command-name', 'awaited_result_if_ok', 'other-awaited-results if not ok...],
    
    Old format was:
    'CMD-NAME': ['cmd-get', 'awaited_res_ok_for_get', 'cmd-set', 'awaited_res_ok_for_set', 'comment'],
    '''
    # @overwrite
    #_cmd_device_concrete = {
    GEN2NAT_CMDS_GENERAL = {

        # Possible answers:
        # - B# while the initial startup message is being displayed (new in L4),
        # - b# while waiting for the selection of the Startup Mode,
        # - S# during a Cold Start (new in L4), G# after completed startup
        'get_ack': [Protocol.COMMAND6, 'G', 'B','b','S'], 

        # General commands for the Gemini controller
        'get_date': ['GC', '20/10/19'],
        'set_date': ['SC'],
        'get_time': ['GL', '20:20:36'],
        'set_time': ['SL'],
    }
    GEN2NAT_CMDS_FILTER = {
        # GET/SET commands
        # get_radec and set_radec are already defined in parent abstract class
        'get_timezone': ['GG', "+00"],
        'set_timezone': ['SG', '1'],

        # DO commands
        # defined in abstract class:
        #'do_init': [],
        #'do_init': ['do_init'],
        'do_init': ['GL'],
        'do_goto': ['GC'],
        'do_park': ['GC'],
    }
    GEN2NAT_CMDS_SHUTTER = {
        'get_state': ['GC'],
        'do_open': ['GC'],
        'do_close': ['GC'],
        'do_sync': ['GC'],
    }
    GEN2NAT_CMDS_SENSOR = {
        # General commands (for test only because these commands should be in the GENERAL level above)
        'get_date': ['GC', '20/10/19'],
        'set_date': ['SC'],
        #'get_time': ['GL', '20:20:36'],
        #'set_time': ['SL'],

        'do_init': ['GC'],
        'do_start_acq': ['GC'],
        'do_stop_acq': ['GC'],
        'do_abort': ['GC'],
        'do_shutdown': ['GC'],
    }
    GEN2NAT_CMDS = {
        # My GENERAL commands
        **GEN2NAT_CMDS_GENERAL,

        # SPECIFIC commands for my DCCs
        'DC_Filter': GEN2NAT_CMDS_FILTER,
        'DC_Shutter': GEN2NAT_CMDS_SHUTTER,
        'DC_Sensor': GEN2NAT_CMDS_SENSOR,
    }


    def __init__(self, device_host:str="localhost", device_port:int=11110):
        super().__init__(device_host, device_port, "SOCKET-UDP", MY_DEVICE_CHANNEL_BUFFER_SIZE, protoc=self.Protocol, gen2nat_cmds=self.GEN2NAT_CMDS, device_sim=DS_SBIG)

        printd('*****************************')
        printd('*****************************')
        printd('*****************************')
        #printd(DeviceControllerSBIG.mro())
        #for c in DeviceControllerSBIG.mro(): printd(c.__name__)
        #printd(self.mro())
        printd('*****************************')
        printd('*****************************')
        printd('*****************************')

        '''
        Initialize my dcc(s), passing them the SAME parameters as I use :
            - device_host, device_port,
            - self._my_channel (passed by superclass), MY_DEVICE_CHANNEL_BUFFER_SIZE, 
            - self.Protocol,
            - self.GEN2NAT_CMDS (subset of my commands related to the dcc), 
            - NO SIMULATOR (because my dccs will use the same as me and I have already launched it),
            - DEBUG
        '''
        # @override superclass empty list
        self.set_dc_components( 
            [
                #DC_FilterSelector(device_host, device_port, self._my_channel, MY_DEVICE_CHANNEL_BUFFER_SIZE, protoc=self.Protocol, gen2nat_cmds=self.GEN2NAT_CMDS_FILTER, device_sim=None, DEBUG=DEBUG),
                DC_FilterSelector(device_host, device_port, self._my_channel, MY_DEVICE_CHANNEL_BUFFER_SIZE, protoc=self.Protocol, gen2nat_cmds=self.GEN2NAT_CMDS_FILTER, device_sim=None),
                DC_DetectorSensor(device_host, device_port, self._my_channel, MY_DEVICE_CHANNEL_BUFFER_SIZE, protoc=self.Protocol, gen2nat_cmds=self.GEN2NAT_CMDS_SENSOR, device_sim=None),
                DC_DetectorShutter(device_host, device_port, self._my_channel, MY_DEVICE_CHANNEL_BUFFER_SIZE, protoc=self.Protocol, gen2nat_cmds=self.GEN2NAT_CMDS_SHUTTER, device_sim=None),
            ]
        )


    # @override
    def set_speed(self, speed_rate):
        native_cmd = None
        # quick
        if speed_rate == "slew": native_cmd = 'RS'
        # average
        if speed_rate == "average": native_cmd = 'RM'
        # slow
        if speed_rate == "center": native_cmd = 'RC'
        # very slow
        if speed_rate == "guide": native_cmd = 'RG'
        if not native_cmd: raise UnknownGenericCmdArgException(__name__, speed_rate)
        return self.execute_unformated_native_cmd(native_cmd)


if __name__ == "__main__":
    import doctest
    doctest.testmod()
    exit()