#!/usr/bin/env python3 """Socket Client Telescope (abstract) implementation To be used as a base class (interface) for any concrete socket client telescope class """ # Standard library imports #from enum import Enum import copy from dataclasses import dataclass, field import functools #import inspect import logging import os import pprint import socket import sys import threading import time from typing import Dict # Third party imports # Local application imports #sys.path.append("../../..") sys.path.append("../../../..") #import src.core.pyros_django.utils.celme as celme import vendor.guitastro.src.guitastro as guitastro from src.logpyros import LogPyros #sys.path.append('../..') #from src.client.socket_client_abstract import UnknownNativeCmdException, SocketClientAbstract ##from src_socket.client.socket_client_abstract import * ##from src_device.client.client_channel import * sys.path.append("../..") ##from device_controller.logs import * from src.device_controller.channels.client_channel import ChannelCommunicationException from src.device_controller.channels.client_channel_socket import ClientChannelSocket from src.device_controller.channels.client_channel_serial import ClientChannelSerial from src.device_controller.channels.client_channel_usb import ClientChannelUSB import config # Execute also "set" and "do" commands GET_ONLY=False # Execute only "get" commands #GET_ONLY=True # Default timeouts TIMEOUT_SEND = 10 TIMEOUT_RECEIVE = 10 ''' logger = LogPyros(__name__) def log(self, *args, **kwargs): logger.print(*args, **kwargs) # DEBUG print def printd(self, *args, **kwargs): logger.printd(*args, **kwargs) def tprintd(self, *args, **kwargs): printd('(THREAD):', *args, *kwargs) ''' def printd(*args, **kwargs): if os.environ.get('PYROS_DEBUG', '0')=='1': print(*args, **kwargs) ''' class c(Enum): # GET, SET DEC = 'DEC' RA = 'RA' RA_DEC = 'RA_DEC' # DO PARK = 'PARK' WARM_START = 'WARM_START' ''' # DECORATOR def generic_cmd(func): #def wrapper_generic_cmd(*args, **kwargs): @functools.wraps(func) def wrapper_generic_cmd(self, values_to_set=None): #printd("func name is", func.__name__) return self.exec_generic_cmd(func.__name__, values_to_set) return wrapper_generic_cmd # DECORATOR def recursive_search(f): @functools.wraps(f) def wrapped(self, *args, **kwargs): #def wrapped(*args, **kwargs): #printd(f.__name__) #return f(*args, **kwargs) ko_return_value = f(self, *args, **kwargs) return self.do_command_recursive_search_using_function(ko_return_value, f.__name__, *args, **kwargs) return wrapped class DeviceCmd: full_name:str = '' name = None args = None # Device component type devtype = None def __init__(self, cmd_full_name:str, dev_comp_type:str=None, cmd_args:str=None): self.full_name = cmd_full_name dev_comp_type,cmd_name,cmd_args = self.get_full_name_parts() self.name = cmd_name self.devtype = dev_comp_type self.args = cmd_args if self.is_generic(): dev_comp_type,cmd_name,cmd_args = self.get_full_name_parts() self.name = cmd_name if dev_comp_type: self.devtype = dev_comp_type if cmd_args: self.args = cmd_args def __str__(self): return (f"Commmand '{self.full_name}'") def is_generic(self): ''' cmd_name = self.full_name if '.' in self.full_name: cmd_name = self.full_name.split('.')[1] ''' cmd_name = self.full_name[self.full_name.find('.')+1:] return cmd_name.startswith('do_') or cmd_name.startswith('get_') or cmd_name.startswith('set_') ''' @classmethod def is_generic_cmd_name(cls, cmd_name:str): if '.' in cmd_name: cmd_name = cmd_name.split('.')[1] return cmd_name.startswith('do_') or cmd_name.startswith('get_') or cmd_name.startswith('set_') def is_generic(self): #return type(self).is_generic_cmd_name(self.full_name) return self.is_generic_cmd_name(self.full_name) #return DeviceCmd.is_generic_cmd_name(self.full_name) #return self.name.startswith('do_') or self.name.startswith('get_') or self.name.startswith('set_') ''' @property def name_and_args(self): cmd_name_and_args = self.full_name if '.' in cmd_name_and_args: cmd_name_and_args = cmd_name_and_args.split('.')[1] return cmd_name_and_args def get_full_name_parts(self): cmd_name = self.full_name devtype = None cmd_args = None if '.' in cmd_name: devtype, cmd_name = cmd_name.split('.') if ' ' in cmd_name: cmd_name, *cmd_args = cmd_name.split(' ') return devtype, cmd_name, cmd_args class GenericResult: ''' Usage: res = execute(command) printd("result is", res) if res.ko: raise UnexpectedReturnCode() if res.ok: ... ''' # By default, bad result ok = True ko = not ok unknown_command=False unknown_result=False def __init__(self, native_result:str, ok=True, unknown_command=False, unknown_result=False): self.txt = native_result self.ok = ok self.ko = not ok self.unknown_command = unknown_command self.unknown_result = unknown_result def __str__(self): return self.txt ''' def __repr__(self): return self.txt def __get__(self, instance, owner): return self.b def __set__(self, instance, value): self.b = value ''' ''' ************************** EXCEPTIONS DEFINITION ************************** See https://docs.python.org/3/tutorial/errors.html ''' class DCCNotFoundException(Exception): ''' Raised when a specific DCC is not available ''' pass class UnknownNativeCmdException(Exception): ''' Raised when a NATIVE command name is not recognized by the controller ''' pass ''' def __init__(self,*args,**kwargs): super().__init__(self,*args,**kwargs) ''' class UnknownNativeResException(Exception): ''' Raised when a NATIVE command result is not recognized by the controller ''' pass class UnknownGenericCmdException(Exception): pass class UnimplementedGenericCmdException(Exception): ''' Raised when a GENERIC cmd has no implementation in the controller (no native cmd available for the generic cmd) ''' def __str__(self): #return f"({type(self).__name__}): Generic command '{self.args[0]}' has no implementation in the controller" return f"({type(self).__name__}): Device Generic command has no implementation in the controller" class UnknownGenericCmdArgException(Exception): ''' Raised when a GENERIC cmd argument is not recognized by the controller (no native cmd available for the generic cmd) ''' def __init__(self, name, arg): self.name = name self.arg = arg def __str__(self): return f"The argument '{self.arg}' does not exist for generic cmd {self.name}" class UnexpectedCommandReturnCode(Exception): pass class DeviceTimeoutException(Exception): pass # PYTHON 3.7+ only @dataclass class Cmd: generic_name: str = 'generic name' native_name: str = '' desc: str = 'Description' params: Dict[str, str] = field(default_factory=dict) final_simul_response: str = 'simulator response' final_device_responses: Dict[str, str] = field(default_factory=dict) immediate_responses: Dict[str, str] = field(default_factory=dict) errors: Dict[str, str] = field(default_factory=dict) # MORE CLASSICAL VERSION class Cmd2: def __init__(self, generic_name: str = 'generic name', native_name: str = 'native name', params: Dict[str, str] = {}, final_device_responses: Dict[str, str] = {}, final_simul_response: str = 'simulator response', immediate_responses: Dict[str, str] = {}, errors: Dict[str, str] = {} ): self.generic_name = generic_name self.native_name = native_name self.params = params self.final_device_responses = final_device_responses self.final_simul_response = final_simul_response self.immediate_responses = immediate_responses self.errors = errors # TODO: move dans classe Cmd def cmd_get_native_name(native_cmd_infos:Cmd): #if isinstance(native_cmd_infos, Cmd): return [native_cmd_infos.native_name, native_cmd_infos.final_simul_response] if isinstance(native_cmd_infos, Cmd): return native_cmd_infos.native_name return native_cmd_infos[0] def cmd_get_simul_response(native_cmd_infos:Cmd): if isinstance(native_cmd_infos, Cmd): return native_cmd_infos.final_simul_response if native_cmd_infos.final_simul_response!='simulator response' else None return native_cmd_infos[1] if len(native_cmd_infos)>1 else None def cmd_set_simulated_answer(get_cmd:Cmd, simulated_answer:str): if isinstance(get_cmd, Cmd): get_cmd.final_simul_response = simulated_answer else: get_cmd[1] = simulated_answer def cmd_native_name_upper_is(native_cmd_infos:Cmd, cmd:str): if isinstance(native_cmd_infos, Cmd): return cmd.upper() == native_cmd_infos.native_name.upper() return cmd in native_cmd_infos and cmd.upper() == native_cmd_infos[0].upper() def cmd_native_name_is(native_cmd_infos:Cmd, cmd:str): if isinstance(native_cmd_infos, Cmd): return cmd == native_cmd_infos.native_name return cmd in native_cmd_infos and cmd == native_cmd_infos[0] def cmd_has_native_infos(native_infos): if isinstance(native_infos, Cmd): return native_infos.native_name != '' return native_infos is not None def cmd_get_native_infos(native_infos): if isinstance(native_infos, Cmd): return [native_infos.native_name, native_infos.final_simul_response] return native_infos class Gen2NatCmds: # To be set by constructor GEN2NAT_CMDS = {} # NEW format #cmds = {} def __init__(self, cmds:dict={}): #self.cmds = {} # cmds is a dict of cmd: self.GEN2NAT_CMDS = cmds # cmds is a list of cmd: if isinstance(cmds, list): self.GEN2NAT_CMDS = {} self.add_cmds(cmds) def __str__(self)->str: return str(self.GEN2NAT_CMDS) # build cmd as dict def build_cmd(self, generic_name: str, native_name: str, params: dict = {}, final_device_responses: dict = {}, final_simul_response: str = '', immediate_responses: dict = {}, errors: dict = {}, )->dict: cmd = { 'generic_name': generic_name, 'native_name': native_name, 'params': params, # ready 'final_device_responses': final_device_responses, 'final_simul_response': final_simul_response, 'immediate_responses': immediate_responses, # native error codes 'errors': errors } return cmd # TODO: def build_cmd_get_set(self, generic_name:str, native_get_name:str, native_set_name:str)->(dict,dict): get_cmd = set_cmd = {} return (get_cmd, set_cmd) # TODO: def build_cmd_do(self, generic_name:str, native_name:str): return {} ''' def add_cmd(self, cmd:dict): self.cmds[cmd['generic_name']] = cmd ''' def add_cmd(self, cmd:Cmd): self.GEN2NAT_CMDS[cmd.generic_name] = cmd def add_cmds(self, *cmds): ''' # add a dict elif isinstance(cmds[0], dict): self.GEN2NAT_CMDS.update(cmds[0]) ''' # add another instance if isinstance(cmds[0], Gen2NatCmds): self.GEN2NAT_CMDS.update(cmds[0].get_as_dict()) # add a list elif isinstance(cmds[0], list): for cmd in cmds[0]: self.add_cmd(cmd) elif isinstance(cmds[0], str): #if len(cmds) == 1: raise Exception("Missing second arg (obj, list or dict)") if len(cmds) == 1: raise Exception("Missing second arg (a list)") ''' # add a key:dict elif isinstance(cmds[1], dict): self.GEN2NAT_CMDS[cmds[0]] = cmds[1] # add a key:list if isinstance(cmds[1], list): self.GEN2NAT_CMDS[cmds[0]] = { cmd.generic_name:cmd for cmd in cmds[1] } ''' # add another instance if isinstance(cmds[1], Gen2NatCmds): self.GEN2NAT_CMDS[cmds[0]] = cmds[1].get_as_dict() else: #raise Exception("Second arg should be a obj, list, or dict") raise Exception("Second arg should be a list") else: raise Exception("bad arguments") ''' # add a list of cmd (cmd1, cmd2, cmd3, ...) elif isinstance(cmds[0], Cmd): for cmd in cmds: self.add_cmd(cmd) ''' def get_as_dict(self): return self.GEN2NAT_CMDS def print_mes_commandes(self): pprint.sorted = lambda x, key=None: x pprint.pprint(self.GEN2NAT_CMDS) #for cmd in self.cmds: print(cmd) def get(self, cmd:str=None): #if cmd: return self.GEN2NAT_CMDS.get(cmd) #return self.GEN2NAT_CMDS if cmd is None: return self.GEN2NAT_CMDS # 1) search in my MAIN commands native_infos = self.GEN2NAT_CMDS.get(cmd) printd("native infos:", native_infos) # native_cmd can be [] or [infos] (or None) ##if native_infos is not None: return native_infos if cmd_has_native_infos(native_infos) : return cmd_get_native_infos(native_infos) #printd(self.GEN2NAT_CMDS) ''' # 2) search in each DCC commands # !! BAD !! because general dict is not complete for each dcc (especially, does not contain macro-commands, like get_radec, ...) # So, better not to use this for key in self.GEN2NAT_CMDS.keys(): if isinstance(self.GEN2NAT_CMDS[key], dict): native_infos = self.GEN2NAT_CMDS[key].get(cmd) printd('key is', key, 'native cmd is', native_infos) if native_infos is not None: return native_infos ''' # Native infos not found return None #def update(self, newdict:dict): self.GEN2NAT_CMDS = { **self.GEN2NAT_CMDS, **newdict } def get_native_infos_for_generic_cmd(self, cmd:str)->str: return self.get(cmd) def get_native_cmd_for_generic(self, cmd:str)->str: val = self.get_native_infos_for_generic_cmd(cmd) if not val: return None return val[0] def get_simulated_answer_for_generic_cmd(self, cmd:str)->str: val = self.get(cmd) if not val: return None # no answer available if len(val) == 1: return None return val[1] #TODO: get answer from dict def get_related_native_get_cmd_name_for_set_cmd(self, cmd:str): return 'G'+ cmd[1] def get_simulated_answer_for_native_cmd(self, cmd:str)->str: for val in self.GEN2NAT_CMDS.values(): if isinstance(val, Cmd): if val.native_name == cmd: return val.final_simul_response continue if cmd in val: # No native cmd defined if cmd != val[0]: return None # no answer available if len(val) < 2: return None # return simulated answer return val[1] # no simulated answer found return None def set_simulated_answer_for_native_get_cmd(self, get_cmd_name:str, get_cmd_simulated_answer:str): for key,val in self.GEN2NAT_CMDS.items(): # do not search in components (it will be done anyway at another level) #if isinstance(val, dict): return False if isinstance(val, dict): continue ##if val and get_cmd_name.upper() in val[0].upper(): if cmd_native_name_upper_is(val, get_cmd_name): # no answer available ##if len(val) < 2: return False if not cmd_get_simul_response(val): return False ##val[1] = get_cmd_simulated_answer[3:-1] cmd_set_simulated_answer(val, get_cmd_simulated_answer[3:-1]) self.GEN2NAT_CMDS[key] = val return True # nothing set return False ''' #TODO: chercher dans les DCC !! (recursive) def is_valid_native_cmd(self, cmd:str)->bool: # 1) search in my MAIN commands if cmd in self.GEN2NAT_CMDS.values(): return True # 2) search in each DCC commands for key in self.GEN2NAT_CMDS.keys(): if isinstance(self.GEN2NAT_CMDS[key], dict): if cmd in self.GEN2NAT_CMDS[key].values(): return True return False ''' def has_native_cmd(self, cmd:str)->bool: # Simplist version ##return cmd in self.GEN2NAT_CMDS.values() # More elaborated version for native_cmd_infos in self.GEN2NAT_CMDS.values(): #if cmd in native_cmd_infos and cmd == native_cmd_infos[0]: return True if cmd_native_name_is(native_cmd_infos, cmd): return True # no notive cmd found return False def print_available_cmds(self): #printd("All commands are:", self._gen2nat_cmds.keys()) print("\nAvailable commands:") print("=======================") # 1) print general commands self.print_available_cmds_for_dcc("General") # 2) print commands for each DCC: for key in self.GEN2NAT_CMDS.keys(): if isinstance(self.GEN2NAT_CMDS[key], dict): self.print_available_cmds_for_dcc(key) def print_available_cmds_for_dcc(self, dcc_key): d = self.GEN2NAT_CMDS if dcc_key=="General" else self.GEN2NAT_CMDS[dcc_key] print(f"\n{dcc_key} commands are:") print("- GET commands:") #print (list(cmd.replace('_',' ') for cmd in self._gen2nat_cmds.keys() if cmd.startswith('get_'))) print(list(cmd for cmd in d.keys() if cmd.startswith('get_'))) print("- SET commands:") print(list(cmd for cmd in d.keys() if cmd.startswith('set_'))) print("- DO commands:") print(list(cmd for cmd in d.keys() if cmd.startswith('do_'))) #TODO: remove ClientChannelAbstract, and set instead a ClientChannel #class DeviceController(SocketClientAbstract): ##class DeviceController(ClientChannel): class DeviceController(): DEBUG = False _device_simulator = None _thread_device_simulator = None _device_host = "localhost" _device_port = None # List of device controller (dc) components (by default, None) _my_dc_components = [] # ClientChannel used by the device controller (to be set during __init__ via set_client_channel()) _my_channel = None # @abstract (to be overriden) _cmd_device_concrete = {} _cmd_device_abstract = {} ##_cmd = { GEN2NAT_CMDS_obj = Gen2NatCmds([ # General format: #'cmd_generic_name': ['cmd_native_name', 'default_simulator_answer', 'native_answer1', 'native_answer2', ...], # GET-SET commands: Cmd('get_timezone'), Cmd('set_timezone'), Cmd('get_date'), Cmd('set_date'), Cmd('get_time'), Cmd('set_time'), # for test only Cmd('dc_only'), # DO commands: Cmd('do_init','do_init'), Cmd('do_park'), ]) #my_cmds3.add_cmds(GEN2NAT_CMDS_obj) ''' print("******************************") print("(DC) Mes commandes") my_cmds3.print_mes_commandes() print("******************************") ''' GEN2NAT_CMDS_dict = { # General format: #'cmd_generic_name': ['cmd_native_name', 'default_simulator_answer', 'native_answer1', 'native_answer2', ...], # GET-SET commands: 'get_timezone': [], 'set_timezone': [], 'get_date': [], 'set_date': [], 'get_time': [], 'set_time': [], # for test only 'dc_only':[], # DO commands: 'do_init': ['do_init'], 'do_park': [], } GEN2NAT_CMDS = GEN2NAT_CMDS_dict #GEN2NAT_CMDS = my_cmds3.get_as_dict() GEN2NAT_CMDS = GEN2NAT_CMDS_obj class Protocol: # By default, do nothing @classmethod def formated_cmd(cls, cmd:str, value:str=None)->str: return cmd # Encapsulate useful data to be ready for sending # By default, do nothing #def encapsulate_data_to_send(self, data:str): @classmethod def encap(cls, data:str): printd("*** Default encap ***") ##return data return "stamp", data ''' #@deprecated def format_data_to_send(self, data:str): return self.encapsulate_data_to_send(data) ''' # Extract useful data from received raw data # By default, do nothing #def uncap_received_data(self, data:str): @classmethod #def uncap(cls, dcc_name:str, data:str): def uncap(cls, dcc_name:str, stamp:str, data:str): #return data_received.decode() printd(f"*** Default uncap (from {dcc_name})***") return data ''' def unformat_received_data(self, data:str): return self.uncap_received_data(data) ''' ''' def encapsulate_data_to_send(self, command:str): return self._my_channel.encapsulate_data_to_send(command) def uncap_received_data(self, data_received:str): return self._my_channel.uncap_received_data(data_received) ''' # WRAPPER methods def getp(self)->Protocol: return self._protoc def formated_cmd(self, cmd:str, value:str=None)->str: ##return self._protoc.formated_cmd(self, cmd, value) return self._protoc.formated_cmd(cmd, value) def encap(self, data:str): ##self.Protoc.encap(self) ##return self._protoc.encap(self, data) return self._protoc.encap(data) self.log_w("WARNING", "watch your step !!!") ##def uncap(self, data:str): def uncap(self, stamp:str, data:str): ##self.Protoc.encap(self) #return self._protoc.uncap(self, data) ##return self._protoc.uncap(type(self).__name__, data) return self._protoc.uncap(type(self).__name__, stamp, data) ##def __init__(self, device_host:str="localhost", device_port:int=11110, PROTOCOL:str="TCP", buffer_size=1024, DEBUG=False): #def __init__(self, device_host:str="localhost", device_port:int=11110, channel="TCP", buffer_size=1024, protoc=None, gen2nat_cmds={}, device_sim=None, DEBUG=False): #v3 #def __init__(self, device_host:str="localhost", device_port:int=11110, channel="TCP", buffer_size=1024, protoc=None, gen2nat_cmds={}, device_sim=None): #v4 def __init__(self, device_host:str="localhost", device_port:int=11110, channel="TCP", buffer_size=1024, protoc=None, gen2nat_cmds:Gen2NatCmds=None, device_sim=None): ''' :param device_host: server IP or hostname :param device_port: server port :param channel: "SOCKET-TCP", "SOCKET-UDP", "SERIAL", "USB", or instance of Channel ''' self.stamp_current = None #self.DEBUG_MODE = DEBUG ##self.DEBUG_MODE = os.environ.get('PYROS_DEBUG', '0') == '1' self.DEBUG_MODE = config.is_debug() ##set_logger(self.DEBUG_MODE) ##log_d("Logger configured") self.log = LogPyros(self.__class__.__name__) self.log_d("coucou") if self.DEBUG_MODE: self.log.set_global_log_level(LogPyros.LOG_LEVEL_DEBUG) # Set host (IP) and port #printd("IN DeviceController") self._device_host = device_host self._device_port = device_port # Set Protocol self._protoc = protoc if protoc else self.Protoc # Set Commands # overwrite abstract _cmd dictionary with subclass native _cmd_native dictionary: # Merge my commands dictionary with the one passed by subclass # _gen2nat_cmds = GEN2NAT_CMDS + gen2nat_cmds #printd("(dc1) MY COMMANDS ARE (before merge):", self.GEN2NAT_CMDS) #printd("(dc3) GIVEN CMDS (before merge):", gen2nat_cmds) #self._gen2nat_cmds = copy.deepcopy(self.GEN2NAT_CMDS) #self._gen2nat_cmds.update(gen2nat_cmds) ##self._gen2nat_cmds = { **self.GEN2NAT_CMDS, **gen2nat_cmds } #v3 #self._gen2nat_cmds = { **DeviceController.GEN2NAT_CMDS, **gen2nat_cmds } #v4 self._gen2nat_cmds = { **(DeviceController.GEN2NAT_CMDS.get_as_dict()), **(gen2nat_cmds.get_as_dict()) } self.printd(self, "MY COMMANDS ARE (after merge):", self._gen2nat_cmds) #self.Gen2NatCmds.update(gen2nat_cmds) #self._gen2nat_cmds = self.Gen2NatCmds #self._gen2nat_cmds = {**self._gen2nat_cmds, **self._gen2nat_cmds_native} ##self._gen2nat_cmds = {**self._gen2nat_cmds, **self._gen2nat_cmds_device_abstract, **self._gen2nat_cmds_device_concrete} self._my_cmds = Gen2NatCmds(self._gen2nat_cmds) # Set Channel and Simulator if not isinstance(channel, str): self._my_channel = channel self._device_simulator = None else: if channel.startswith("SOCKET"): #self._my_channel:ClientChannel = ClientChannelSocket(device_host, device_port, channel, buffer_size, DEBUG) self._my_channel:ClientChannel = ClientChannelSocket(device_host, device_port, channel, buffer_size) elif channel == "SERIAL": self._my_channel:ClientChannel = ClientChannelSerial(device_host, device_port, buffer_size) elif channel == "USB": self._my_channel:ClientChannel = ClientChannelUSB(device_host, device_port, buffer_size) else: raise Exception("Unknown Channel", channel) # If LOCALHOST, launch the device SIMULATOR if device_host=="localhost": if not device_sim: raise Exception("No simulator class available") ##self._device_simulator = device_sim(protoc, gen2nat_cmds) self._device_simulator = device_sim # Pass to my simulator a reference to myself self._device_simulator.set_dc(self) # TODO: a virer car plus necessaire vu qu'on passe le DC au simu ##self._device_simulator.set_protoc_and_cmds(protoc, self._my_cmds) self.printd("SIMU IS", device_sim, self._device_simulator) self._thread_device_simulator = threading.Thread(target=self.device_simulator_run) self._thread_device_simulator.start() #self.log_w("WARNING", "watch your step !!") # Set my list of dc components def set_dc_components(self, dc_components:list): self._my_dc_components = dc_components # So that we can use this with the "with" statement (context manager) def __enter__(self): return self def __exit__(self, type, value, traceback): self._my_channel.__exit__(type, value, traceback) ''' def set_logger(self, DEBUG): self._my_channel.set_logger(DEBUG) ''' def print(self, *args, **kwargs): self.log.print(*args, **kwargs) def printd(self, *args, **kwargs): self.log.printd(*args, **kwargs) def tprintd(self, *args, **kwargs): self.printd('(THREAD):', *args, *kwargs) def log_d(self, *args, **kwargs): self.log.log_d(*args, **kwargs) def log_i(self, *args, **kwargs): self.log.log_i(*args, **kwargs) def log_w(self, *args, **kwargs): self.log.log_w(*args, **kwargs) def log_e(self, *args, **kwargs): self.log.log_e(*args, **kwargs) def log_c(self, *args, **kwargs): self.log.log_c(*args, **kwargs) def device_simulator_run(self): #HOST, PORT = "localhost", 11110 #with get_SocketServer_UDP_TCP(HOST, PORT, "UDP") as myserver: self.printd("\n******************************") self.printd("******* (DC) Starting device simulator on (host:port): ", self._device_host+':'+str(self._device_port)) self.printd("******************************\n") self._device_simulator.serve_forever(self._device_port) #with get_SocketServer_UDP_TCP(self.HOST, self.PORT, "UDP") as myserver: myserver.serve_forever() ''' myserver = get_SocketServer_UDP_TCP(self.HOST, self.PORT, "UDP") myserver.serve_forever() ''' def _connect_to_device(self): self._my_channel._connect_to_server() def get_guitastro_longitude(self, longitude): return guitastro.Angle(longitude).sexagesimal("d:+0180.0") def get_celme_latitude(self, latitude): return guitastro.Angle(latitude).sexagesimal("d:+090.0") #@override ClientChannel send_data def send_data(self, data:str): ##data_encapsulated = self.format_data_to_send(data) self.stamp_current, data_encapsulated = self.encap(data) self._my_channel.send_data(data_encapsulated) ''' The chosen way to send data is this: # - :GD# b'00030000:GD#\x00' Another way to send data (which also works), is it better ? # - :GD# ###tsock.mysock.sendto(b'\x00\x00\x00\x01\x00\x00\x00\x00\x3A\x47\x44\x23\x00', (HOST, PORT)) # - :GR# ###tsock.mysock.sendto(b'\x00\x00\x00\x01\x00\x00\x00\x00\x3A\x47\x52\x23\x00', (HOST, PORT)) # - ACK 06 OK !!! : tsock.mysock.sendto(b'\x00\x00\x00\x01\x00\x00\x00\x00\x00\x06\x00\x00', (HOST, PORT)) Which one is the best method ? ''' #log_d("NATIVE Command to send is "+repr(data)) ##encapsulated_data = self.encapsulate_data_to_send(data) #printd("before _send", encapsulated_data) #printd("before _send", repr(encapsulated_data)) ##self._send_data(encapsulated_data) ##self._my_channel.send_data(encapsulated_data) #log_i(f'Sent: {encapsulated_data}') ''' def _send_data(self, data): self._my_channel._send_data(data) ''' #@override ClientChannel receive_data def receive_data(self)->str: ##data_received_bytes = self._receive_data() data_received = self._my_channel.receive_data() #log_d("Received (all data): {}".format(data_received)) #log_d("data in bytes: "+str(bytes(data_received, "utf-8"))) ##data = self.unformat_received_data(data_received) data = self.uncap(self.stamp_current, data_received) self.printd(f"(dc) ({self}) RECEIVED (useful data): {data}") ##log_i(f"(dc) ({self}) RECEIVED (useful data): {data}") return data ''' def _receive_data(self): return self._my_channel._receive_data() ''' def get_utc_date(self): return guitastro.Date("now").iso(0) #return celme.Date("now").ymdhms() def close(self): self._my_channel.close() # Stop device simulator (only if used) if self._device_host=="localhost": self.printd("Stopping device simulator") self._device_simulator.stop() ''' def is_generic_cmd(self, raw_input_cmd:str) -> bool: printd("raw_input_cmd is", raw_input_cmd) # Using Google documentation format (https://www.sphinx-doc.org/en/master/usage/extensions/example_google.html#example-google) #"" Is this a generic command ? Args: raw_input_cmd: a command in string format (like 'set_state active' or 'get_ra' or 'set_ra 20:00:00' or 'set_radec 20:00:00 90:00:00" or 'do_park'...) Returns: either False or (cmd, [args]) with cmd like "get_ra" and [args] like ['20:00:00', '90:00:00'] #"" #return cmd.startswith('get_') or cmd.startswith('set_') or cmd.startswith('do_') #cmds = ['get ', 'get_', 'set ', 'set_', 'do ', 'do_'] #'' seps = (" ", "_") #cmds = list(x+y for x in cmd for y in sep) for cmd in cmds: for sep in seps: generic_cmd = cmd+sep if raw_input_cmd.startswith(generic_cmd): # Is there value(s) passed ? if len(raw_input_cmd) > len(generic_cmd): values = raw_input_cmd[len(generic_cmd):] values = values.split(' ') # return cmd like "get_ra", [and values] return generic_cmd.replace(' ','_'), values return False, False #'' ##cmds = ("get","set","do") #'' # ex: "set radec" => "set_radec" raw_input_cmd = raw_input_cmd.strip() cmd_splitted = raw_input_cmd.split(' ') if len(cmd_splitted) == 1: return False,False generic_cmd = cmd_splitted[0] + '_' + cmd_splitted[1] #'' # Ex: "set_radec 15 30", "do_init", "get_radec", "set_state active", "do_goto_radec 15 45"... tokens = raw_input_cmd.split(' ') generic_cmd = tokens[0] # Check this generic command exists #if (generic_cmd not in self._gen2nat_cmds.keys()): return False,False if generic_cmd not in self._gen2nat_cmds: return False,False # Is there value(s) passed ? ###if len(cmd_splitted) > 2: values_to_set = cmd_splitted[2:] args = tokens[1:] if len(tokens)>1 else None # ex: return "set_radec", ["20:00:00", "90:00:00"] return generic_cmd, args ''' def has_dc_component_for_type(self, dc_component_type:str): for dcc in self._my_dc_components: self.printd(dc_component_type, "in ??????", type(dcc).__name__) #if dc_component_type in dcct.__class__.__name__: if dc_component_type in type(dcc).__name__: return True return False #res = self.getDeviceControllerForType(cmd.device_type).execute_cmd(cmd.full_name) def get_dc_component_for_type(self, dc_component_type:str): #->DeviceController: # By default, return myself (as a DeviceController component) # ex1: None # ex2: "Telescope" (is in "AgentDeviceTelescopeGemini") #if dc_component_type is None or dc_component_type in self.__class__.__name__ : return self if dc_component_type is None or dc_component_type in type(self).__name__ : return self #for dcc in type(self).mro(): #for dcc in type(self).__bases__ : self.printd("components are", self._my_dc_components) self.printd("1st component is", self._my_dc_components[0]) for dcc in self._my_dc_components: self.printd(dc_component_type, "in ??????", type(dcc).__name__) #if dc_component_type in dcct.__class__.__name__: if dc_component_type in type(dcc).__name__: return dcc raise DCCNotFoundException("DEVICE CONTROLLER COMPONENT NOT FOUND: "+dc_component_type) def is_valid_cmd(self, cmd:DeviceCmd): self.printd("cmd.name is", cmd.name) self.printd(cmd) self.printd("generic ?", cmd.is_generic()) self.printd("is valid generic ?", self.is_valid_generic_cmd(cmd)) return ( ( cmd.is_generic() and self.is_valid_generic_cmd(cmd) ) or self.is_valid_native_cmd(cmd) ) def is_generic_but_UNIMPLEMENTED_cmd(self, cmd:DeviceCmd): return cmd.is_generic() and not self.is_implemented_generic_cmd(cmd) # WRAPPER methods def has_generic_cmd(self, cmd:DeviceCmd): #return self._my_cmds.get(cmd.name) is not None return cmd.name in self._my_cmds.get() def has_native_cmd(self, cmd:DeviceCmd)->bool: #return self._my_cmds.get(cmd.name) is not None return self._my_cmds.has_native_cmd(cmd.name) def has_native_cmd_for_generic(self, generic_cmd:DeviceCmd): return self._my_cmds.get_native_infos_for_generic_cmd(generic_cmd.name) not in [None, []] # check if generic cmd exists def is_valid_generic_cmd(self, cmd:DeviceCmd): #printd("_my_cmds", self._my_cmds) # 1) If a DCC given, return search result in this DCC commands if cmd.devtype: if not self.has_dc_component_for_type(cmd.devtype): return False return self.get_dc_component_for_type(cmd.devtype).has_generic_cmd(cmd) # 2) Search in my general commands if self.has_generic_cmd(cmd): return True # 3) Search in all my DCCs for dcc in self._my_dc_components: if dcc.has_generic_cmd(cmd): return True # 4) not found return False ''' ##if not cmd.devtype: return self._my_cmds.get(cmd.name) is not None if not cmd.devtype: return self.has_generic_cmd(cmd) if not self.has_dc_component_for_type(cmd.devtype): return False ##return self.get_dc_component_for_type(cmd.devtype)._my_cmds.get(cmd.name) is not None return self.get_dc_component_for_type(cmd.devtype).has_generic_cmd(cmd) ''' # check if generic cmd exists and is implemented as a native cmd (in dictionary) def is_implemented_generic_cmd(self, cmd:DeviceCmd): self.printd("is implemented generic ?") self.printd("cmd.devtype", cmd.devtype) #printd("_my_cmds", self._my_cmds) #return self._my_cmds.get_native_infos_for_generic_cmd(cmd.name) not in [None, []] # 1) If a DCC given, return search result in this DCC commands if cmd.devtype: if not self.has_dc_component_for_type(cmd.devtype): return False #self.printd("1.2") return self.get_dc_component_for_type(cmd.devtype).has_native_cmd_for_generic(cmd) # 2) Search in my general commands if self.has_native_cmd_for_generic(cmd): #self.printd("2") return True # 3) Search in all my DCCs for dcc in self._my_dc_components: #self.printd("3") if dcc.has_native_cmd_for_generic(cmd): return True # 4) not found return False ''' ##if not cmd.devtype: return self._my_cmds.get_native_infos_for_generic_cmd(cmd.name) not in [None, []] if not cmd.devtype: return self.has_native_cmd_for_generic(cmd) if not self.has_dc_component_for_type(cmd.devtype): return False ##return self.get_dc_component_for_type(cmd.devtype)._my_cmds.get(cmd.name) not in [None, []] return self.get_dc_component_for_type(cmd.devtype).has_native_cmd_for_generic(cmd) ''' def is_valid_native_cmd(self, cmd:DeviceCmd)->bool: ##return self._my_cmds.is_valid_native_cmd(cmd.name) # 1) If a DCC given, return search result in this DCC commands if cmd.devtype: if not self.has_dc_component_for_type(cmd.devtype): return False return self.get_dc_component_for_type(cmd.devtype).has_native_cmd(cmd) # 2) Search in my general commands if self.has_native_cmd(cmd): return True # 3) Search in all my DCCs for dcc in self._my_dc_components: if dcc.has_native_cmd(cmd): return True # 4) not found return False def get_dcc_and_native_cmd_for_generic(self, generic_cmd:str): #if generic_cmd not in self._gen2nat_cmds.keys(): raise UnknownNativeCmdException() # Is it a general command for the (general) controller ? ''' if generic_cmd in self._gen2nat_cmds: return self, self._gen2nat_cmds[generic_cmd] ''' # 1) Search in my MAIN commands nc_infos = self._my_cmds.get_native_infos_for_generic_cmd(generic_cmd) # return nc_infos only if not None (but can be an empty answer like []) # same as "if nc_infos is not None" if nc_infos: return self, nc_infos # 2) Search in each DCC commands # Is it a command for one of my dcc ? ''' # Bad because general dict is not complete for each dcc (especially, does not contain macro-commands, like get_radec, ...) for key in self._gen2nat_cmds.keys(): if isinstance(self._gen2nat_cmds[key], dict): # dcc = key if generic_cmd in self._gen2nat_cmds[key]: return key, self._gen2nat_cmds[key][generic_cmd] ''' for dcc in self._my_dc_components: _, native_cmd_infos = dcc.get_dcc_and_native_cmd_for_generic(generic_cmd) if native_cmd_infos: return dcc, native_cmd_infos # Native command not found return None, None def get_native_cmd_for_generic(self, generic_cmd:str): native = self.get_dcc_and_native_cmd_for_generic(generic_cmd)[1] print("native", native) if isinstance(native, Cmd): return [native.native_name, native.final_simul_response] return native ''' def get_related_native_get_cmd_name_for_set_cmd(self, native_cmd:str): #return self._my_cmds.get_related_native_get_cmd_name_for_set_cmd(cmd) # 1) Search in my MAIN commands answer = self._my_cmds.get_related_native_get_cmd_name_for_set_cmd(native_cmd) if answer: return answer # 2) Search in each DCC commands for dcc in self._my_dc_components: answer = dcc.get_related_native_get_cmd_name_for_set_cmd(native_cmd) if answer: return answer # Native command not found return None ''' @recursive_search def get_related_native_get_cmd_name_for_set_cmd(self, native_cmd:str): #return self.do_command_recursive_search_using_function(None, "get_related_native_get_cmd_name_for_set_cmd", native_cmd) return None ''' def get_simulated_answer_for_native_cmd(self, native_cmd:str): # 1) Search in my MAIN commands answer = self._my_cmds.get_simulated_answer_for_native_cmd(native_cmd) if answer: return answer # 2) Search in each DCC commands for dcc in self._my_dc_components: answer = dcc.get_simulated_answer_for_native_cmd(native_cmd) if answer: return answer # Native command not found return None ''' @recursive_search def get_simulated_answer_for_native_cmd(self, native_cmd:str): #return self.do_command_recursive_search_using_function(None, "get_simulated_answer_for_native_cmd", native_cmd) return None ''' def set_simulated_answer_for_native_get_cmd(self, get_cmd_name:str, get_cmd_simulated_answer:str): # 1) Search in my MAIN commands answer = self._my_cmds.set_simulated_answer_for_native_get_cmd(get_cmd_name, get_cmd_simulated_answer) if answer: return answer # 2) Search in each DCC commands for dcc in self._my_dc_components: answer = dcc.set_simulated_answer_for_native_get_cmd(get_cmd_name, get_cmd_simulated_answer) if answer: return answer # Native command not found return False ''' @recursive_search def set_simulated_answer_for_native_get_cmd(self, get_cmd_name:str, get_cmd_simulated_answer:str): #return self.do_command_recursive_search_using_function(False, "set_simulated_answer_for_native_get_cmd", get_cmd_name, get_cmd_simulated_answer) return False def do_command_recursive_search_using_function(self, ko_return_value, fname, *args): # 1) Search in my MAIN commands ##answer = self._my_cmds.get_simulated_answer_for_native_cmd(native_cmd) answer = getattr(self._my_cmds, fname)(*args) if answer: return answer # 2) Search in each DCC commands for dcc in self._my_dc_components: ##answer = dcc.get_simulated_answer_for_native_cmd(native_cmd) answer = getattr(dcc, fname)(*args) if answer: return answer # Native command not found return ko_return_value ''' This method is either called - DIRECTLY from AgentDevice.routine_process().get_device_status() (NO THREAD) or - FROM A THREAD from AgentDevice._thread_exec_specific_cmd().exec_specific_cmd() ''' #def execute_cmd(self, cmd:DeviceCmd)->GenericResult: def exec_cmd(self, raw_input_cmd:str)->GenericResult: ''' :param raw_input_cmd: ''' #generic_cmd, args = self.is_generic_cmd(raw_input_cmd) cmd = DeviceCmd(raw_input_cmd) self.tprintd("cmd is", cmd, raw_input_cmd) # GENERIC command (pyros grammar) #if generic_cmd is not False: if cmd.is_generic(): self.tprintd("GENERIC COMMAND") if not self.is_valid_generic_cmd(cmd): raise UnknownGenericCmdException(cmd.name) #return self.exec_generic_cmd(generic_cmd, args) ''' try: res = self.exec_generic_cmd(cmd.name, cmd.args, cmd.devtype) ''' # Set dcc # Default is myself dcc = self # But can be a component if cmd.devtype: # Get the dcc in charge of this command try: dcc = self.get_dc_component_for_type(cmd.devtype) self.tprintd("*** EXECUTÉ PAR COMPONENT", dcc) except DCCNotFoundException as e: self.log_e(f"(THREAD?) EXCEPTION caught by {type(self).__name__}", e) raise #return (DCC)(self.exec_generic_cmd(generic_cmd, values_to_set, None)) # Delegate cmd execution to the dcc try: res = dcc.exec_generic_cmd(cmd.name, cmd.args) ##except (UnknownGenericCmdException, UnimplementedGenericCmdException, DCCNotFoundException) as e: except (UnknownGenericCmdException, UnimplementedGenericCmdException, UnknownNativeCmdException, UnknownNativeResException) as e: self.log_e(f"(THREAD?) EXCEPTION caught by {type(self).__name__} (from DC {dcc})", e) raise # not executed ? return None return res # NATIVE command ''' if cmd.startswith('get_'): #generic_cmd,_ = request[4:].split('(') generic_cmd = cmd[4:] if (generic_cmd not in self._gen2nat_cmds_getset.keys()) and (generic_cmd not in self._gen2nat_cmds_do.keys()): #eval(request) return self.get_radec() printd("cmd is", generic_cmd) return self._get(generic_cmd) return ''' # NATIVE command self.tprintd("NATIVE COMMAND") if not self.is_valid_native_cmd(cmd): raise UnknownNativeCmdException(cmd.name) #res_native = self.exec_native_cmd(raw_input_cmd) try: res_native = self.exec_native_cmd(cmd.name_and_args) except (UnknownNativeCmdException, UnknownNativeResException) as e: self.log_e(f"THREAD EXCEPTION caught by {type(self).__name__} (from DC)", e) raise return GenericResult(res_native) # To be overriden by subclasses def means_unknown_cmd(self, native_res:str)->bool: return False # To be overriden by subclasses def is_unknown_res(self, native_res:str)->bool: return False #def exec_native_cmd(self, request:str, awaited_res_if_ok=None)->GenericResult: def exec_native_cmd(self, native_cmd:str) -> str: """ Execute a native command Args: native_cmd: a native command Returns: the command result """ self.tprintd("NATIVE Command to send is "+ repr(native_cmd)) # SEND command TO my device try: #self.send_request(native_cmd) self.send_native_cmd(native_cmd) except ChannelCommunicationException as e: self.log_e("Channel error while sending command", e) raise # RECEIVE device answer FROM my device try: native_res = self.receive_data() except ChannelCommunicationException as e: self.log_e("Channel error while sending command", e) raise if self.means_unknown_cmd(native_res): raise UnknownNativeCmdException(native_cmd) if self.is_unknown_res(native_res): raise UnknownNativeResException(native_res) return native_res ''' ok = True if not awaited_res_if_ok else (native_res == awaited_res_if_ok) return GenericResult(native_res, ok) ''' ''' def exec_native_cmd_OLD(self, request:str)->str: self.send_request(request) native_res = self.receive_data() return native_res ''' def execute_unformated_native_cmd(self, request:str) -> str: request = self.formated_cmd(request) #return self.exec_native_cmd_OLD(request) return self.exec_native_cmd(request) def send_native_cmd(self, native_cmd:str)->str: return self.send_data(native_cmd) #@deprecated def send_request(self, request:str)->str: return self.send_native_cmd(request) # wrapper shortcut methods def print_available_cmds(self): self.printd("Here are my available commands:") self._my_cmds.print_available_cmds() def print_available_cmds_for_dcc(self, dcc_key): self.printd("DCC available commands:") self._my_cmds.print_available_cmds_for_dcc(dcc_key) ''' def print_available_commands(self): #printd("All commands are:", self._gen2nat_cmds.keys()) printd("\nAvailable commands:") printd("=======================") # 1) print general commands self.print_available_commands_for_dcc("General") # 2) print commands for each DCC: for key in self._gen2nat_cmds.keys(): if isinstance(self._gen2nat_cmds[key], dict): self.print_available_commands_for_dcc(key) def print_available_commands_for_dcc(self, dcc_key): d = self._gen2nat_cmds if dcc_key=="General" else self._gen2nat_cmds[dcc_key] printd(f"\n{dcc_key} commands are:") printd("- GET commands:") #print (list(cmd.replace('_',' ') for cmd in self._gen2nat_cmds.keys() if cmd.startswith('get_'))) print (list(cmd for cmd in d.keys() if cmd.startswith('get_'))) printd("- SET commands:") print (list(cmd for cmd in d.keys() if cmd.startswith('set_'))) printd("- DO commands:") print (list(cmd for cmd in d.keys() if cmd.startswith('do_'))) def available_commands(self): return list(self._gen2nat_cmds.keys()) ''' #def run_func(self, func, arg=None): def run_func(self, func, *args): #printd("args", args) if args: return getattr(self, func)(*args) else: return getattr(self, func)() ''' def cmd_get_simul_response(self, native_cmd_infos:Cmd): if isinstance(native_cmd_infos, Cmd): return native_cmd_infos.final_simul_response if native_cmd_infos.final_simul_response!='simulator response' else None return native_cmd_infos[1] if len(native_cmd_infos)>1 else None ''' #def exec_generic_cmd(self, generic_cmd:DeviceCmd)->str: ##def exec_generic_cmd(self, generic_cmd:str, values_to_set:str=None, dcc_type:str=None)->str: def exec_generic_cmd(self, generic_cmd:str, values_to_set:str=None)->str: ''' Execute a generic command :param generic_cmd: str like "get_ra" or "set_ra" or "do_park"... :param value: only for a "set_" cmd ''' self.tprintd("(DC):exec_generic_cmd() from", self) #log_d("\n\nGENERIC Command to send is "+generic_cmd) self.tprintd("\n(DC): GENERIC Command to execute is ", generic_cmd) ##printd("(DC): My ("+type(self).__name__+") commands are:", self._gen2nat_cmds) #printd("(DC): My ("+type(self).__name__+") commands are:", self._my_cmds.get()) #printd("(DC): My ("+type(self).__name__+") commands are:", self._my_cmds.get()) if self.DEBUG_MODE: self.tprintd("(DC): My ("+type(self).__name__+") commands are:") pprint.pprint(self._my_cmds.get()) # 1) Get Native command corresponding to the generic one dcc, native_cmd_infos = self.get_dcc_and_native_cmd_for_generic(generic_cmd) self.tprintd("native_cmd_infos", native_cmd_infos) # Command is unknown if native_cmd_infos is None: raise UnknownGenericCmdException(generic_cmd) # Command is not implemented (empty native infos) if not native_cmd_infos: raise UnimplementedGenericCmdException(generic_cmd) ''' # Command is one of my dcc's, so pass it the command for exec if dcc != self: #dcc = self.get_dc_component_for_type(dcc_type) return dcc.exec_generic_cmd(generic_cmd, values_to_set) ''' # Get corresponding native command: #native_cmd = native_cmd_infos[0] native_cmd = cmd_get_native_name(native_cmd_infos) print("***** generic, native_cmd", generic_cmd, native_cmd) if not native_cmd: raise UnimplementedGenericCmdException(generic_cmd) # 2) MACRO-COMMAND or NORMAL NATIVE COMMAND ? # 2.a) MACRO-COMMAND (ex: native_cmd == "do_goto", "do_init", "get_radec") if native_cmd == generic_cmd: self.tprintd("MACRO-COMMAND") #printd("cmd,val", native_cmd, values_to_set) #res:GenericResult = self.run_func(native_cmd, *values_to_set) try: if values_to_set: self.tprintd("with args") res = dcc.run_func(native_cmd, *values_to_set) ##res = self.run_func(native_cmd, *values_to_set) #res = getattr(self, native_cmd)(values_to_set) else: res = dcc.run_func(native_cmd) ##res = self.run_func(native_cmd) #res = getattr(self, native_cmd)() except (AttributeError, ChannelCommunicationException) as e: self.log_e("Unknown Native command ?", native_cmd) raise UnknownNativeCmdException(native_cmd) #if res is None: res = 'ok' # res should be a GenericResult if not isinstance(res, GenericResult): raise Exception("Should be a GenericResult", res) # raise Exception if ERROR if res.unknown_command: raise UnknownNativeCmdException(native_cmd) if res.unknown_result: raise UnknownNativeResException(res.txt) # Return Generic result return res # 2.b) NORMAL NATIVE COMMAND (ex: native_cmd == "GR") ##native_cmd = self.formated_cmd(native_cmd, values_to_set) native_cmd = dcc.formated_cmd(native_cmd, values_to_set) awaited_res_if_ok = cmd_get_simul_response(native_cmd_infos) ''' awaited_res_if_ok = None if isinstance(native_cmd_infos, Cmd) and native_cmd_infos.final_simul_response!='simulator response': awaited_res_if_ok = native_cmd_infos.final_simul_response elif len(native_cmd_infos) > 1: awaited_res_if_ok = native_cmd_infos[1] ''' #native_res = self.exec_native_cmd(self.formated_cmd(native_cmd,value), awaited_res_ok) ##native_res = self.exec_native_cmd(native_cmd) try: native_res = dcc.exec_native_cmd(native_cmd) except (ChannelCommunicationException, UnknownNativeCmdException, UnknownNativeResException) as e: # TODO: a implementer dans exec_native_cmd() self.log_e(f"THREAD Native command execution caught by {type(self).__name__} (from DC)", e) raise # 3) Make Generic result from native and return it ok = True if not awaited_res_if_ok else (native_res == awaited_res_if_ok) return GenericResult(native_res, ok) ''' **************************** **************************** GENERIC TELESCOPE COMMANDS (abstract methods) **************************** **************************** ''' ''' **************************** GENERIC GET & SET commands **************************** ''' @generic_cmd def get_timezone(self): pass #def get_timezone(self): return self.exec_generic_cmd('get_timezone') @generic_cmd def set_timezone(self, hh): pass #def set_timezone(self, hh): return self.exec_generic_cmd('set_timezone', hh) @generic_cmd def get_date(self): pass @generic_cmd def set_date(self, mmddyy): pass @generic_cmd def get_time(self): pass @generic_cmd def set_time(self, hhmmss): pass ''' **************************** GENERIC DO commands **************************** ''' # @abstract #def do_INIT(self): return self._do("INIT") ''' do_PARK() (p103) - STARTUP position = CWD - :hC# - position required for a Cold or Warm Start, pointing to the celestial pole of the given hemisphere (north or south), with the counterweight pointing downwards (CWD position). From L4, V1.0 up - HOME position parking => par defaut, c'est CWD, mais ca peut etre different - :hP# - defaults to the celestial pole visible at the given hemisphere (north or south) and can be set by the user ''' # @abstract def do_PARK(self): pass #def do_PARK(self): return self._do("PARK") # @abstract def do_start(self): pass def do_stop(self): pass # @abstract MACRO def do_init(self): raise UnimplementedGenericCmdException # TODO: empecher de creer une instance de cette classe abstraite # Avec ABC ? ''' if __name__ == "__main__": #HOST, PORT = "localhost", 9999 #HOST, PORT = "localhost", 20001 HOST, PORT = "localhost", 11110 # Classic usage: #tsock = SocketClient_UDP_TCP(HOST, PORT, "UDP") # More elegant usage, using "with": with SocketClient_ABSTRACT(HOST, PORT, "UDP") as tsock: # 0) CONNECT to server (only for TCP, does nothing for UDP) tsock._connect_to_server() while True: # 1) SEND REQUEST data to server # saisie de la requête au clavier et suppression des espaces des 2 côtés data = input("REQUEST TO SERVER [ex: ':GD#' (Get Dec), ':GR#' (Get RA)']: ").strip() # test d'arrêt if data=="": break #data_to_send = bytes(data + "\n", "utf-8") tsock.send_data(data) #mysock.sendto("%s" % data, (HOST, PORT)) #printd("Sent: {}".format(data)) # 2) RECEIVE REPLY data from server data_received = tsock.receive_data() #reponse, adr = mysock.recvfrom(buf) #printd("Received: {}".format(data_received)) #printd("Useful data received: {}".format(data_useful)) printd('\n') #tsock.close() '''