#!/usr/bin/env python3 """Socket Client Telescope (abstract) implementation To be used as a base class (interface) for any concrete socket client telescope class """ # Standard library imports #from enum import Enum import functools import logging import socket import sys import threading import time # Third party imports # from sockets_tele/ #sys.path.append("..") # from src_socket/client/ sys.path.append("../../..") # sys.path.append("../../../..") #import src.core.pyros_django.utils.celme as celme import src.core.celme as celme sys.path.append("../..") from device_controller.logs import * # Local application imports #sys.path.append('../..') #from src.client.socket_client_abstract import UnknownCommandException, SocketClientAbstract ##from src_socket.client.socket_client_abstract import * ##from src_device.client.client_channel import * from device_controller.channels.client_channel_socket import ClientChannelSocket from device_controller.channels.client_channel_serial import ClientChannelSerial from device_controller.channels.client_channel_usb import ClientChannelUSB # Execute also "set" and "do" commands GET_ONLY=False # Execute only "get" commands #GET_ONLY=True # Default timeouts TIMEOUT_SEND = 10 TIMEOUT_RECEIVE = 10 ''' class c(Enum): # GET, SET DEC = 'DEC' RA = 'RA' RA_DEC = 'RA_DEC' # DO PARK = 'PARK' WARM_START = 'WARM_START' ''' # DECORATOR def generic_cmd(func): #def wrapper_generic_cmd(*args, **kwargs): @functools.wraps(func) def wrapper_generic_cmd(self, values_to_set=None): #print("func name is", func.__name__) return self.execute_generic_cmd(func.__name__, values_to_set) return wrapper_generic_cmd class DeviceCommand: full_name = None name = None args = None # Device component type devtype = None def __init__(self, cmd_full_name:str, dev_comp_type:str=None, cmd_args:str=None): self.full_name = cmd_full_name self.name = cmd_full_name self.devtype = dev_comp_type self.args = cmd_args if self.is_generic(): dev_comp_type,cmd_name,cmd_args = self.get_full_name_parts() self.name = cmd_name if dev_comp_type: self.devtype = dev_comp_type if cmd_args: self.args = cmd_args def __str__(self): return (f"Commmand '{self.full_name}'") def is_generic(self): ''' cmd_name = self.full_name if '.' in self.full_name: cmd_name = self.full_name.split('.')[1] ''' cmd_name = self.full_name[self.full_name.find('.')+1:] return cmd_name.startswith('do_') or cmd_name.startswith('get_') or cmd_name.startswith('set_') ''' @classmethod def is_generic_cmd_name(cls, cmd_name:str): if '.' in cmd_name: cmd_name = cmd_name.split('.')[1] return cmd_name.startswith('do_') or cmd_name.startswith('get_') or cmd_name.startswith('set_') def is_generic(self): #return type(self).is_generic_cmd_name(self.full_name) return self.is_generic_cmd_name(self.full_name) #return DeviceCommand.is_generic_cmd_name(self.full_name) #return self.name.startswith('do_') or self.name.startswith('get_') or self.name.startswith('set_') ''' @property def name_and_args(self): cmd_name_and_args = self.full_name if '.' in cmd_name_and_args: cmd_name_and_args = cmd_name_and_args.split('.')[1] return cmd_name_and_args def get_full_name_parts(self): cmd_name = self.full_name devtype = None cmd_args = None if '.' in cmd_name: devtype, cmd_name = cmd_name.split('.') if ' ' in cmd_name: cmd_name, *cmd_args = cmd_name.split(' ') return devtype, cmd_name, cmd_args class GenericResult: ''' Usage: res = execute(command) print("result is", res) if res.ko: raise UnexpectedReturnCode() if res.ok: ... ''' # By default, bad result ok = True ko = False def __init__(self, native_result:str, ok=True): self.txt = native_result self.ok = ok self.ko = not ok def __str__(self): return self.txt ''' def __repr__(self): return self.txt def __get__(self, instance, owner): return self.b def __set__(self, instance, value): self.b = value ''' class UnexpectedCommandReturnCode(Exception): pass class TimeoutException(Exception): pass class UnknownCommandException(Exception): pass ''' def __init__(self,*args,**kwargs): super().__init__(self,*args,**kwargs) ''' #TODO: remove ClientChannelAbstract, and set instead a ClientChannel #class DeviceControllerAbstract(SocketClientAbstract): ##class DeviceControllerAbstract(ClientChannel): class DeviceControllerAbstract(): _device_simulator = None _thread_device_simulator = None _device_host = "localhost" _device_port = None # Ancienne Solution à base de COMPOSITION : # List of device controller (dc) components (by default, None) #_my_dc_components = [] # ClientChannel used by the device controller (to be set during __init__ via set_client_channel()) my_channel = None # @abstract (to be overriden) _cmd_device_concrete = {} _cmd_device_abstract = {} _cmd = { # GET-SET commands: 'get_timezone': [], 'set_timezone': [], 'get_date': [], 'set_date': [], 'get_time': [], 'set_time': [], # DO commands: 'do_init': ['do_init'], 'do_park': [], } ##def __init__(self, device_host:str="localhost", device_port:int=11110, PROTOCOL:str="TCP", buffer_size=1024, DEBUG=False): def __init__(self, device_host:str="localhost", device_port:int=11110, PROTOCOL:str="SOCKET-TCP", buffer_size=1024, DEBUG=False, device_sim=None): ''' :param device_host: server IP or hostname :param device_port: server port :param PROTOCOL: "SOCKET-TCP", "SOCKET-UDP", "SERIAL", or "USB" ''' print("IN DeviceControllerAbstract") self._device_host = device_host self._device_port = device_port ##super().__init__(device_host, device_port, PROTOCOL, buffer_size, DEBUG) set_logger(DEBUG) log_d("Logger configured") # If LOCALHOST, launch the device SIMULATOR if device_host=="localhost": self._device_simulator = device_sim print("SIMU IS", device_sim, self._device_simulator) self._thread_device_simulator = threading.Thread(target=self.device_simulator_run) self._thread_device_simulator.start() if PROTOCOL.startswith("SOCKET"): self.my_channel:ClientChannel = ClientChannelSocket(device_host, device_port, PROTOCOL, buffer_size, DEBUG) elif PROTOCOL == "SERIAL": self.my_channel:ClientChannel = ClientChannelSerial(device_host, device_port, buffer_size, DEBUG) elif PROTOCOL == "USB": self.my_channel:ClientChannel = ClientChannelUSB(device_host, device_port, buffer_size, DEBUG) else: raise Exception("Unknown Channel", PROTOCOL) # overwrite abstract _cmd dictionary with subclass native _cmd_native dictionary: #self._cmd = {**self._cmd, **self._cmd_native} self._cmd = {**self._cmd, **self._cmd_device_abstract, **self._cmd_device_concrete} print("MY COMMANDS ARE:", self._cmd) # So that we can use this with the "with" statement (context manager) def __enter__(self): return self def __exit__(self, type, value, traceback): self.my_channel.__exit__(type, value, traceback) ''' def set_logger(self, DEBUG): self.my_channel.set_logger(DEBUG) ''' def device_simulator_run(self): #HOST, PORT = "localhost", 11110 #with get_SocketServer_UDP_TCP(HOST, PORT, "UDP") as myserver: print("Starting device simulator on (host:port): ", self._device_host+':'+str(self._device_port)) self._device_simulator.serve_forever(self._device_port) #with get_SocketServer_UDP_TCP(self.HOST, self.PORT, "UDP") as myserver: myserver.serve_forever() ''' myserver = get_SocketServer_UDP_TCP(self.HOST, self.PORT, "UDP") myserver.serve_forever() ''' def _connect_to_device(self): self.my_channel._connect_to_server() def get_celme_longitude(self, longitude): return celme.Angle(longitude).sexagesimal("d:+0180.0") def get_celme_latitude(self, latitude): return celme.Angle(latitude).sexagesimal("d:+090.0") #@override ClientChannel send_data def send_data(self, data:str): data_encapsulated = self.format_data_to_send(data) self.my_channel.send_data(data_encapsulated) ''' The chosen way to send data is this: # - :GD# b'00030000:GD#\x00' Another way to send data (which also works), is it better ? # - :GD# ###tsock.mysock.sendto(b'\x00\x00\x00\x01\x00\x00\x00\x00\x3A\x47\x44\x23\x00', (HOST, PORT)) # - :GR# ###tsock.mysock.sendto(b'\x00\x00\x00\x01\x00\x00\x00\x00\x3A\x47\x52\x23\x00', (HOST, PORT)) # - ACK 06 OK !!! : tsock.mysock.sendto(b'\x00\x00\x00\x01\x00\x00\x00\x00\x00\x06\x00\x00', (HOST, PORT)) Which one is the best method ? ''' #log_d("NATIVE Command to send is "+repr(data)) ##encapsulated_data = self.encapsulate_data_to_send(data) #print("before _send", encapsulated_data) #print("before _send", repr(encapsulated_data)) ##self._send_data(encapsulated_data) ##self.my_channel.send_data(encapsulated_data) #log_i(f'Sent: {encapsulated_data}') ''' def _send_data(self, data): self.my_channel._send_data(data) ''' #@override ClientChannel receive_data def receive_data(self)->str: ##data_received_bytes = self._receive_data() data_received = self.my_channel.receive_data() #log_d("Received (all data): {}".format(data_received)) #log_d("data in bytes: "+str(bytes(data_received, "utf-8"))) data = self.unformat_received_data(data_received) log_i("RECEIVED (useful data): {}".format(data)) return data ''' def _receive_data(self): return self.my_channel._receive_data() ''' # Encapsulate useful data to be ready for sending # By default, do nothing #@abstract def format_data_to_send(self, data:str): return self.encapsulate_data_to_send(data) #@deprecated def encapsulate_data_to_send(self, data:str): return data # Extract useful data from received raw data # By default, do nothing #@abstract def unformat_received_data(self, data:str): return self.uncap_received_data(data) #@deprecated def uncap_received_data(self, data:str): #return data_received.decode() return data ''' def encapsulate_data_to_send(self, command:str): return self.my_channel.encapsulate_data_to_send(command) def uncap_received_data(self, data_received:str): return self.my_channel.uncap_received_data(data_received) ''' def get_utc_date(self): return celme.Date("now").iso(0) #return celme.Date("now").ymdhms() def close(self): self.my_channel.close() # Stop device simulator (only if used) if self._device_host=="localhost": print("Stopping device simulator") self._device_simulator.stop() ''' def is_generic_cmd(self, raw_input_cmd:str) -> bool: print("raw_input_cmd is", raw_input_cmd) # Using Google documentation format (https://www.sphinx-doc.org/en/master/usage/extensions/example_google.html#example-google) #"" Is this a generic command ? Args: raw_input_cmd: a command in string format (like 'set_state active' or 'get_ra' or 'set_ra 20:00:00' or 'set_radec 20:00:00 90:00:00" or 'do_park'...) Returns: either False or (cmd, [args]) with cmd like "get_ra" and [args] like ['20:00:00', '90:00:00'] #"" #return cmd.startswith('get_') or cmd.startswith('set_') or cmd.startswith('do_') #cmds = ['get ', 'get_', 'set ', 'set_', 'do ', 'do_'] #'' seps = (" ", "_") #cmds = list(x+y for x in cmd for y in sep) for cmd in cmds: for sep in seps: generic_cmd = cmd+sep if raw_input_cmd.startswith(generic_cmd): # Is there value(s) passed ? if len(raw_input_cmd) > len(generic_cmd): values = raw_input_cmd[len(generic_cmd):] values = values.split(' ') # return cmd like "get_ra", [and values] return generic_cmd.replace(' ','_'), values return False, False #'' ##cmds = ("get","set","do") #'' # ex: "set radec" => "set_radec" raw_input_cmd = raw_input_cmd.strip() cmd_splitted = raw_input_cmd.split(' ') if len(cmd_splitted) == 1: return False,False generic_cmd = cmd_splitted[0] + '_' + cmd_splitted[1] #'' # Ex: "set_radec 15 30", "do_init", "get_radec", "set_state active", "do_goto_radec 15 45"... tokens = raw_input_cmd.split(' ') generic_cmd = tokens[0] # Check this generic command exists #if (generic_cmd not in self._cmd.keys()): return False,False if generic_cmd not in self._cmd: return False,False # Is there value(s) passed ? ###if len(cmd_splitted) > 2: values_to_set = cmd_splitted[2:] args = tokens[1:] if len(tokens)>1 else None # ex: return "set_radec", ["20:00:00", "90:00:00"] return generic_cmd, args ''' #res = self.getDeviceControllerForType(cmd.device_type).execute_cmd(cmd.full_name) def get_dc_component_for_type(self, dc_component_type:str): #->DeviceControllerAbstract: # By default, return myself (as a DeviceController component) # ex1: None # ex2: "Telescope" (is in "AgentDeviceTelescopeGemini") #if dc_component_type is None or dc_component_type in self.__class__.__name__ : return self if dc_component_type is None or dc_component_type in type(self).__name__ : return self #for dcc in self._my_dc_components: #for dcc in type(self).mro(): for dcc in type(self).__bases__ : print(dc_component_type, "in ??????", dcc.__name__) #if dc_component_type in dcct.__class__.__name__: if dc_component_type in dcc.__name__: return dcc raise Exception("NO DEVICE CONTROLLER COMPONENT FOUND FOR THIS TYPE: "+dc_component_type) #def execute_cmd(self, cmd:DeviceCommand)->GenericResult: def execute_cmd(self, raw_input_cmd:str)->GenericResult: ''' :param raw_input_cmd: ''' #generic_cmd, args = self.is_generic_cmd(raw_input_cmd) cmd = DeviceCommand(raw_input_cmd) print("cmd is", cmd, raw_input_cmd) # GENERIC command #if generic_cmd is not False: if cmd.is_generic(): print("GENERIC COMMAND") #return self.execute_generic_cmd(generic_cmd, args) return self.execute_generic_cmd(cmd.name, cmd.args, cmd.devtype) # NATIVE command ''' if cmd.startswith('get_'): #generic_cmd,_ = request[4:].split('(') generic_cmd = cmd[4:] if (generic_cmd not in self._cmd_getset.keys()) and (generic_cmd not in self._cmd_do.keys()): #eval(request) return self.get_radec() print("cmd is", generic_cmd) return self._get(generic_cmd) return ''' # NATIVE command print("NATIVE COMMAND") #res_native = self.execute_native_cmd(raw_input_cmd) res_native = self.execute_native_cmd(cmd.name_and_args) return GenericResult(res_native) #def execute_native_cmd(self, request:str, awaited_res_if_ok=None)->GenericResult: def execute_native_cmd(self, native_cmd:str) -> str: """ Execute a native command Args: native_cmd: a native command Returns: the command result """ print("NATIVE Command to send is "+ repr(native_cmd)) #self.send_request(native_cmd) self.send_native_cmd(native_cmd) native_res = self.receive_data() return native_res ''' ok = True if not awaited_res_if_ok else (native_res == awaited_res_if_ok) return GenericResult(native_res, ok) ''' ''' def execute_native_cmd_OLD(self, request:str)->str: self.send_request(request) native_res = self.receive_data() return native_res ''' def execute_unformated_native_cmd(self, request:str) -> str: request = self.formated_cmd(request) #return self.execute_native_cmd_OLD(request) return self.execute_native_cmd(request) def send_native_cmd(self, native_cmd:str)->str: return self.send_data(native_cmd) #@deprecated def send_request(self, request:str)->str: return self.send_native_cmd(request) def print_available_commands(self): print("\nAvailable commands are:") print("- GET commands:") #print (list(cmd.replace('_',' ') for cmd in self._cmd.keys() if cmd.startswith('get_'))) print (list(cmd for cmd in self._cmd.keys() if cmd.startswith('get_'))) print("- SET commands:") print (list(cmd for cmd in self._cmd.keys() if cmd.startswith('set_'))) print("- DO commands:") print (list(cmd for cmd in self._cmd.keys() if cmd.startswith('do_'))) def available_commands(self): return list(self._cmd.keys()) # @abstract def formated_cmd(self, cmd:str, value:str=None)->str: return cmd #def run_func(self, func, arg=None): def run_func(self, func, *args): #print("args", args) if args: return getattr(self, func)(*args) else: return getattr(self, func)() ''' TELESCOPE COMMANDS (abstract methods) ''' #def execute_generic_cmd(self, generic_cmd:DeviceCommand)->str: def execute_generic_cmd(self, generic_cmd:str, values_to_set:str=None, dc_component_type:str=None)->str: ''' Execute a generic command :param generic_cmd: str like "get_ra" or "set_ra" or "do_park"... :param value: only for a "set_" cmd ''' print("execute_generic_cmd() from", self) # If generic_cmd is for a specific device component (dc), pass it to this dc (instead of me) print("dc_component_type is: ", dc_component_type) ####if dc_component_type and dc_component_type not in self.__class__.__name__ : if dc_component_type: DCC = self.get_dc_component_for_type(dc_component_type) print("*** EXECUTÉ PAR COMPONENT", DCC) #return (DCC)(self.execute_generic_cmd(generic_cmd, values_to_set, None)) return DCC.execute_generic_cmd(self, generic_cmd, values_to_set, None) #log_d("\n\nGENERIC Command to send is "+generic_cmd) print("\n\nGENERIC Command to send is ", generic_cmd) # Check if generic_param exists #if generic_cmd not in self._cmd.keys(): raise UnknownCommandException() # if this generic command has no corresponding native command, raise NotImplementedError native_cmd_infos = self._cmd[generic_cmd] if not native_cmd_infos: raise NotImplementedError # Get corresponding native command: native_cmd = native_cmd_infos[0] if not native_cmd: raise NotImplementedError # MACRO-COMMAND (ex: native_cmd == "do_goto", "do_init", "get_radec") if native_cmd == generic_cmd: print("MACRO-COMMAND") #print("cmd,val", native_cmd, values_to_set) #res:GenericResult = self.run_func(native_cmd, *values_to_set) if values_to_set: print("with args") res = self.run_func(native_cmd, *values_to_set) #res = getattr(self, native_cmd)(values_to_set) else: res = self.run_func(native_cmd) #res = getattr(self, native_cmd)() #if res is None: res = 'ok' # res should be a GenericResult if not isinstance(res, GenericResult): raise Exception("Should be a GenericResult", res) return res # NATIVE COMMAND (ex: native_cmd == "GR") native_cmd = self.formated_cmd(native_cmd, values_to_set) awaited_res_if_ok = None if len(native_cmd_infos) > 1: awaited_res_if_ok = native_cmd_infos[1] #native_res = self.execute_native_cmd(self.formated_cmd(native_cmd,value), awaited_res_ok) native_res = self.execute_native_cmd(native_cmd) ok = True if not awaited_res_if_ok else (native_res == awaited_res_if_ok) return GenericResult(native_res, ok) ''' **************************** **************************** GENERIC TELESCOPE COMMANDS (abstract methods) **************************** **************************** ''' ''' **************************** GENERIC GET & SET commands **************************** ''' @generic_cmd def get_timezone(self): pass #def get_timezone(self): return self.execute_generic_cmd('get_timezone') @generic_cmd def set_timezone(self, hh): pass #def set_timezone(self, hh): return self.execute_generic_cmd('set_timezone', hh) @generic_cmd def get_date(self): pass @generic_cmd def set_date(self, mmddyy): pass @generic_cmd def get_time(self): pass @generic_cmd def set_time(self, hhmmss): pass ''' **************************** GENERIC DO commands **************************** ''' # @abstract #def do_INIT(self): return self._do("INIT") ''' do_PARK() (p103) - STARTUP position = CWD - :hC# - position required for a Cold or Warm Start, pointing to the celestial pole of the given hemisphere (north or south), with the counterweight pointing downwards (CWD position). From L4, V1.0 up - HOME position parking => par defaut, c'est CWD, mais ca peut etre different - :hP# - defaults to the celestial pole visible at the given hemisphere (north or south) and can be set by the user ''' # @abstract def do_PARK(self): pass #def do_PARK(self): return self._do("PARK") # @abstract def do_start(self): pass def do_stop(self): pass # @abstract MACRO def do_init(self): return NotImplementedError # TODO: empecher de creer une instance de cette classe abstraite # Avec ABC ? ''' if __name__ == "__main__": #HOST, PORT = "localhost", 9999 #HOST, PORT = "localhost", 20001 HOST, PORT = "localhost", 11110 # Classic usage: #tsock = SocketClient_UDP_TCP(HOST, PORT, "UDP") # More elegant usage, using "with": with SocketClient_ABSTRACT(HOST, PORT, "UDP") as tsock: # 0) CONNECT to server (only for TCP, does nothing for UDP) tsock._connect_to_server() while True: # 1) SEND REQUEST data to server # saisie de la requête au clavier et suppression des espaces des 2 côtés data = input("REQUEST TO SERVER [ex: ':GD#' (Get Dec), ':GR#' (Get RA)']: ").strip() # test d'arrêt if data=="": break #data_to_send = bytes(data + "\n", "utf-8") tsock.send_data(data) #mysock.sendto("%s" % data, (HOST, PORT)) #print("Sent: {}".format(data)) # 2) RECEIVE REPLY data from server data_received = tsock.receive_data() #reponse, adr = mysock.recvfrom(buf) #print("Received: {}".format(data_received)) #print("Useful data received: {}".format(data_useful)) print('\n') #tsock.close() '''