#!/usr/bin/env python3 """Socket Client Gemini Telescope implementation To be used as a concrete class to system control a Gemini telescope """ # Standard library imports #import socket #import logging import sys import time # Third party imports # None # Local application imports sys.path.append('../../..') # My parent class and exceptions from device_controller.abstract_component.device_controller import DeviceController, UnknownNativeCmdException, UnknownGenericCmdArgException # My DC components: from device_controller.abstract_component.filter_selector import DC_FilterSelector from device_controller.abstract_component.detector_sensor import DC_DetectorSensor from device_controller.abstract_component.detector_shutter import DC_DetectorShutter ''' from device_controller.abstract_component.detector_sensor import DeviceControllerDetectorSensor from device_controller.abstract_component.filter_selector import DeviceControllerFilterSelector from device_controller.abstract_component.detector_shutter import DeviceControllerDetectorShutter from device_controller.concrete_component.sbig.sbig_filter_selector_controller import DeviceControllerFilterSelectorSBIG from device_controller.concrete_component.sbig.sbig_detector_sensor_controller import DeviceControllerDetectorSensorSBIG from device_controller.concrete_component.sbig.sbig_detector_shutter_controller import DeviceControllerDetectorShutterSBIG ''' #from src.client.socket_client_telescope_abstract import Position, UnknownCommandException, TimeoutException, SocketClientTelescopeAbstract ##from src_socket.client.socket_client_telescope_abstract import * #from device_controller.abstract_component.telescope_controller_abstract import * #from device_controller.abstract_component.telescope import * from device_controller.concrete_component.sbig.sbig_simulator import DS_SBIG # Default timeouts TIMEOUT_SEND = 10 TIMEOUT_RECEIVE = 10 MY_DEVICE_CHANNEL_BUFFER_SIZE = 1024 ##class SocketClientTelescopeGemini(SocketClientTelescopeAbstract): #class DeviceControllerSBIG(DeviceController): #class DeviceControllerSBIG(DeviceController): class DC_SBIG(DeviceController): ''' # Components (list of my capabilities or roles): DeviceControllerDetectorSensor, DeviceControllerFilterSelector, DeviceControllerDetectorShutter): ''' # Gemini communication protocol # @override class Protocol: # Default timeouts TIMEOUT_SEND = 10 TIMEOUT_RECEIVE = 10 # COMMON CONSTANTS WITH SERVER TERMINATOR = '\x00' COMMAND5 = '050000' #COMMAND6_SIMPLE = '\x00\x06' COMMAND6 = '\x00\x06\x00' COMMAND6_SIMPLE = '6' # STAMP : # 00 00 00 00 00 00 00' (8 bytes from '00' to 'FF', 1st one is LOW BYTE) # ex: 01 + STAMP = 01 00 00 00 00 00 00 00' (8 bytes from '00' to 'FF', 1st one is LOW BYTE) #STAMP_FILLER = '00000000000000' #MY_FULL_STAMP = MYSTAMP + STAMP_FILLER # Initialize request number (will be increased at each request) request_num = 0 # Request number stands on 4 digits (from 0001 to 9999) request_num_nb_digits = 4 # For 4 digits, we should get the format '{:04d}': request_num_format = '{:0'+str(request_num_nb_digits)+'d}' # Stamp filler with 0 (stamp is 8 digits long) STAMP_LENGTH = 8 STAMP_FILLER = '0' * (STAMP_LENGTH - request_num_nb_digits) # @override @classmethod def formated_cmd(cls, cmd:str, values_to_set:str=None)->str: if values_to_set != None: for value_to_set in values_to_set: cmd += value_to_set if cmd not in (cls.COMMAND6, cls.COMMAND5): cmd += '#' if cmd not in ('bC#','bW#','bR#'): cmd=':'+cmd return cmd # @override #def encapsulate_data_to_send(self, command:str): #def encap_data_to_send(self, command:str): @classmethod def encap(cls, command:str): r''' Encapsulate useful data to be ready for sending If data is "complete" (with stamp, and well formatted), send it as is Otherwise, add stamp and format it 3 types of commands: - TYPE1: '06' or '050000' - TYPE2: ':cde#' (mainly ':??#' - i.e : ':GD#', ':GR#', ...) - TYPE3: 'b?#' (bC# = Cold Start, bW# = selecting Warm Start, bR# = selecting Warm Restart) :Examples: >>> tele = DC_Gemini("localhost", 11110, DEBUG=False) Starting device simulator on (host:port): localhost:11110 >>> tele.encap(':GD#') '00010000:GD#\x00' >>> tele.encap(':GR#') '00020000:GR#\x00' >>> tele.close() # ne marche pas => '00010000:GD#\x00' ''' ####return bytes(self.MYSTAMP + self.STAMP_FILLER + data + "\n", "utf-8") # TYPE1 #print("command to encapsulate is", repr(command)) if command == cls.COMMAND6_SIMPLE: command = cls.COMMAND6 if command not in (cls.COMMAND6, cls.COMMAND5): # TYPE2 or 3 #has_starting_car = data.find(':') > -1 if len(command) < 3: raise UnknownNativeCmdException(command) if not (command[-1]=='#'): raise UnknownNativeCmdException(command) if not (command[0]==':') and command not in ('bC#','bW#','bR#'): raise UnknownNativeCmdException(command) #stamp,command = data.split(':',1) #if command.find('#')>-1: command,_ = command.split('#',1) cls.request_num += 1 # Format to request_num_nb_digits (4) digits request_num_str = cls.request_num_format.format(cls.request_num) cls.last_stamp = request_num_str + cls.STAMP_FILLER #if command == COMMAND6_SIMPLE: command = COMMAND6_REAL data_encapsulated = cls.last_stamp + command + cls.TERMINATOR #return super().encap(data_encapsulated) #return self._my_channel.encap(data_encapsulated) ##return data_encapsulated return cls.last_stamp, data_encapsulated #return bytes(data + "\n", "utf-8") ####return bytes(self.MYSTAMP + self.STAMP_FILLER + data + "\n", "utf-8") # @override #def uncap_received_data(self, data_received:str)->str: @classmethod #def uncap(cls, dcc_name:str, data_received:str)->str: def uncap(cls, dcc_name:str, stamp:str, data_received:str)->str: #data_received = super().uncap(data_received_bytes) ##data_received = self._my_channel.uncap(data_received_bytes) #>>> tele.uncap(b'001700001\x00') r""" Extract useful data from received raw data >>> tele = DC_Gemini("localhost", 11110, DEBUG=False) Starting device simulator on (host:port): localhost:11110 >>> tele.last_stamp = '00170000' >>> tele.uncap('001700001#') '1' >>> tele.close() """ print(f"(sbig protoc used from {dcc_name}) data_received_bytes type is", type(data_received)) print(f"(sbig protoc used from {dcc_name}) data received is", data_received) ##data_received = data_received_bytes.decode() #print("data_received is", data_received) # Remove STAMP (and \n at the end): #useful_data = data_received.split(self.MY_FULL_STAMP)[1][:-1] print(f"*** (sbig protoc used from {dcc_name}) Last stamp is ***", cls.last_stamp, ", data received is", data_received) ''' # Normal case: LAST stamp found if cls.last_stamp in data_received: useful_data = data_received.split(cls.last_stamp)[1][:-1] # Bad case: LAST stamp NOT found => try PREVIOUS stamp else: request_num_str = cls.request_num_format.format(cls.request_num-1) temp_last_stamp = request_num_str + cls.STAMP_FILLER if temp_last_stamp in data_received: useful_data = data_received.split(temp_last_stamp)[1][:-1] else: raise Exception("BAD STAMP") ''' if stamp not in data_received: raise Exception("BAD STAMP, this is not the answer to my request, but another request's") else: useful_data = data_received.split(stamp)[1][:-1] # Remove '#' at the end, if exists if useful_data[-1] == '#': useful_data = useful_data[:-1] return useful_data # end of class Protocol #data = " ".join(sys.argv[1:]) #data_to_send = bytes(data + "\n", "utf-8") ''' Commands dictionary NEW foramt is: 'generic-command-name': ['native-command-name', 'awaited_result_if_ok', 'other-awaited-results if not ok...], Old format was: 'CMD-NAME': ['cmd-get', 'awaited_res_ok_for_get', 'cmd-set', 'awaited_res_ok_for_set', 'comment'], ''' # @overwrite #_cmd_device_concrete = { MY_GEN2NAT_CMDS_FILTER = { # GET/SET commands # get_radec and set_radec are already defined in parent abstract class 'get_timezone': ['GG'], 'set_timezone': ['SG', '1'], # DO commands # defined in abstract class: #'do_init': ['do_init'], 'do_goto': ['GC'], 'do_park': ['GC'], } MY_GEN2NAT_CMDS_SHUTTER = { 'get_state': ['GC'], 'do_open': ['GC'], 'do_close': ['GC'], 'do_sync': ['GC'], } MY_GEN2NAT_CMDS_SENSOR = { 'do_init': ['GC'], 'do_start_acq': ['GC'], 'do_stop_acq': ['GC'], 'do_abort': ['GC'], 'do_shutdown': ['GC'], } MY_GEN2NAT_CMDS = { # Possible answers: # - B# while the initial startup message is being displayed (new in L4), # - b# while waiting for the selection of the Startup Mode, # - S# during a Cold Start (new in L4), G# after completed startup 'get_ack': [Protocol.COMMAND6, 'G', 'B','b','S'], # General commands for the Gemini controller 'get_date': ['GC', 'bidon'], 'set_date': ['SC'], 'get_time': ['GL'], 'set_time': ['SL'], # Commands for my dc components 'DC_Filter' : MY_GEN2NAT_CMDS_FILTER, 'DC_Shutter' : MY_GEN2NAT_CMDS_SHUTTER, 'DC_Sensor' : MY_GEN2NAT_CMDS_SENSOR, } # @overwrite # Gemini is using UDP def __init__(self, device_host:str="localhost", device_port:int=11110, DEBUG=False): #super().__init__(device_host, device_port, "SOCKET-UDP", 1024, DEBUG) ##DeviceController.__init__(self, device_host, device_port, "SOCKET-UDP", 1024, DEBUG, DeviceSimulatorSBIG) ##super().__init__(device_host, device_port, "SOCKET-UDP", 1024, DEBUG, DeviceSimulatorSBIG) super().__init__(device_host, device_port, "SOCKET-UDP", MY_DEVICE_CHANNEL_BUFFER_SIZE, protoc=self.Protocol, gen2nat_cmds=self.MY_GEN2NAT_CMDS, device_sim=DS_SBIG, DEBUG=DEBUG) print('*****************************') print('*****************************') print('*****************************') #print(DeviceControllerSBIG.mro()) #for c in DeviceControllerSBIG.mro(): print(c.__name__) #print(self.mro()) print('*****************************') print('*****************************') print('*****************************') ''' Initialize my dcc(s), passing them the SAME parameters as I use : - device_host, device_port, - self._my_channel (passed by superclass), MY_DEVICE_CHANNEL_BUFFER_SIZE, - self.Protocol, - self.MY_GEN2NAT_CMDS (subset of my commands related to the dcc), - NO SIMULATOR (because my dccs will use the same as me and I have already launched it), - DEBUG ''' # @override superclass empty list self.set_dc_components( [ DC_FilterSelector(device_host, device_port, self._my_channel, MY_DEVICE_CHANNEL_BUFFER_SIZE, protoc=self.Protocol, gen2nat_cmds=self.MY_GEN2NAT_CMDS_FILTER, device_sim=None, DEBUG=DEBUG), DC_DetectorSensor(device_host, device_port, self._my_channel, MY_DEVICE_CHANNEL_BUFFER_SIZE, protoc=self.Protocol, gen2nat_cmds=self.MY_GEN2NAT_CMDS_SENSOR, device_sim=None, DEBUG=DEBUG), DC_DetectorShutter(device_host, device_port, self._my_channel, MY_DEVICE_CHANNEL_BUFFER_SIZE, protoc=self.Protocol, gen2nat_cmds=self.MY_GEN2NAT_CMDS_SHUTTER, device_sim=None, DEBUG=DEBUG), ] ) # @overwrite #def get_ack(self): return self.execute_native_cmd(COMMAND6, 'G') #def do_warm_start(self): return self.execute_native_cmd('bW') #TODO: déplacer dans parent avec nouvelles commandes set_speed_slew, set_speed_average, ... # @override def set_speed(self, speed_rate): native_cmd = None # quick if speed_rate == "slew": native_cmd = 'RS' # average if speed_rate == "average": native_cmd = 'RM' # slow if speed_rate == "center": native_cmd = 'RC' # very slow if speed_rate == "guide": native_cmd = 'RG' if not native_cmd: raise UnknownGenericCmdArgException(__name__, speed_rate) return self.execute_unformated_native_cmd(native_cmd) if __name__ == "__main__": import doctest doctest.testmod() exit() """ # Classic usage: #tele_ctrl = SocketClient_Gemini(HOST, PORT) # More elegant usage, using "with": ##with SocketClientTelescopeGemini("localhost", 11110, DEBUG=False) as tele_ctrl: with TelescopeControllerGemini("localhost", 11110, DEBUG=False) as tele_ctrl: # 0) CONNECT to server (only for TCP, does nothing for UDP) tele_ctrl._connect_to_server() #tele_ctrl.config() # Send some commands to the Telescope #pos = tele_ctrl.get_position() # Do EXPERT mode execution while True: # 1) SEND REQUEST data to server # saisie de la requête au clavier et suppression des espaces des 2 côtés data = input("(EXPERT MODE) REQUEST TO SERVER [ex: ':GD#' (Get Dec), ':GR#' (Get RA)']: ").strip() # test d'arrêt if data == "": break #data_to_send = bytes(data + "\n", "utf-8") tele_ctrl.send_data(data) #mysock.sendto("%s" % data, (HOST, PORT)) #print("Sent: {}".format(data)) # 2) RECEIVE REPLY data from server data_received = tele_ctrl.receive_data() #reponse, adr = mysock.recvfrom(buf) #print("Received: {}".format(data_received)) #print("Useful data received: {}".format(data_useful)) print('\n') #tele_ctrl.close() """