#!/usr/bin/env python3 import sys ##import utils.Logger as L ##import threading import time ##from .Agent import Agent sys.path.append("..") from agent.AgentDevice import AgentDevice, build_agent from majordome.models import AgentDeviceTelescopeStatus, get_or_create_unique_row_from_model sys.path.append("../../..") ##from device_controller.concrete_component.device_simulator_common.server_udp_or_tcp import get_SocketServer_UDP_TCP ###from device_controller.concrete_component.gemini.gemini_telescope_simulator import DeviceSimulatorTelescopeGemini from device_controller.concrete_component.gemini.gemini_controller import DC_Gemini ##log = L.setupLogger("AgentXTaskLogger", "AgentX") #class AgentDeviceTelescopeGemini(AgentDevice): class AgentDeviceGemini(AgentDevice): # Agent level specific commands # @override superclass Agent AGENT_SPECIFIC_COMMANDS = [ "do_ad_gemini_specific1", "set_ad_gemini_specific2", # Error case "do_ad_gemini_specific3_unimplemented", ] _agent_device_telescope_status = None # Host and Port of the device HOST,PORT = "82.64.28.71", 11110 # FOR TEST ONLY : # - Run this agent in simulator mode ? TEST_MODE = None # - Run the assertion tests at the end ? TEST_WITH_FINAL_TEST = True # - Run the test with the real device ? WITH_SIMULATOR = False #WITH_SIMULATOR = True # - How many seconds should I be running ? TEST_MAX_DURATION_SEC = None #TEST_MAX_DURATION_SEC = 70 # - Who should I send commands to ? #TEST_COMMANDS_DEST = "myself" ##TEST_COMMANDS_DEST = "AgentB" # - Scenario to be executed TEST_COMMANDS_LIST = [] """ ================================================================= FUNCTIONS RUN INSIDE MAIN THREAD ================================================================= """ # @override #def __init__(self, config_filename=None, RUN_IN_THREAD=True, DEBUG_MODE=False): def __init__(self, config_filename=None, RUN_IN_THREAD=True): ''' if self.is_in_simulator_mode() and not self.WITH_SIMULATOR: # START device SIMULATOR (in a thread) so that we can connect to it in place of the real device self.HOST = "localhost" thread_device_simulator = threading.Thread(target=self.device_simulator) thread_device_simulator.start() ''' #super().__init__(self.__class__.__name__, super().__init__( config_filename, RUN_IN_THREAD, device_controller=DC_Gemini, host=self.HOST, port=self.PORT ) #DEBUG_MODE=DEBUG_MODE) ###device_simulator=DeviceSimulatorTelescopeGemini) # Initialize the device table status # If table is empty, create a default 1st row ##self._agent_device_telescope_status = get_or_create_unique_row_from_model(AgentDeviceTelescopeStatus) """ if not AgentDeviceTelescopeStatus.objects.exists(): self.printd("CREATE first row") self._agent_device_telescope_status = AgentDeviceTelescopeStatus.objects.create(id=1) # Get 1st row (will be updated at each iteration by routine_process() with current device status) self.printd("GET first row") self._agent_device_telescope_status = AgentDeviceTelescopeStatus.objects.get(id=1) """ # Initialize the device socket # Port local AK 8085 = redirigé sur l’IP du tele 192.168.0.12 sur port 11110 ##HOST, PORT = "82.64.28.71", 11110 #HOST, PORT = "localhost", 11110 ##self.tele_ctrl = TelescopeControllerGemini(HOST, PORT, True) ##self._log.self.printd(f"init done for {name}") def init(self): super().init() # Telescope (long) init # TODO: """ def device_simulator(self): super().device_simulator() #HOST, PORT = "localhost", 11110 #with get_SocketServer_UDP_TCP(HOST, PORT, "UDP") as myserver: #with get_SocketServer_UDP_TCP(self.HOST, self.PORT, "UDP") as myserver: myserver.serve_forever() TelescopeGeminiSimulator.serve_forever(self.PORT) ''' myserver = get_SocketServer_UDP_TCP(self.HOST, self.PORT, "UDP") myserver.serve_forever() ''' """ ''' # @override def load_config(self): super().load_config() ''' ''' # @override def update_survey(self): super().update_survey() ''' ''' # @override def get_next_command(self): return super().get_next_command() ''' # @Override def do_log(self): super().do_log() # @Override def get_device_status(self): #cmd="get date" cmd="get_date" res = self._device_ctrl.exec_cmd(cmd) self.printd("result is", str(res)) if res.ok: self.printd("OK") dev_date = str(res) time.sleep(1) #cmd="get time" cmd="get_time" res = self._device_ctrl.exec_cmd(cmd) self.printd("result is", str(res)) if res.ok: self.printd("OK") dev_time = str(res) time.sleep(1) # For a dcc command, both methods work: # - Guess wich dcc can exec this command cmd="get_radec" # OR # - I tell you explicitely which one it is : "Mount" cmd="Mount.get_radec" res = self._device_ctrl.exec_cmd(cmd) self.printd("result is", str(res)) if res.ok: self.printd("OK") dev_radec = str(res) time.sleep(1) return { 'date':dev_date, 'time':dev_time, 'radec':dev_radec } ''' AGENT LEVEL SPECIFIC COMMANDS ''' def do_ad_gemini_specific1(self): self.printd("processing ad_specific1... ") def set_ad_gemini_specific2(self): self.printd("processing set_specific2... ") """ ================================================================= MAIN FUNCTION ================================================================= """ if __name__ == "__main__": # with thread RUN_IN_THREAD=True # with process #RUN_IN_THREAD=False agent = build_agent(AgentDeviceGemini, RUN_IN_THREAD=RUN_IN_THREAD) ''' TEST_MODE, WITH_SIM, configfile = extract_parameters() #agent = AgentX() agent = AgentDeviceTelescopeGemini(configfile, RUN_IN_THREAD) agent.setSimulatorMode(TEST_MODE) self.printd(agent) ''' agent.run()