import os import sys import time DEBUG_FILE = False PACKAGE_PARENT = '..' SCRIPT_DIR = os.path.dirname(os.path.realpath(os.path.join(os.getcwd(), os.path.expanduser(__file__)))) sys.path.append(os.path.normpath(os.path.join(SCRIPT_DIR, PACKAGE_PARENT))) from utils.DeviceSim import DeviceSim from utils.StatusManager import StatusManager class TelescopeSimulator(DeviceSim, StatusManager): _coords = "0, 0, 0" _real_coords = "0, 0 ,0" doors_timer = 5 move_time = 5 _loop_mode = False status = {"status": "VALID", "coords": _coords, "real_coords" : _real_coords, "doors_timer": doors_timer, "move_time" : move_time, "loop_mode": _loop_mode} def __init__(self, argv): super().__init__(argv) self.setDevice("Telescope") if len(argv) == 1: self.loop_mode = True if (len(argv) > 1): self.setStatusManager("telescopeSimulator", argv) def telescopePrint(self, string: str): if DEBUG_FILE: print("telescopeSimulator : " + string) def deal_command(self, cmd_type, cmd, args): answer = "" if cmd_type == "GET": if cmd == "INFORMATIONS": status = {"status": "VALID", "coords": self._coords, "real_coords": self._real_coords, "doors_timer": self.doors_timer, "move_time": self.move_time, "loop_mode": self._loop_mode} answer = str(status) elif cmd.startswith("COORD"): answer = cmd +" " + str(self._coords) else: answer = "NOT IMPLEMENTED YET" elif cmd_type == "SET": if cmd.startswith("COORD"): self._coords = str(args) answer = "COORDS set to " + self._coords else: answer = "NOT IMPLEMENTED YET" elif cmd_type == "DO": if cmd == "GOTO": self._real_coords = self._coords answer = "Starting slew of the mount" elif cmd == "HOMING": self._coords = "0, 0, 0" self._real_coords = "0, 0, 0" answer = "Start the Homong of the mount" else: answer = "NOT IMPLEMENTED YET" else: answer ="Invalid or not implemented cmd" return answer def handleConnection(self): for co in self.connections: data = self.readBytes(128, co) #answer = "" cut_data = data.split() if (len(cut_data) < 2): self.telescopePrint("Received invalid message on socket : " + data) continue cmd_type = cut_data[0] cmd = cut_data[1] args = cut_data[2:] answer = self.deal_command(cmd_type, cmd, args) ''' sleep pour simuler temps de latence de mouvement : probleme sans multi-thread le simulateur est inactif pendant ce temps ''' if (len(answer) > 0): self.sendMessage(answer, co) return (0) def loop(self): i = 0 if not self.loop_mode and self.ended == 0: self.telescopePrint("Not entry for telescope found in config file : " + self.config_file) return (0) while True: self.updateStatus(i) if self.isConnected(): self.handleConnection() i += 1 if not self.loop_mode and i > self.ended: return 0 return 1 def run(self): if DEBUG_FILE: print("Telescope simulator running") self.parse() self.configSocket() self.loop() return 0 if __name__ == "__main__": sim = TelescopeSimulator(sys.argv) sim.run() # class Telescope(Device): # # def __init__(self): # super().__init__("Telescope") # # ''' I will just fill the attributes with their names and params without regarding them ''' # self.attributes = {} # # self.doors_status = "True" # self.status = "IDLE" # # # def loop(self): # move_timer = 0 # doors_timer = 0 # # while True: # try: # conn, addr = self.sock.accept() # except socket.error as e: # print("There was a socket error: " + repr(e)) # break # # data = conn.recv(128).decode() # # # print("Received: " + data) # # ''' # Pour le moment, les messages sont de taille variable, contenant : # CMD_TYPE CMD ARG1 ARG2 ... # On considère que ce qu'on reçoit peut être faux, mais que dans tous les cas, on recoit le message au complet. # Pas de souci s'async puisqu'on prend une connection à la fois. # ''' # # answer = "" # cut_data = data.split(" ") # # if move_timer != 0 and time.time() > move_timer: # move_timer = 0 # status = "IDLE" # print("Stopped movement") # if doors_timer != 0 and time.time() > doors_timer: # doors_timer = 0 # print("Stopped doors") # # if len(cut_data) < 2: # print("Invalid message: " + data) # else: # cmd_type = cut_data[0] # cmd = cut_data[1] # args = cut_data[2:] # if cmd_type == "SET": # self.attributes[cmd] = args # elif cmd_type == "GET": # if cmd == "POSITION": # answer = "101.287 -16.716" # elif cmd == "STATUS": # answer = self.status # elif cmd == "SETUP": # answer = "GET SETUP answer not implemented" # else: # answer = "Invalid cmd for GET: " + cmd # print(answer) # elif cmd_type == "DO": # if cmd == "START": # print("Start movement") # move_timer = time.time() + MOVE_TIME # elif cmd == "STOP": # move_timer = 0 # elif cmd == "ABORT": # move_timer = 0 # elif cmd == "HOMING": # print("Start movement (homing)") # move_timer = time.time() + MOVE_TIME # elif cmd == "DOORS": # if len(args) > 0: # if args[0] != self.doors_status: # print("Start door") # self.doors_status = args[0] # doors_timer = time.time() + DOORS_TIME # else: # print("Invalid cmd for DO: " + cmd) # else: # print("Invalid message: " + data) # # if len(answer) > 0: # conn.send(bytes(answer, "UTF-8")) # # print("send: " + answer) # conn.close() # # tel = Telescope() # tel.loop()