#!/usr/bin/env python3 # # To launch this agent from the root of Pyros: # cd /srv/develop/pyros # .\PYROS -t start AgentImagesCalibrator -o tnc -fg # # Edit the file pyros/pyros.py, # Add a new entry in the dict AGENT: # "AgentImagesCalibrator": "observation_manager", # --------------------------------------------------- import sys import time import os pwd = os.environ['PROJECT_ROOT_PATH'] if pwd not in sys.path: sys.path.append(pwd) short_paths = ['src', 'src/core/pyros_django'] for short_path in short_paths: path = os.path.join(pwd, short_path) if path not in sys.path: sys.path.insert(0, path) from src.core.pyros_django.agent.Agent import Agent, build_agent, log # = Specials import glob import shutil import guitastro class AgentImagesCalibrator(Agent): # - All possible running states RUNNING_NOTHING = 0 RUNNING_ONE_IMAGE_PROCESSING = 1 RUNNING_COMPUTE_RON_GAIN = 2 # TODO: Redefine valid timeout _AGENT_SPECIFIC_COMMANDS = { # Format : “cmd_name” : (timeout, exec_mode) "do_create_test_images_1" : (60, Agent.EXEC_MODE.SEQUENTIAL, ''), "do_create_test_images_2" : (60, Agent.EXEC_MODE.SEQUENTIAL, ''), "do_stop_current_processing" : (60, Agent.EXEC_MODE.SEQUENTIAL, ''), } # Scenario to be executed # "self do_stop_current_processing" # AgentCmd.CMD_STATUS_CODE.CMD_EXECUTED _TEST_COMMANDS_LIST = [ # Format : ("self cmd_name cmd_args", timeout, "expected_result", expected_status), (True, "self do_create_test_images_1", 200, '', Agent.CMD_STATUS.CMD_EXECUTED), (True, "self do_exit", 500, "STOPPING", Agent.CMD_STATUS.CMD_EXECUTED), ] """ ================================================================= Methods running inside main thread ================================================================= """ def __init__(self, name:str=None): if name is None: name = self.__class__.__name__ super().__init__() def _init(self): super()._init() log.debug("end super init()") log.info(f"self.TEST_MODE = {self.TEST_MODE}") # === Get config infos agent_alias = self.name log.info(f"agent_alias = {agent_alias}") # === Get the config object self.config = self._oc['config'] # === Get self._path_data_root self._path_data_root = self.config.get_agent_path_data_root(agent_alias) # === Get self._home of current unit self._home = self.config.getHome() # === Get self._paths the directories for all data (images). See obsconfig_class.py to know keys self._paths = self.config.get_agent_path_data_tree(agent_alias, True) # === Get bias strategies unit_name = "TNC" channel_name = "OpticalChannel_up1" category = "BI" self_strategy_bias = self.config.get_image_calibrations(unit_name, channel_name, category) log.debug(f"self_strategy_bias={self_strategy_bias}") # === Instanciate an object Ima to make image processing self._ima = guitastro.Ima() home = guitastro.Home(self._home) # === Instanciate an object Filenames to manage file names self._filename_manager = guitastro.FileNames() self._filename_manager.naming("PyROS.1") # === Set longitude to ima object to generate the night yyyymmdd and subdirectories yyyy/mm/dd longitude = home.longitude log.info(f"Longitude={longitude}") self._ima.longitude(longitude) log.info("Init done with success") # === Status of routine processing self._routine_running = self.RUNNING_NOTHING log.debug("end init()") # Note : called by _routine_process() in Agent # @override def _routine_process_iter_start_body(self): log.debug("in routine_process_before_body()") # Note : called by _routine_process() in Agent # @override def _routine_process_iter_end_body(self): log.debug("in routine_process_after_body()") if self._routine_running == self.RUNNING_NOTHING: # Get files to process fitsfiles = self.glob_images_to_process() n = len(fitsfiles) log.info(f"There are {n} image{self._plural(n)} to process") if n > 0: # - We select the oldest image fitsfile = fitsfiles[0] log.info(f"Process the file {fitsfile}") # - Thread TODO self._routine_running = self.RUNNING_ONE_IMAGE_PROCESSING self.process_one_image(fitsfile) """ ================================================================= Methods of specific commands ================================================================= """ def do_stop_current_processing(self): pass def do_create_test_images_1(self): self._create_test_images_1() def do_create_test_images_2(self): self._create_test_images_2() """ ================================================================= Methods called by commands or routine. Overload these methods ================================================================= """ def glob_images_to_process(self): # - glob the incoming directory: fitsfiles = glob.glob(f"{self._paths['ima_incoming']}/BI_*.fit") # - Please sort list of files in increasing dates (TODO) return fitsfiles def bias_correction(self): # - Search the bias path_bias = os.path.join( self._paths['ima_bias'], self._date_night ) fitsbiasfiles = glob.glob(f"{path_bias}/*.fit") log.info(f"fitsbiasfiles = {fitsbiasfiles}") if len(fitsbiasfiles) > 0: # - Select the bias pass def dark_correction(self): # - Search the dark path_darks = os.path.join( self._paths['ima_darks'], self._date_night ) fitsdarkfiles = glob.glob(f"{path_darks}/*.fit") log.info(f"fitsdarkfiles = {fitsdarkfiles}") if len(fitsdarkfiles) > 0: # - Select two darks and compute the therm using exposure # - Correction of dark pass def flat_correction(self): # - Search the flat path_flats = os.path.join( self._paths['ima_flats'], self._date_night ) fitsflatfiles = glob.glob(f"{path_flats}/*.fit") log.info(f"fitsflatfiles = {fitsflatfiles}") if len(fitsflatfiles) > 0: # - Select the flat (with the filter) # - Correction of flat pass def inversion_correction(self): pass def cosmetic_correction(self): pass def wcs_calibration(self): return 0 def process_one_image(self, fitsfile: str): """This is the general algorithm of processing The processing consists to make corrections of dark, flat, inversions, cosmetic and perform WCS calibration. Args: fitsfile: The file of the FITS file to process. """ # - Load file in memory log.info("Load the file in memory") #self.set_infos("Load the file in memory") f = self._ima.genename(self._ima.load(fitsfile)) # log.info(f"f={f}") # - Save as tmp self._ima.path(self._paths['ima_tmp']) log.info("Save the temporary file as tmp name") self._ima.save("tmp") # - Load tmp and get infos self._ima.load("tmp") date_obs = self._ima.getkwd("DATE-OBS") self._date_night = self._ima.get_night(date_obs) log.info(f"Date_obs = {date_obs}") log.info(f"Night = {self._date_night}") exposure = self._ima.getkwd("EXPOSURE") log.info(f"Exposure = {exposure}") # - Bias correction self.bias_correction() # - Dark correction self.dark_correction() # - Flat correction self.flat_correction() # - Save tmp corrected by dark and flat self._ima.path(self._paths['ima_tmp']) self._ima.save("tmp") # - Inversion of mirrors or mirorxy self.inversion_correction() # - Cosmetic correction self.cosmetic_correction() # - WCS calibration nmatched = self.wcs_calibration() # - Prepare the output file name log.info("Decode the filename") fgen_in = f['genename'] + f['sep'] + f['indexes'][0] + f['suffix'] fext_in = f['file_extension'] fext_out = ".fits" # - Save in processed yyyy = self._date_night[0:4] mm = self._date_night[4:6] dd = self._date_night[6:8] path_processed = os.path.join( self._paths['ima_processed'], yyyy, mm, dd ) self._ima.path(path_processed) fname_out = fgen_in + fext_out fname = self._ima.save(fname_out) log.info(f"Save the processed image {fname}") # - Delete the file in incoming directory os.remove(fitsfile) log.info(f"Delete the raw image {fitsfile}") # - Update the running state self._routine_running = self.RUNNING_NOTHING time.sleep(5) print("\n ...End of image calibration\n") """ ================================================================= Internal methods ================================================================= """ def _create_test_images_1(self): try: # === Define an image to test the processing and copy it in incoming directory self._file_ima_test = os.path.join(self._path_data_root,"vendor/guitastro/tests/data/m57.fit") file_in = self._file_ima_test file_out = f"{self._paths['ima_incoming']}/m57.fit" shutil.copyfile(file_in, file_out) self._filename_manager.naming("") except: raise def _create_test_images_2(self): try: self._ima.etc.camera("Kepler 4040") self._ima.etc.optics("Takahashi_180ED") self._ima.etc.params("msky",18) ra = 132.84583 dec = 11.81333 at = self._ima.simulation("GAIA", "PHOTOM", shutter_mode="closed", t=50) file_out = os.path.join(self._paths['ima_tmp'], "m67.ecsv") print(f"STEP TOTO 1 = {at}") at.t.write(file_out, format='astrotable', overwrite=True) print(f"STEP TOTO 2") date_obs = self.getkwd("DATE-OBS") except: raise def _plural(self, n: int) -> str: """Return "s" if n>1 for plurals. Args: n: Number of entities Returns: The string "s" or "" """ if n > 1: s = "s" else: s = "" return s if __name__ == "__main__": agent = build_agent(AgentImagesCalibrator) print(agent) agent.run()