import os import sys DEBUG_FILE = False PACKAGE_PARENT = '..' SCRIPT_DIR = os.path.dirname(os.path.realpath(os.path.join(os.getcwd(), os.path.expanduser(__file__)))) sys.path.append(os.path.normpath(os.path.join(SCRIPT_DIR, PACKAGE_PARENT))) #(EP renamed) from utils.Device import Device from utils.DeviceSim import DeviceSim from utils.StatusManager import StatusManager #(EP) class CameraVISSimulator(Device, StatusManager): class CameraVISSimulator(DeviceSim, StatusManager): exposure_time = 5 shutter_time = 3 cooler_timer = 5 _window_x1 = 0 _window_x2 = 0 _window_y1 = 0 _window_y2 = 0 _cooler = "OFF" status = {"status": "VALID", "exposure_time": exposure_time, "shutter_time": shutter_time, "cooler_timer": cooler_timer, "cooler": _cooler, "win x1": _window_x1, "win x2": _window_x2, "win y1": _window_y1, "win_y2": _window_y2} def __init__(self, argv): super().__init__(argv) self.setDevice("CameraVIS") self.setStatusManager("cameraVISSimulator", argv) self.cameraVISPrint("COUCOU CECI EST UN TEST") def cameraVISPrint(self, string: str): if DEBUG_FILE: print("cameraVISSimulator : " + string) def deal_command(self, cmd_type, cmd, args): answer = "" if cmd_type == "GET": if cmd == "STATUS": answer = str({"status": "VALID", "exposure_time" : self.exposure_time, "shutter_time": self.shutter_time, "cooler_timer" : self.cooler_timer, "cooler" : self._cooler, "win x1" : self._window_x1, "win x2" : self._window_x2, "win y1" : self._window_y1, "win_y2" : self._window_y2}) elif cmd == "TIMER": answer = "cooler timer : " + str(self.cooler_timer) else: answer = "NOT IMPLEMENTED YET" elif cmd_type == "SET": if cmd == "WINDOW": os.system("echo " + str(args) + " >> /home/portos/IRAP/PYROS/argss") self._window_x1 = int(args[0]) self._window_x2 = int(args[1]) self._window_y1 = int(args[2]) self._window_y2 = int(args[3]) answer = "Window coordinates are set to " + str(args) else: answer = "NOT IMPLEMENTED YET" elif cmd_type == "DO": if cmd == "COOLER": if args[0] == "ON": self._cooler = "ON tmp: " + str(args[1]) answer = "COOLER TURNED ON tmp: " + str(args[1]) elif args[0] == "OFF": self._cooler = "OFF" answer = "COOLER TURNED OFF" else: answer = "NOT IMPLEMENTED YET" else: answer = "Invalid or not implemented cmd" return answer def handleConnection(self): for co in self.connections: data = self.readBytes(128, co) answer = "" cut_data = data.split() if (len(cut_data) < 2): self.cameraVISPrint("Received invalid message on socket : " + data) continue cmd_type = cut_data[0] cmd = cut_data[1] args = cut_data[2:] answer = self.deal_command(cmd_type, cmd, args) if (len(answer) > 0): self.sendMessage(answer, co) return (0) def loop(self): i = 0 if (self.ended == 0): self.cameraVISPrint("No entry for cameraVIS found in config file : " + self.config_file) return (0) while (True): self.updateStatus(i) if (self.isConnected()): self.handleConnection() i += 1 if (i > self.ended): return (0) return (1) def run(self): if DEBUG_FILE: print("cameraVIS simulator running") self.parse() self.configSocket() self.loop() return (0) if (__name__ == "__main__"): sim = CameraVISSimulator(sys.argv) sim.run() # from Camera import Camera # # class CameraVIS(Camera): # def __init__(self): # super().__init__("CameraVIS") # # cam = CameraVIS() # cam.loop()