import os import sys DEBUG_FILE = False PACKAGE_PARENT = '..' SCRIPT_DIR = os.path.dirname(os.path.realpath(os.path.join(os.getcwd(), os.path.expanduser(__file__)))) sys.path.append(os.path.normpath(os.path.join(SCRIPT_DIR, PACKAGE_PARENT))) #(EP renamed) from utils.Device import Device from utils.DeviceSim import DeviceSim from utils.StatusManager import StatusManager #(EP) class CameraNIRSimulator(Device, StatusManager): class CameraNIRSimulator(DeviceSim, StatusManager): exposure_time = 5 shutter_time = 3 cooler_timer = 5 _window_x1 = 0 _window_x2 = 0 _window_y1 = 0 _window_y2 = 0 _cooler = "OFF" status = {"status": "VALID", "exposure_time" : exposure_time, "shutter_time": shutter_time, "cooler_timer" : cooler_timer, "cooler" : _cooler, "win x1" : _window_x1, "win x2" : _window_x2, "win y1" : _window_y1, "win_y2" : _window_y2} def __init__(self, argv): super().__init__(argv) self.setDevice("CameraNIR") self.setStatusManager("cameraNIRSimulator", argv) def cameraNIRPrint(self, string: str): if DEBUG_FILE: print("cameraNIRSimulator : " + string) def deal_command(self, cmd_type, cmd, args): answer = "" if cmd_type == "GET": if cmd == "STATUS": answer = str({"status": "VALID", "exposure_time" : self.exposure_time, "shutter_time": self.shutter_time, "cooler_timer" : self.cooler_timer, "cooler" : self._cooler, "win x1" : self._window_x1, "win x2" : self._window_x2, "win y1" : self._window_y1, "win_y2" : self._window_y2}) elif cmd == "TIMER": answer = "cooler timer : " + str(self.cooler_timer) else: answer = "NOT IMPLEMENTED YET" elif cmd_type == "SET": if cmd == "WINDOW": os.system("echo " + str(args) + " >> /home/portos/IRAP/PYROS/argss") self._window_x1 = int(args[0]) self._window_x2 = int(args[1]) self._window_y1 = int(args[2]) self._window_y2 = int(args[3]) answer = "Window coordinates are set to " + str(args) else: answer = "NOT IMPLEMENTED YET" elif cmd_type == "DO": if cmd == "COOLER": if args[0] == "ON": self._cooler = "ON tmp: " + str(args[1]) answer = "COOLER TURNED ON tmp: " + str(args[1]) elif args[0] == "OFF": self._cooler = "OFF" answer = "COOLER TURNED OFF" else: answer = "NOT IMPLEMENTED YET" else: answer = "Invalid or not implemented cmd" return answer def handleConnection(self): for co in self.connections: data = self.readBytes(128, co) answer = "" cut_data = data.split(" ") if (len(cut_data) < 2): self.cameraNIRPrint("Received invalid message on socket : " + data) continue cmd_type = cut_data[0] cmd = cut_data[1] args = cut_data[2:] answer = self.deal_command(cmd_type, cmd, args) ''' if (cmd_type == "GET"): if (cmd == "STATUS"): answer = self.create_status() else: answer = "Invalid cmd for GET: " + cmd self.cameraNIRPrint(answer) elif (cmd_type == "DO"): if (cmd == "CAPTURE"): folder_name = args[0] nb_images = int(args[1]) duration = args[2] i = 0 while i < nb_images: try: with open(folder_name + os.sep + "nir_image_"+str(i), "w") as f: f.write(folder_name+" "+str(nb_images)+" "+str(duration)) except Exception as e: self.cameraNIRPrint(str(e)) i += 1 answer = "CAPTURING" else: self.cameraNIRPrint("Ignored message " + data) ''' if (len(answer) > 0): self.sendMessage(answer, co) return (0) def loop(self): i = 0 if (self.ended == 0): self.cameraNIRPrint("No entry for cameraNIR found in config file : " + self.config_file) return (0) while (True): self.updateStatus(i) if (self.isConnected()): self.handleConnection() i += 1 if (i > self.ended): return (0) return (1) def run(self): if DEBUG_FILE: print("cameraNIR simulator running") self.parse() self.configSocket() self.loop() return (0) if (__name__ == "__main__"): sim = CameraNIRSimulator(sys.argv) sim.run() # class CameraNIR(Camera): # # def __init__(self): # super().__init__("CameraNIR") # # ''' I will just fill the attributes with their names and params without regarding them ''' # self.attributes = {} # self.attributes["FILENAME"] = "default_nir_" + str(randint(1, 10000)) # # self.status = "IDLE" # self.shutter_status = "CLOSE" # # def loop(self): # # à gérer : la réception des paramètres, le temps de shutter (et son statut), le temps de start, le stop, le abort, la création d'images # # cooler_timer = 0 # shutter_timer = 0 # exposure_timer = 0 # # while True: # try: # conn, addr = self.sock.accept() # except socket.error as e: # print("There was a socket error: " + repr(e)) # break # # data = conn.recv(128).decode() # # # print("Received: " + data) # # answer = "" # cut_data = data.split(" ") # # if cooler_timer != 0 and time.time() > cooler_timer: # cooler_timer = 0 # status = "IDLE" # print("Ended cooling") # if shutter_timer != 0 and time.time() > shutter_timer: # shutter_timer = 0 # print("Ended shutter") # if exposure_timer > 0 and time.time() > exposure_timer: # exposure_timer = -1 # print("Ended exposure") # with open(os.path.join(IMAGES_FOLDER, self.attributes["FILENAME"]), 'w'): # pass # # if len(cut_data) < 2: # print("Invalid message: " + data) # else: # cmd_type = cut_data[0] # cmd = cut_data[1] # args = cut_data[2:] # if cmd_type == "SET": # if cmd == "FILENAME": # if len(args) > 0: # self.attributes["FILENAME"] = " ".join(args) # else: # print("An argument is needed for the FILENAME command") # else: # self.attributes[cmd] = args # elif cmd_type == "GET": # if cmd == "STATUS": # answer = self.status # elif cmd == "TEMPERATURE": # answer = "GET TEMPERATURE answer not implemented" # elif cmd == "SETUP": # answer = "GET SETUP answer not implemented" # elif cmd == "TIMER": # if exposure_timer > 0: # answer = str(int(exposure_timer - time.time())) # else: # answer = str(exposure_timer) # else: # answer = "Invalid cmd for GET: " + cmd # print(answer) # elif cmd_type == "DO": # if cmd == "START": # exposure_timer = time.time() + EXPOSURE_TIME # elif cmd == "STOP": # exposure_timer = -1 # elif cmd == "ABORT": # exposure_timer = -1 # elif cmd == "COOLER": # cooler_timer = time.time() + COOLER_TIME # elif cmd == "SHUTTER": # shutter_timer = time.time() + SHUTTER_TIME # else: # print("Invalid cmd for GET: " + cmd) # else: # print("Invalid message: " + data) # # if len(answer) > 0: # conn.send(bytes(answer, "UTF-8")) # # print("send: " + answer) # # conn.close() # # cam = CameraNIR() # cam.loop()