AgentImagesCalibrator.py
10.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
#!/usr/bin/env python3
#
# To launch this agent from the root of Pyros:
# cd /srv/develop/pyros
# .\PYROS -t start AgentImagesCalibrator -o tnc -fg
#
# Edit the file pyros/pyros.py,
# Add a new entry in the dict AGENT:
# "AgentImagesCalibrator": "observation_manager",
# ---------------------------------------------------
import sys
import time
import os
pwd = os.environ['PROJECT_ROOT_PATH']
if pwd not in sys.path:
sys.path.append(pwd)
short_paths = ['src', 'src/core/pyros_django']
for short_path in short_paths:
path = os.path.join(pwd, short_path)
if path not in sys.path:
sys.path.insert(0, path)
from src.core.pyros_django.majordome.agent.Agent import Agent, build_agent, log
# = Specials
import glob
import shutil
import guitastro
class AgentImagesCalibrator(Agent):
# - All possible running states
RUNNING_NOTHING = 0
RUNNING_ONE_IMAGE_PROCESSING = 1
RUNNING_COMPUTE_RON_GAIN = 2
# TODO: Redefine valid timeout
_AGENT_SPECIFIC_COMMANDS = {
# Format : “cmd_name” : (timeout, exec_mode)
"do_create_test_images_1" : (60, Agent.EXEC_MODE.SEQUENTIAL, ''),
"do_create_test_images_2" : (60, Agent.EXEC_MODE.SEQUENTIAL, ''),
"do_stop_current_processing" : (60, Agent.EXEC_MODE.SEQUENTIAL, ''),
}
# Scenario to be executed
# "self do_stop_current_processing"
# AgentCmd.CMD_STATUS_CODE.CMD_EXECUTED
_TEST_COMMANDS_LIST = [
# Format : ("self cmd_name cmd_args", timeout, "expected_result", expected_status),
(True, "self do_create_test_images_1", 200, '', Agent.CMD_STATUS.CMD_EXECUTED),
(True, "self do_exit", 500, "STOPPING", Agent.CMD_STATUS.CMD_EXECUTED),
]
"""
=================================================================
Methods running inside main thread
=================================================================
"""
def __init__(self, name:str=None):
if name is None:
name = self.__class__.__name__
super().__init__()
def _init(self):
super()._init()
log.debug("end super init()")
log.info(f"self.TEST_MODE = {self.TEST_MODE}")
# === Get config infos
agent_alias = self.name
log.info(f"agent_alias = {agent_alias}")
# === Get the config object
self.config = self._oc['config']
# === Get self._path_data_root
self._path_data_root = self.config.get_agent_path_data_root(agent_alias)
# === Get self._home of current unit
self._home = self.config.getHome()
# === Get self._paths the directories for all data (images). See obsconfig_class.py to know keys
self._paths = self.config.get_agent_path_data_tree(agent_alias, True)
# === Get bias strategies
unit_name = "TNC"
channel_name = "OpticalChannel_up1"
category = "BI"
self_strategy_bias = self.config.get_image_calibrations(unit_name, channel_name, category)
log.debug(f"self_strategy_bias={self_strategy_bias}")
# === Instanciate an object Ima to make image processing
self._ima = guitastro.Ima()
home = guitastro.Home(self._home)
# === Instanciate an object Filenames to manage file names
self._filename_manager = guitastro.FileNames()
self._filename_manager.naming("PyROS.1")
# === Set longitude to ima object to generate the night yyyymmdd and subdirectories yyyy/mm/dd
longitude = home.longitude
log.info(f"Longitude={longitude}")
self._ima.longitude(longitude)
log.info("Init done with success")
# === Status of routine processing
self._routine_running = self.RUNNING_NOTHING
log.debug("end init()")
# Note : called by _routine_process() in Agent
# @override
def _routine_process_iter_start_body(self):
log.debug("in routine_process_before_body()")
# Note : called by _routine_process() in Agent
# @override
def _routine_process_iter_end_body(self):
log.debug("in routine_process_after_body()")
if self._routine_running == self.RUNNING_NOTHING:
# Get files to process
fitsfiles = self.glob_images_to_process()
n = len(fitsfiles)
log.info(f"There are {n} image{self._plural(n)} to process")
if n > 0:
# - We select the oldest image
fitsfile = fitsfiles[0]
log.info(f"Process the file {fitsfile}")
# - Thread TODO
self._routine_running = self.RUNNING_ONE_IMAGE_PROCESSING
self.process_one_image(fitsfile)
"""
=================================================================
Methods of specific commands
=================================================================
"""
def do_stop_current_processing(self):
pass
def do_create_test_images_1(self):
self._create_test_images_1()
def do_create_test_images_2(self):
self._create_test_images_2()
"""
=================================================================
Methods called by commands or routine. Overload these methods
=================================================================
"""
def glob_images_to_process(self):
# - glob the incoming directory:
fitsfiles = glob.glob(f"{self._paths['ima_incoming']}/BI_*.fit")
# - Please sort list of files in increasing dates (TODO)
return fitsfiles
def bias_correction(self):
# - Search the bias
path_bias = os.path.join( self._paths['ima_bias'], self._date_night )
fitsbiasfiles = glob.glob(f"{path_bias}/*.fit")
log.info(f"fitsbiasfiles = {fitsbiasfiles}")
if len(fitsbiasfiles) > 0:
# - Select the bias
pass
def dark_correction(self):
# - Search the dark
path_darks = os.path.join( self._paths['ima_darks'], self._date_night )
fitsdarkfiles = glob.glob(f"{path_darks}/*.fit")
log.info(f"fitsdarkfiles = {fitsdarkfiles}")
if len(fitsdarkfiles) > 0:
# - Select two darks and compute the therm using exposure
# - Correction of dark
pass
def flat_correction(self):
# - Search the flat
path_flats = os.path.join( self._paths['ima_flats'], self._date_night )
fitsflatfiles = glob.glob(f"{path_flats}/*.fit")
log.info(f"fitsflatfiles = {fitsflatfiles}")
if len(fitsflatfiles) > 0:
# - Select the flat (with the filter)
# - Correction of flat
pass
def inversion_correction(self):
pass
def cosmetic_correction(self):
pass
def wcs_calibration(self):
return 0
def process_one_image(self, fitsfile: str):
"""This is the general algorithm of processing
The processing consists to make corrections of dark, flat, inversions, cosmetic
and perform WCS calibration.
Args:
fitsfile: The file of the FITS file to process.
"""
# - Load file in memory
log.info("Load the file in memory")
#self.set_infos("Load the file in memory")
f = self._ima.genename(self._ima.load(fitsfile))
# log.info(f"f={f}")
# - Save as tmp
self._ima.path(self._paths['ima_tmp'])
log.info("Save the temporary file as tmp name")
self._ima.save("tmp")
# - Load tmp and get infos
self._ima.load("tmp")
date_obs = self._ima.getkwd("DATE-OBS")
self._date_night = self._ima.get_night(date_obs)
log.info(f"Date_obs = {date_obs}")
log.info(f"Night = {self._date_night}")
exposure = self._ima.getkwd("EXPOSURE")
log.info(f"Exposure = {exposure}")
# - Bias correction
self.bias_correction()
# - Dark correction
self.dark_correction()
# - Flat correction
self.flat_correction()
# - Save tmp corrected by dark and flat
self._ima.path(self._paths['ima_tmp'])
self._ima.save("tmp")
# - Inversion of mirrors or mirorxy
self.inversion_correction()
# - Cosmetic correction
self.cosmetic_correction()
# - WCS calibration
nmatched = self.wcs_calibration()
# - Prepare the output file name
log.info("Decode the filename")
fgen_in = f['genename'] + f['sep'] + f['indexes'][0] + f['suffix']
fext_in = f['file_extension']
fext_out = ".fits"
# - Save in processed
yyyy = self._date_night[0:4]
mm = self._date_night[4:6]
dd = self._date_night[6:8]
path_processed = os.path.join( self._paths['ima_processed'], yyyy, mm, dd )
self._ima.path(path_processed)
fname_out = fgen_in + fext_out
fname = self._ima.save(fname_out)
log.info(f"Save the processed image {fname}")
# - Delete the file in incoming directory
os.remove(fitsfile)
log.info(f"Delete the raw image {fitsfile}")
# - Update the running state
self._routine_running = self.RUNNING_NOTHING
time.sleep(5)
print("\n ...End of image calibration\n")
"""
=================================================================
Internal methods
=================================================================
"""
def _create_test_images_1(self):
try:
# === Define an image to test the processing and copy it in incoming directory
self._file_ima_test = os.path.join(self._path_data_root,"vendor/guitastro/tests/data/m57.fit")
file_in = self._file_ima_test
file_out = f"{self._paths['ima_incoming']}/m57.fit"
shutil.copyfile(file_in, file_out)
self._filename_manager.naming("")
except:
raise
def _create_test_images_2(self):
try:
self._ima.etc.camera("Kepler 4040")
self._ima.etc.optics("Takahashi_180ED")
self._ima.etc.params("msky",18)
ra = 132.84583
dec = 11.81333
at = self._ima.simulation("GAIA", "PHOTOM", shutter_mode="closed", t=50)
file_out = os.path.join(self._paths['ima_tmp'], "m67.ecsv")
print(f"STEP TOTO 1 = {at}")
at.t.write(file_out, format='astrotable', overwrite=True)
print(f"STEP TOTO 2")
date_obs = self.getkwd("DATE-OBS")
except:
raise
def _plural(self, n: int) -> str:
"""Return "s" if n>1 for plurals.
Args:
n: Number of entities
Returns:
The string "s" or ""
"""
if n > 1:
s = "s"
else:
s = ""
return s
if __name__ == "__main__":
agent = build_agent(AgentImagesCalibrator)
print(agent)
agent.run()