PAHTAT.py
15.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
# !/usr/bin/env python3.6
# -*- coding: utf-8 -*-
import numpy as np
from astropy.io import fits
from scipy.optimize import nnls
import matplotlib.pyplot as plt
from scipy import interpolate
from astropy.table import Table
import time
# Fundamental constants
k_b = 1.38065e-23 # m^2 kg s^-2 K^-1 (SI)
c = 299792458. # m*sec^-1
c_microns = 299792458000000 # microns*sec^-1
h = 6.62607e-34 # m^2 kg s^-1
def blackbody(wave, temperature):
"""
Black body function, units are in SI
:param wave: array of floats
wavelength array, steps must be regular
:param temperature: float
black body temperature (in K)
:return black body luminosity profile
"""
luminosity = (2 * h * (c ** 2)) / ((wave ** 5) * (np.exp(h * c / (wave * k_b * temperature)) - 1))
return luminosity
def gauss(wave, peak_center, sigma):
"""
returns Gaussian function on the wavelength array given
:param wave: array
wavelengths array
:param peak_center: float
position of the center of the peak
:param sigma: float
width of the gaussian
:return array of gaussian values centered at the given peak
"""
return np.exp(-np.power(wave - peak_center, 2.) / (2 * np.power(sigma, 2.)))
class Spectrum:
"""
Use a model to fit observed spectra to apply a new NMF on real data
Needs to initialize with some parameters first:
the spectrum to fit, the corresponding wavelength array and the spectral resolution of the spectrum to fit
"""
def __init__(self, spectrum_to_fit, wave_axis, resolution, redshift=0., templates='Foschino', first_run=True):
"""
Initialize the class using the following parameters:
:param spectrum_to_fit: array
The spectrum to fit
:param wave_axis: array
The array of wavelengths corresponding to the spectrum to fit, in microns
:param resolution: float
The spectrum to fit resolution corresponding to R=lambda/delta_lambda
:param redshift: float
The redshift of the source spectrum, default is 0.
:param templates: string
Name of the PAHTAH templates to use, default is 'Foschino'
:param first_run: bool
default is True, if false, means it will load the files gas_lines.fits, continuum.fits and AIBs.fits
to have a quicker fit
"""
if templates == 'Foschino':
wave_elementary_axis_filename = 'Data/Elementary_spectra_Foschino_wave.fits'
elementary_spectra = 'Data/Elementary_spectra_Foschino.fits'
else:
raise NameError('Templates name is not a valid one, try to type "Foschino"')
self.spectrum_to_fit = spectrum_to_fit
self.redshift = redshift
self.wave = wave_axis / (1 + self.redshift)
self.wave_elementary = fits.getdata(wave_elementary_axis_filename)
self.wave_elementary *= 1e6 # because we need wavelengths in microns
self.elementary_spectra = fits.getdata(elementary_spectra)
self.wave_resolution = resolution
first_x_value = max(self.wave[0], self.wave_elementary[0])
last_x_value = min(self.wave[-1], self.wave_elementary[-1])
delta_lambda = first_x_value / self.wave_resolution
x_to_fit = np.arange(first_x_value, last_x_value, delta_lambda)
if self.wave_elementary.shape != self.wave.shape:
interpolate_elementary = interpolate.interp1d(self.wave_elementary, self.elementary_spectra)
interpolate_to_fit = interpolate.interp1d(self.wave, self.spectrum_to_fit)
self.spectrum_to_fit = interpolate_to_fit(x_to_fit)
self.elementary_spectra = interpolate_elementary(x_to_fit)
self.wave = x_to_fit
if first_run is False:
self.continuum = fits.getdata('Data/continuum.fits')
self.gas_lines = fits.getdata('Data/gas_lines.fits')
self.AIBs = fits.getdata('Data/AIBs.fits')
else:
self.continuum = None
self.gas_lines = None
self.AIBs = np.zeros([self.wave.shape[0], self.elementary_spectra.shape[0]])
self.PAHp = np.zeros([self.wave.shape[0], 1])
self.PAH0 = np.zeros([self.wave.shape[0], 1])
self.eVSG = np.zeros([self.wave.shape[0], 1])
self.PAHx = np.zeros([self.wave.shape[0], 1])
self.AIBs_gauss = None
self.model = None
self.fit_result = None
self.nb_comp = None
self.c_ext = None
self.gas_lines_ind = None
self.species = None
self.central_lambda = None
self.first_run = first_run
def build_continuum(self, t_min=40, t_max=6000, delta_t=20, nh=None):
"""
Creates the continuum model
:param t_min: float
Minimal temperature of the blackbodies catalog in K, default is 40
:param t_max: float
Maximal temperature of the blackbodies catalog in K, default is 6000
:param delta_t: float
step size in temperature between two blackbodies, in K, default is 20
:param nh: array
column densities taken into account for the extinction, default is None
fills the continuum Class parameter with an array of nb_CN columns of len(wave) points
Writes the continuum in 'Data/continuum.fits' file
"""
# extinction law from Weingartner and Drain 2001, Rv=5.5
if nh is None:
nh = [0, 1e18, 1.0e19, 1.0e20, 1.6e22, 5e22, 1e23]
wave_draine__, albe, cos, cext__ = np.loadtxt('Data/extinction_curve/Cext_tabulation_draine_55.txt',
unpack=True,
usecols=[0, 1, 2, 3])
Cext_ = cext__[np.where(wave_draine__ < (self.wave[-1]))]
wave_draine_ = wave_draine__[np.where(wave_draine__ < (self.wave[-1]))]
cext_ = Cext_[np.where(wave_draine_ > (self.wave[0]))]
wave_draine = wave_draine_[np.where(wave_draine_ > (self.wave[0]))]
C_ext_new = interpolate.interp1d(wave_draine, cext_, fill_value="extrapolate")
Cext = C_ext_new(self.wave)
self.c_ext = Cext
# =============================================================================
# Grey body parameters
# number of black body
nb_CN = len(nh) * ((t_max - t_min) / delta_t)
nb_CN = int(nb_CN)
slope = np.arange(0.1, 3, 0.1)
# creates continuum model
self.continuum = np.zeros([len(self.wave), nb_CN + slope.shape[0]])
p = 0
for num_discr_cn in range(len(nh)):
extinction = np.exp(-Cext * nh[num_discr_cn])
for cn in range(int(nb_CN / len(nh))):
# grey body
grey_body = extinction * blackbody(self.wave * 1e-6, t_min + cn * delta_t)
integral = np.sum(grey_body)
self.continuum[:, p] = grey_body / integral
p += 1
for factor in slope:
self.continuum[:, p] = factor * self.wave
p += 1
print('continuum array shape:', self.continuum.shape)
fits.writeto('Data/continuum.fits', self.continuum, overwrite=True)
def build_gaslines(self):
"""
Function generating the gas lines model.
Fills the gas lines fitting catalog into the gas-lines parameter, an array of catalog Gaussian columns and
len(wave) spectral points
Write the gas lines in 'Data/gas_lines.fits' file
"""
table_lines = Table.read('Data/line_list_all.txt', format='ascii')
central_lambda = table_lines['wavelength']
self.species = table_lines['Species']
self.central_lambda = central_lambda
width = (central_lambda / self.wave_resolution)
self.gas_lines = np.zeros([len(self.wave), np.size(central_lambda)])
for i in range(len(central_lambda)):
self.gas_lines[:, i] = gauss(self.wave, (central_lambda[i]), width[i] / 2.355)
print('gas array shape:', self.gas_lines.shape)
fits.writeto('Data/gas_lines.fits', self.gas_lines, overwrite=True)
def build_aib(self, norm):
"""
Function generating the catalog used to fit the AIBs (Aromatic Infrared Band).
:param norm: bool
choose or not to normalize the elementary spectra if needed.
Fills the AIB parameter with fitting catalog, an array of 4 columns and len(wave) spectral points
Writes the AIB fitting catalog into 'Data/AIBs.fits' file
"""
if norm:
for i_norm in range(self.elementary_spectra.shape[0]):
print(self.elementary_spectra.shape, self.wave.shape)
self.AIBs[:, i_norm] = self.elementary_spectra[i_norm] / np.trapz(self.elementary_spectra[i_norm],
self.wave)
self.PAHp[:, 0] = self.AIBs[:, 0]
self.PAH0[:, 0] = self.AIBs[:, 2]
self.eVSG[:, 0] = self.AIBs[:, 1]
self.PAHx[:, 0] = self.AIBs[:, 3]
else:
for i_norm in range(self.elementary_spectra.shape[0]):
print(self.elementary_spectra.shape, self.wave.shape)
self.AIBs[:, i_norm] = self.elementary_spectra[i_norm].copy
print('AIBs arrays shape:', self.AIBs.shape)
fits.writeto('Data/AIBs.fits', self.AIBs, overwrite=True)
def build_catalog(self):
"""
Construction of the basis of the model
"""
print('Fit of the observed spectra with the elementary spectra for the AIBs')
if self.first_run:
self.build_aib(norm=True)
self.build_gaslines()
self.build_continuum()
nb_comp = [self.gas_lines.shape[1], self.continuum.shape[1], self.PAHp.shape[1], self.PAH0.shape[1],
self.eVSG.shape[1], self.PAHx.shape[1]]
self.model = np.concatenate((self.gas_lines, self.continuum, self.PAHp, self.PAH0, self.eVSG, self.PAHx), 1)
self.nb_comp = nb_comp
def fit(self, fit=True):
"""
Fitting function using a non-negative least square algorithm, calculating the scale factor of each component
of the base.
It uses the "build_catalog" method the first time if no catalog has been generated.
fill the "fit_result" object with a dictionary containing the fitting information
:param fit: bool
Runs PAHTAT on the spectrum to fit. Default is True.
"""
if fit:
self.build_catalog()
self.fit_result = {}
obj_total_flux = np.zeros([self.spectrum_to_fit.shape[0], 1])
# fitting
print('Fitting spectrum...')
start_time = time.time()
x, norm1 = nnls(self.model, self.spectrum_to_fit)
print("--- %s seconds ---" % (time.time() - start_time))
print(norm1)
# extracting each component
model = np.sum(x * self.model, axis=1)
save_x = x.copy()
obj_total_flux[0] = np.trapz(self.spectrum_to_fit, self.wave)
if len(self.nb_comp) > 1:
gas = np.sum(x[:self.nb_comp[0]] * self.model[:, :self.nb_comp[0]], axis=1)
self.gas_lines_ind = x[:self.nb_comp[0]] * self.model[:, :self.nb_comp[0]]
cont = np.sum(x[self.nb_comp[0]:(self.nb_comp[0] + self.nb_comp[1])] *
self.model[:, self.nb_comp[0]:(self.nb_comp[0] + self.nb_comp[1])], axis=1)
AIBs = np.sum(x[(self.nb_comp[0] + self.nb_comp[1]):] *
self.model[:, (self.nb_comp[0] + self.nb_comp[1]):], axis=1)
pahp_position_l = self.nb_comp[0] + self.nb_comp[1]
pahp_position_h = self.nb_comp[0] + self.nb_comp[1] + self.nb_comp[2]
pah0_position = self.nb_comp[0] + self.nb_comp[1] + self.nb_comp[2] + self.nb_comp[3]
evsg_position = self.nb_comp[0] + self.nb_comp[1] + self.nb_comp[2] + self.nb_comp[3] + self.nb_comp[4]
pahx_position = self.nb_comp[0] + self.nb_comp[1] + self.nb_comp[2] + self.nb_comp[3] + self.nb_comp[4] \
+ self.nb_comp[5]
PAHp = np.sum(x[pahp_position_l:pahp_position_h] * self.model[:, pahp_position_l:pahp_position_h], axis=1)
PAH0 = np.sum(x[pahp_position_h:pah0_position] * self.model[:, pahp_position_h:pah0_position], axis=1)
eVSG = np.sum(x[pah0_position:evsg_position] * self.model[:, pah0_position:evsg_position], axis=1)
PAHx = np.sum(x[evsg_position:pahx_position] * self.model[:, evsg_position:pahx_position], axis=1)
self.fit_result = {'wave': self.wave, 'model': model, 'AIBs': AIBs, 'continuum': cont, 'gas': gas,
'Base': self.model, 'weights': save_x, 'int': obj_total_flux, 'PAHp': PAHp, 'PAH0': PAH0,
'eVSG': eVSG, 'PAHx': PAHx}
else:
self.fit_result = {'wave': self.wave, 'model': model, 'Base': self.model, 'weights': save_x,
'int': obj_total_flux}
def plot_results(self, fit=True):
"""
Plots the fit of the spectrum
:param fit: bool
if True, fits the spectrum using pahtat templates, default is True
"""
if fit:
self.fit()
plt.figure(3, figsize=(16, 6))
plt.plot(self.wave, self.spectrum_to_fit, 'blue', label='data')
plt.plot(self.wave, self.fit_result['continuum'], 'black', ls='--', label='continuum')
plt.plot(self.wave, self.fit_result['gas'], 'k', label='gas')
plt.plot(self.wave, self.fit_result['model'], 'red', label='fit', zorder=9)
plt.plot(self.wave, self.fit_result['PAHp'], label=r'PAH$^+$')
plt.plot(self.wave, self.fit_result['PAH0'], label=r'PAH$^0$')
plt.plot(self.wave, self.fit_result['eVSG'], label=r'eVSG')
plt.plot(self.wave, self.fit_result['PAHx'], label=r'PAH$^x$')
plt.legend(fontsize=15)
plt.xlabel(r"Wavelength ($\mu$m)", fontsize=20)
plt.ylabel(r'Flux density (MJy/sr)', fontsize=20)
plt.xticks(fontsize=20)
plt.yticks(fontsize=20)
# plt.subplots_adjust(top=0.989, bottom=0.175, left=0.08, right=0.995, hspace=0.2, wspace=0.2)
plt.tight_layout()
plt.show()
def save_results(self, filename, gas_lines=False):
"""
saves the fitting results in an ascii file with filename.txt
"""
results = Table()
results['wavelength'] = self.wave
results['data'] = self.spectrum_to_fit
results['gas_lines'] = self.fit_result['gas']
results['continuum'] = self.fit_result['continuum']
results['AIB_1'] = self.fit_result['PAHp']
results['AIB_2'] = self.fit_result['eVSG']
results['AIB_3'] = self.fit_result['PAH0']
results['AIB_4'] = self.fit_result['PAHx']
results['model'] = self.fit_result['model']
results.write(filename + '.txt', format='ascii', overwrite=True)
if gas_lines:
gas_contrib = Table()
gas_contrib['species'] = self.species
gas_contrib['central_lambda'] = self.central_lambda
flux = []
for i in range(np.size(self.species)):
# This computes the line fluxes in erg/s/cm2 assuming the spectrum is in MJy/sr
flux.append(np.abs(np.trapz(np.transpose(self.gas_lines_ind)[i] * 1e-17, c / (self.wave * 1e-6))))
gas_contrib['flux'] = flux
gas_contrib.write(filename + '_line_fluxes.txt', format='ascii', overwrite=True)