#!/usr/bin/env python3 """Socket Client Gemini Telescope implementation To be used as a concrete class to system control a Gemini telescope """ # Standard library imports import sys import time # Third party imports # None # Local application imports sys.path.append('../../..') # My parent class and exceptions from device_controller.abstract_component.device_controller import ( printd, DeviceController, Gen2NatCmds, Cmd, Cmd2, UnknownNativeCmdException, UnknownGenericCmdArgException ) # My DC components: from device_controller.abstract_component.filter_selector import DC_FilterSelector from device_controller.abstract_component.detector_sensor import DC_DetectorSensor from device_controller.abstract_component.detector_shutter import DC_DetectorShutter # My simulator from device_controller.concrete_component.sbig.sbig_simulator import DS_SBIG MY_DEVICE_CHANNEL_BUFFER_SIZE = 1024 class DC_SBIG(DeviceController): ''' # Components (list of my capabilities or roles): DeviceControllerDetectorSensor, DeviceControllerFilterSelector, DeviceControllerDetectorShutter): ''' # Gemini communication protocol # @override class Protocol: # Default timeouts TIMEOUT_SEND = 10 TIMEOUT_RECEIVE = 10 # COMMON CONSTANTS WITH SERVER TERMINATOR = '\x00' COMMAND5 = '050000' COMMAND6 = '\x00\x06\x00' COMMAND6_SIMPLE = '6' # STAMP : # Initialize request number (will be increased at each request) request_num = 0 # Request number stands on 4 digits (from 0001 to 9999) request_num_nb_digits = 4 # For 4 digits, we should get the format '{:04d}': request_num_format = '{:0'+str(request_num_nb_digits)+'d}' # Stamp filler with 0 (stamp is 8 digits long) STAMP_LENGTH = 8 STAMP_FILLER = '0' * (STAMP_LENGTH - request_num_nb_digits) # @override @classmethod def formated_cmd(cls, cmd:str, values_to_set:str=None)->str: if values_to_set != None: for value_to_set in values_to_set: cmd += value_to_set if cmd not in (cls.COMMAND6, cls.COMMAND5): cmd += '#' if cmd not in ('bC#','bW#','bR#'): cmd=':'+cmd return cmd # @override #def encapsulate_data_to_send(self, command:str): #def encap_data_to_send(self, command:str): @classmethod def encap(cls, command:str): r''' Encapsulate useful data to be ready for sending If data is "complete" (with stamp, and well formatted), send it as is Otherwise, add stamp and format it 3 types of commands: - TYPE1: '06' or '050000' - TYPE2: ':cde#' (mainly ':??#' - i.e : ':GD#', ':GR#', ...) - TYPE3: 'b?#' (bC# = Cold Start, bW# = selecting Warm Start, bR# = selecting Warm Restart) :Examples: >>> tele = DC_Gemini("localhost", 11110, DEBUG=False) Starting device simulator on (host:port): localhost:11110 >>> tele.encap(':GD#') '00010000:GD#\x00' >>> tele.encap(':GR#') '00020000:GR#\x00' >>> tele.close() # ne marche pas => '00010000:GD#\x00' ''' ####return bytes(self.MYSTAMP + self.STAMP_FILLER + data + "\n", "utf-8") # TYPE1 #printd("command to encapsulate is", repr(command)) if command == cls.COMMAND6_SIMPLE: command = cls.COMMAND6 if command not in (cls.COMMAND6, cls.COMMAND5): # TYPE2 or 3 #has_starting_car = data.find(':') > -1 if len(command) < 3: raise UnknownNativeCmdException(command) if not (command[-1]=='#'): raise UnknownNativeCmdException(command) if not (command[0]==':') and command not in ('bC#','bW#','bR#'): raise UnknownNativeCmdException(command) #stamp,command = data.split(':',1) #if command.find('#')>-1: command,_ = command.split('#',1) cls.request_num += 1 # Format to request_num_nb_digits (4) digits request_num_str = cls.request_num_format.format(cls.request_num) cls.last_stamp = request_num_str + cls.STAMP_FILLER #if command == COMMAND6_SIMPLE: command = COMMAND6_REAL data_encapsulated = cls.last_stamp + command + cls.TERMINATOR #return super().encap(data_encapsulated) #return self._my_channel.encap(data_encapsulated) ##return data_encapsulated return cls.last_stamp, data_encapsulated #return bytes(data + "\n", "utf-8") ####return bytes(self.MYSTAMP + self.STAMP_FILLER + data + "\n", "utf-8") # @override #def uncap_received_data(self, data_received:str)->str: #def uncap(cls, dcc_name:str, data_received:str)->str: @classmethod def uncap(cls, dcc_name:str, stamp:str, data_received:str)->str: #data_received = super().uncap(data_received_bytes) ##data_received = self._my_channel.uncap(data_received_bytes) #>>> tele.uncap(b'001700001\x00') r""" Extract useful data from received raw data >>> tele = DC_Gemini("localhost", 11110, DEBUG=False) Starting device simulator on (host:port): localhost:11110 >>> tele.last_stamp = '00170000' >>> tele.uncap('001700001#') '1' >>> tele.close() """ printd(f"(sbig protoc used from {dcc_name}) data_received_bytes type is", type(data_received)) printd(f"(sbig protoc used from {dcc_name}) data received is", data_received) ##data_received = data_received_bytes.decode() #printd("data_received is", data_received) # Remove STAMP (and \n at the end): #useful_data = data_received.split(self.MY_FULL_STAMP)[1][:-1] printd(f"*** (sbig protoc used from {dcc_name}) Last stamp is ***", cls.last_stamp, ", data received is", data_received) ''' # Normal case: LAST stamp found if cls.last_stamp in data_received: useful_data = data_received.split(cls.last_stamp)[1][:-1] # Bad case: LAST stamp NOT found => try PREVIOUS stamp else: request_num_str = cls.request_num_format.format(cls.request_num-1) temp_last_stamp = request_num_str + cls.STAMP_FILLER if temp_last_stamp in data_received: useful_data = data_received.split(temp_last_stamp)[1][:-1] else: raise Exception("BAD STAMP") ''' if stamp not in data_received: raise Exception("BAD STAMP, this is not the answer to my request, but another request's") else: useful_data = data_received.split(stamp)[1][:-1] # Remove '#' at the end, if exists if useful_data[-1] == '#': useful_data = useful_data[:-1] return useful_data # end of class Protocol #data = " ".join(sys.argv[1:]) #data_to_send = bytes(data + "\n", "utf-8") ''' Commands dictionary NEW foramt is: 'generic-command-name': ['native-command-name', 'awaited_result_if_ok', 'other-awaited-results if not ok...], Old format was: 'CMD-NAME': ['cmd-get', 'awaited_res_ok_for_get', 'cmd-set', 'awaited_res_ok_for_set', 'comment'], ''' # @overwrite #_cmd_device_concrete = { GEN2NAT_CMDS_GENERAL_dict = { # Possible answers: # - B# while the initial startup message is being displayed (new in L4), # - b# while waiting for the selection of the Startup Mode, # - S# during a Cold Start (new in L4), G# after completed startup 'get_ack': [Protocol.COMMAND6, 'G', 'B','b','S'], # General commands for the Gemini controller 'get_date': ['GC', '20/10/19'], 'set_date': ['SC'], 'get_time': ['GL', '20:20:36'], 'set_time': ['SL'], } GEN2NAT_CMDS_GENERAL_obj = Gen2NatCmds([ # Possible answers: # - B# while the initial startup message is being displayed (new in L4), # - b# while waiting for the selection of the Startup Mode, # - S# during a Cold Start (new in L4), G# after completed startup Cmd('get_ack', Protocol.COMMAND6, '', {}, 'G', final_device_responses = { 'G' : 'after completed startup', 'B' : 'while the initial startup message is being displayed (new in L4)', 'b' : 'while waiting for the selection of the Startup Mode', 'S' : 'during a Cold Start (new in L4)', }, ), # General commands for the Gemini controller Cmd('get_date', 'GC', '', {}, '20/10/19'), Cmd('set_date', 'SC'), Cmd('get_time', 'GL', '', {},'20:20:36'), Cmd('set_time', 'SL'), ]) GEN2NAT_CMDS_FILTER_dict = { # GET/SET commands # get_radec and set_radec are already defined in parent abstract class 'get_timezone': ['GG', "+00"], 'set_timezone': ['SG', '1'], # DO commands # defined in abstract class: #'do_init': [], #'do_init': ['do_init'], 'do_init': ['GL'], 'do_goto': ['GC'], 'do_park': ['GC'], } GEN2NAT_CMDS_FILTER_obj = Gen2NatCmds([ # GET/SET commands # get_radec and set_radec are already defined in parent abstract class Cmd('get_timezone', 'GG', '', {}, "+00"), Cmd('set_timezone', 'SG', '', {}, '1'), # DO commands # defined in abstract class: #'do_init': [], #'do_init': ['do_init'], Cmd('do_init','GL'), Cmd('do_goto','GC'), Cmd('do_park','GC'), ]) GEN2NAT_CMDS_FILTER = GEN2NAT_CMDS_FILTER_dict GEN2NAT_CMDS_FILTER = GEN2NAT_CMDS_FILTER_obj GEN2NAT_CMDS_SHUTTER_dict = { 'get_state': ['GC'], 'do_open': ['GC'], 'do_close': ['GC'], 'do_sync': ['GC'], } GEN2NAT_CMDS_SHUTTER_obj = Gen2NatCmds([ Cmd('get_state','GC'), Cmd('do_open','GC'), Cmd('do_close','GC'), Cmd('do_sync','GC'), ]) GEN2NAT_CMDS_SHUTTER = GEN2NAT_CMDS_SHUTTER_dict GEN2NAT_CMDS_SHUTTER = GEN2NAT_CMDS_SHUTTER_obj GEN2NAT_CMDS_SENSOR_dict = { # General commands (for test only because these commands should be in the GENERAL level above) 'get_date': ['GC', '20/10/19'], 'set_date': ['SC'], #'get_time': ['GL', '20:20:36'], #'set_time': ['SL'], 'do_init': ['GC'], 'do_start_acq': ['GC'], 'do_stop_acq': ['GC'], 'do_abort': ['GC'], 'do_shutdown': ['GC'], } GEN2NAT_CMDS_SENSOR_obj = Gen2NatCmds([ # General commands (for test only because these commands should be in the GENERAL level above) Cmd('get_date', 'GC', '', {}, '20/10/19'), Cmd('set_date', 'SC'), #'get_time': ['GL', '20:20:36'], #'set_time': ['SL'], Cmd('do_init','GC'), Cmd('do_start_acq','GC'), Cmd('do_stop_acq','GC'), Cmd('do_abort','GC'), Cmd('do_shutdown','GC'), ]) GEN2NAT_CMDS_SENSOR = GEN2NAT_CMDS_SENSOR_dict GEN2NAT_CMDS_SENSOR = GEN2NAT_CMDS_SENSOR_obj GEN2NAT_CMDS_dict = { # My GENERAL commands **GEN2NAT_CMDS_GENERAL_dict, # SPECIFIC commands for my DCCs 'DC_Filter': GEN2NAT_CMDS_FILTER_dict, 'DC_Shutter': GEN2NAT_CMDS_SHUTTER_dict, 'DC_Sensor': GEN2NAT_CMDS_SENSOR_dict, } GEN2NAT_CMDS_obj = Gen2NatCmds() GEN2NAT_CMDS_obj.add_cmds(GEN2NAT_CMDS_GENERAL_obj) GEN2NAT_CMDS_obj.add_cmds('DC_Filter', GEN2NAT_CMDS_FILTER_obj) GEN2NAT_CMDS_obj.add_cmds('DC_Shutter', GEN2NAT_CMDS_SHUTTER_obj) GEN2NAT_CMDS_obj.add_cmds('DC_Sensor', GEN2NAT_CMDS_SENSOR_obj) GEN2NAT_CMDS = GEN2NAT_CMDS_dict GEN2NAT_CMDS = GEN2NAT_CMDS_obj def __init__(self, device_host:str="localhost", device_port:int=11110): super().__init__(device_host, device_port, "SOCKET-UDP", MY_DEVICE_CHANNEL_BUFFER_SIZE, protoc=self.Protocol, gen2nat_cmds=self.GEN2NAT_CMDS, device_sim=DS_SBIG) printd('*****************************') printd('*****************************') printd('*****************************') #printd(DeviceControllerSBIG.mro()) #for c in DeviceControllerSBIG.mro(): printd(c.__name__) #printd(self.mro()) printd('*****************************') printd('*****************************') printd('*****************************') ''' Initialize my dcc(s), passing them the SAME parameters as I use : - device_host, device_port, - self._my_channel (passed by superclass), MY_DEVICE_CHANNEL_BUFFER_SIZE, - self.Protocol, - self.GEN2NAT_CMDS (subset of my commands related to the dcc), - NO SIMULATOR (because my dccs will use the same as me and I have already launched it), - DEBUG ''' # @override superclass empty list self.set_dc_components( [ #DC_FilterSelector(device_host, device_port, self._my_channel, MY_DEVICE_CHANNEL_BUFFER_SIZE, protoc=self.Protocol, gen2nat_cmds=self.GEN2NAT_CMDS_FILTER, device_sim=None, DEBUG=DEBUG), DC_FilterSelector(device_host, device_port, self._my_channel, MY_DEVICE_CHANNEL_BUFFER_SIZE, protoc=self.Protocol, gen2nat_cmds=self.GEN2NAT_CMDS_FILTER, device_sim=None), DC_DetectorSensor(device_host, device_port, self._my_channel, MY_DEVICE_CHANNEL_BUFFER_SIZE, protoc=self.Protocol, gen2nat_cmds=self.GEN2NAT_CMDS_SENSOR, device_sim=None), DC_DetectorShutter(device_host, device_port, self._my_channel, MY_DEVICE_CHANNEL_BUFFER_SIZE, protoc=self.Protocol, gen2nat_cmds=self.GEN2NAT_CMDS_SHUTTER, device_sim=None), ] ) # @override def set_speed(self, speed_rate): native_cmd = None # quick if speed_rate == "slew": native_cmd = 'RS' # average if speed_rate == "average": native_cmd = 'RM' # slow if speed_rate == "center": native_cmd = 'RC' # very slow if speed_rate == "guide": native_cmd = 'RG' if not native_cmd: raise UnknownGenericCmdArgException(__name__, speed_rate) return self.execute_unformated_native_cmd(native_cmd) if __name__ == "__main__": import doctest doctest.testmod() exit()