imaseries.py 10.7 KB
# -*- coding: utf-8 -*-
"""
@author: aklotz@irap.omp.eu
"""

import numpy as np
import os
#import inspect
from astropy.wcs import WCS
import multiprocessing
import psutil

try:
    from .filenames import FileNames
except:
    from filenames import FileNames

try:
    from .ima import Ima
except:
    from ima import Ima

try:
    from .guitastrotools import GuitastroException
except:
    from guitastrotools import GuitastroException

# #####################################################################
# #####################################################################
# #####################################################################
# Class ImaSeries
# #####################################################################
# #####################################################################
# This class provides an Astropy wrapper
# #####################################################################

class ImaSeriesException(GuitastroException):
    """Exception raised for errors in the ImaSeries class.
    """

    SHAPES_NOT_SAME = 0
    FILE_NOT_FOUND = 1

    errors = [""]*2
    errors[SHAPES_NOT_SAME] = "Shapes of images are not the same"
    errors[FILE_NOT_FOUND] = "File not found"



class ImaSeries(ImaSeriesException, FileNames):
    """Image processing applied to a series of images.

    This class process a series of images to generate another series of images.

    The class ImaSeries imports the following methods from the class FileNames. See the FileNames class documentation for these methods:

        * longitude
        * namings
        * naming
        * naming_rules
        * naming_ident
        * naming_get
        * naming_set
        * path
        * extension
        * get_night
        * fullfilename
        * basename
        * itername
        * enumname
        * genename
        * genenames
        * innames
        * inoutnames
        * outfilename
        * indexname
        * indexnames
        * deletenames

    """
    VERBOSE_NONE = 0
    VERBOSE_SPECIFIC = 1
    VERBOSE_DEBUG = 2
    VERBOSE_ESSENTIAL = 4
    VERBOSE_ALL = 8

    def __init__(self, *args, **kwargs):
        super().__init__()
        self._ima =  Ima()
        self._verbose_level = self.VERBOSE_NONE
        self._longiau_deg = 0.0
        self._outfilename = os.path.join(self._ima.path(), "noname"+self._ima.extension())
        self._ima.do_multiprocessing = False
        self.do_multiprocessing = True

    def execute_process_type1(self, method_process, process_args, genename, outfilegene, outfile_extension):
        """Generic method for multiprocesses
        """
        # --- Total list of files
        fnames = []
        k = 1
        for index in genename["indexes"]:
            fnames.append( self.indexname(genename, outfilegene, outfile_extension, index, k) )
            k += 1
        nitot = k - 1
        # --- Total list of files
        effective_cpu_count = self._ima.cpu_count - 2
        if effective_cpu_count < 1:
            effective_cpu_count = 1
        if self.do_multiprocessing == False:
            effective_cpu_count = 1
        # ---
        proc = psutil.Process()
        affinity = proc.cpu_affinity()
        # --- Jobs attribution
        ni = int(np.ceil(nitot / effective_cpu_count))
        i2 = 0
        jobs = []
        p = [None]*effective_cpu_count
        for i in range(0, effective_cpu_count):
            i1 = i2
            i2 = i1 + ni + 1
            if i2 > nitot:
                i2 = nitot
            subfnames = fnames[i1:i2].copy()
            # affinity = [i]
            p[i] = ImaSeries()
            p[i] = self.copy()
            process = multiprocessing.Process(target=p[i].method_process, args=(affinity, subfnames, process_args))
            jobs.append(process)
        # --- Jobs start
        for j in jobs:
            j.start()
        # --- Jobs join
        for j in jobs:
            j.join()
        # --- Jobs close
        for j in jobs:
            j.terminate()
            j.close()


    # =============================================
    # Print filters
    # =============================================

    def print_level(self, level:int=""):
        if isinstance(level,type(int)) == True:
            self._verbose_level = level
        return self._verbose_level

    def print_msg(self, level:int, message:str):
        if message == "":
            self._verbose_level = level
            return
        else:
            if self._verbose_level >= level:
                print(message)

    # =============================================
    # Image processing (pixels are modified)
    # =============================================

    def calibwcs(self, fitsname, outfilename, method="duplicate", **kwargs):
        # ---
        genename, outfilegene, outfile_extension = self.inoutnames(fitsname, outfilename, **kwargs)
        # ---
        if method.upper()=="DUPLICATE":
            # --
            k = 1
            for index in genename["indexes"]:
                # ---
                filename_short, basename, filename, outfname = self.indexname(genename, outfilegene, outfile_extension, index, k)
                # ---
                self._ima.load(filename)
                if k == 1:
                    wcs = WCS(self._ima._hdu.header, naxis = 2)
                else:
                    self._ima._hdu.header.extend(wcs.to_header())
                self._ima.save(outfname)
                k += 1
            return k-1

    def div(self, fitsname, outfilename, filename_to_div, mult:float, **kwargs):
        # ---
        genename, outfilegene, outfile_extension = self.inoutnames(fitsname, outfilename, **kwargs)
        # ---
        filename_to_div = self._ima.fullfilename(filename_to_div)
        if os.path.exists(filename_to_div) == False:
            raise ImaSeriesException("FILE_NOT_FOUND",filename_to_div)
        # --
        mult = float(mult)
        # --
        k = 1
        for index in genename["indexes"]:
            # ---
            filename_short, basename, filename, outfname = self.indexname(genename, outfilegene, outfile_extension, index, k)
            # ---
            self._ima.load(filename)
            self._ima.div(filename_to_div, mult)
            self._ima.save(outfname)
            k += 1
        return k-1

    def ngain(self, fitsname: str, outfilename: str, norm:float, **kwargs) -> int:
        # ---
        genename, outfilegene, outfile_extension = self.inoutnames(fitsname, outfilename, **kwargs)
        # ---
        norm = float(norm)
        # ---
        method_process = self.ngain_process
        process_args = (norm, )
        # ---
        nitot = self.execute_process_type1(method_process, process_args, genename, outfilegene, outfile_extension)
        return nitot

    def ngain_process(self, affinity: list, fnames: list, process_args: tuple):
        norm, = process_args
        proc = psutil.Process()
        proc.cpu_affinity(affinity)
        for fname in fnames:
            filename_short, basename, filename, outfname = fname
            # ---
            self._ima.load(filename)
            stat = self._ima.stat()
            mult = norm / stat["mean"]
            self._ima.mult(mult)
            self._ima.save(outfname)

    def register(self, fitsname, outfilename, **kwargs):
        # ---
        genename, outfilegene, outfile_extension = self.inoutnames(fitsname, outfilename, **kwargs)
        # --
        k = 1
        for index in genename["indexes"]:
            # ---
            filename_short, basename, filename, outfname = self.indexname(genename, outfilegene, outfile_extension, index, k)
            # ---
            self._ima.load(filename)
            if k == 1:
                filename_ref = filename
                translation = np.array([0, 0])
            else:
                transf = self._ima.register(filename_ref)
                translation = transf.translation
            print(f"{k} trans = {translation}")
            self._ima.save(outfname)
            k += 1
        return k-1

    def sub(self, fitsname, outfilename, filename_to_sub, **kwargs):
        # ---
        genename, outfilegene, outfile_extension = self.inoutnames(fitsname, outfilename, **kwargs)
        # ---
        filename_to_sub = self._ima.fullfilename(filename_to_sub)
        if os.path.exists(filename_to_sub) == False:
            raise ImaSeriesException("FILE_NOT_FOUND",filename_to_sub)
        # --
        #name = inspect.stack()
        k = 1
        for index in genename["indexes"]:
            # ---
            filename_short, basename, filename, outfname = self.indexname(genename, outfilegene, outfile_extension, index, k)
            # ---
            self._ima.load(filename)
            self._ima.sub(filename_to_sub)
            self._ima.save(outfname)
            k += 1
        return k-1
        #return name

    def uncosmic(self, fitsname, outfilename, **kwargs):
        # ---
        genename, outfilegene, outfile_extension = self.inoutnames(fitsname, outfilename, **kwargs)
        # --
        k = 1
        for index in genename["indexes"]:
            # ---
            filename_short, basename, filename, outfname = self.indexname(genename, outfilegene, outfile_extension, index, k)
            # ---
            self._ima.load(filename)
            self._ima.uncosmic()
            self._ima.save(outfname)
            k += 1
        return k-1

# #####################################################################
# #####################################################################
# #####################################################################
# Main
# #####################################################################
# #####################################################################
# #####################################################################

if __name__ == "__main__":

    default = 3
    example = input(f"Select the example (0 to 3) ({default}) ")
    try:
        example = int(example)
    except:
        example = default
    print("Example       = {}".format(example))

    # --- Get path_images for examples
    ima = Ima()
    path_images = ima.conf_guitastro()['path_data']
    path_products = ima.conf_guitastro()['path_products']

    if example == 1:
        imaseries = ImaSeries()
        path = r"C:\d\mp-g2a\photometrie_pulsante\2017-11-07"
        f = os.path.join(path,"Dypeg flat-001.fit")
        res = imaseries.sub(f, "i", "Dypeg offset")

    if example == 2:
        imaseries = ImaSeries()
        path = r"C:\d\mp-g2a\photometrie_pulsante\2017-11-07"
        imaseries.path(path)
        res = imaseries.ngain("i", "j", 10000)

    if example == 3:
        imaseries = ImaSeries()
        path = r"C:\d\mp-g2a\photometrie_pulsante\2017-11-07"
        imaseries.path(path)
        res = imaseries.ngain("bb", "j", 10000)