#!/usr/bin/env python3 """Socket Client Gemini Telescope implementation To be used as a concrete class to system control a Gemini telescope """ # Standard library imports #import socket #import logging import sys import time # Third party imports # None # Local application imports # device_controller/ sys.path.append('../../..') # src/ sys.path.append('../../../..') from device_controller.abstract_component.base import UnknownCommandException #from device_controller.abstract_component.telescope_controller_abstract import * #TODO: Heritage ou plutot COMPOSITION ? # The SBIG controller has 3 capabilities : filter selector, detector sensor, and detector shutter #from device_controller.abstract_component.base import * from device_controller.abstract_component.filter_selector import DeviceControllerFilterSelector from device_controller.abstract_component.detector_sensor import DeviceControllerDetectorSensor from device_controller.abstract_component.detector_shutter import DeviceControllerDetectorShutter # Default timeouts TIMEOUT_SEND = 10 TIMEOUT_RECEIVE = 10 # COMMON CONSTANTS WITH SERVER TERMINATOR = '\x00' COMMAND5 = '050000' #COMMAND6_SIMPLE = '\x00\x06' COMMAND6 = '\x00\x06\x00' COMMAND6_SIMPLE = '6' ##class SocketClientTelescopeGemini(SocketClientTelescopeAbstract): #class TelescopeControllerGemini(TelescopeControllerAbstract): #class DeviceControllerSBIG(DeviceControllerDetectorSensor, DeviceControllerDetectorShutter, DeviceControllerFilterSelector): class DeviceControllerSBIG(DeviceControllerAbstract): # STAMP : # 00 00 00 00 00 00 00' (8 bytes from '00' to 'FF', 1st one is LOW BYTE) # ex: 01 + STAMP = 01 00 00 00 00 00 00 00' (8 bytes from '00' to 'FF', 1st one is LOW BYTE) #STAMP_FILLER = '00000000000000' #MY_FULL_STAMP = MYSTAMP + STAMP_FILLER # Initialize request number (will be increased at each request) request_num = 0 # Request number stands on 4 digits (from 0001 to 9999) request_num_nb_digits = 4 # For 4 digits, we should get the format '{:04d}': request_num_format = '{:0'+str(request_num_nb_digits)+'d}' # Stamp filler with 0 STAMP_FILLER = '0' * (8 - request_num_nb_digits) #data = " ".join(sys.argv[1:]) #data_to_send = bytes(data + "\n", "utf-8") ''' Commands dictionary NEW foramt is: 'generic-command-name': ['native-command-name', 'awaited_result_if_ok', 'other-awaited-results if not ok...], Old format was: 'CMD-NAME': ['cmd-get', 'awaited_res_ok_for_get', 'cmd-set', 'awaited_res_ok_for_set', 'comment'], ''' # @overwrite _cmd_device_concrete = { # GET @ SET commands 'get_ack': [COMMAND6, 'G', 'B','b','S'], # B# while the initial startup message is being displayed (new in L4), b# while waiting for the selection of the Startup Mode, S# during a Cold Start (new in L4), G# after completed startup. # get_radec and set_radec are already defined in abstract class 'get_timezone': ['GG'], 'set_timezone': ['SG', '1'], 'get_date': ['GC'], 'set_date': ['SC'], 'get_time': ['GL'], 'set_time': ['SL'], # DO commands # defined in abstract class: #'do_init': ['do_init'], 'do_park': ['hP'], 'do_warm_start': ['bW'], 'do_stop': ['Q'], } # @overwrite # Gemini is using UDP def __init__(self, device_host:str="localhost", device_port:int=11110, DEBUG=False): super().__init__(device_host, device_port, "SOCKET-UDP", 1024, DEBUG) myDC_detector_sensor = DeviceControllerDetectorSensor(device_host, device_port, "SOCKET-UDP", 1024, DEBUG) myDC_detector_shutter = DeviceControllerDetectorShutter(device_host, device_port, "SOCKET-UDP", 1024, DEBUG) myDC_filter_selector = DeviceControllerFilterSelector(device_host, device_port, "SOCKET-UDP", 1024, DEBUG) self._my_device_component_types.append(myDC_detector_sensor) self._my_device_component_types.append(myDC_detector_shutter) self._my_device_component_types.append(myDC_filter_selector) # @overwrite def formated_cmd(self, cmd:str, values_to_set:str=None)->str: if values_to_set != None: for value_to_set in values_to_set: cmd += value_to_set if cmd not in (COMMAND6, COMMAND5): cmd += '#' if cmd not in ('bC#','bW#','bR#'): cmd=':'+cmd return cmd #@override def encapsulate_data_to_send(self, command:str): r''' Encapsulate useful data to be ready for sending If data is "complete" (with stamp, and well formatted), send it as is Otherwise, add stamp and format it 3 types of commands: - TYPE1: '06' or '050000' - TYPE2: ':cde#' (mainly ':??#' - i.e : ':GD#', ':GR#', ...) - TYPE3: 'b?#' (bC# = Cold Start, bW# = selecting Warm Start, bR# = selecting Warm Restart) :Examples: >>> tele = TelescopeControllerGemini("localhost", 11110, DEBUG=False) >>> tele.encapsulate_data_to_send(':GD#') '00010000:GD#\x00' >>> tele.encapsulate_data_to_send(':GR#') '00020000:GR#\x00' >>> tele.close() # ne marche pas => '00010000:GD#\x00' ''' ####return bytes(self.MYSTAMP + self.STAMP_FILLER + data + "\n", "utf-8") # TYPE1 #print("command to encapsulate is", repr(command)) if command == COMMAND6_SIMPLE: command = COMMAND6 if command not in (COMMAND6, COMMAND5): # TYPE2 or 3 #has_starting_car = data.find(':') > -1 if len(command) < 3: raise UnknownCommandException() if not (command[-1]=='#'): raise UnknownCommandException() if not (command[0]==':') and command not in ('bC#','bW#','bR#'): raise UnknownCommandException() #stamp,command = data.split(':',1) #if command.find('#')>-1: command,_ = command.split('#',1) self.request_num += 1 # Format to request_num_nb_digits (4) digits request_num_str = self.request_num_format.format(self.request_num) self.last_stamp = request_num_str + self.STAMP_FILLER #if command == COMMAND6_SIMPLE: command = COMMAND6_REAL data_encapsulated = self.last_stamp + command + TERMINATOR #return super().encapsulate_data_to_send(data_encapsulated) #return self.my_channel.encapsulate_data_to_send(data_encapsulated) return data_encapsulated #return bytes(data + "\n", "utf-8") ####return bytes(self.MYSTAMP + self.STAMP_FILLER + data + "\n", "utf-8") # @overwrite def uncap_received_data(self, data_received:str)->str: #data_received = super().uncap_received_data(data_received_bytes) ##data_received = self.my_channel.uncap_received_data(data_received_bytes) #>>> tele.uncap_received_data(b'001700001\x00') r""" Extract useful data from received raw data >>> tele = TelescopeControllerGemini("localhost", 11110, DEBUG=False) >>> tele.last_stamp = '00170000' >>> tele.uncap_received_data('001700001#') '1' >>> tele.close() """ print("data_received_bytes type is", type(data_received)) print("data received is", data_received) ##data_received = data_received_bytes.decode() #print("data_received is", data_received) # Remove STAMP (and \n at the end): #useful_data = data_received.split(self.MY_FULL_STAMP)[1][:-1] useful_data = data_received.split(self.last_stamp)[1][:-1] # Remove '#' at the end, if exists if useful_data[-1] == '#': useful_data = useful_data[:-1] return useful_data # @overwrite #def get_ack(self): return self.execute_native_cmd(COMMAND6, 'G') #def do_warm_start(self): return self.execute_native_cmd('bW') #TODO: déplacer dans parent avec nouvelles commandes set_speed_slew, set_speed_average, ... # @override def set_speed(self, speed_rate): native_cmd = None # quick if speed_rate == "slew": native_cmd = 'RS' # average if speed_rate == "average": native_cmd = 'RM' # slow if speed_rate == "center": native_cmd = 'RC' # very slow if speed_rate == "guide": native_cmd = 'RG' if not native_cmd: raise UnknownCommandException(speed_rate) return self.execute_unformated_native_cmd(native_cmd) if __name__ == "__main__": import doctest doctest.testmod() exit() """ # Classic usage: #tele_ctrl = SocketClient_Gemini(HOST, PORT) # More elegant usage, using "with": ##with SocketClientTelescopeGemini("localhost", 11110, DEBUG=False) as tele_ctrl: with TelescopeControllerGemini("localhost", 11110, DEBUG=False) as tele_ctrl: # 0) CONNECT to server (only for TCP, does nothing for UDP) tele_ctrl._connect_to_server() #tele_ctrl.config() # Send some commands to the Telescope #pos = tele_ctrl.get_position() # Do EXPERT mode execution while True: # 1) SEND REQUEST data to server # saisie de la requête au clavier et suppression des espaces des 2 côtés data = input("(EXPERT MODE) REQUEST TO SERVER [ex: ':GD#' (Get Dec), ':GR#' (Get RA)']: ").strip() # test d'arrêt if data == "": break #data_to_send = bytes(data + "\n", "utf-8") tele_ctrl.send_data(data) #mysock.sendto("%s" % data, (HOST, PORT)) #print("Sent: {}".format(data)) # 2) RECEIVE REPLY data from server data_received = tele_ctrl.receive_data() #reponse, adr = mysock.recvfrom(buf) #print("Received: {}".format(data_received)) #print("Useful data received: {}".format(data_useful)) print('\n') #tele_ctrl.close() """