socket_client_telescope_gemini.py 10.5 KB
#!/usr/bin/env python3

"""Socket Client GEMINI Telescope implementation

To be used as a concrete class to system control a GEMINI telescope
"""


# Standard library imports
#import socket
#import logging
import sys
import time

# Third party imports
# None

# Local application imports
sys.path.append('../..')
#from src.client.socket_client_telescope_abstract import Position, UnknownCommandException, TimeoutException, SocketClientTelescopeAbstract
from src_socket.client.socket_client_telescope_abstract import *


# Default timeouts
TIMEOUT_SEND = 10
TIMEOUT_RECEIVE = 10

# COMMON CONSTANTS WITH SERVER
TERMINATOR = '\x00'
COMMAND5 = '050000'
#COMMAND6_SIMPLE = '\x00\x06'
COMMAND6 = '\x00\x06\x00'
COMMAND6_SIMPLE = '6'




class SocketClientTelescopeGEMINI(SocketClientTelescopeAbstract):
    
    # STAMP : 
    # 00 00 00 00 00 00 00' (8 bytes from '00' to 'FF', 1st one is LOW BYTE)
    # ex: 01 + STAMP = 01 00 00 00 00 00 00 00' (8 bytes from '00' to 'FF', 1st one is LOW BYTE)
    #STAMP_FILLER = '00000000000000' 
    #MY_FULL_STAMP = MYSTAMP + STAMP_FILLER

    # Initialize request number (will be increased at each request)
    request_num = 0
    # Request number stands on 4 digits (from 0001 to 9999)
    request_num_nb_digits = 4
    # For 4 digits, we should get the format '{:04d}':
    request_num_format = '{:0'+str(request_num_nb_digits)+'d}'
    # Stamp filler with 0
    STAMP_FILLER = '0' * (8 - request_num_nb_digits)
    
    #data = " ".join(sys.argv[1:])
    #data_to_send = bytes(data + "\n", "utf-8")


    ''' Commands dictionary
    'CMD-NAME': ['cmd-get', 'awaited_res_ok_for_get', 'cmd-set', 'awaited_res_ok_for_set', 'comment'],
    '''
    
    # @override
    _cmd = {
        # GET @ SET commands
        'get_ack': [COMMAND6, 'G', 'B','b','S'], # B# while the initial startup message is being displayed (new in L4), b# while waiting for the selection of the Startup Mode, S# during a Cold Start (new in L4), G# after completed startup.

        'get_ra': ['GR'],
        'set_ra': ['Sr'],
        
        'get_dec': ['GD'],
        
        'get_timezone': ['GG'],
        'set_timezone': ['SG', '1'],

        
        # DO commands
        'do_move': [':MS#', '0', '1Object below horizon.#', '2No object selected.#', '3Manual Control.#', '4Position unreachable.#', '5Not aligned.#', '6Outside Limits.#' ],    
        'do_stop': [':Q#'],
        'do_init': ['do_init'],
    }
    
    """
    # @override
    _cmd_getset = {
        
        'ack': [COMMAND6, 'G'],
                
        # RA-DEC (p109-110) 
        #:Sr<hh>:<mm>.<m># or :Sr<hh>:<mm>:<ss>#
        #Sets the object's Right Ascension and the object status to "Not Selected". The :Sd# command has to follow to complete the selection. The subsequent use of the :ON...# command is recommended
        #:Sd{+-}<dd>{*°}<mm># or :Sd{+- }<dd>{*°:}<mm>:<ss>
        #Sets the object's declination. It is important that the :Sr# command has been send prior. Internal calculations are done that may take up to 0.5 seconds. If the coordinate selection is valid the object status is set to "Selected"
        'ra': ['GR', None, 'Sr', None, 'commentaire'],
        'dec': ['GD', None, 'Sd'],
        #'RADEC': [('GR','GD'), ''],

        # ALT-AZ (p?)
        "ALT": ['GA'],
        "AZ": ['GZ'],
        #"ALT-AZ": [('GA','GZ'), ''],

        # LONG-LAT
        "LONGITUDE": ['Gg', None, 'Sg'],
        "LATITUDE": ['Gt', None, 'St'],
        #"LONGLAT": [('Gg','Gt'), ''],
        
        "hour-angle": ['GH'],

        "max-vel": ['Gv'],
        
        'timezone': ['GG', None, 'SG', '1'],
        'DATE': ['GC', None, 'SC'],
        'TIME': ['GL', None, 'SL'],

        'VELOCITY': ['Gv'],


        
    }
    ''' Commands dictionary
    'CMD-NAME': ['cmd-do', 'comment'],
    '''
    # @override
    _cmd_do = {
        #'INIT': [],
        'PARK': ['hP'],
        'WARM_START': ['bW'],
        'PREC_REFR': ['p3'],
        'MOVE': ['MS'],
        'MOVE_NORTH': ['Mn'],
        'MOVE_SOUTH': ['Ms'],
        'MOVE_WEST': ['Mw'],
        'MOVE_EAST': ['Me'],
        'STOP': ['Q'],
    }
    """


    # @override
    # GEMINI is using UDP
    def __init__(self, server_host:str="localhost", server_port:int=11110, DEBUG=False):
        super().__init__(server_host, server_port, "UDP", 1024, DEBUG)

        
    # @override
    def formated_cmd(self, cmd:str, values_to_set:str=None)->str:
        if values_to_set != None: 
            for value_to_set in values_to_set:
                cmd += value_to_set
        if cmd not in (COMMAND6, COMMAND5):
            cmd += '#' 
            if cmd not in ('bC#','bW#','bR#'): 
                cmd=':'+cmd
        return cmd

    def run_func(self, func, arg):
        return getattr(self, func)(arg)

        
    # @override
    def _encapsulate_data_to_send(self, command:str):
        r''' Encapsulate useful data to be ready for sending
        
        If data is "complete" (with stamp, and well formatted), send it as is
        Otherwise, add stamp and format it

        3 types of commands:
        - TYPE1: '06' or '050000'
        - TYPE2: ':cde#' (mainly ':??#' - i.e : ':GD#', ':GR#', ...)
        - TYPE3: 'b?#' (bC# = Cold Start, bW# = selecting Warm Start, bR# = selecting Warm Restart)
        
        :Examples:
        >>> tele = SocketClientTelescopeGEMINI("localhost", 11110, DEBUG=False)
        >>> tele._encapsulate_data_to_send(':GD#')
        '00010000:GD#\x00'
        >>> tele._encapsulate_data_to_send(':GR#')
        '00020000:GR#\x00'
        >>> tele.close()
        
        # ne marche pas => '00010000:GD#\x00'
        '''

        ####return bytes(self.MYSTAMP + self.STAMP_FILLER + data + "\n", "utf-8")
        
        # TYPE1
        #print("command to encapsulate is", repr(command))
        
        if command == COMMAND6_SIMPLE: command = COMMAND6
        
        if command not in (COMMAND6, COMMAND5):
            # TYPE2 or 3
            #has_starting_car = data.find(':') > -1
            if len(command) < 3: raise UnknownCommandException()
            if not (command[-1]=='#'): raise UnknownCommandException()
            if not (command[0]==':') and command not in ('bC#','bW#','bR#'): raise UnknownCommandException()
            #stamp,command = data.split(':',1)
        
        #if command.find('#')>-1: command,_ = command.split('#',1)
        self.request_num += 1
        # Format to request_num_nb_digits (4) digits
        request_num_str = self.request_num_format.format(self.request_num)
        self.last_stamp = request_num_str + self.STAMP_FILLER
        
        #if command == COMMAND6_SIMPLE: command = COMMAND6_REAL
        return self.last_stamp + command + TERMINATOR
        #return bytes(data + "\n", "utf-8")
        ####return bytes(self.MYSTAMP + self.STAMP_FILLER + data + "\n", "utf-8")

    # @override
    def _uncap_received_data(self, data_received_bytes:bytes)->str:
        r"""
            Extract useful data from received raw data 
        
        >>> tele = SocketClientTelescopeGEMINI("localhost", 11110, DEBUG=False)
        >>> tele.last_stamp = '00170000'
        >>> tele._uncap_received_data(b'001700001\x00')
        '1'
        >>> tele.close()
        """

        #print("data_received_bytes type is", type(data_received_bytes))
        #print("data received is", data_received_bytes)
        
        #TODO: resoudre ce pb plus proprement (utiliser unicode au lieu de utf-8 codec ???
        # BUGFIX: b'\xdf' (should correspond to "°" symbol) generates a UnicodeDecodeError,
        # so, replace it by ':' (b'\x3A')
        if b'\xdf' in data_received_bytes:
            data_received_bytes = data_received_bytes.replace(b'\xdf', b'\x3A')
        
        data_received = data_received_bytes.decode()
        #print("data_received is", data_received)
        # Remove STAMP (and \n at the end):
        #useful_data = data_received.split(self.MY_FULL_STAMP)[1][:-1]
        useful_data = data_received.split(self.last_stamp)[1][:-1]
        # Remove '#' at the end, if exists
        if useful_data[-1] == '#': useful_data = useful_data[:-1]
        return useful_data


    # @overwrite
    #def get_ack(self): return self.execute_native_cmd(COMMAND6, 'G')
    #def do_warm_start(self): return self.execute_native_cmd('bW')

    def set_SPEED(self, speed_rate):
        print("from child")
        native_cmd = None
        # quick
        if speed_rate == "slew": native_cmd = 'RS'
        # average
        if speed_rate == "average": native_cmd = 'RM'
        # slow
        if speed_rate == "center": native_cmd = 'RC'
        # very slow
        if speed_rate == "guide": native_cmd = 'RG'
        if not native_cmd: raise UnknownCommandException(speed_rate)
        return self.execute_unformated_native_cmd(native_cmd)

    '''
    def do_MOVE_NORTH(self): return self.execute_native_cmd_OLD(':Mn#')
    def do_MOVE_SOUTH(self): return self.execute_native_cmd_OLD(':Ms#')
    def do_MOVE_WEST(self): return self.execute_native_cmd_OLD(':Mw#')
    def do_MOVE_EAST(self): return self.execute_native_cmd_OLD(':Me#')
    '''

    '''
    TELESCOPE COMMANDS implementation
    '''
       
    ''' 
    The 3 main generic commands : get(), set(), do()
    '''
    ''' 1) GET methods '''
    ''' 2) SET methods '''
    ''' 3) DO methods '''
    ''' SPECIFIC methods '''


if __name__ == "__main__":
    
    import doctest
    doctest.testmod()
    exit()
    
    # Classic usage:
    #tele_client = SocketClient_GEMINI(HOST, PORT)
    # More elegant usage, using "with":
    with SocketClientTelescopeGEMINI("localhost", 11110, DEBUG=False) as tele_client:
        
        # 0) CONNECT to server (only for TCP, does nothing for UDP)
        tele_client._connect_to_server()
        
        #tele_client.config()
        
        # Send some commands to the Telescope
        #pos = tele_client.get_position()
        
        # Do EXPERT mode execution
        while True:
            
            # 1) SEND REQUEST data to server
            # saisie de la requête au clavier et suppression des espaces des 2 côtés
            data = input("(EXPERT MODE) REQUEST TO SERVER [ex: ':GD#' (Get Dec), ':GR#' (Get RA)']: ").strip()
            # test d'arrêt
            if data == "": break
            #data_to_send = bytes(data + "\n", "utf-8")
            tele_client.send_data(data)
            #mysock.sendto("%s" % data, (HOST, PORT))
            #print("Sent: {}".format(data))
    
            # 2) RECEIVE REPLY data from server
            data_received = tele_client.receive_data()
            #reponse, adr = mysock.recvfrom(buf)
            #print("Received: {}".format(data_received))
            #print("Useful data received: {}".format(data_useful))
            print('\n')

        #tele_client.close()