component_sensor_detector.py 11 KB
import numpy as np

try:
    from .siteobs import Siteobs
except:
    from siteobs import Siteobs

try:
    from .ima import Ima
except:
    from ima import Ima

try:
    from .etc import ExposureTimeCalculator
except:
    from etc import ExposureTimeCalculator

try:
    from .component import Component, ComponentException
except:
    from component import Component, ComponentException

try:
    from .guitastrotools import GuitastroException, GuitastroTools
except:
    from guitastrotools import GuitastroException, GuitastroTools

# #####################################################################
# #####################################################################
# #####################################################################
# Class Component Sensor Detector
# #####################################################################
# #####################################################################
# #####################################################################

class ComponentSensorDetectorException(GuitastroException):

    ERR_DETECTOR_IS_EXPOSING = 1
    ERR_DETECTOR_IS_READING = 2

    errors = [""]*3
    errors[ERR_DETECTOR_IS_EXPOSING] = "The detector is in exposure. Cannot start other exposures"
    errors[ERR_DETECTOR_IS_READING] = "The detector is reading. Cannot start other exposures"


class ComponentSensorDetector(ComponentSensorDetectorException, Component, GuitastroTools):
    """Component for Sensor detector

    Implement only simulation operations.

    Usage : ComponentSensorDetector()
    """

    # --- same as sensoraxis.py
    SENSOR_STATE_UNKNOWN = -1
    SENSOR_STATE_STOPED = 0
    SENSOR_STATE_IDLE = 1
    SENSOR_STATE_EXPOSING = 2
    SENSOR_STATE_READING = 3

    # =====================================================================
    # methods
    # =====================================================================

    # ------------ init

    def _my_init_params(self, param_optionals: dict, sensor_types: dict):
        # --- This method could be overriden
        self._matrix = np.array([[0]])
        self._header = {}
        self._header['NAXIS'] = ['NAXIS', 2, 'int', 'Number of dimensions', '']
        self._header['NAXIS1'] = ['NAXIS', 1, 'int', 'pixels along X', 'pix']
        self._header['NAXIS2'] = ['NAXIS', 1, 'int', 'pixels along Y', 'pix']
        return param_optionals, sensor_types

    def init(self, *args, **kwargs):
        """
        Conversion from Uniform Python object into protocol language
        Usage : ComponentSensorDetector("CCD", name="Test")
        """
        # === Parameters of the command line
        # --- Dico of optional parameters for all axis_types
        param_optionals = {}
        param_optionals["MODEL"] = (str, "")
        param_optionals["MANUFACTURER"] = (str, "")
        param_optionals["SERIAL_NUMBER"] = (str, "")
        param_optionals["REAL"] = (bool, False)
        param_optionals["DESCRIPTION"] = (str, "No description.")
        param_optionals["SITE"] = (Siteobs,"GPS 0 E 45 100")
        param_optionals["LOGO_FILE"] = (str,"")
        param_optionals["ETC"] = (ExposureTimeCalculator, None)
        # --- Dico of axis_types and their parameters
        sensor_types = {}
        sensor_types["CCD"]= {"MANDATORY" : {"NAME":[str,"Unknown"]}, "OPTIONAL" : {"LABEL":[str,"CCD Sensor"]} }
        sensor_types["CMOS"]= {"MANDATORY" : {"NAME":[str,"Unknown"]}, "OPTIONAL" : {"LABEL":[str,"CMOS Sensor"]} }
        # --- Add params and types when inherited
        param_optionals, sensor_types = self._my_init_params(param_optionals, sensor_types)
        # --- Decode args and kwargs parameters
        self._sensor_params = self.decode_args_kwargs(0, sensor_types, param_optionals, *args, **kwargs)
        self._sensor_type = self._sensor_params["SELECTED_ARG"]
        # === Observatory location for ephemeris
        if str(type(self._sensor_params["SITE"])) == "<class 'siteobs.Siteobs'>":
            self.siteobs = self._sensor_params["SITE"]
        else:
            self.siteobs = Siteobs(self._sensor_params["SITE"])
        # === Image in memory
        self.ima = Ima()
        self.ima.longitude(self.siteobs.longitude)
        if str(type(self._sensor_params["ETC"])) == "<class 'etc.ExposureTimeCalculator'>":
            self.ima.etc = self._sensor_params["ETC"]
        # === Name
        self.name = self._sensor_params["NAME"]
        # === Initial state real or simulation
        real = self._sensor_params["REAL"]
        # === Call init1 (inherited from Component. Will be overriden by a concrete method)
        self._my_init1(*args, **kwargs)
        # === Initialisation of axis list according the sensor_type
        # === Default axes setup
        # === Call init2 (inherited from Component. Will be overriden by a concrete method)
        self._my_init2(*args, **kwargs)
        # ===  Default database
        param = {}
        param['exptime'] = 0.1
        param['binning'] = [1, 1]
        self._queue.put(param)

    # ------------ prop

    def prop(self):
        """Component property minimal concrete method

        Extended properties will be added in the method _my_prop.
        """
        prop = {}
        prop['category'] = 'SensorDetector'
        prop['actions'] = ['GET', 'SET', 'DO']
        prop['DO'] = {}
        prop['DO']['NATIVE'] = "The message is sent to the device with no transformation"
        prop['DO']['ACQ'] = {}
        prop['DO']['ACQ']['START'] = "Start a new image acquisition"
        prop['DO']['ACQ']['STOP'] = "Stop the current sensor image with a readout"
        prop['DO']['ACQ']['CANCEL'] = "Cancel the current sensor image without readout"
        prop = self._my_prop(prop)
        return prop

    # ------------ do

    def _my_do_native(self, *args, **kwargs):
        # This method should be overriden
        return ""

    def _do_native(self, *args, **kwargs):
        operation = args[0].upper()
        # --- make the simulation
        self.log = f"native simulation of {operation}"
        result = ""
        # --- call the real
        if self.real:
            result = self._my_do_native(*args, **kwargs)
        return result

    def _my_do_acq(self, *args, **kwargs):
        # This method should be overriden
        return ""

    def _do_acq(self, *args, **kwargs):
        result = None
        operation = args[0].upper()
        # --- Manage sub operations of ACQ
        na = len(args)
        if na==1:
            result = ""
        if na==2:
            suboperation = args[1].upper()
            # --- make the simulation
            self.log = f"acq simulation of {operation}"
            param = self.database.query()
            exptime = param['exptime']
            binning = param['binning']
            # TBD
            # --- call the real
            if self.real:
                result = self._my_do_acq(*args, **kwargs)
        return result

    def _do(self, *args, **kwargs):
        """Do an action of the component.
        Args:

            *args: List of operations extracted from split of the command:

                * args[0]: main operation
                * args[1]: sub operation

        Returns:

            Result depending the operation.
        """
        result = None
        self._verify_nargs(1, *args)
        operation = args[0].upper()
        operations = list(self.prop()['DO'].keys())
        if operation not in operations:
            msg = f"{operation} not found amongst {operations}"
            raise ComponentException(ComponentException.ERR_OPERATION_NOT_FOUND, msg)
        # ---
        kwargs['operation'] = operation
        if operation == "NATIVE":
            # For real and simu
            result = self._do_native(*args, **kwargs)
        elif operation == "ACQ":
            # For real and simu
            result = self._do_acq(*args, **kwargs)
        # --- only call the real method
        self.log = f"start operation {operation}"
        if self.real:
            result = self._my_do(*args, **kwargs)
        return self._fresult(result)

    def _my_do(self, *args, **kwargs):
        value = None
        return value

    # ------------ set

    def _my_set(self, *args, **kwargs):
        return args, kwargs

    # ------------ get

    def _my_get_final(self, *args, **kwargs):
        # This method should be overriden
        pass

    def _my_get(self, *args, **kwargs):
        return self._my_get_final(*args, **kwargs)

    # =====================================================================
    # =====================================================================
    # Ima methods
    # =====================================================================
    # =====================================================================

    # ------------ conversion int -> string for human output

    def status2string(self, status:int, verb:bool= False):
        if status == self.SENSOR_STATE_STOPED:
            msg = "stoped" if verb else "stop"
        elif status == self.SENSOR_STATE_IDLE:
            msg = "idle" if verb else "idle"
        elif status == self.SENSOR_STATE_EXPOSING:
            msg = "exposing" if verb else "expose"
        elif status == self.SENSOR_STATE_READING:
            msg = "reading" if verb else "read"
        else:
            msg = "unknown" if verb else "unk"
        return msg



# #####################################################################
# #####################################################################
# #####################################################################
# Main
# #####################################################################
# #####################################################################
# #####################################################################

if __name__ == "__main__":

    default = 0
    example = input(f"Select the example (0 to 0) ({default}) ")
    try:
        example = int(example)
    except:
        example = default

    print("Example       = {}".format(example))

    import time
    if example == 0:
        """
        Simple camera simulator
        """
        # --- siteobs
        siteobs = Siteobs("GPS 0 E 49 200")
        # --- horizon
        siteobs.horizon_altaz = [(0,40), (180,0), (360,40)]
        # --- Image simulator
        etc = ExposureTimeCalculator()
        etc.camera("Audine Kaf401ME")
        etc.optics("Takahashi_180ED")
        # --- Component init
        comp = ComponentSensorDetector("CCD", name="test", site=siteobs, etc=etc)
        comp.verbose = 1
        comp.command("SET", "exptime", 1.0)
        comp.command("SET", "binning", [1, 1])
        # --- Start the ACQ
        comp.command("DO", "ACQ", "START")
        k = 0
        while k<3:
            # --- Get the current status
            status = comp.command("GET", "status")
            print(f"{k:2d} status = {status} {comp.status2string(status)}")
            time.sleep(2)
            k += 1