component_detector_shutter.py 8.13 KB
import time

try:
    from .component import Component, ComponentException
except:
    from component import Component, ComponentException

try:
    from .guitastrotools import GuitastroException, GuitastroTools
except:
    from guitastrotools import GuitastroException, GuitastroTools

# #####################################################################
# #####################################################################
# #####################################################################
# Class Component Detector Shutter
# #####################################################################
# #####################################################################
# #####################################################################

class ComponentDetectorShutterException(GuitastroException):

    ERR_SHUTTER_DOES_NOT_RESPOND = 0

    errors = [""]*1
    errors[ERR_SHUTTER_DOES_NOT_RESPOND] = "The shutter communication problem"


class ComponentDetectorShutter(ComponentDetectorShutterException, Component, GuitastroTools):
    """Component for Detector shutter

    Implement only simulation operations.

    Usage : ComponentDetectorShutter()
    """

    # --- same as motoraxis.py
    SHUTTER_STATE_UNKNOWN = -1
    SHUTTER_STATE_OPENED = 0
    SHUTTER_STATE_SYNCHRO = 1
    SHUTTER_STATE_CLOSED = 2

    # =====================================================================
    # methods
    # =====================================================================

    # ------------ init

    def _my_init_params(self, param_optionals: dict, shutter_types: dict):
        # --- This method could be overriden
        return param_optionals, shutter_types

    def init(self, *args, **kwargs):
        # --- Dico of optional parameters for all axis_types
        param_optionals = {}
        param_optionals["MODEL"] = (str, "")
        param_optionals["MANUFACTURER"] = (str, "")
        param_optionals["SERIAL_NUMBER"] = (str, "")
        param_optionals["REAL"] = (bool, False)
        param_optionals["DESCRIPTION"] = (str, "No description.")
        param_optionals["LIBRARY"] = (any, None)
        param_optionals["LOGO_FILE"] = (str,"")
        param_optionals["CLIENT_DATA"] = (any,None)
        # --- Dico of axis_types and their parameters
        shutter_types = {}
        shutter_types["IRIS"]= {"MANDATORY" : {"NAME":[str,"Unknown"]}, "OPTIONAL" : {"LABEL":[str,"Detector shutter"]} }
        # --- Add params and types when inherited
        param_optionals, shutter_types = self._my_init_params(param_optionals, shutter_types)
        # --- Decode args and kwargs parameters
        self._shutter_params = self.decode_args_kwargs(0, shutter_types, param_optionals, *args, **kwargs)
        # ===
        self._mount_type = self._shutter_params["SELECTED_ARG"]
        self.name = self._shutter_params["NAME"]
        self._description = self._shutter_params["DESCRIPTION"]
        self._model = self._shutter_params["MODEL"]
        self._manufacturer = self._shutter_params["MANUFACTURER"]
        self._serial_number= self._shutter_params["SERIAL_NUMBER"]
        # --- Update the database using the queue
        param = {}
        param["state_simu"] = self.SHUTTER_STATE_UNKNOWN
        param["state_real"] = self.SHUTTER_STATE_UNKNOWN
        param["state"] = self.SHUTTER_STATE_UNKNOWN
        self._queue.put(param)
        time.sleep(0.02) # wait to be sure the thread has record the new value
        # ---
        self._my_init1(*args, **kwargs)
        # ---
        self._my_init2(*args, **kwargs)
        # ---

    # ------------ repr
    def __repr__(self):
        name = self.name
        msg = super().__repr__()
        msg += f"\n--- Component {name} parameters:\n"
        for key, val in self._shutter_params.items():
            msg += f" * {key} = {val}\n"
        return msg

    # ------------ prop

    def prop(self):
        #Component property minimal concrete method
        #
        #Extended properties will be added in the method _my_prop.
        #
        prop = {}
        prop['category'] = 'DetectorShutter'
        prop['actions'] = ['GET', 'SET', 'DO']
        prop['DO'] = {}
        prop['DO']['NATIVE'] = "The message is sent to the device with no transformation"
        prop['DO']['CLOSE'] = "Close the shutter and keep it closed all the time"
        prop['DO']['OPEN'] = "Open the shutter and keep it opened all the time"
        prop['DO']['SYNCHRO'] = "Close the shutter and open it only during exposures"
        prop = self._my_prop(prop)
        return prop

    # ------------ do

    def _do(self, *args, **kwargs):
        result = None
        self._verify_nargs(1, *args)
        operation = args[0].upper()
        operations = list(self.prop()['DO'].keys())
        if operation not in operations:
            msg = f"{operation} not found amongst {operations}"
            raise ComponentException(ComponentException.ERR_OPERATION_NOT_FOUND, msg)
        # ---
        if operation == "CLOSE":
            self._queue.put(f"state_simu = {self.SHUTTER_STATE_CLOSED}")
        elif operation == "OPEN":
            self._queue.put(f"state_simu = {self.SHUTTER_STATE_OPENED}")
        elif operation == "SYNCHRO":
            self._queue.put(f"state_simu = {self.SHUTTER_STATE_SYNCHRO}")
        if self.real:
            result = self._my_do(*args, **kwargs)
        return self._fresult(result)

    def _my_do(self, *args, **kwargs):
        value = None
        return value

    # ------------ get

    def _get_real_state(self, *args, **kwargs):
        # To be overriden
        return self._get_simu_state()

    def _get_simu_state(self, *args, **kwargs):
        state_simu = self.database.query('state_simu')
        return state_simu

    def _my_get_final(self, *args, **kwargs):
        # This method should be overriden
        pass

    def _my_get(self, *args, **kwargs):
        key = args[0]
        if key == "state":
            if self.real == True:
                state = self._get_real_state()
            else:
                state = self._get_simu_state()
            self._queue.put(f"state = {state}")
            return state
        else:
            return self._my_get_final(*args, **kwargs)
        return None

    # ------------ conversion int -> string for human output

    def state2string(self, state:int, verb:bool= False):
        if state == self.SHUTTER_STATE_CLOSED:
            msg = "closed" if verb else "close"
        elif state == self.SHUTTER_STATE_SYNCHRO:
            msg = "synchro" if verb else "synchro"
        elif state == self.SHUTTER_STATE_OPENED:
            msg = "opened" if verb else "open"
        else:
            msg = "unknown" if verb else "unk"
        return msg


# #####################################################################
# #####################################################################
# #####################################################################
# Main
# #####################################################################
# #####################################################################
# #####################################################################

if __name__ == "__main__":
    default = 0
    example = input(f"Select the example (0 to 0) ({default}) ")
    try:
        example = int(example)
    except:
        example = default

    print("Example       = {}".format(example))

    if example == 0:
        """
        Basic example
        """
        comp = ComponentDetectorShutter("Z", name="test")
        inc_per_motor_rev = 1000.0
        mm_per_motor_rev = 2.0
        comp.init("Z", name="test", inc_per_motor_rev=inc_per_motor_rev, mm_per_motor_rev=mm_per_motor_rev)
        comp.verbose = 1
        comp.command("SET", "unit", "inc") # inc
        comp.command("SET", "target", 1500) # inc
        comp.command("SET", "speed_slew", 700) # inc/s
        comp.command("DO", "GOTO")
        for k in range(4):
            z, date = comp.command("DO", "COORD")
            print(f"Focus={z:.2f}")
            time.sleep(1)
        # comp._maxis.disp()