#!/usr/bin/env python3 """Socket Client Gemini Telescope implementation To be used as a concrete class to system control a Gemini telescope """ # Standard library imports #import socket #import logging import os import sys import time # Third party imports # None # Local application imports sys.path.append('../../..') # My parent class and exceptions from device_controller.abstract_component.device_controller import ( printd, DeviceController, Gen2NatCmds, Cmd, Cmd2, UnknownNativeCmdException, UnknownGenericCmdArgException ) #from src.client.socket_client_telescope_abstract import Position, UnknownNativeCmdException, TimeoutException, SocketClientTelescopeAbstract ##from src_socket.client.socket_client_telescope_abstract import * # My device controller component(s) #from device_controller.abstract_component.mount import * from device_controller.abstract_component.mount import DC_Mount # My simulator from device_controller.concrete_component.gemini.gemini_simulator import DS_Gemini ''' # Default timeouts TIMEOUT_SEND = 10 TIMEOUT_RECEIVE = 10 ''' MY_DEVICE_CHANNEL_BUFFER_SIZE = 1024 ''' Moved inside the Protocol class # COMMON CONSTANTS WITH SERVER TERMINATOR = '\x00' COMMAND5 = '050000' #COMMAND6_SIMPLE = '\x00\x06' COMMAND6 = '\x00\x06\x00' COMMAND6_SIMPLE = '6' ''' ''' def printd(*args, **kwargs): if os.environ.get('PYROS_DEBUG')=='1': print(*args, **kwargs) ''' class DC_MountBis(DC_Mount): pass #class DeviceControllerTelescopeGemini(DC_Mount): #class DC_Gemini(DC_Mount): class DC_Gemini(DeviceController): # Gemini communication protocol # @override class Protocol: # Default timeouts TIMEOUT_SEND = 10 TIMEOUT_RECEIVE = 10 # COMMON CONSTANTS WITH SERVER TERMINATOR = '\x00' COMMAND5 = '050000' #COMMAND6_SIMPLE = '\x00\x06' COMMAND6 = '\x00\x06\x00' COMMAND6_SIMPLE = '6' # STAMP : # 00 00 00 00 00 00 00' (8 bytes from '00' to 'FF', 1st one is LOW BYTE) # ex: 01 + STAMP = 01 00 00 00 00 00 00 00' (8 bytes from '00' to 'FF', 1st one is LOW BYTE) #STAMP_FILLER = '00000000000000' #MY_FULL_STAMP = MYSTAMP + STAMP_FILLER # Initialize request number (will be increased at each request) request_num = 0 # Request number stands on 4 digits (from 0001 to 9999) request_num_nb_digits = 4 # For 4 digits, we should get the format '{:04d}': request_num_format = '{:0'+str(request_num_nb_digits)+'d}' # Stamp filler with 0 STAMP_LENGTH = 8 STAMP_FILLER = '0' * (STAMP_LENGTH - request_num_nb_digits) # @override @classmethod def formated_cmd(cls, cmd:str, values_to_set:str=None)->str: if values_to_set != None: for value_to_set in values_to_set: cmd += value_to_set if cmd not in (cls.COMMAND6, cls.COMMAND5): cmd += '#' if cmd not in ('bC#','bW#','bR#'): cmd=':'+cmd return cmd # @override #def encapsulate_data_to_send(self, command:str): #def encap_data_to_send(self, command:str): @classmethod def encap(cls, command:str): r''' Encapsulate useful data to be ready for sending If data is "complete" (with stamp, and well formatted), send it as is Otherwise, add stamp and format it 3 types of commands: - TYPE1: '06' or '050000' - TYPE2: ':cde#' (mainly ':??#' - i.e : ':GD#', ':GR#', ...) - TYPE3: 'b?#' (bC# = Cold Start, bW# = selecting Warm Start, bR# = selecting Warm Restart) :Examples: >>> tele = DC_Gemini("localhost", 11110) Starting device simulator on (host:port): localhost:11110 >>> tele.encap(':GD#') ('00010000', '00010000:GD#\x00') >>> tele.encap(':GR#') ('00020000', '00020000:GR#\x00') >>> tele.close() # ne marche pas => '00010000:GD#\x00' ''' ####return bytes(self.MYSTAMP + self.STAMP_FILLER + data + "\n", "utf-8") # TYPE1 #printd("command to encapsulate is", repr(command)) if command == cls.COMMAND6_SIMPLE: command = cls.COMMAND6 if command not in (cls.COMMAND6, cls.COMMAND5): # TYPE2 or 3 #has_starting_car = data.find(':') > -1 if len(command) < 3: raise UnknownNativeCmdException(command) if not (command[-1]=='#'): raise UnknownNativeCmdException(command) if not (command[0]==':') and command not in ('bC#','bW#','bR#'): raise UnknownNativeCmdException(command) #stamp,command = data.split(':',1) #if command.find('#')>-1: command,_ = command.split('#',1) cls.request_num += 1 # Format to request_num_nb_digits (4) digits request_num_str = cls.request_num_format.format(cls.request_num) cls.last_stamp = request_num_str + cls.STAMP_FILLER #if command == COMMAND6_SIMPLE: command = COMMAND6_REAL data_encapsulated = cls.last_stamp + command + cls.TERMINATOR #return super().encap(data_encapsulated) #return self._my_channel.encap(data_encapsulated) #return data_encapsulated return cls.last_stamp, data_encapsulated #return bytes(data + "\n", "utf-8") ####return bytes(self.MYSTAMP + self.STAMP_FILLER + data + "\n", "utf-8") # @override #def uncap_received_data(self, data_received:str)->str: #def uncap(cls, data_received:str)->str: @classmethod def uncap(cls, dcc_name:str, stamp:str, data_received:str)->str: #data_received = super().uncap(data_received_bytes) ##data_received = self._my_channel.uncap(data_received_bytes) #>>> tele.uncap(b'001700001\x00') r""" Extract useful data from received raw data >>> tele = DC_Gemini("localhost", 11110) Starting device simulator on (host:port): localhost:11110 >>> tele.last_stamp = '00170000' >>> tele.uncap('00170000', '001700001#') '1' >>> tele.close() """ printd("\n*********** DEBUG IS ON ***********\n") printd(f"(gemini protoc used from {dcc_name}) data_received_bytes type is", type(data_received)) printd(f"(gemini protoc used from {dcc_name}) data received is", data_received) ##data_received = data_received_bytes.decode() #printd("data_received is", data_received) # Remove STAMP (and \n at the end): #useful_data = data_received.split(self.MY_FULL_STAMP)[1][:-1] #useful_data = data_received.split(cls.last_stamp)[1][:-1] printd(f"*** (gemini protoc used from {dcc_name}) Last stamp is ***", cls.last_stamp, ", data received is", data_received) ''' # Normal case: LAST stamp found if cls.last_stamp in data_received: useful_data = data_received.split(cls.last_stamp)[1][:-1] # Bad case: LAST stamp NOT found => try PREVIOUS stamp else: request_num_str = cls.request_num_format.format(cls.request_num-1) temp_last_stamp = request_num_str + cls.STAMP_FILLER if temp_last_stamp in data_received: useful_data = data_received.split(temp_last_stamp)[1][:-1] else: raise Exception("BAD STAMP") ''' if stamp not in data_received: raise Exception("BAD STAMP, this is not the answer to my request, but another request's") else: useful_data = data_received.split(stamp)[1][:-1] # Remove '#' at the end, if exists if useful_data[-1] == '#': useful_data = useful_data[:-1] return useful_data # end of class Protocol #data = " ".join(sys.argv[1:]) #data_to_send = bytes(data + "\n", "utf-8") ''' Commands dictionary ''' ''' # # VERSION 1 : 1 gros dictionnaire par commande # CON : long et error prone # get_ack = { 'generic_name': 'get_ack', 'native_name': Protocol.COMMAND6, 'params': {}, # ready 'final_device_responses': { 'G' : 'after completed startup', 'B' : 'while the initial startup message is being displayed (new in L4)', 'b' : 'while waiting for the selection of the Startup Mode', 'S' : 'during a Cold Start (new in L4)', }, 'final_simul_response': 'G', 'immediate_response': { 'ready' : 'gg', 'wait' : 10, 'error' : '', }, # native error codes 'errors': { '0' : 'pb 0 ...', '1' : 'pb 1 ...', } } get_ra = { 'generic_name': 'set_ra', 'native_name': 'GR', 'params': {'ra':'', }, # ready 'final_device_responses': [], 'final_simul_response': '15:01:49', 'immediate_response': { 'ready' : 'gg', 'wait' : 10, 'error' : '', }, # native error codes 'errors': { '0' : 'pb 0 ...', '1' : 'pb 1 ...', } } # # VERSION 2 : on utilise la methode build_cmd() en précisant key=val à chaque paramètre (sauf les paramètres vides, par défaut) # PRO: court, pas d'erreur possible # CON: appel de fonction get_ack = mes_commandes.build_cmd( generic_name = 'get_ack', native_name = Protocol.COMMAND6, #params = {}, final_device_responses={ 'G': 'after completed startup', 'B': 'while the initial startup message is being displayed (new in L4)', 'b': 'while waiting for the selection of the Startup Mode', 'S': 'during a Cold Start (new in L4)', }, final_simul_response='G', immediate_responses={ 'ready': 'gg', 'wait': 10, 'error': '', }, errors = { '0': 'pb 0 ...', '1': 'pb 1 ...', } ) mes_commandes.add_cmd(get_ack) # # VERSION 3 (+court): on utilise la methode build_cmd() SANS préciser le nom des param => il faut alors les rentrer TOUS # get_ack = mes_commandes.build_cmd( 'get_ack', Protocol.COMMAND6, {}, { 'G' : 'after completed startup', 'B' : 'while the initial startup message is being displayed (new in L4)', 'b' : 'while waiting for the selection of the Startup Mode', 'S' : 'during a Cold Start (new in L4)', }, 'G', { 'ready' : 'gg', 'wait' : 10, 'error' : '', }, { '0' : 'pb 0 ...', '1' : 'pb 1 ...', } ) # Commande vide cmd2 = mes_commandes.build_cmd('cmd2_generic', 'cmd2_native') # # OPTIMISATION possible pour les commandes get et set : # on construit les 2 en même temps, et ca génère 2 commandes différentes (un get et un set) # get_dec, set_dec = mes_commandes.build_cmd_get_set(generic_name='dec', native_get_name='GD', native_set_name='Sd') do_init = mes_commandes.build_cmd_do('do_init', 'titi') ''' # # VERSION 4 : Command class (Cmd) # cmd1 = Cmd('cmd_generic', 'cmd_native') cmd2 = Cmd2('cmd2_generic', 'cmd2_native') get_ack = Cmd( #generic_name = 'get_ack', #native_name = Protocol.COMMAND6, 'Description', {}, 'G', { 'G' : 'after completed startup', 'B' : 'while the initial startup message is being displayed (new in L4)', 'b' : 'while waiting for the selection of the Startup Mode', 'S' : 'during a Cold Start (new in L4)', }, #'G', { 'ready' : 'gg', 'wait' : 10, 'error' : '', }, { '0' : 'pb 0 ...', '1' : 'pb 1 ...', } ) get_date = Cmd('get_date', 'GC', final_simul_response='10/10/19') set_date = Cmd('set_date', 'SC') get_time = Cmd('get_time', 'GL', final_simul_response='10:20:36') set_time = Cmd('set_time', 'SL') GEN2NAT_CMDS_GENERAL_obj = Gen2NatCmds([ get_ack, # General commands for the Gemini controller get_date, set_date, get_time, set_time, #v3 #]).get_as_dict() #v4 ]) #GEN2NAT_CMDS_GENERAL_obj = Gen2NatCmds(GEN2NAT_CMDS_GENERAL_obj).get_as_dict() ''' my_cmds.add_cmds( get_ack, # General commands for the Gemini controller get_date, set_date, get_time, set_time ) ''' GEN2NAT_CMDS_GENERAL_dict = { # Possible answers: # - B# while the initial startup message is being displayed (new in L4), # - b# while waiting for the selection of the Startup Mode, # - S# during a Cold Start (new in L4), # - G# after completed startup 'get_ack': [Protocol.COMMAND6, 'G', ['G', 'B','b','S']], # General commands for the Gemini controller 'get_date': ['GC', '10/10/19'], 'set_date': ['SC'], 'get_time': ['GL', '10:20:36'], 'set_time': ['SL'], } #GEN2NAT_CMDS_GENERAL = GEN2NAT_CMDS_GENERAL_dict # RA-DEC (p109-110) get_ra = Cmd( 'get_ra', 'GR', desc='get right ascension', final_simul_response = '15:01:49', immediate_responses = { 'ready' : 'gg', 'wait' : 10, 'error' : '', }, errors = { '0' : 'pb 0 ...', '1' : 'pb 1 ...', } ) # Sets the object's Right Ascension and the object status to "Not Selected". # The :Sd# command has to follow to complete the selection. # The subsequent use of the :ON...# command is recommended # :Sr:.# or :Sr::# set_ra = Cmd( 'set_ra', 'Sr', params = {':.# | ::#' : 'hour:minutes | hour:minutes:seconds'}, immediate_responses = { 'ready' : 'gg', 'wait' : 10, 'error' : '', }, errors = { '0' : 'pb 0 ...', '1' : 'pb 1 ...', } ) get_dec = Cmd( 'get_dec', 'GD', desc='get declination', final_simul_response = '+12:29', ) # :Sd{+-}
{*°}# or :Sd{+- }
{*°:}:# # Sets the object's declination. # It is important that the :Sr# command has been send prior. # Internal calculations are done that may take up to 0.5 seconds. # If the coordinate selection is valid the object status is set to "Selected" set_dec = Cmd( 'set_dec', 'Sd', desc='set declination', params = {'{+-}
{*°}# | :Sd{+- }
{*°:}:#' : 'TODO'}, ) GEN2NAT_CMDS_MOUNT_obj = Gen2NatCmds([ # GET & SET commands get_ra, set_ra, get_dec, set_dec, # get_radec and set_radec are already defined in abstract class Cmd('get_timezone', 'GG', "+00"), Cmd('set_timezone', 'SG', '1'), # ALT-AZ (p?) Cmd('get_alt', 'GA'), Cmd('get_az', 'GZ'), #"ALT-AZ": [('GA','GZ'), ''], # LONG-LAT Cmd('get_long','Gg', '+10'), Cmd('set_long', 'Sg'), Cmd('get_lat', 'Gt', '+45:00:00'), Cmd('set_lat', 'St'), #"LONGLAT": [('Gg','Gt'), ''], Cmd('get_hangle', 'GH'), Cmd('get_vel', 'Gv', 'T'), #"get_maxvel": ['Gv'], # DO commands # defined in abstract class: #'do_init': ['do_init'], Cmd('do_park', 'hP'), Cmd('do_warm_start', 'bW'), Cmd('do_prec_refr', 'p3'), # for test only Cmd('gem_only'), Cmd('do_move', 'MS', '0', final_device_responses = { '1':'Object below horizon.', '2':'No object selected.', '3':'Manual Control.', '4':'Position unreachable.', '5':'Not aligned.', '6':'Outside Limits.' } ), Cmd('do_movenorth', 'Mn'), Cmd('do_movesouth', 'Ms'), Cmd('do_movewest', 'Mw'), Cmd('do_moveeast', 'Me'), Cmd('do_stop', 'Q'), #v3 #]).get_as_dict() #v4 ]) #GEN2NAT_CMDS_MOUNT_obj = Gen2NatCmds(GEN2NAT_CMDS_MOUNT_obj).get_as_dict() GEN2NAT_CMDS_MOUNT_dict = { # GET & SET commands 'get_ra': ['GR', '15:01:49'], 'set_ra': ['Sr'], 'get_dec': ['GD', '+12:29'], 'set_dec': ['Sd'], # get_radec and set_radec are already defined in abstract class 'get_timezone': ['GG', "+00"], 'set_timezone': ['SG', '1'], # ALT-AZ (p?) "get_alt": ['GA'], "get_az": ['GZ'], #"ALT-AZ": [('GA','GZ'), ''], # LONG-LAT "get_long": ['Gg', '+10'], "set_long": ['Sg'], "get_lat": ['Gt', '+45:00:00'], "set_lat": ['St'], #"LONGLAT": [('Gg','Gt'), ''], "get_hangle": ['GH'], 'get_vel': ['Gv', 'T'], #"get_maxvel": ['Gv'], # DO commands # defined in abstract class: #'do_init': ['do_init'], 'do_park': ['hP'], 'do_warm_start': ['bW'], 'do_prec_refr': ['p3'], # for test only 'gem_only': [], 'do_move': ['MS', '0', '1Object below horizon.', '2No object selected.', '3Manual Control.', '4Position unreachable.', '5Not aligned.', '6Outside Limits.' ], 'do_movenorth': ['Mn'], 'do_movesouth': ['Ms'], 'do_movewest': ['Mw'], 'do_moveeast': ['Me'], 'do_stop': ['Q'], } GEN2NAT_CMDS_MOUNT = GEN2NAT_CMDS_MOUNT_dict GEN2NAT_CMDS_MOUNT = GEN2NAT_CMDS_MOUNT_obj GEN2NAT_CMDS_dict = { # My GENERAL commands **GEN2NAT_CMDS_GENERAL_dict, # SPECIFIC commands for my DCCs 'DC_Mount' : GEN2NAT_CMDS_MOUNT_dict } ''' GEN2NAT_CMDS_obj = [ # My GENERAL commands *GEN2NAT_CMDS_GENERAL_obj, # SPECIFIC commands for my DCCs GEN2NAT_CMDS_MOUNT_obj ] GEN2NAT_CMDS_obj = Gen2NatCmds( GEN2NAT_CMDS_GENERAL_obj, ('DC_Mount', GEN2NAT_CMDS_MOUNT_obj) ) ''' GEN2NAT_CMDS_obj = Gen2NatCmds() GEN2NAT_CMDS_obj.add_cmds(GEN2NAT_CMDS_GENERAL_obj) GEN2NAT_CMDS_obj.add_cmds('DC_Mount', GEN2NAT_CMDS_MOUNT_obj) #v3 #GEN2NAT_CMDS_obj = GEN2NAT_CMDS_obj.get_as_dict() #v4 #GEN2NAT_CMDS_obj = GEN2NAT_CMDS_obj GEN2NAT_CMDS = GEN2NAT_CMDS_dict GEN2NAT_CMDS = GEN2NAT_CMDS_obj # Utilisation, affichage #mes_commandes.add_cmd(get_ack) #mes_commandes.add_cmd(cmd2) #mes_commandes.add_cmd(get_ra) ''' print("******************************") print("(GEMINI) Mes commandes") my_cmds.print_mes_commandes() print("******************************") ''' # Gemini is using UDP #def __init__(self, device_host:str="localhost", device_port:int=11110, channel=socket, DEBUG=False): #def __init__(self, device_host:str="localhost", device_port:int=11110, DEBUG=False): #def __init__(self, device_host:str="localhost", device_port:int=11110, dcc_list=[], DEBUG=False): def __init__(self, device_host:str="localhost", device_port:int=11110, dcc_list=[]): ##super().__init__(device_host, device_port, "SOCKET-UDP", 1024, DEBUG) #super().__init__(device_host, device_port, "SOCKET-UDP", MY_DEVICE_CHANNEL_BUFFER_SIZE, protoc=self.Protocol, gen2nat_cmds=self.GEN2NAT_CMDS, device_sim=DS_Gemini, DEBUG=DEBUG) super().__init__(device_host, device_port, "SOCKET-UDP", MY_DEVICE_CHANNEL_BUFFER_SIZE, protoc=self.Protocol, gen2nat_cmds=self.GEN2NAT_CMDS, device_sim=DS_Gemini) #print("dict is", self._my_cmds) #print("dict is", self._my_cmds.get('dc_only')) ''' assert self._my_cmds.get('dc_only') == [] assert self._my_cmds.get('get_date') == ['GC', '10/10/19'] assert self._my_cmds.get('get_ra') == None ''' print("dict is", self.get_native_cmd_for_generic('get_date')) assert self.get_native_cmd_for_generic('get_date') == ['GC', '10/10/19'] ''' Initialize my dcc(s), passing them the SAME parameters as I use : - device_host, device_port, - self._my_channel (passed by superclass), MY_DEVICE_CHANNEL_BUFFER_SIZE, - self.Protocol, - self.GEN2NAT_CMDS (subset of my commands related to the dcc), - NO SIMULATOR (because my dccs will use the same as me and I have already launched it), - DEBUG ''' # TODO: utiliser dcc_list # Default list of DCCs (if not given) if not dcc_list: dcc_list = [DC_Mount, DC_MountBis] # @override superclass empty list of dc components self.set_dc_components( [ #DC_Mount(device_host, device_port, self._my_channel, 1024, protoc= self.Protocol, gen2nat_cmds= self.GEN2NAT_CMDS['DC_Mount'], device_sim=None, DEBUG=DEBUG), DC_Mount(device_host, device_port, self._my_channel, MY_DEVICE_CHANNEL_BUFFER_SIZE, protoc=self.Protocol, gen2nat_cmds=self.GEN2NAT_CMDS_MOUNT, device_sim=None), DC_MountBis(device_host, device_port, self._my_channel, MY_DEVICE_CHANNEL_BUFFER_SIZE, protoc=self.Protocol, gen2nat_cmds=self.GEN2NAT_CMDS_MOUNT, device_sim=None), ] ) ''' dcc,native = self.get_dcc_and_native_cmd_for_generic('get_ra') assert native == ['GR', '15:01:49'] ''' print("dict is", self.get_native_cmd_for_generic('get_ra')) assert self.get_native_cmd_for_generic('get_ra') == ['GR', '15:01:49'] #assert self.get_native_cmd_for_generic('get_ra') == 'toto' # @overwrite #def get_ack(self): return self.execute_native_cmd(COMMAND6, 'G') #def do_warm_start(self): return self.execute_native_cmd('bW') #TODO: déplacer dans parent avec nouvelles commandes set_speed_slew, set_speed_average, ... # @override def set_speed(self, speed_rate): native_cmd = None # quick if speed_rate == "slew": native_cmd = 'RS' # average if speed_rate == "average": native_cmd = 'RM' # slow if speed_rate == "center": native_cmd = 'RC' # very slow if speed_rate == "guide": native_cmd = 'RG' if not native_cmd: raise UnknownGenericCmdArgException(__name__, speed_rate) return self.execute_unformated_native_cmd(native_cmd) if __name__ == "__main__": import doctest doctest.testmod() exit() """ # Classic usage: #tele_ctrl = SocketClient_Gemini(HOST, PORT) # More elegant usage, using "with": ##with SocketClientTelescopeGemini("localhost", 11110, DEBUG=False) as tele_ctrl: with TelescopeControllerGemini("localhost", 11110, DEBUG=False) as tele_ctrl: # 0) CONNECT to server (only for TCP, does nothing for UDP) tele_ctrl._connect_to_server() #tele_ctrl.config() # Send some commands to the Telescope #pos = tele_ctrl.get_position() # Do EXPERT mode execution while True: # 1) SEND REQUEST data to server # saisie de la requête au clavier et suppression des espaces des 2 côtés data = input("(EXPERT MODE) REQUEST TO SERVER [ex: ':GD#' (Get Dec), ':GR#' (Get RA)']: ").strip() # test d'arrêt if data == "": break #data_to_send = bytes(data + "\n", "utf-8") tele_ctrl.send_data(data) #mysock.sendto("%s" % data, (HOST, PORT)) #printd("Sent: {}".format(data)) # 2) RECEIVE REPLY data from server data_received = tele_ctrl.receive_data() #reponse, adr = mysock.recvfrom(buf) #printd("Received: {}".format(data_received)) #printd("Useful data received: {}".format(data_useful)) printd('\n') #tele_ctrl.close() """