#!/usr/bin/env python3 """Socket Client Telescope (abstract) implementation To be used as a base class (interface) for any concrete socket client telescope class """ # Standard library imports #from enum import Enum import copy import functools import logging import pprint import socket import sys import threading import time # Third party imports # Local application imports #sys.path.append("../../..") sys.path.append("../../../..") #import src.core.pyros_django.utils.celme as celme import src.core.celme as celme #sys.path.append('../..') #from src.client.socket_client_abstract import UnknownNativeCmdException, SocketClientAbstract ##from src_socket.client.socket_client_abstract import * ##from src_device.client.client_channel import * sys.path.append("../..") from device_controller.logs import * from device_controller.channels.client_channel_socket import ClientChannelSocket from device_controller.channels.client_channel_serial import ClientChannelSerial from device_controller.channels.client_channel_usb import ClientChannelUSB # Execute also "set" and "do" commands GET_ONLY=False # Execute only "get" commands #GET_ONLY=True # Default timeouts TIMEOUT_SEND = 10 TIMEOUT_RECEIVE = 10 ''' class c(Enum): # GET, SET DEC = 'DEC' RA = 'RA' RA_DEC = 'RA_DEC' # DO PARK = 'PARK' WARM_START = 'WARM_START' ''' # DECORATOR def generic_cmd(func): #def wrapper_generic_cmd(*args, **kwargs): @functools.wraps(func) def wrapper_generic_cmd(self, values_to_set=None): #print("func name is", func.__name__) return self.exec_generic_cmd(func.__name__, values_to_set) return wrapper_generic_cmd class DeviceCommand: full_name:str = '' name = None args = None # Device component type devtype = None def __init__(self, cmd_full_name:str, dev_comp_type:str=None, cmd_args:str=None): self.full_name = cmd_full_name self.name = cmd_full_name self.devtype = dev_comp_type self.args = cmd_args if self.is_generic(): dev_comp_type,cmd_name,cmd_args = self.get_full_name_parts() self.name = cmd_name if dev_comp_type: self.devtype = dev_comp_type if cmd_args: self.args = cmd_args def __str__(self): return (f"Commmand '{self.full_name}'") def is_generic(self): ''' cmd_name = self.full_name if '.' in self.full_name: cmd_name = self.full_name.split('.')[1] ''' cmd_name = self.full_name[self.full_name.find('.')+1:] return cmd_name.startswith('do_') or cmd_name.startswith('get_') or cmd_name.startswith('set_') ''' @classmethod def is_generic_cmd_name(cls, cmd_name:str): if '.' in cmd_name: cmd_name = cmd_name.split('.')[1] return cmd_name.startswith('do_') or cmd_name.startswith('get_') or cmd_name.startswith('set_') def is_generic(self): #return type(self).is_generic_cmd_name(self.full_name) return self.is_generic_cmd_name(self.full_name) #return DeviceCommand.is_generic_cmd_name(self.full_name) #return self.name.startswith('do_') or self.name.startswith('get_') or self.name.startswith('set_') ''' @property def name_and_args(self): cmd_name_and_args = self.full_name if '.' in cmd_name_and_args: cmd_name_and_args = cmd_name_and_args.split('.')[1] return cmd_name_and_args def get_full_name_parts(self): cmd_name = self.full_name devtype = None cmd_args = None if '.' in cmd_name: devtype, cmd_name = cmd_name.split('.') if ' ' in cmd_name: cmd_name, *cmd_args = cmd_name.split(' ') return devtype, cmd_name, cmd_args class GenericResult: ''' Usage: res = execute(command) print("result is", res) if res.ko: raise UnexpectedReturnCode() if res.ok: ... ''' # By default, bad result ok = True ko = False def __init__(self, native_result:str, ok=True): self.txt = native_result self.ok = ok self.ko = not ok def __str__(self): return self.txt ''' def __repr__(self): return self.txt def __get__(self, instance, owner): return self.b def __set__(self, instance, value): self.b = value ''' ''' ************************** EXCEPTIONS DEFINITION ************************** See https://docs.python.org/3/tutorial/errors.html ''' class DCCNotFoundException(Exception): ''' Raised when a specific DCC is not available ''' pass class UnknownNativeCmdException(Exception): ''' Raised when a NATIVE cmd is not recognized by the controller ''' pass ''' def __init__(self,*args,**kwargs): super().__init__(self,*args,**kwargs) ''' class UnknownGenericCmdException(Exception): pass class UnimplementedGenericCmdException(Exception): ''' Raised when a GENERIC cmd has no implementation in the controller (no native cmd available for the generic cmd) ''' def __str__(self): #return f"({type(self).__name__}): Generic command '{self.args[0]}' has no implementation in the controller" return f"({type(self).__name__}): Device Generic command has no implementation in the controller" class UnknownGenericCmdArgException(Exception): ''' Raised when a GENERIC cmd argument is not recognized by the controller (no native cmd available for the generic cmd) ''' def __init__(self, name, arg): self.name = name self.arg = arg def __str__(self): return f"The argument '{self.arg}' does not exist for generic cmd {self.name}" class UnexpectedCommandReturnCode(Exception): pass class TimeoutException(Exception): pass class Gen2NatCmds: # To be set by constructor GEN2NAT_CMDS = {} def __init__(self, cmds:dict): self.GEN2NAT_CMDS = cmds def __str__(self)->str: return str(self.GEN2NAT_CMDS) def get(self, cmd:str=None): #if cmd: return self.GEN2NAT_CMDS.get(cmd) #return self.GEN2NAT_CMDS if cmd is None: return self.GEN2NAT_CMDS # 1) search in my MAIN commands native_infos = self.GEN2NAT_CMDS.get(cmd) print("native infos:", native_infos) # native_cmd can be None, [], or [infos] if native_infos is not None: return native_infos #print(self.GEN2NAT_CMDS) ''' # 2) search in each DCC commands # !! BAD !! because general dict is not complete for each dcc (especially, does not contain macro-commands, like get_radec, ...) # So, better not to use this for key in self.GEN2NAT_CMDS.keys(): if isinstance(self.GEN2NAT_CMDS[key], dict): native_infos = self.GEN2NAT_CMDS[key].get(cmd) print('key is', key, 'native cmd is', native_infos) if native_infos is not None: return native_infos ''' # Native infos not found return None #def update(self, newdict:dict): self.GEN2NAT_CMDS = { **self.GEN2NAT_CMDS, **newdict } def get_native_infos_for_generic_cmd(self, cmd:str)->str: return self.get(cmd) def get_native_cmd_for_generic(self, cmd:str)->str: val = self.get_native_infos_for_generic_cmd(cmd) if not val: return None return val[0] def get_simulated_answer_for_generic_cmd(self, cmd:str)->str: val = self.get(cmd) if not val: return None # no answer available if len(val) == 1: return None return val[1] def get_simulated_answer_for_native_cmd(self, cmd:str)->str: for val in self.GEN2NAT_CMDS.values(): if cmd in val: # no answer available if len(val) == 1: return None # return 1st answer available return val[1] return None def is_valid_native_cmd(self, cmd:str)->bool: # 1) search in my MAIN commands if cmd in self.GEN2NAT_CMDS.values(): return True # 2) search in each DCC commands for key in self.GEN2NAT_CMDS.keys(): if isinstance(self.GEN2NAT_CMDS[key], dict): if cmd in self.GEN2NAT_CMDS[key].values(): return True return False def print_available_cmds(self): #print("All commands are:", self._gen2nat_cmds.keys()) print("\nAvailable commands:") print("=======================") # 1) print general commands self.print_available_cmds_for_dcc("General") # 2) print commands for each DCC: for key in self.GEN2NAT_CMDS.keys(): if isinstance(self.GEN2NAT_CMDS[key], dict): self.print_available_cmds_for_dcc(key) def print_available_cmds_for_dcc(self, dcc_key): d = self.GEN2NAT_CMDS if dcc_key=="General" else self.GEN2NAT_CMDS[dcc_key] print(f"\n{dcc_key} commands are:") print("- GET commands:") #print (list(cmd.replace('_',' ') for cmd in self._gen2nat_cmds.keys() if cmd.startswith('get_'))) print (list(cmd for cmd in d.keys() if cmd.startswith('get_'))) print("- SET commands:") print (list(cmd for cmd in d.keys() if cmd.startswith('set_'))) print("- DO commands:") print (list(cmd for cmd in d.keys() if cmd.startswith('do_'))) #TODO: remove ClientChannelAbstract, and set instead a ClientChannel #class DeviceController(SocketClientAbstract): ##class DeviceController(ClientChannel): class DeviceController(): DEBUG = False _device_simulator = None _thread_device_simulator = None _device_host = "localhost" _device_port = None # List of device controller (dc) components (by default, None) _my_dc_components = [] # ClientChannel used by the device controller (to be set during __init__ via set_client_channel()) _my_channel = None # @abstract (to be overriden) _cmd_device_concrete = {} _cmd_device_abstract = {} ##_cmd = { GEN2NAT_CMDS = { # General format: #'cmd_generic_name': ['cmd_native_name', 'default_simulator_answer', 'native_answer1', 'native_answer2', ...], # GET-SET commands: 'get_timezone': [], 'set_timezone': [], 'get_date': [], 'set_date': [], 'get_time': [], 'set_time': [], # for test only 'dc_only':[], # DO commands: 'do_init': ['do_init'], 'do_park': [], } class Protocol: # By default, do nothing @classmethod def formated_cmd(cls, cmd:str, value:str=None)->str: return cmd # Encapsulate useful data to be ready for sending # By default, do nothing #def encapsulate_data_to_send(self, data:str): @classmethod def encap(cls, data:str): print("*** Default encap ***") ##return data return "stamp", data ''' #@deprecated def format_data_to_send(self, data:str): return self.encapsulate_data_to_send(data) ''' # Extract useful data from received raw data # By default, do nothing #def uncap_received_data(self, data:str): @classmethod #def uncap(cls, dcc_name:str, data:str): def uncap(cls, dcc_name:str, stamp:str, data:str): #return data_received.decode() print(f"*** Default uncap (from {dcc_name})***") return data ''' def unformat_received_data(self, data:str): return self.uncap_received_data(data) ''' ''' def encapsulate_data_to_send(self, command:str): return self._my_channel.encapsulate_data_to_send(command) def uncap_received_data(self, data_received:str): return self._my_channel.uncap_received_data(data_received) ''' # WRAPPER methods def formated_cmd(self, cmd:str, value:str=None)->str: ##return self._protoc.formated_cmd(self, cmd, value) return self._protoc.formated_cmd(cmd, value) def encap(self, data:str): ##self.Protoc.encap(self) ##return self._protoc.encap(self, data) return self._protoc.encap(data) ##def uncap(self, data:str): def uncap(self, stamp:str, data:str): ##self.Protoc.encap(self) #return self._protoc.uncap(self, data) ##return self._protoc.uncap(type(self).__name__, data) return self._protoc.uncap(type(self).__name__, stamp, data) ##def __init__(self, device_host:str="localhost", device_port:int=11110, PROTOCOL:str="TCP", buffer_size=1024, DEBUG=False): def __init__(self, device_host:str="localhost", device_port:int=11110, channel="TCP", buffer_size=1024, protoc=None, gen2nat_cmds={}, device_sim=None, DEBUG=False): ''' :param device_host: server IP or hostname :param device_port: server port :param channel: "SOCKET-TCP", "SOCKET-UDP", "SERIAL", "USB", or instance of Channel ''' self.stamp_current = None self.DEBUG = DEBUG set_logger(DEBUG) log_d("Logger configured") # Set host (IP) and port #print("IN DeviceController") self._device_host = device_host self._device_port = device_port # Set Protocol self._protoc = protoc if protoc else self.Protoc # Set Commands # overwrite abstract _cmd dictionary with subclass native _cmd_native dictionary: # Merge my commands dictionary with the one passed by subclass # _gen2nat_cmds = GEN2NAT_CMDS + gen2nat_cmds #print("(dc1) MY COMMANDS ARE (before merge):", self.GEN2NAT_CMDS) #print("(dc3) GIVEN CMDS (before merge):", gen2nat_cmds) #self._gen2nat_cmds = copy.deepcopy(self.GEN2NAT_CMDS) #self._gen2nat_cmds.update(gen2nat_cmds) ##self._gen2nat_cmds = { **self.GEN2NAT_CMDS, **gen2nat_cmds } self._gen2nat_cmds = { **DeviceController.GEN2NAT_CMDS, **gen2nat_cmds } print(self, "MY COMMANDS ARE (after merge):", self._gen2nat_cmds) #self.Gen2NatCmds.update(gen2nat_cmds) #self._gen2nat_cmds = self.Gen2NatCmds #self._gen2nat_cmds = {**self._gen2nat_cmds, **self._gen2nat_cmds_native} ##self._gen2nat_cmds = {**self._gen2nat_cmds, **self._gen2nat_cmds_device_abstract, **self._gen2nat_cmds_device_concrete} self._my_cmds = Gen2NatCmds(self._gen2nat_cmds) # Set Channel and Simulator if not isinstance(channel, str): self._my_channel = channel self._device_simulator = None else: if channel.startswith("SOCKET"): self._my_channel:ClientChannel = ClientChannelSocket(device_host, device_port, channel, buffer_size, DEBUG) elif channel == "SERIAL": self._my_channel:ClientChannel = ClientChannelSerial(device_host, device_port, buffer_size, DEBUG) elif channel == "USB": self._my_channel:ClientChannel = ClientChannelUSB(device_host, device_port, buffer_size, DEBUG) else: raise Exception("Unknown Channel", channel) # If LOCALHOST, launch the device SIMULATOR if device_host=="localhost": if not device_sim: raise Exception("No simulator class available") ##self._device_simulator = device_sim(protoc, gen2nat_cmds) self._device_simulator = device_sim self._device_simulator.set_protoc_and_cmds(protoc, self._my_cmds) print("SIMU IS", device_sim, self._device_simulator) self._thread_device_simulator = threading.Thread(target=self.device_simulator_run) self._thread_device_simulator.start() # Set my list of dc components def set_dc_components(self, dc_components:list): self._my_dc_components = dc_components # So that we can use this with the "with" statement (context manager) def __enter__(self): return self def __exit__(self, type, value, traceback): self._my_channel.__exit__(type, value, traceback) ''' def set_logger(self, DEBUG): self._my_channel.set_logger(DEBUG) ''' # DEBUG print def printd(self, *args, **kwargs): #self._log.printd(*args, **kwargs) if self.DEBUG: print(*args, **kwargs) def tprint(self, *args, **kwargs): self.printd('(THREAD):', *args, *kwargs) def device_simulator_run(self): #HOST, PORT = "localhost", 11110 #with get_SocketServer_UDP_TCP(HOST, PORT, "UDP") as myserver: print("\n******************************") print("******* (DC) Starting device simulator on (host:port): ", self._device_host+':'+str(self._device_port)) print("******************************\n") self._device_simulator.serve_forever(self._device_port) #with get_SocketServer_UDP_TCP(self.HOST, self.PORT, "UDP") as myserver: myserver.serve_forever() ''' myserver = get_SocketServer_UDP_TCP(self.HOST, self.PORT, "UDP") myserver.serve_forever() ''' def _connect_to_device(self): self._my_channel._connect_to_server() def get_celme_longitude(self, longitude): return celme.Angle(longitude).sexagesimal("d:+0180.0") def get_celme_latitude(self, latitude): return celme.Angle(latitude).sexagesimal("d:+090.0") #@override ClientChannel send_data def send_data(self, data:str): ##data_encapsulated = self.format_data_to_send(data) self.stamp_current, data_encapsulated = self.encap(data) self._my_channel.send_data(data_encapsulated) ''' The chosen way to send data is this: # - :GD# b'00030000:GD#\x00' Another way to send data (which also works), is it better ? # - :GD# ###tsock.mysock.sendto(b'\x00\x00\x00\x01\x00\x00\x00\x00\x3A\x47\x44\x23\x00', (HOST, PORT)) # - :GR# ###tsock.mysock.sendto(b'\x00\x00\x00\x01\x00\x00\x00\x00\x3A\x47\x52\x23\x00', (HOST, PORT)) # - ACK 06 OK !!! : tsock.mysock.sendto(b'\x00\x00\x00\x01\x00\x00\x00\x00\x00\x06\x00\x00', (HOST, PORT)) Which one is the best method ? ''' #log_d("NATIVE Command to send is "+repr(data)) ##encapsulated_data = self.encapsulate_data_to_send(data) #print("before _send", encapsulated_data) #print("before _send", repr(encapsulated_data)) ##self._send_data(encapsulated_data) ##self._my_channel.send_data(encapsulated_data) #log_i(f'Sent: {encapsulated_data}') ''' def _send_data(self, data): self._my_channel._send_data(data) ''' #@override ClientChannel receive_data def receive_data(self)->str: ##data_received_bytes = self._receive_data() data_received = self._my_channel.receive_data() #log_d("Received (all data): {}".format(data_received)) #log_d("data in bytes: "+str(bytes(data_received, "utf-8"))) ##data = self.unformat_received_data(data_received) data = self.uncap(self.stamp_current, data_received) print(f"(dc) ({self}) RECEIVED (useful data): {data}") log_i(f"(dc) ({self}) RECEIVED (useful data): {data}") return data ''' def _receive_data(self): return self._my_channel._receive_data() ''' def get_utc_date(self): return celme.Date("now").iso(0) #return celme.Date("now").ymdhms() def close(self): self._my_channel.close() # Stop device simulator (only if used) if self._device_host=="localhost": print("Stopping device simulator") self._device_simulator.stop() ''' def is_generic_cmd(self, raw_input_cmd:str) -> bool: print("raw_input_cmd is", raw_input_cmd) # Using Google documentation format (https://www.sphinx-doc.org/en/master/usage/extensions/example_google.html#example-google) #"" Is this a generic command ? Args: raw_input_cmd: a command in string format (like 'set_state active' or 'get_ra' or 'set_ra 20:00:00' or 'set_radec 20:00:00 90:00:00" or 'do_park'...) Returns: either False or (cmd, [args]) with cmd like "get_ra" and [args] like ['20:00:00', '90:00:00'] #"" #return cmd.startswith('get_') or cmd.startswith('set_') or cmd.startswith('do_') #cmds = ['get ', 'get_', 'set ', 'set_', 'do ', 'do_'] #'' seps = (" ", "_") #cmds = list(x+y for x in cmd for y in sep) for cmd in cmds: for sep in seps: generic_cmd = cmd+sep if raw_input_cmd.startswith(generic_cmd): # Is there value(s) passed ? if len(raw_input_cmd) > len(generic_cmd): values = raw_input_cmd[len(generic_cmd):] values = values.split(' ') # return cmd like "get_ra", [and values] return generic_cmd.replace(' ','_'), values return False, False #'' ##cmds = ("get","set","do") #'' # ex: "set radec" => "set_radec" raw_input_cmd = raw_input_cmd.strip() cmd_splitted = raw_input_cmd.split(' ') if len(cmd_splitted) == 1: return False,False generic_cmd = cmd_splitted[0] + '_' + cmd_splitted[1] #'' # Ex: "set_radec 15 30", "do_init", "get_radec", "set_state active", "do_goto_radec 15 45"... tokens = raw_input_cmd.split(' ') generic_cmd = tokens[0] # Check this generic command exists #if (generic_cmd not in self._gen2nat_cmds.keys()): return False,False if generic_cmd not in self._gen2nat_cmds: return False,False # Is there value(s) passed ? ###if len(cmd_splitted) > 2: values_to_set = cmd_splitted[2:] args = tokens[1:] if len(tokens)>1 else None # ex: return "set_radec", ["20:00:00", "90:00:00"] return generic_cmd, args ''' def has_dc_component_for_type(self, dc_component_type:str): for dcc in self._my_dc_components: print(dc_component_type, "in ??????", type(dcc).__name__) #if dc_component_type in dcct.__class__.__name__: if dc_component_type in type(dcc).__name__: return True return False #res = self.getDeviceControllerForType(cmd.device_type).execute_cmd(cmd.full_name) def get_dc_component_for_type(self, dc_component_type:str): #->DeviceController: # By default, return myself (as a DeviceController component) # ex1: None # ex2: "Telescope" (is in "AgentDeviceTelescopeGemini") #if dc_component_type is None or dc_component_type in self.__class__.__name__ : return self if dc_component_type is None or dc_component_type in type(self).__name__ : return self #for dcc in type(self).mro(): #for dcc in type(self).__bases__ : print("components are", self._my_dc_components) print("1st component is", self._my_dc_components[0]) for dcc in self._my_dc_components: print(dc_component_type, "in ??????", type(dcc).__name__) #if dc_component_type in dcct.__class__.__name__: if dc_component_type in type(dcc).__name__: return dcc raise DCCNotFoundException("DEVICE CONTROLLER COMPONENT NOT FOUND: "+dc_component_type) def is_valid_cmd(self, cmd:DeviceCommand): print("cmd.name is", cmd.name) print(cmd) print("generic ?", cmd.is_generic()) print("is valid generic ?", self.is_valid_generic_cmd(cmd)) return ( ( cmd.is_generic() and self.is_valid_generic_cmd(cmd) ) or self.is_valid_native_cmd(cmd) ) def is_generic_but_UNIMPLEMENTED_cmd(self, cmd:DeviceCommand): return cmd.is_generic() and not self.is_implemented_generic_cmd(cmd) # WRAPPER methods def has_generic_command(self, generic_cmd:DeviceCommand): #return self._my_cmds.get(cmd.name) is not None return generic_cmd.name in self._my_cmds.get() def has_native_cmd_for_generic(self, generic_cmd:DeviceCommand): return self._my_cmds.get_native_infos_for_generic_cmd(generic_cmd.name) not in [None, []] # check if generic cmd exists def is_valid_generic_cmd(self, cmd:DeviceCommand): #print("_my_cmds", self._my_cmds) # 1) If a DCC given, return search result in this DCC commands if cmd.devtype: if not self.has_dc_component_for_type(cmd.devtype): return False return self.get_dc_component_for_type(cmd.devtype).has_generic_command(cmd) # 2) Search in my general commands if self.has_generic_command(cmd): return True # 3) Search in all my DCCs for dcc in self._my_dc_components: if dcc.has_generic_command(cmd): return True # 4) not found return False ''' ##if not cmd.devtype: return self._my_cmds.get(cmd.name) is not None if not cmd.devtype: return self.has_generic_command(cmd) if not self.has_dc_component_for_type(cmd.devtype): return False ##return self.get_dc_component_for_type(cmd.devtype)._my_cmds.get(cmd.name) is not None return self.get_dc_component_for_type(cmd.devtype).has_generic_command(cmd) ''' # check if generic cmd exists and is implemented as a native cmd (in dictionary) def is_implemented_generic_cmd(self, cmd:DeviceCommand): print("is implemented generic ?") print("cmd.devtype", cmd.devtype) #print("_my_cmds", self._my_cmds) #return self._my_cmds.get_native_infos_for_generic_cmd(cmd.name) not in [None, []] # 1) If a DCC given, return search result in this DCC commands if cmd.devtype: print("1") if not self.has_dc_component_for_type(cmd.devtype): return False print("1.2") return self.get_dc_component_for_type(cmd.devtype).has_native_cmd_for_generic(cmd) # 2) Search in my general commands if self.has_native_cmd_for_generic(cmd): print("2") return True # 3) Search in all my DCCs for dcc in self._my_dc_components: print("3") if dcc.has_native_cmd_for_generic(cmd): return True # 4) not found return False ''' ##if not cmd.devtype: return self._my_cmds.get_native_infos_for_generic_cmd(cmd.name) not in [None, []] if not cmd.devtype: return self.has_native_cmd_for_generic(cmd) if not self.has_dc_component_for_type(cmd.devtype): return False ##return self.get_dc_component_for_type(cmd.devtype)._my_cmds.get(cmd.name) not in [None, []] return self.get_dc_component_for_type(cmd.devtype).has_native_cmd_for_generic(cmd) ''' def is_valid_native_cmd(self, native_cmd:DeviceCommand): return self._my_cmds.is_valid_native_cmd(native_cmd.name) def get_dcc_and_native_cmd_for_generic(self, generic_cmd:str): #if generic_cmd not in self._gen2nat_cmds.keys(): raise UnknownNativeCmdException() # Is it a general command for the (general) controller ? ''' if generic_cmd in self._gen2nat_cmds: return self, self._gen2nat_cmds[generic_cmd] ''' # 1) Search in my MAIN commands nc_infos = self._my_cmds.get_native_infos_for_generic_cmd(generic_cmd) if nc_infos: return self, nc_infos # 2) Search in each DCC commands # Is it a command for one of my dcc ? ''' # Bad because general dict is not complete for each dcc (especially, does not contain macro-commands, like get_radec, ...) for key in self._gen2nat_cmds.keys(): if isinstance(self._gen2nat_cmds[key], dict): # dcc = key if generic_cmd in self._gen2nat_cmds[key]: return key, self._gen2nat_cmds[key][generic_cmd] ''' for dcc in self._my_dc_components: _, native_cmd_infos = dcc.get_dcc_and_native_cmd_for_generic(generic_cmd) if native_cmd_infos: return dcc, native_cmd_infos # Native command not found return None, None ''' This method is either called - DIRECTLY from AgentDevice.routine_process().get_device_status() (NO THREAD) or - FROM A THREAD from AgentDevice._thread_exec_specific_cmd().exec_specific_cmd() ''' #def execute_cmd(self, cmd:DeviceCommand)->GenericResult: def exec_cmd(self, raw_input_cmd:str)->GenericResult: ''' :param raw_input_cmd: ''' #generic_cmd, args = self.is_generic_cmd(raw_input_cmd) cmd = DeviceCommand(raw_input_cmd) self.tprint("cmd is", cmd, raw_input_cmd) # GENERIC command (pyros grammar) #if generic_cmd is not False: if cmd.is_generic(): self.tprint("GENERIC COMMAND") if not self.is_valid_generic_cmd(cmd): raise UnknownGenericCmdException(cmd.name) #return self.exec_generic_cmd(generic_cmd, args) try: res = self.exec_generic_cmd(cmd.name, cmd.args, cmd.devtype) except (UnimplementedGenericCmdException, DCCNotFoundException) as e: self.tprint(f"EXCEPTION caught by {type(self).__name__} (from DC)", e) raise return res # NATIVE command ''' if cmd.startswith('get_'): #generic_cmd,_ = request[4:].split('(') generic_cmd = cmd[4:] if (generic_cmd not in self._gen2nat_cmds_getset.keys()) and (generic_cmd not in self._gen2nat_cmds_do.keys()): #eval(request) return self.get_radec() print("cmd is", generic_cmd) return self._get(generic_cmd) return ''' # NATIVE command self.tprint("NATIVE COMMAND") if not self.is_valid_native_cmd(cmd): raise UnknownNativeCmdException(cmd.name) #res_native = self.exec_native_cmd(raw_input_cmd) try: res_native = self.exec_native_cmd(cmd.name_and_args) except UnknownNativeCmdException as e: self.tprint(f"EXCEPTION caught by {type(self).__name__} (from DC)", e) raise return GenericResult(res_native) #def exec_native_cmd(self, request:str, awaited_res_if_ok=None)->GenericResult: def exec_native_cmd(self, native_cmd:str) -> str: """ Execute a native command Args: native_cmd: a native command Returns: the command result """ self.tprint("NATIVE Command to send is "+ repr(native_cmd)) #self.send_request(native_cmd) self.send_native_cmd(native_cmd) native_res = self.receive_data() return native_res ''' ok = True if not awaited_res_if_ok else (native_res == awaited_res_if_ok) return GenericResult(native_res, ok) ''' ''' def exec_native_cmd_OLD(self, request:str)->str: self.send_request(request) native_res = self.receive_data() return native_res ''' def execute_unformated_native_cmd(self, request:str) -> str: request = self.formated_cmd(request) #return self.exec_native_cmd_OLD(request) return self.exec_native_cmd(request) def send_native_cmd(self, native_cmd:str)->str: return self.send_data(native_cmd) #@deprecated def send_request(self, request:str)->str: return self.send_native_cmd(request) # wrapper methods def print_available_cmds(self): self._my_cmds.print_available_cmds() def print_available_cmds_for_dcc(self, dcc_key): self._my_cmds.print_available_cmds_for_dcc(dcc_key) ''' def print_available_commands(self): #print("All commands are:", self._gen2nat_cmds.keys()) print("\nAvailable commands:") print("=======================") # 1) print general commands self.print_available_commands_for_dcc("General") # 2) print commands for each DCC: for key in self._gen2nat_cmds.keys(): if isinstance(self._gen2nat_cmds[key], dict): self.print_available_commands_for_dcc(key) def print_available_commands_for_dcc(self, dcc_key): d = self._gen2nat_cmds if dcc_key=="General" else self._gen2nat_cmds[dcc_key] print(f"\n{dcc_key} commands are:") print("- GET commands:") #print (list(cmd.replace('_',' ') for cmd in self._gen2nat_cmds.keys() if cmd.startswith('get_'))) print (list(cmd for cmd in d.keys() if cmd.startswith('get_'))) print("- SET commands:") print (list(cmd for cmd in d.keys() if cmd.startswith('set_'))) print("- DO commands:") print (list(cmd for cmd in d.keys() if cmd.startswith('do_'))) def available_commands(self): return list(self._gen2nat_cmds.keys()) ''' #def run_func(self, func, arg=None): def run_func(self, func, *args): #print("args", args) if args: return getattr(self, func)(*args) else: return getattr(self, func)() #def exec_generic_cmd(self, generic_cmd:DeviceCommand)->str: def exec_generic_cmd(self, generic_cmd:str, values_to_set:str=None, dcc_type:str=None)->str: ''' Execute a generic command :param generic_cmd: str like "get_ra" or "set_ra" or "do_park"... :param value: only for a "set_" cmd ''' self.tprint("(DC):exec_generic_cmd() from", self) # If generic_cmd is for a specific device component (dc), pass it to this dc (instead of me) self.tprint("(DC):dc_component_type is: ", dcc_type) ####if dc_component_type and dc_component_type not in self.__class__.__name__ : if dcc_type: # Get the dcc in charge of this command try: dcc = self.get_dc_component_for_type(dcc_type) self.tprint("*** EXECUTÉ PAR COMPONENT", dcc) except DCCNotFoundException as e: self.tprint(f"EXCEPTION caught by {type(self).__name__} (from dcc)", e) raise #return (DCC)(self.exec_generic_cmd(generic_cmd, values_to_set, None)) # Delegate cmd execution to the dcc try: return dcc.exec_generic_cmd(generic_cmd, values_to_set, None) except UnimplementedGenericCmdException as e: self.tprint(f"EXCEPTION caught by {type(self).__name__} (from dcc)", e) raise # not executed ? return None #log_d("\n\nGENERIC Command to send is "+generic_cmd) self.tprint("\n\n(DC): GENERIC Command to execute is ", generic_cmd) ##print("(DC): My ("+type(self).__name__+") commands are:", self._gen2nat_cmds) #print("(DC): My ("+type(self).__name__+") commands are:", self._my_cmds.get()) #print("(DC): My ("+type(self).__name__+") commands are:", self._my_cmds.get()) self.tprint("(DC): My ("+type(self).__name__+") commands are:") pprint.pprint(self._my_cmds.get()) dcc, native_cmd_infos = self.get_dcc_and_native_cmd_for_generic(generic_cmd) self.tprint("native_cmd_infos", native_cmd_infos) # Command is not implemented if not native_cmd_infos: raise UnimplementedGenericCmdException(generic_cmd) # Command is one of my dcc's, so pass it the command for exec if dcc != self: #dcc = self.get_dc_component_for_type(dcc_type) return dcc.exec_generic_cmd(generic_cmd, values_to_set, None) # Get corresponding native command: native_cmd = native_cmd_infos[0] if not native_cmd: raise UnimplementedGenericCmdException(generic_cmd) # MACRO-COMMAND (ex: native_cmd == "do_goto", "do_init", "get_radec") if native_cmd == generic_cmd: self.tprint("MACRO-COMMAND") #print("cmd,val", native_cmd, values_to_set) #res:GenericResult = self.run_func(native_cmd, *values_to_set) if values_to_set: self.tprint("with args") res = self.run_func(native_cmd, *values_to_set) #res = getattr(self, native_cmd)(values_to_set) else: res = self.run_func(native_cmd) #res = getattr(self, native_cmd)() #if res is None: res = 'ok' # res should be a GenericResult if not isinstance(res, GenericResult): raise Exception("Should be a GenericResult", res) return res # NATIVE COMMAND (ex: native_cmd == "GR") native_cmd = self.formated_cmd(native_cmd, values_to_set) awaited_res_if_ok = None if len(native_cmd_infos) > 1: awaited_res_if_ok = native_cmd_infos[1] #native_res = self.exec_native_cmd(self.formated_cmd(native_cmd,value), awaited_res_ok) native_res = self.exec_native_cmd(native_cmd) ok = True if not awaited_res_if_ok else (native_res == awaited_res_if_ok) return GenericResult(native_res, ok) ''' **************************** **************************** GENERIC TELESCOPE COMMANDS (abstract methods) **************************** **************************** ''' ''' **************************** GENERIC GET & SET commands **************************** ''' @generic_cmd def get_timezone(self): pass #def get_timezone(self): return self.exec_generic_cmd('get_timezone') @generic_cmd def set_timezone(self, hh): pass #def set_timezone(self, hh): return self.exec_generic_cmd('set_timezone', hh) @generic_cmd def get_date(self): pass @generic_cmd def set_date(self, mmddyy): pass @generic_cmd def get_time(self): pass @generic_cmd def set_time(self, hhmmss): pass ''' **************************** GENERIC DO commands **************************** ''' # @abstract #def do_INIT(self): return self._do("INIT") ''' do_PARK() (p103) - STARTUP position = CWD - :hC# - position required for a Cold or Warm Start, pointing to the celestial pole of the given hemisphere (north or south), with the counterweight pointing downwards (CWD position). From L4, V1.0 up - HOME position parking => par defaut, c'est CWD, mais ca peut etre different - :hP# - defaults to the celestial pole visible at the given hemisphere (north or south) and can be set by the user ''' # @abstract def do_PARK(self): pass #def do_PARK(self): return self._do("PARK") # @abstract def do_start(self): pass def do_stop(self): pass # @abstract MACRO def do_init(self): raise UnimplementedGenericCmdException # TODO: empecher de creer une instance de cette classe abstraite # Avec ABC ? ''' if __name__ == "__main__": #HOST, PORT = "localhost", 9999 #HOST, PORT = "localhost", 20001 HOST, PORT = "localhost", 11110 # Classic usage: #tsock = SocketClient_UDP_TCP(HOST, PORT, "UDP") # More elegant usage, using "with": with SocketClient_ABSTRACT(HOST, PORT, "UDP") as tsock: # 0) CONNECT to server (only for TCP, does nothing for UDP) tsock._connect_to_server() while True: # 1) SEND REQUEST data to server # saisie de la requête au clavier et suppression des espaces des 2 côtés data = input("REQUEST TO SERVER [ex: ':GD#' (Get Dec), ':GR#' (Get RA)']: ").strip() # test d'arrêt if data=="": break #data_to_send = bytes(data + "\n", "utf-8") tsock.send_data(data) #mysock.sendto("%s" % data, (HOST, PORT)) #print("Sent: {}".format(data)) # 2) RECEIVE REPLY data from server data_received = tsock.receive_data() #reponse, adr = mysock.recvfrom(buf) #print("Received: {}".format(data_received)) #print("Useful data received: {}".format(data_useful)) print('\n') #tsock.close() '''