#!/usr/bin/env python3 """Socket Client GEMINI Telescope implementation To be used as a concrete class to system control a GEMINI telescope """ # Standard library imports #import socket #import logging import sys import time # Third party imports # None # Local application imports sys.path.append('../..') #from src.client.socket_client_telescope_abstract import Position, UnknownCommandException, TimeoutException, SocketClientTelescopeAbstract from src_socket.client.socket_client_telescope_abstract import * # Default timeouts TIMEOUT_SEND = 10 TIMEOUT_RECEIVE = 10 # COMMON CONSTANTS WITH SERVER TERMINATOR = '\x00' COMMAND5 = '050000' #COMMAND6_SIMPLE = '\x00\x06' COMMAND6 = '\x00\x06\x00' COMMAND6_SIMPLE = '6' class SocketClientTelescopeGEMINI(SocketClientTelescopeAbstract): # STAMP : # 00 00 00 00 00 00 00' (8 bytes from '00' to 'FF', 1st one is LOW BYTE) # ex: 01 + STAMP = 01 00 00 00 00 00 00 00' (8 bytes from '00' to 'FF', 1st one is LOW BYTE) #STAMP_FILLER = '00000000000000' #MY_FULL_STAMP = MYSTAMP + STAMP_FILLER # Initialize request number (will be increased at each request) request_num = 0 # Request number stands on 4 digits (from 0001 to 9999) request_num_nb_digits = 4 # For 4 digits, we should get the format '{:04d}': request_num_format = '{:0'+str(request_num_nb_digits)+'d}' # Stamp filler with 0 STAMP_FILLER = '0' * (8 - request_num_nb_digits) #data = " ".join(sys.argv[1:]) #data_to_send = bytes(data + "\n", "utf-8") ''' Commands dictionary 'CMD-NAME': ['cmd-get', 'awaited_res_ok_for_get', 'cmd-set', 'awaited_res_ok_for_set', 'comment'], ''' # @override _cmd = { # GET @ SET commands 'get_ack': [COMMAND6, 'G', 'B','b','S'], # B# while the initial startup message is being displayed (new in L4), b# while waiting for the selection of the Startup Mode, S# during a Cold Start (new in L4), G# after completed startup. 'get_ra': ['GR'], 'set_ra': ['Sr'], 'get_dec': ['GD'], 'get_timezone': ['GG'], 'set_timezone': ['SG', '1'], # DO commands 'do_move': [':MS#', '0', '1Object below horizon.#', '2No object selected.#', '3Manual Control.#', '4Position unreachable.#', '5Not aligned.#', '6Outside Limits.#' ], 'do_stop': [':Q#'], 'do_init': ['do_init'], } """ # @override _cmd_getset = { 'ack': [COMMAND6, 'G'], # RA-DEC (p109-110) #:Sr:.# or :Sr::# #Sets the object's Right Ascension and the object status to "Not Selected". The :Sd# command has to follow to complete the selection. The subsequent use of the :ON...# command is recommended #:Sd{+-}
{*°}# or :Sd{+- }
{*°:}: #Sets the object's declination. It is important that the :Sr# command has been send prior. Internal calculations are done that may take up to 0.5 seconds. If the coordinate selection is valid the object status is set to "Selected" 'ra': ['GR', None, 'Sr', None, 'commentaire'], 'dec': ['GD', None, 'Sd'], #'RADEC': [('GR','GD'), ''], # ALT-AZ (p?) "ALT": ['GA'], "AZ": ['GZ'], #"ALT-AZ": [('GA','GZ'), ''], # LONG-LAT "LONGITUDE": ['Gg', None, 'Sg'], "LATITUDE": ['Gt', None, 'St'], #"LONGLAT": [('Gg','Gt'), ''], "hour-angle": ['GH'], "max-vel": ['Gv'], 'timezone': ['GG', None, 'SG', '1'], 'DATE': ['GC', None, 'SC'], 'TIME': ['GL', None, 'SL'], 'VELOCITY': ['Gv'], } ''' Commands dictionary 'CMD-NAME': ['cmd-do', 'comment'], ''' # @override _cmd_do = { #'INIT': [], 'PARK': ['hP'], 'WARM_START': ['bW'], 'PREC_REFR': ['p3'], 'MOVE': ['MS'], 'MOVE_NORTH': ['Mn'], 'MOVE_SOUTH': ['Ms'], 'MOVE_WEST': ['Mw'], 'MOVE_EAST': ['Me'], 'STOP': ['Q'], } """ # @override # GEMINI is using UDP def __init__(self, server_host:str="localhost", server_port:int=11110, DEBUG=False): super().__init__(server_host, server_port, "UDP", 1024, DEBUG) # @override def formated_cmd(self, cmd:str, values_to_set:str=None)->str: if values_to_set != None: for value_to_set in values_to_set: cmd += value_to_set if cmd not in (COMMAND6, COMMAND5): cmd += '#' if cmd not in ('bC#','bW#','bR#'): cmd=':'+cmd return cmd def run_func(self, func, arg): return getattr(self, func)(arg) # @override def _encapsulate_data_to_send(self, command:str): r''' Encapsulate useful data to be ready for sending If data is "complete" (with stamp, and well formatted), send it as is Otherwise, add stamp and format it 3 types of commands: - TYPE1: '06' or '050000' - TYPE2: ':cde#' (mainly ':??#' - i.e : ':GD#', ':GR#', ...) - TYPE3: 'b?#' (bC# = Cold Start, bW# = selecting Warm Start, bR# = selecting Warm Restart) :Examples: >>> tele = SocketClientTelescopeGEMINI("localhost", 11110, DEBUG=False) >>> tele._encapsulate_data_to_send(':GD#') '00010000:GD#\x00' >>> tele._encapsulate_data_to_send(':GR#') '00020000:GR#\x00' >>> tele.close() # ne marche pas => '00010000:GD#\x00' ''' ####return bytes(self.MYSTAMP + self.STAMP_FILLER + data + "\n", "utf-8") # TYPE1 #print("command to encapsulate is", repr(command)) if command == COMMAND6_SIMPLE: command = COMMAND6 if command not in (COMMAND6, COMMAND5): # TYPE2 or 3 #has_starting_car = data.find(':') > -1 if len(command) < 3: raise UnknownCommandException() if not (command[-1]=='#'): raise UnknownCommandException() if not (command[0]==':') and command not in ('bC#','bW#','bR#'): raise UnknownCommandException() #stamp,command = data.split(':',1) #if command.find('#')>-1: command,_ = command.split('#',1) self.request_num += 1 # Format to request_num_nb_digits (4) digits request_num_str = self.request_num_format.format(self.request_num) self.last_stamp = request_num_str + self.STAMP_FILLER #if command == COMMAND6_SIMPLE: command = COMMAND6_REAL return self.last_stamp + command + TERMINATOR #return bytes(data + "\n", "utf-8") ####return bytes(self.MYSTAMP + self.STAMP_FILLER + data + "\n", "utf-8") # @override def _uncap_received_data(self, data_received_bytes:bytes)->str: r""" Extract useful data from received raw data >>> tele = SocketClientTelescopeGEMINI("localhost", 11110, DEBUG=False) >>> tele.last_stamp = '00170000' >>> tele._uncap_received_data(b'001700001\x00') '1' >>> tele.close() """ #print("data_received_bytes type is", type(data_received_bytes)) #print("data received is", data_received_bytes) #TODO: resoudre ce pb plus proprement (utiliser unicode au lieu de utf-8 codec ??? # BUGFIX: b'\xdf' (should correspond to "°" symbol) generates a UnicodeDecodeError, # so, replace it by ':' (b'\x3A') if b'\xdf' in data_received_bytes: data_received_bytes = data_received_bytes.replace(b'\xdf', b'\x3A') data_received = data_received_bytes.decode() #print("data_received is", data_received) # Remove STAMP (and \n at the end): #useful_data = data_received.split(self.MY_FULL_STAMP)[1][:-1] useful_data = data_received.split(self.last_stamp)[1][:-1] # Remove '#' at the end, if exists if useful_data[-1] == '#': useful_data = useful_data[:-1] return useful_data # @overwrite #def get_ack(self): return self.execute_native_cmd(COMMAND6, 'G') #def do_warm_start(self): return self.execute_native_cmd('bW') def set_SPEED(self, speed_rate): print("from child") native_cmd = None # quick if speed_rate == "slew": native_cmd = 'RS' # average if speed_rate == "average": native_cmd = 'RM' # slow if speed_rate == "center": native_cmd = 'RC' # very slow if speed_rate == "guide": native_cmd = 'RG' if not native_cmd: raise UnknownCommandException(speed_rate) return self.execute_unformated_native_cmd(native_cmd) ''' def do_MOVE_NORTH(self): return self.execute_native_cmd_OLD(':Mn#') def do_MOVE_SOUTH(self): return self.execute_native_cmd_OLD(':Ms#') def do_MOVE_WEST(self): return self.execute_native_cmd_OLD(':Mw#') def do_MOVE_EAST(self): return self.execute_native_cmd_OLD(':Me#') ''' ''' TELESCOPE COMMANDS implementation ''' ''' The 3 main generic commands : get(), set(), do() ''' ''' 1) GET methods ''' ''' 2) SET methods ''' ''' 3) DO methods ''' ''' SPECIFIC methods ''' if __name__ == "__main__": import doctest doctest.testmod() exit() # Classic usage: #tele_client = SocketClient_GEMINI(HOST, PORT) # More elegant usage, using "with": with SocketClientTelescopeGEMINI("localhost", 11110, DEBUG=False) as tele_client: # 0) CONNECT to server (only for TCP, does nothing for UDP) tele_client._connect_to_server() #tele_client.config() # Send some commands to the Telescope #pos = tele_client.get_position() # Do EXPERT mode execution while True: # 1) SEND REQUEST data to server # saisie de la requête au clavier et suppression des espaces des 2 côtés data = input("(EXPERT MODE) REQUEST TO SERVER [ex: ':GD#' (Get Dec), ':GR#' (Get RA)']: ").strip() # test d'arrêt if data == "": break #data_to_send = bytes(data + "\n", "utf-8") tele_client.send_data(data) #mysock.sendto("%s" % data, (HOST, PORT)) #print("Sent: {}".format(data)) # 2) RECEIVE REPLY data from server data_received = tele_client.receive_data() #reponse, adr = mysock.recvfrom(buf) #print("Received: {}".format(data_received)) #print("Useful data received: {}".format(data_useful)) print('\n') #tele_client.close()