#!/usr/bin/env python3 """Socket Client Gemini Telescope implementation To be used as a concrete class to system control a Gemini telescope """ # Standard library imports #import socket #import logging import sys import time # Third party imports # None # Local application imports sys.path.append('../..') #from src.client.socket_client_telescope_abstract import Position, UnknownCommandException, TimeoutException, SocketClientTelescopeAbstract from src_device.client.plc_controller_abstract import * # Default timeouts TIMEOUT_SEND = 10 TIMEOUT_RECEIVE = 10 # COMMON CONSTANTS WITH SERVER TERMINATOR = '\x00' COMMAND5 = '050000' #COMMAND6_SIMPLE = '\x00\x06' COMMAND6 = '\x00\x06\x00' COMMAND6_SIMPLE = '6' class DeviceControllerPLCAK(DeviceControllerPLC): # STAMP : # 00 00 00 00 00 00 00' (8 bytes from '00' to 'FF', 1st one is LOW BYTE) # ex: 01 + STAMP = 01 00 00 00 00 00 00 00' (8 bytes from '00' to 'FF', 1st one is LOW BYTE) #STAMP_FILLER = '00000000000000' #MY_FULL_STAMP = MYSTAMP + STAMP_FILLER # Initialize request number (will be increased at each request) request_num = 0 # Request number stands on 4 digits (from 0001 to 9999) request_num_nb_digits = 4 # For 4 digits, we should get the format '{:04d}': request_num_format = '{:0'+str(request_num_nb_digits)+'d}' # Stamp filler with 0 STAMP_FILLER = '0' * (8 - request_num_nb_digits) #data = " ".join(sys.argv[1:]) #data_to_send = bytes(data + "\n", "utf-8") ''' Commands dictionary NEW foramt is: 'generic-command-name': ['native-command-name', 'awaited_result_if_ok', 'other-awaited-results if not ok...], Old format was: 'CMD-NAME': ['cmd-get', 'awaited_res_ok_for_get', 'cmd-set', 'awaited_res_ok_for_set', 'comment'], ''' # @overwrite _cmd_native = { # GET @ SET commands 'get_ack': [COMMAND6, 'G', 'B','b','S'], # B# while the initial startup message is being displayed (new in L4), b# while waiting for the selection of the Startup Mode, S# during a Cold Start (new in L4), G# after completed startup. # RA-DEC (p109-110) #:Sr:.# or :Sr::# #Sets the object's Right Ascension and the object status to "Not Selected". The :Sd# command has to follow to complete the selection. The subsequent use of the :ON...# command is recommended #:Sd{+-}
{*°}# or :Sd{+- }
{*°:}: #Sets the object's declination. It is important that the :Sr# command has been send prior. Internal calculations are done that may take up to 0.5 seconds. If the coordinate selection is valid the object status is set to "Selected" #'RADEC': [('GR','GD'), ''], #'ra': ['GR', None, 'Sr', None, 'commentaire'], 'get_ra': ['GR'], 'set_ra': ['Sr'], 'get_dec': ['GD'], 'set_dec': ['Sd'], # get_radec and set_radec are already defined in abstract class 'get_timezone': ['GG'], 'set_timezone': ['SG', '1'], # ALT-AZ (p?) "get_alt": ['GA'], "get_az": ['GZ'], #"ALT-AZ": [('GA','GZ'), ''], # LONG-LAT "get_long": ['Gg'], "set_long": ['Sg'], "get_lat": ['Gt'], "set_lat": ['St'], #"LONGLAT": [('Gg','Gt'), ''], "get_hangle": ['GH'], 'get_vel': ['Gv'], #"get_maxvel": ['Gv'], 'get_date': ['GC'], 'set_date': ['SC'], 'get_time': ['GL'], 'set_time': ['SL'], # DO commands # defined in abstract class: #'do_init': ['do_init'], 'do_park': ['hP'], 'do_warm_start': ['bW'], 'do_prec_refr': ['p3'], 'do_move': ['MS', '0', '1Object below horizon.', '2No object selected.', '3Manual Control.', '4Position unreachable.', '5Not aligned.', '6Outside Limits.' ], 'do_movenorth': ['Mn'], 'do_movesouth': ['Ms'], 'do_movewest': ['Mw'], 'do_moveeast': ['Me'], 'do_stop': ['Q'], } # @overwrite # Gemini is using UDP def __init__(self, server_host:str="localhost", server_port:int=11110, DEBUG=False): super().__init__(server_host, server_port, "TCP", 1024, DEBUG) # @overwrite def formated_cmd(self, cmd:str, values_to_set:str=None)->str: if values_to_set != None: for value_to_set in values_to_set: cmd += value_to_set if cmd not in (COMMAND6, COMMAND5): cmd += '#' if cmd not in ('bC#','bW#','bR#'): cmd=':'+cmd return cmd # @overwrite def encapsulate_data_to_send(self, command:str): r''' Encapsulate useful data to be ready for sending If data is "complete" (with stamp, and well formatted), send it as is Otherwise, add stamp and format it 3 types of commands: - TYPE1: '06' or '050000' - TYPE2: ':cde#' (mainly ':??#' - i.e : ':GD#', ':GR#', ...) - TYPE3: 'b?#' (bC# = Cold Start, bW# = selecting Warm Start, bR# = selecting Warm Restart) :Examples: >>> tele = SocketClientTelescopeGemini("localhost", 11110, DEBUG=False) >>> tele.encapsulate_data_to_send(':GD#') '00010000:GD#\x00' >>> tele.encapsulate_data_to_send(':GR#') '00020000:GR#\x00' >>> tele.close() # ne marche pas => '00010000:GD#\x00' ''' ####return bytes(self.MYSTAMP + self.STAMP_FILLER + data + "\n", "utf-8") # TYPE1 #print("command to encapsulate is", repr(command)) if command == COMMAND6_SIMPLE: command = COMMAND6 if command not in (COMMAND6, COMMAND5): # TYPE2 or 3 #has_starting_car = data.find(':') > -1 if len(command) < 3: raise UnknownCommandException() if not (command[-1]=='#'): raise UnknownCommandException() if not (command[0]==':') and command not in ('bC#','bW#','bR#'): raise UnknownCommandException() #stamp,command = data.split(':',1) #if command.find('#')>-1: command,_ = command.split('#',1) self.request_num += 1 # Format to request_num_nb_digits (4) digits request_num_str = self.request_num_format.format(self.request_num) self.last_stamp = request_num_str + self.STAMP_FILLER #if command == COMMAND6_SIMPLE: command = COMMAND6_REAL return self.last_stamp + command + TERMINATOR #return bytes(data + "\n", "utf-8") ####return bytes(self.MYSTAMP + self.STAMP_FILLER + data + "\n", "utf-8") # @overwrite def uncap_received_data(self, data_received_bytes:bytes)->str: r""" Extract useful data from received raw data >>> tele = SocketClientTelescopeGemini("localhost", 11110, DEBUG=False) >>> tele.last_stamp = '00170000' >>> tele.uncap_received_data(b'001700001\x00') '1' >>> tele.close() """ #print("data_received_bytes type is", type(data_received_bytes)) #print("data received is", data_received_bytes) #TODO: resoudre ce pb plus proprement (utiliser unicode au lieu de utf-8 codec ??? # BUGFIX: b'\xdf' (should correspond to "°" symbol) generates a UnicodeDecodeError, # so, replace it by ':' (b'\x3A') if b'\xdf' in data_received_bytes: data_received_bytes = data_received_bytes.replace(b'\xdf', b'\x3A') data_received = data_received_bytes.decode() #print("data_received is", data_received) # Remove STAMP (and \n at the end): #useful_data = data_received.split(self.MY_FULL_STAMP)[1][:-1] useful_data = data_received.split(self.last_stamp)[1][:-1] # Remove '#' at the end, if exists if useful_data[-1] == '#': useful_data = useful_data[:-1] return useful_data if __name__ == "__main__": import doctest doctest.testmod() exit() # Classic usage: #plc_client = SocketClient_Gemini(HOST, PORT) # More elegant usage, using "with": with DeviceControllerPLCAK("localhost", 11110, DEBUG=False) as plc_client: # 0) CONNECT to server (only for TCP, does nothing for UDP) plc_client._connect_to_server() #plc_client.config() # Send some commands to the Telescope #pos = plc_client.get_position() # Do EXPERT mode execution while True: # 1) SEND REQUEST data to server # saisie de la requête au clavier et suppression des espaces des 2 côtés data = input("(EXPERT MODE) REQUEST TO SERVER [ex: ':GD#' (Get Dec), ':GR#' (Get RA)']: ").strip() # test d'arrêt if data == "": break #data_to_send = bytes(data + "\n", "utf-8") plc_client.send_data(data) #mysock.sendto("%s" % data, (HOST, PORT)) #print("Sent: {}".format(data)) # 2) RECEIVE REPLY data from server data_received = plc_client.receive_data() #reponse, adr = mysock.recvfrom(buf) #print("Received: {}".format(data_received)) #print("Useful data received: {}".format(data_useful)) print('\n') #plc_client.close()