Source code for mountastro.mountastro_astromecca

# -*- coding: utf-8 -*-
import os
import time
import socket
import numpy as np
import matplotlib.pyplot as plt
import math

# --- celme imports
modulename = 'celme'
if modulename in dir():
    del celme
if modulename not in dir():    
    import celme

from .mountastro import Mountastro
from .mountaxis import Mountaxis
from .mountchannel import Mountchannel
from .mountutils_eqmod import Mountutils_eqmod

# #####################################################################
# #####################################################################
# #####################################################################
# Class Mountastro_Astromecca
# #####################################################################
# #####################################################################
# #####################################################################

[docs]class Mountastro_Astromecca(Mountastro,Mountchannel): # ===================================================================== # ===================================================================== # Private methods # ===================================================================== # ===================================================================== def error_messages(self, alarm_code): error_codes = [] error_code = {} error_code["Phenomenon"] = "No error" error_code["Alarm_Code"] = "00h" error_code["ALARM_LED_Blinks"] = "none" error_code["ALMCLR_Effect"] = "No effect" error_code["Protective_Function"] = "No error" error_code["Description"] = "No error" error_code["Action"] = "Nothing to do" error_codes.append(error_code) error_code = {} error_code["Phenomenon"] = "Motion and sequence execution stop" error_code["Alarm_Code"] = "32h" error_code["ALARM_LED_Blinks"] = "1" error_code["ALMCLR_Effect"] = "Clears alarm" error_code["Protective_Function"] = "Out of position range" error_code["Description"] = "The PABS value exceeded the coordinate control range (-2,147,483.648 to +2,147,483.647)." error_code["Action"] = "Check that PABS is in the range." error_codes.append(error_code) error_code = {} error_code["Phenomenon"] = "Motion and sequence execution stop" error_code["Alarm_Code"] = "90h" error_code["ALARM_LED_Blinks"] = "1" error_code["ALMCLR_Effect"] = "Clears alarm" error_code["Protective_Function"] = "Stack overflow" error_code["Description"] = "Sequence memory stack exhausted" error_code["Action"] = "Restructure sequences to reduce the number of nested blocks or subroutine calls" error_codes.append(error_code) error_code = {} error_code["Phenomenon"] = "Motion and sequence execution stop" error_code["Alarm_Code"] = "94h" error_code["ALARM_LED_Blinks"] = "1" error_code["ALMCLR_Effect"] = "Clears alarm" error_code["Protective_Function"] = "Sequence reference error" error_code["Description"] = "Attempt to call a non-existing sequence as a subroutine" error_code["Action"] = "Revise the CALL statement or rename the intended target sequence" error_codes.append(error_code) error_code = {} error_code["Phenomenon"] = "Motion and sequence execution stop" error_code["Alarm_Code"] = "98h" error_code["ALARM_LED_Blinks"] = "1" error_code["ALMCLR_Effect"] = "Clears alarm" error_code["Protective_Function"] = "Calculation overflow" error_code["Description"] = "Sequence calculation result exceeded numerical limits" error_code["Action"] = "Check math operators, make sure they cannot overflow" error_codes.append(error_code) error_code = {} error_code["Phenomenon"] = "Motion and sequence execution stop" error_code["Alarm_Code"] = "99h" error_code["ALARM_LED_Blinks"] = "1" error_code["ALMCLR_Effect"] = "Clears alarm" error_code["Protective_Function"] = "Parameter range error" error_code["Description"] = "Attempt to set a parameter to a value outside its range" error_code["Action"] = "Make sure all assignments stay within defined limits" error_codes.append(error_code) error_code = {} error_code["Phenomenon"] = "Motion and sequence execution stop" error_code["Alarm_Code"] = "9Ah" error_code["ALARM_LED_Blinks"] = "1" error_code["ALMCLR_Effect"] = "Clears alarm" error_code["Protective_Function"] = "Zero division" error_code["Description"] = "Attempt to divide by zero" error_code["Action"] = "Check division operations, test divisor for zero before division" error_codes.append(error_code) error_code = {} error_code["Phenomenon"] = "Motion and sequence execution stop" error_code["Alarm_Code"] = "9Dh" error_code["ALARM_LED_Blinks"] = "1" error_code["ALMCLR_Effect"] = "Clears alarm" error_code["Protective_Function"] = "PC command execution error" error_code["Description"] = "Attempt to modify position counter PC while a motion was in process" error_code["Action"] = "Make sure that PC is only changed when motor is stopped" error_codes.append(error_code) error_code = {} error_code["Phenomenon"] = "Motion and sequence execution stop" error_code["Alarm_Code"] = "9Eh" error_code["ALARM_LED_Blinks"] = "1" error_code["ALMCLR_Effect"] = "Clears alarm" error_code["Protective_Function"] = "User variable reference error" error_code["Description"] = "Attempt to access a non-existing user-defined variable" error_code["Action"] = "Make sure the target user-defined variable exists: use the correct name in sequence" error_codes.append(error_code) error_code = {} error_code["Phenomenon"] = "Motion and sequence execution stop" error_code["Alarm_Code"] = "9Fh" error_code["ALARM_LED_Blinks"] = "1" error_code["ALMCLR_Effect"] = "Clears alarm" error_code["Protective_Function"] = "Parameter write error" error_code["Description"] = "Attempt to change a parameter under invalid condition" error_code["Action"] = "Check if you tried to write a parameter, which is not allowed to write to, during operation." error_codes.append(error_code) error_code = {} error_code["Phenomenon"] = "Motion and sequence execution stop" error_code["Alarm_Code"] = "A0h" error_code["ALARM_LED_Blinks"] = "1" error_code["ALMCLR_Effect"] = "Clears alarm" error_code["Protective_Function"] = "Motion while in motion" error_code["Description"] = "Attempt to execute a motion while an incompatible motion is in progress" error_code["Action"] = "Make sure motions are not started before a previous motion is complete. Use MEND, poll SIGMOVE, or monitor the MOVE output to detect motion complete." error_codes.append(error_code) error_code = {} error_code["Phenomenon"] = "Motion and sequence execution stop" error_code["Alarm_Code"] = "E0h" error_code["ALARM_LED_Blinks"] = "1" error_code["ALMCLR_Effect"] = "Clears alarm" error_code["Protective_Function"] = "User alarm" error_code["Description"] = "ALMSET command intentionally executed" error_code["Action"] = "If a user alarm was not expected, check sequence programming for inappropriate ALMSET command(s)" error_codes.append(error_code) error_code = {} error_code["Phenomenon"] = "Motion and sequence execution stop" error_code["Alarm_Code"] = "10h" error_code["ALARM_LED_Blinks"] = "4" error_code["ALMCLR_Effect"] = "Clears alarm" error_code["Protective_Function"] = "Excessive Position Deviation" error_code["Description"] = "When performing the MEND command or mechanical home seeking operation, the END signal was not output in the time set by ENDWAIT." error_code["Action"] = "If DEND=0, check whether the overload was occurred or ENDACT (END range) was too small. If DEND=1, check whether the driver END signal is connected, the overload was occurred or the END signal range of the driver was too small." error_codes.append(error_code) error_code = {} error_code["Phenomenon"] = "Motion and sequence execution stop" error_code["Alarm_Code"] = "60h" error_code["ALARM_LED_Blinks"] = "7" error_code["ALMCLR_Effect"] = "Clears alarm" error_code["Protective_Function"] = "LS logic error" error_code["Description"] = "Positive and negative position limit signals on simultaneously" error_code["Action"] = "- Check limit sensors and wiring. - Check input signal configuration. - Check the logic setting for limit sensors (OTLV): Normally open (N.O.) or Normally closed (N.C.)." error_codes.append(error_code) error_code = {} error_code["Phenomenon"] = "Motion and sequence execution stop" error_code["Alarm_Code"] = "61h" error_code["ALARM_LED_Blinks"] = "7" error_code["ALMCLR_Effect"] = "Clears alarm" error_code["Protective_Function"] = "LS connected in reverse" error_code["Description"] = "Positive or negative position limit signal detected opposite home seeking direction" error_code["Action"] = "- Check limit sensors and wiring. - Check input signal configuration. - Check the logic setting for limit sensors (OTLV): Normally open (N.O.) or Normally closed (N.C.)." error_codes.append(error_code) error_code = {} error_code["Phenomenon"] = "Motion and sequence execution stop" error_code["Alarm_Code"] = "62h" error_code["ALARM_LED_Blinks"] = "7" error_code["ALMCLR_Effect"] = "Clears alarm" error_code["Protective_Function"] = "HOME operation failed" error_code["Description"] = "Unstable or unexpected position limit signal detected while seeking home position" error_code["Action"] = "- Check limit sensors and wiring. - Check input signal configuration. - Check the logic setting for limit sensors (OTLV): Normally open (N.O.) or Normally closed (N.C.)." error_codes.append(error_code) error_code = {} error_code["Phenomenon"] = "Motion and sequence execution stop" error_code["Alarm_Code"] = "63h" error_code["ALARM_LED_Blinks"] = "7" error_code["ALMCLR_Effect"] = "Clears alarm" error_code["Protective_Function"] = "HOMELS not found" error_code["Description"] = "No HOME input detected between position limit signals while seeking home position" error_code["Action"] = "Check HOME sensor wiring and connections" error_codes.append(error_code) error_code = {} error_code["Phenomenon"] = "Motion and sequence execution stop" error_code["Alarm_Code"] = "64h" error_code["ALARM_LED_Blinks"] = "7" error_code["ALMCLR_Effect"] = "Clears alarm" error_code["Protective_Function"] = "TIM, SENSOR signal error" error_code["Description"] = "TIM position or SENSOR input expected with HOME input: not found" error_code["Action"] = "Selected mechanical home seeking operation (see HOMETYP) requires a valid SENSOR input and/or a valid TIM position while HOME input active. Make sure HOME and other required input(s) can be active at the same location." error_codes.append(error_code) error_code = {} error_code["Phenomenon"] = "Motion and sequence execution stop" error_code["Alarm_Code"] = "6Ah" error_code["ALARM_LED_Blinks"] = "7" error_code["ALMCLR_Effect"] = "Clears alarm" error_code["Protective_Function"] = "LS detected during home offset motion" error_code["Description"] = "Positive or negative position limit signal detected while moving to OFFSET position after homing" error_code["Action"] = "Make sure that the OFFSET distance, measured from the HOME signal position, does not trigger a limit sensor" error_codes.append(error_code) error_code = {} error_code["Phenomenon"] = "Motion and sequence execution stop" error_code["Alarm_Code"] = "6Eh" error_code["ALARM_LED_Blinks"] = "7" error_code["ALMCLR_Effect"] = "Clears alarm" error_code["Protective_Function"] = "Driver alarm" error_code["Description"] = "Driver alarm signal is active" error_code["Action"] = "Check the driver and see the operating manual of the driver." error_codes.append(error_code) error_code = {} error_code["Phenomenon"] = "Motion and sequence execution stop" error_code["Alarm_Code"] = "6Fh" error_code["ALARM_LED_Blinks"] = "7" error_code["ALMCLR_Effect"] = "Clears alarm" error_code["Protective_Function"] = "Driver connection error" error_code["Description"] = "The command was canceled due to no response from the driver during executing the command or before executing the command" error_code["Action"] = "Be sure to check if driver and the CM10/SCX11 are connected securely." error_codes.append(error_code) error_code = {} error_code["Phenomenon"] = "Motion and sequence execution stop" error_code["Alarm_Code"] = "70h" error_code["ALARM_LED_Blinks"] = "7" error_code["ALMCLR_Effect"] = "Clears alarm" error_code["Protective_Function"] = "Motion parameter error" error_code["Description"] = "Attempt to execute motion with incompatible motion parameters" error_code["Action"] = "- Make sure current is enabled (CURRENT=1). - Home seeking: make sure required inputs are configured. - Linked indexing: make sure all linked segments execute in the same direction." error_codes.append(error_code) error_code = {} error_code["Phenomenon"] = "Motion and sequence execution stop. Motor may or may not have holding torque, depending on ALMACT." error_code["Alarm_Code"] = "68h" error_code["ALARM_LED_Blinks"] = "6" error_code["ALMCLR_Effect"] = "Clears alarm" error_code["Protective_Function"] = "Panic stop" error_code["Description"] = "System executed a panic stop because of a PSTOP input or command" error_code["Action"] = "If a panic stop was unexpected: - Check PSTOP input configuration. - Check sequence programming for inappropriate PSTOP command(s)." error_codes.append(error_code) error_code = {} error_code["Phenomenon"] = "Motion and sequence execution stop. Motor may or may not have holding torque, depending on ALMACT." error_code["Alarm_Code"] = "66h" error_code["ALARM_LED_Blinks"] = "7" error_code["ALMCLR_Effect"] = "Clears alarm" error_code["Protective_Function"] = "Hardware over travel" error_code["Description"] = "Positive or negative position limit signal detected" error_code["Action"] = "- Check motion parameters. - Make sure home position is correct. - Check limit sensors and wiring. - Check input signal configuration. - Check the logic setting for limit sensors (OTLV): Normally open (N.O.) or Normally closed (N.C.)." error_codes.append(error_code) error_code = {} error_code["Phenomenon"] = "Motion and sequence execution stop. Motor may or may not have holding torque, depending on ALMACT." error_code["Alarm_Code"] = "67h" error_code["ALARM_LED_Blinks"] = "7" error_code["ALMCLR_Effect"] = "Clears alarm" error_code["Protective_Function"] = "Software over travel" error_code["Description"] = "Position outside of programmed positive and negative position limits" error_code["Action"] = "- Check motion parameters. - Check software position limits. - Make sure home position is correct." error_codes.append(error_code) error_code = {} error_code["Phenomenon"] = "The motor lacks holding torque." error_code["Alarm_Code"] = "41h" error_code["ALARM_LED_Blinks"] = "9" error_code["ALMCLR_Effect"] = "No effect" error_code["Protective_Function"] = "EEPROM error" error_code["Description"] = "User data in non-volatile EEPROM memory is corrupt" error_code["Action"] = "Contact Oriental Motor to arrange for inspection or repair." error_codes.append(error_code) error_code = {} error_code["Phenomenon"] = "The motor lacks holding torque." error_code["Alarm_Code"] = "F0h" error_code["ALARM_LED_Blinks"] = "ON" error_code["ALMCLR_Effect"] = "No effect" error_code["Protective_Function"] = "System error" error_code["Description"] = "System detected unexpected internal logic state" error_code["Action"] = "Contact Oriental Motor to arrange for inspection or repair." error_codes.append(error_code) error_code = {} error_code["Phenomenon"] = "The motor lacks holding torque." error_code["Alarm_Code"] = "F1h" error_code["ALARM_LED_Blinks"] = "ON" error_code["ALMCLR_Effect"] = "No effect" error_code["Protective_Function"] = "Memory error" error_code["Description"] = "Internal memory access error" error_code["Action"] = "Contact Oriental Motor to arrange for inspection or repair." error_codes.append(error_code) error_code = {} error_code["Phenomenon"] = "The motor lacks holding torque." error_code["Alarm_Code"] = "F2h" error_code["ALARM_LED_Blinks"] = "ON" error_code["ALMCLR_Effect"] = "No effect" error_code["Protective_Function"] = "Sequence internal error" error_code["Description"] = "Sequence code invalid or corrupt" error_code["Action"] = "Contact Oriental Motor to arrange for inspection or repair." error_codes.append(error_code) found = False for error_code in error_codes: if (error_code["Alarm_Code"] == alarm_code) or (error_code["Alarm_Code"] == alarm_code+'h'): found = True break return found, error_code def _my_open_chan(self): # --- Concrete method # --- Overloading method according the language protocol fid = self._fid_chan if fid==None: return self.ERR_CHAN_NOT_OPENED # --- Active echo, message returned are not verbose, clear all alarms for kaxis in range(Mountaxis.AXIS_MAX): current_axis = self.axis[kaxis] if current_axis == None: continue if current_axis.real == False: continue if current_axis.language_protocol == "SCX11": cmd = self._talks[kaxis] # --- Check echo err, res = self.putread_chan(cmd + " ; ECHO",1) if res!="1": self.putread_chan(cmd + " ; ECHO 1") # --- Check echo err, res = self.putread_chan(cmd + " ; VERBOSE",1) if res!="0": self.putread_chan(cmd + " ; VERBOSE 0") # --- Check alarms err, res = self.putread_chan(cmd + " ; ALM",1) #self.log.print("kaxis={} err={} res={}".format(kaxis,err,res)) #res = "00" almclr = False if res!="00": found, error_code = self.error_messages(res) almclr = True msg = error_code["Protective_Function"] msg += error_code["Description"] self.log.print("ALARM {} axis {} because: {}".format(res,kaxis,msg)) if res=="66" or res=="67": almclr = False msg = error_code["Action"] if almclr==False: self.log.print("Cannot clear alarm {} axis {}. Action to do = {}".format(res,kaxis,msg)) else: self.log.print("Clear alarm {} axis {} ({}) ".format(res,kaxis,msg)) err, res = self.putread_chan(cmd + " ; ALMCLR ; ABORT ; ALMCLR") # --- activate the software limits LIMN and LIMP # --- and download the absolute position from the controler err, res = self.putread_chan(cmd + " ; HOMETYP 12 ; PC = 0 ; EHOME") time.sleep(0.5) err, res = self.putread_chan(cmd + " ; RUN getabs") #err, res = self.putread_chan(cmd + " ; RUN init1") #self.log.print("kaxis={} err={} res={}".format(kaxis,err,res)) time.sleep(0.5) # --- First read incs because the first time it is always zero err, res = self.putread_chan(cmd + " ; PC") #self.log.print("kaxis={} err={} res={}".format(kaxis,err,res)) err, res = self.putread_chan(cmd + " ; TA=0.5 ; TD=0.5") err, res = self.putread_chan(cmd + " ; N_BACKLASH -1000") fid.flush() time.sleep(3) # long time for the axis dec return self.NO_ERROR def _my_read_encs(self, incsimus): # --- Concrete method # --- Overloading method according the language protocol increals = incsimus # --- Loop over all the possible axis types for kaxis in range(Mountaxis.AXIS_MAX): current_axis = self.axis[kaxis] if current_axis == None: continue if current_axis.real == False: continue # --- This axis is valid and real. We read the position if current_axis.language_protocol == "SCX11": cmd = self._talks[kaxis] + " ; PC" err, res = self.putread_chan(cmd,1) if err==self.NO_ERROR: try: increals[kaxis] = float(res) except: self.log.print("Problem reading PC, axis={} res={}".format(kaxis,res)) return increals def _update_motion_states(self): """ In case of slewing started, perhaps the mount is already tracking. This method allows to switch the ASCOM states according the mount. """ # --- Concrete method # --- Overloading method according the language protocol # # --- Loop over all the possible axis types axes_motion_state_reals = [] for kaxis in range(Mountaxis.AXIS_MAX): current_axis = self.axis[kaxis] if current_axis == None: axes_motion_state_reals.append(Mountaxis.MOTION_STATE_UNKNOWN) continue if current_axis.real == False: axes_motion_state_reals.append(Mountaxis.MOTION_STATE_UNKNOWN) continue # --- This axis is valid and real. We read the position if current_axis.language_protocol == "SCX11": cmd = self._talks[kaxis] + " ; SIGMOVE" err, res = self.putread_chan(cmd,1) if err==self.NO_ERROR: try: sigmove = int(res) except: self.log.print("Problem reading SIGMOVE, axis={} res={}".format(kaxis,res)) cmd = self._talks[kaxis] + " ; Z" err, res = self.putread_chan(cmd,1) z = -1 if err==self.NO_ERROR: try: z = float(res) except: self.log.print("Problem reading Z, axis={} res={}".format(kaxis,res)) #print("z={} sigmove={} current_axis.motion_state={}".format(z, sigmove, current_axis.motion_state)) current_axis.motion_state = Mountaxis.MOTION_STATE_UNKNOWN if z==1 and sigmove==1: current_axis.motion_state = Mountaxis.MOTION_STATE_SLEWING elif z==2 and sigmove==1: current_axis.motion_state = Mountaxis.MOTION_STATE_DRIFTING elif z==3 and sigmove==1: current_axis.motion_state = Mountaxis.MOTION_STATE_MOVING elif z==0 or sigmove==0: current_axis.motion_state = Mountaxis.MOTION_STATE_NOMOTION cmd = self._talks[kaxis] + " ; Z=0" err, res = self.putread_chan(cmd,1) # --- axes_motion_state_reals.append(current_axis.motion_state) return axes_motion_state_reals def _my_hadec_drift(self,hadec_speeddrift_ha_deg_per_sec:float, hadec_speeddrift_dec_deg_per_sec:float): # --- Concrete method # --- Overloading method according the language protocol err = self.NO_ERROR res = 0 ha_target = 0 # any value is correct dec_target = 0 # any value is correct # --- Compute incs of the target and the drifts (for any mount_type) celb, celp, celr, dcelb, dcelp, dcelr = self.astro2cel("HADEC", ha_target, dec_target, hadec_speeddrift_ha_deg_per_sec, hadec_speeddrift_dec_deg_per_sec) # --- Loop over all the possible axis types for kaxis in range(Mountaxis.AXIS_MAX): current_axis = self.axis[kaxis] if current_axis == None: continue if current_axis.real == False: continue # --- This axis is valid and real. We read the position if current_axis.language_protocol == "SCX11": # === Target position and drift if kaxis == Mountaxis.BASE: dcel = dcelb elif kaxis == Mountaxis.POLAR: dcel = dcelp elif kaxis == Mountaxis.ROTATOR: dcel = dcelr # === Drift Velocity inc/sec inc_per_sec_drift = dcel * current_axis.senseinc * current_axis.inc_per_deg #print("Inital drift={} inc/sec".format(inc_per_sec_drift)) inc_per_sec_drift *= self.mult_inc_per_sec_drift if self.site.latitude>=0: inc_per_sec_drift *= -1 # --- cmd = self._talks[kaxis] cmd += " ; MSTOP ; ABORT" self.log.print("hadec_drift {} cmd >> {}".format(current_axis._axis_type, cmd)) err, res = self.putread_chan(cmd) time.sleep(0.05) if inc_per_sec_drift!=0: cmd = self._talks[kaxis] cmd += " ; N_DRFVEL={:.3f}".format(inc_per_sec_drift) self.log.print("hadec_drift {} cmd >> {}".format(current_axis._axis_type, cmd)) err, res = self.putread_chan(cmd) cmd = self._talks[kaxis] if kaxis == Mountaxis.BASE: cmd += " ; N_PECPHASE=0; N_PEC1VEL=0.0 ; N_PEC2VEL=0.0" #cmd += " ; N_PECPHASE=-80; N_PEC1VEL=0.120; N_PEC2VEL=0.080" else: cmd += " ; N_PECPHASE=0; N_PEC1VEL=0.0 ; N_PEC2VEL=0.0" cmd += " ; RUN drf1" self.log.print("hadec_drift {} cmd >> {}".format(current_axis._axis_type, cmd)) err, res = self.putread_chan(cmd) current_axis.motion_state = Mountaxis.MOTION_STATE_DRIFTING return err, res def _my_hadec_goto(self, ha_target, dec_target, pierside_target): # --- Concrete method # --- Overloading method according the language protocol err = self.NO_ERROR res = 0 # --- Compute incs of the target and the drifts (for any mount_type) celb, celp, celr, dcelb, dcelp, dcelr = self.astro2cel("HADEC", ha_target, dec_target, self._hadec_speeddrift_ha_deg_per_sec, self._hadec_speeddrift_dec_deg_per_sec) incb, incp = self.cel2enc(celb, celp, pierside_target, self.OUTPUT_SHORT, self.SAVE_NONE) incr = 0 #print("ha_target={} dec_target={} pierside_target={}".format(ha_target,dec_target,pierside_target)) #print("celb={} celp={}".format(celb,celp)) #print("incb={} incp={}".format(incb,incp)) # --- Loop over all the possible axis types for kaxis in range(Mountaxis.AXIS_MAX): current_axis = self.axis[kaxis] if current_axis == None: continue if current_axis.real == False: continue # --- This axis is valid and real. We read the position if current_axis.language_protocol == "SCX11": # === Target position and drift if kaxis == Mountaxis.BASE: inc = incb dcel = dcelb elif kaxis == Mountaxis.POLAR: inc = incp dcel = dcelp elif kaxis == Mountaxis.ROTATOR: inc = incr dcel = dcelr # === Target slew dslw = current_axis.slew_deg_per_sec # === Slew Velocity inc/sec inc_per_sec_slew = dslw * current_axis.senseinc * current_axis.inc_per_deg # === Drift Velocity inc/sec inc_per_sec_drift = dcel * current_axis.senseinc * current_axis.inc_per_deg #print("Inital drift={} inc/sec".format(inc_per_sec_drift)) inc_per_sec_drift *= self.mult_inc_per_sec_drift if self.site.latitude>=0: inc_per_sec_drift *= -1 # --- cmd = self._talks[kaxis] cmd += " ; MSTOP ; ABORT" self.log.print("hadec_goto {} cmd >> {}".format(current_axis._axis_type, cmd)) err, res = self.putread_chan(cmd) time.sleep(0.05) if inc_per_sec_drift==0: cmd = self._talks[kaxis] cmd += " ; N_GOTOVEL={:.3f}".format(abs(inc_per_sec_slew)) cmd += " ; N_GOTOPOS={:.0f}".format(inc) cmd += " ; RUN goto1" self.log.print("hadec_goto {} cmd >> {}".format(current_axis._axis_type, cmd)) err, res = self.putread_chan(cmd) else: cmd = self._talks[kaxis] cmd += " ; N_GOTOVEL={:.3f}".format(abs(inc_per_sec_slew)) cmd += " ; N_GOTOPOS={:.0f}".format(inc) cmd += " ; N_DRFVEL={:.3f}".format(inc_per_sec_drift) self.log.print("hadec_goto {} cmd >> {}".format(current_axis._axis_type, cmd)) err, res = self.putread_chan(cmd) cmd = self._talks[kaxis] if kaxis == Mountaxis.BASE: cmd += " ; N_PECPHASE=0; N_PEC1VEL=0.0 ; N_PEC2VEL=0.0" #cmd += " ; N_PECPHASE=-80; N_PEC1VEL=0.120; N_PEC2VEL=0.080" else: cmd += " ; N_PECPHASE=0; N_PEC1VEL=0.0 ; N_PEC2VEL=0.0" cmd += " ; RUN goto_drf1" self.log.print("hadec_goto {} cmd >> {}".format(current_axis._axis_type, cmd)) err, res = self.putread_chan(cmd) current_axis.motion_state = Mountaxis.MOTION_STATE_SLEWING return err, res def _my_hadec_move(self, ha_move_deg_per_sec, dec_move_deg_per_sec): # --- Concrete method # --- Overloading method according the language protocol err = self.NO_ERROR res = 0 ha_target = 0 # any value is correct dec_target = 0 # any value is correct # --- Compute incs of the target and the drifts (for any mount_type) celb, celp, celr, dcelb, dcelp, dcelr = self.astro2cel("HADEC", ha_target, dec_target, ha_move_deg_per_sec, dec_move_deg_per_sec) # --- # --- Loop over all the possible axis types for kaxis in range(Mountaxis.AXIS_MAX): current_axis = self.axis[kaxis] if current_axis == None: continue if current_axis.real == False: continue # --- This axis is valid and real. We read the position if current_axis.language_protocol == "SCX11": # === Target position and drift if kaxis == Mountaxis.BASE: dmov = dcelb elif kaxis == Mountaxis.POLAR: dmov = dcelp elif kaxis == Mountaxis.ROTATOR: dmov = dcelr # === Move Velocity inc/sec inc_per_sec_move = dmov * current_axis.senseinc * current_axis.inc_per_deg #print("Inital inc_per_sec_move={} inc/sec".format(inc_per_sec_move)) if self.site.latitude>=0: inc_per_sec_move *= -1 # --- cmd = self._talks[kaxis] cmd += " ; MSTOP ; ABORT" self.log.print("hadec_move {} cmd >> {}".format(current_axis._axis_type, cmd)) err, res = self.putread_chan(cmd) time.sleep(0.05) # --- cmd = self._talks[kaxis] #cmd += " ; Z = 0" ; # means no drift cmd += " ; N_GOTOVEL={:.3f}".format(inc_per_sec_move) cmd += " ; RUN gotolimit1" self.log.print("hadec_move {} cmd >> {}".format(current_axis._axis_type, cmd)) err, res = self.putread_chan(cmd) current_axis.motion_state = Mountaxis.MOTION_STATE_MOVING return err, res def _my_hadec_move_stop(self): # --- Concrete method # --- Overloading method according the language protocol err = self.NO_ERROR res = 0 ha_target = 0 # any value is correct dec_target = 0 # any value is correct # --- Compute incs of the target and the drifts (for any mount_type) celb, celp, celr, dcelb, dcelp, dcelr = self.astro2cel("HADEC", ha_target, dec_target, self._hadec_speeddrift_ha_deg_per_sec, self._hadec_speeddrift_dec_deg_per_sec) # --- Loop over all the possible axis types for kaxis in range(Mountaxis.AXIS_MAX): current_axis = self.axis[kaxis] if current_axis == None: continue if current_axis.real == False: continue # --- This axis is valid and real. We read the position if current_axis.language_protocol == "SCX11": # === Target position and drift if kaxis == Mountaxis.BASE: dcel = dcelb elif kaxis == Mountaxis.POLAR: dcel = dcelp elif kaxis == Mountaxis.ROTATOR: dcel = dcelr # === Drift Velocity inc/sec inc_per_sec_drift = dcel * current_axis.senseinc * current_axis.inc_per_deg #print("Inital drift={} inc/sec".format(inc_per_sec_drift)) inc_per_sec_drift *= self.mult_inc_per_sec_drift if self.site.latitude>=0: inc_per_sec_drift *= -1 # --- cmd = self._talks[kaxis] cmd += " ; MSTOP ; ABORT" self.log.print("hadec_move_stop {} cmd >> {}".format(current_axis._axis_type, cmd)) err, res = self.putread_chan(cmd) time.sleep(0.05) if inc_per_sec_drift!=0: cmd = self._talks[kaxis] cmd += " ; N_DRFVEL={:.3f}".format(inc_per_sec_drift) self.log.print("hadec_move_stop {} cmd >> {}".format(current_axis._axis_type, cmd)) err, res = self.putread_chan(cmd) cmd = self._talks[kaxis] if kaxis == Mountaxis.BASE: cmd += " ; N_PECPHASE=0; N_PEC1VEL=0.0 ; N_PEC2VEL=0.0" #cmd += " ; N_PECPHASE=-80; N_PEC1VEL=0.120; N_PEC2VEL=0.080" else: cmd += " ; N_PECPHASE=0; N_PEC1VEL=0.0 ; N_PEC2VEL=0.0" cmd += " ; RUN drf1" current_axis.motion_state = Mountaxis.MOTION_STATE_DRIFTING else: current_axis.motion_state = Mountaxis.MOTION_STATE_NOMOTION self.log.print("hadec_move_stop {} cmd >> {}".format(current_axis._axis_type, cmd)) err, res = self.putread_chan(cmd) return err, res def _my_hadec_stop(self): # --- Concrete method # --- Overloading method according the language protocol err = self.NO_ERROR res = 0 # --- Loop over all the possible axis types for kaxis in range(Mountaxis.AXIS_MAX): current_axis = self.axis[kaxis] if current_axis == None: continue if current_axis.real == False: continue # --- This axis is valid and real. We read the position if current_axis.language_protocol == "SCX11": # --- cmd = self._talks[kaxis] cmd += " ; MSTOP" # or ABORT at high speed ? err, res = self.putread_chan(cmd) current_axis.motion_state = Mountaxis.MOTION_STATE_NOMOTION return err, res def _my_inc_goto(self, axis_id:int, inc:float, inc_per_sec_slew:float): # --- Concrete method # --- Overloading method according the language protocol cmd = "" err = self.NO_ERROR res = 0 kaxis = axis_id current_axis = self.axis[kaxis] if current_axis.real==True: cmd = self._talks[kaxis] if cmd != "": cmd += " ; MSTOP ; ABORT ; VR={:.3f}".format(inc_per_sec_slew) cmd += " ; TA=0.5 ; TD=0.5 ; MA {:.0f}".format(inc) self.log.print("inc_goto axis={} cmd >> {}".format(axis_id,cmd)) err, res = self.putread_chan(cmd) return err, res def _my_goto_park(self, incb:float, incp:float, incr:float): # --- Concrete method # --- Overloading method according the language protocol err = self.NO_ERROR res = 0 self.hadec_goto(self.park_ha,self.park_dec,side=self.park_side) return err, res
[docs] def hadec_travel(self): """ Exemple of using RUN lutpos1 """ dposs = np.zeros(20) vels = np.zeros(20) kaxis = 0 cmd = self._talks[kaxis] + " ; PC" err, res = self.putread_chan(cmd,1) pc0 = float(res) dposs = np.linspace(1000,100000,20) poss = pc0 + dposs vels += 1000 n = len(dposs) for k in range(n): v = 1000 + (10000-1000)*math.exp(-(k-n/2.)*(k-n/2.)/12.0) vels[k] = v #return poss, vels # -- k = 0 cmd = self._talks[kaxis] cmd += " ; A={:.3f}".format(abs(vels[k])) cmd += " ; B={:.3f}".format(abs(poss[k])) self.log.print("hadec_travel cmd >> {}".format(cmd)) err, res = self.putread_chan(cmd) # -- k = 1 cmd = self._talks[kaxis] cmd += " ; C={:.3f}".format(abs(vels[k])) cmd += " ; D={:.3f}".format(abs(poss[k])) self.log.print("hadec_travel cmd >> {}".format(cmd)) err, res = self.putread_chan(cmd) # -- k = 2 cmd = self._talks[kaxis] cmd += " ; E={:.3f}".format(abs(vels[k])) cmd += " ; F={:.3f}".format(abs(poss[k])) self.log.print("hadec_travel cmd >> {}".format(cmd)) err, res = self.putread_chan(cmd) # -- k = 3 cmd = self._talks[kaxis] cmd += " ; G={:.3f}".format(abs(vels[k])) cmd += " ; H={:.3f}".format(abs(poss[k])) self.log.print("hadec_travel cmd >> {}".format(cmd)) err, res = self.putread_chan(cmd) # -- cmd = self._talks[kaxis] cmd += " ; run lutpos1" err, res = self.putread_chan(cmd) # -- cmd = self._talks[kaxis] cmd += " ; Y" err, res = self.putread_chan(cmd,1) y0 = float(res) y = y0 self.log.print("hadec_travel y0={}".format(y0)) # -- n = len(poss) for k in range(4,n): cmd = self._talks[kaxis] + " ; PC" err, res = self.putread_chan(cmd,1) pc = float(res) self.log.print("hadec_travel k={} y={} pc={}".format(k,y,pc)) while True: # -- cmd = self._talks[kaxis] cmd += " ; Y" err, res = self.putread_chan(cmd,1) y = float(res) if (y!=y0): break y0 = y self.log.print("hadec_travel k={} y={}".format(k,y)) cmd = self._talks[kaxis] if (y==2): cmd += " ; A={:.3f}".format(abs(vels[k])) cmd += " ; B={:.3f}".format(abs(poss[k])) elif (y==3): cmd += " ; C={:.3f}".format(abs(vels[k])) cmd += " ; D={:.3f}".format(abs(poss[k])) elif (y==4): cmd += " ; E={:.3f}".format(abs(vels[k])) cmd += " ; F={:.3f}".format(abs(poss[k])) elif (y==1): cmd += " ; G={:.3f}".format(abs(vels[k])) cmd += " ; H={:.3f}".format(abs(poss[k])) self.log.print("hadec_travel cmd >> {}".format(cmd)) err, res = self.putread_chan(cmd) kaxis = 0 cmd = self._talks[kaxis] + " ; SSTOP ; ABORT" err, res = self.putread_chan(cmd,1) self.log.print("hadec_travel Finished")
def tachymeter_read_enc(self, axis_id:int): cmd = "" err = self.NO_ERROR res = 0 kaxis = axis_id current_axis = self.axis[kaxis] if current_axis.real==True: cmd = self._talks[kaxis] if cmd != "": cmd += " ; PC" err, res = self.putread_chan(cmd,1) return err, res def tachymeter_check_vrs(self, duration_sec:float, vrmin:float, vrmax:float, dvr:float): duration_sec0 = duration_sec axis_id = Mountaxis.BASE current_axis = self.axis[axis_id] vr = vrmin while vr <= vrmax: deg_per_sec = 1.0*vr/(current_axis.senseinc * current_axis.inc_per_deg) arcsec_per_sec = deg_per_sec*3600 duration_sec = duration_sec0 deg = deg_per_sec * duration_sec if deg>120: duration_sec /= 2 v, dv, ratio = self.tachymeter_check_vr(duration_sec, vr) msg = "==> VR={:.3f} Vobs={:.4f} +/- {:.4f} ratio={:.5f} drift={:.3f}".format(vr,v,dv,ratio,arcsec_per_sec) self.log.print(msg) vr +=dvr def tachymeter_check_vr(self, duration_sec:float, vr:float=""): axis_id = Mountaxis.BASE # --- if vr=="": cmd = self._talks[axis_id] + " ; VR" err, res = self.putread_chan(cmd,1) if err==self.NO_ERROR: vr = float(res) else: vr = 0 #self.log.print("vr = {}".format(vr)) # --- if False: # --- methode classique motion = "MCN" # --- cmd = "MSTOP".format(vr,motion) err, res = self.putread_chan(cmd,1) time.sleep(1) cmd = "VR={:.3f} ; {}".format(vr,motion) err, res = self.putread_chan(cmd,1) else: # --- methode en alternance inc_per_sec_drift = vr #period = 10.0 # sec #dpr = self.axis[axis_id].inc_per_motor_rev/10000 #fdpr = round((1.0+dpr),3)-1.0 #m = inc_per_sec_drift/fdpr #print("Corrected drift={} inc/sec dpr={} fdpr={} m={}".format(inc_per_sec_drift,dpr,fdpr,m)) #mint1 = math.floor(m) #cycrat = m - mint1 #vr1 = (mint1+0.5)*fdpr #p1 = (1-cycrat)*period #mint2 = mint1+1 #vr2 = (mint2+0.5)*fdpr #p2 = cycrat*period #print("C={:.3f}({:.3f}) D={:.3f} E={:.3f}({:.3f}) F={:.3f}".format(vr1,mint1*fdpr,p1,vr2,mint2*fdpr,p2)) # --- cmd = self._talks[axis_id] cmd += " ; MSTOP ; ABORT" err, res = self.putread_chan(cmd) time.sleep(0.1) cmd = self._talks[axis_id] cmd += " ; N_DRFVEL={:.3f}".format(inc_per_sec_drift) err, res = self.putread_chan(cmd) cmd = self._talks[axis_id] cmd += " ; TA = 0.5 ; TD=0.5 ; RUN drf1" err, res = self.putread_chan(cmd) time.sleep(2) # --- v_av0 = 0 v_av = 0 t0 = time.time() ts = [] incs = [] dincs = [] while True: t = time.time() dt_sec = (t-t0) if dt_sec > duration_sec: break # --- err, inc = self.tachymeter_read_enc(axis_id) inc = float(inc) if incs==[]: inc0 = inc dinc = inc - inc0 ts.append(dt_sec) incs.append(inc) dincs.append(dinc) # --- if len(incs)>=2: # --- instantaneous dinc2 = dincs[-1] dinc1 = dincs[-2] t2 = ts[-1] t1 = ts[-2] dinc = dinc2-dinc1 # inc dt = t2-t1 # sec if len(incs)>=3: # --- averaged dinc2 = dincs[-1] dinc1 = dincs[1] t2 = ts[-1] t1 = ts[1] dinc = dinc2-dinc1 # inc dt = t2-t1 # sec v_av = dinc/dt # inc/sec else: v_av = 0 dv_av = v_av - v_av0 # --- #ligne = "{} {} {}".format(dt_sec,v,v_av) #with open("sideral_positions.txt","a",encoding='utf-8') as fid: # fid.write(ligne+"\n") # --- #self.log.print("dt_sec={:.1f} v={:.4f} v_av={:.4f} dv_av={:.4f}".format(dt_sec,v,v_av,dv_av)) time.sleep(5.0) v_av0 = v_av # --- cmd = "SSTOP" err, res = self.putread_chan(cmd,1) # --- # === if v_av==0: ratio = 1 else: ratio = vr/v_av return v_av, abs(dv_av), ratio def tachymeter_check_sideral(self, duration_min, mult=""): self.log.print("calib_sideral_duration {} minutes".format(duration_min)) # --- if mult=="": mult = self.mult_inc_per_sec_drift self.mult_inc_per_sec_drift = mult self.log.print("mult = {}".format(mult)) # --- angle = str(self.site.longitude) date = celme.Date("now") jd = date.jd() + 3./24 longuai = celme.Angle(angle).rad() meca = celme.Mechanics() lst = meca._mc_tsl(jd,-longuai) ang = celme.Angle(str(lst)+"r") lst_hms = ang.sexagesimal("H0.0") ra = lst_hms dec = 0 self.log.print("Pointing {} {}".format(ra,dec)) self.hadec_speeddrift("diurnal",0) self.radec_goto(ra, dec, blocking=False) time.sleep(10.0) self.log.print("End of pointing") #diurnal_drift = 360*3600/self._sideral_sec_per_day # 15.0410844 arcsec/sec t0 = time.time() ts = [] ras = [] dras = [] while True: t = time.time() dt_sec = (t-t0) dt_min = dt_sec/60. if dt_min > duration_min: self.log.print("exit after dt_min={}".format(dt_min)) break # --- ra, dec, side = self.radec_coord() ra_deg = celme.Angle(ra).deg() if ras==[]: ra_deg0 = ra_deg dra_deg = ra_deg - ra_deg0 ts.append(dt_sec) ras.append(ra_deg) dras.append(dra_deg*3600) # --- if len(ras)>=2: # --- instantaneous dra2 = dras[-1] dra1 = dras[-2] t2 = ts[-1] t1 = ts[-2] dra = dra2-dra1 # arcsec dt = t2-t1 # sec drift = dra/dt # arcsec/sec else: drift = 0 if len(ras)>=3: # --- averaged dra2 = dras[-1] dra1 = dras[1] t2 = ts[-1] t1 = ts[1] dra = dra2-dra1 # arcsec dt = t2-t1 # sec drift_av = dra/dt # arcsec/sec else: drift_av = 0 # --- ligne = "{} {} {}".format(dt_sec,dra_deg*3600,ra_deg) with open("sideral_positions.txt","a",encoding='utf-8') as fid: fid.write(ligne+"\n") # --- self.log.print("dt_sec={:.1f} dra={:.2f} drift={:.2f} arcsec/sec av={:2f}".format(dt_sec,dra_deg*3600,drift,drift_av)) time.sleep(5.0) # === xs = np.array(ts) ys = np.array(dras) fig = plt.figure() ax = fig.add_subplot(1,1,1) h = ax.plot(xs,ys,"r-") h[0].set_linewidth(1.5) plt.title("Derive during sideral drift mult={}".format(mult)) plt.xlabel("time (sec)") plt.ylabel("R.A. offset (arcsec)") plt.grid(True) plt.show() outfile = "sideral_positions.png" plt.savefig(outfile, facecolor='w', edgecolor='w') self.hadec_stop() # ===================================================================== # ===================================================================== # Methods for experimented users (debug, etc) # ===================================================================== # ===================================================================== def putread(self, axis_id:int, msg, index=-1, disp=True): cmd = "" err = self.NO_ERROR res = 0 kaxis = axis_id current_axis = self.axis[kaxis] if current_axis.real==True: cmd = self._talks[kaxis] if cmd != "": cmd += " ; {}".format(msg) if disp==True: self.log.print("putread {} cmd >> {}".format(axis_id,cmd)) err, res = self.putread_chan(cmd, index) return err, res # ===================================================================== # ===================================================================== # Methods for users # ===================================================================== # ===================================================================== # ===================================================================== # ===================================================================== # Special methods # ===================================================================== # ===================================================================== def __init__(self, *args, **kwargs): """ Conversion from Uniform Python object into protocol language Usage : Mountastro("HADEC", name="SCX11") """ # === Decode params # --- Use the __init__ of the parent class Mountastro super(Mountastro_Astromecca,self).__init__(*args, **kwargs) # --- Special params for this mount # --- Dicos of optional and mandatory parameters params_optional = {} # --- special SCX11 for TALK function params_optional["CONTROLLER_BASE_ID"] = (int,1) params_optional["CONTROLLER_POLAR_ID"] = (int,2) params_optional["CONTROLLER_ROT_ID"] = (int,3) params_mandatory = {} # --- Decode parameters params = self.decode_kwargs(params_optional, params_mandatory, **kwargs) # --- Add the personnal dict to the general one self._mount_params.update(params) # === Configure according params # --- talks are string that are prefix of SCX11 commands to switch in the dedicaded controler in daisy chain self._talks = ["" for kaxis in range(Mountaxis.AXIS_MAX)] self._talks[Mountaxis.BASE] = "TALK"+str(self._mount_params["CONTROLLER_BASE_ID"]) self._talks[Mountaxis.POLAR] = "TALK"+str(self._mount_params["CONTROLLER_POLAR_ID"]) self._talks[Mountaxis.ROT] = "TALK"+str(self._mount_params["CONTROLLER_ROT_ID"]) # --- shortcuts axisb = self.axis[Mountaxis.BASE] axisp = self.axis[Mountaxis.POLAR] axisr = self.axis[Mountaxis.ROT] # --- initialize if axisb!=None: axisb.real = True axisb.ratio_wheel_puley = 6.29870 axisb.inc_per_motor_rev = 1540 # IMC parameter. System Confg -> System Parameters - Distance/Revolution axisb.ratio_puley_motor = 100 # harmonic reducer axisb.senseinc = -1 axisb.slew_deg_per_sec = 10 axisb.update_inc0(0,-90,axisb.PIERSIDE_POS1) axisb.language_protocol = "SCX11" if axisp!=None: axisp.real = False axisp.ratio_wheel_puley = 6.4262 axisp.inc_per_motor_rev = 1525 axisp.ratio_puley_motor = 100 # harmonic reducer axisp.senseinc = -1 axisp.slew_deg_per_sec = 10 axisp.update_inc0(0,90,axisp.PIERSIDE_POS1) axisp.language_protocol = "SCX11" if axisr!=None: axisr.real = False axisr.ratio_wheel_puley = 6.4262 axisr.inc_per_motor_rev = 1525 axisp.ratio_puley_motor = 100 axisr.senseinc = -1 axisr.slew_deg_per_sec = 10 axisr.update_inc0(0,90,axisr.PIERSIDE_POS1) axisr.language_protocol = "SCX11" # --- self.mult_inc_per_sec_drift = 1.017 * 0.9914 ; # 0.9984 # 1.00517 self.mult_inc_per_sec_drift = 1.0082+0.0082*1.4 def __del__(self): try: self.close_chan() except: pass
# ##################################################################### # ##################################################################### # ##################################################################### # Main # ##################################################################### # ##################################################################### # ##################################################################### if __name__ == "__main__": cwd = os.getcwd() hostname = socket.gethostname() example = 1 print("Example = {}".format(example)) if 'mount_astromecca' in globals(): del(mount_astromecca) if 'eqmod_chan' in globals(): del(eqmod_chan) #try: # mount_astromecca.close_chan() #except: # pass #try: # eqmod_chan.close_chan() #except: # pass # --- configuration depending the computer if hostname == "titanium": print("Configuration = {}".format(hostname)) port_serial_scx11='//./com6' ; # '//./com4' port_serial_eqmod='//./com1' elif hostname == "rapido2": print("Configuration = {}".format(hostname)) port_serial_scx11='/dev/ttyAMA0' port_serial_eqmod='/dev/ttyAMA1' else: print("Attention, pas de configuration pour {}".format(hostname)) port_serial_scx11='//./com1' port_serial_eqmod='//./com2' print("port_scx11 = {}".format(port_serial_scx11)) print("port_eqmod = {}".format(port_serial_eqmod)) if example == 1: """ Basic example """ home = celme.Home("GPS 2.0375 E 43.6443484725 136.9") site = celme.Site(home) # === ASTROMECCA connection mount_astromecca = Mountastro_Astromecca("HADEC", name="Guitalens Mount", manufacturer="Astro MECCA", model="TM350", serial_number="beta001", site=site, CONTROLLER_BASE_ID=1, CONTROLLER_POLAR_ID=2) mount_astromecca.set_channel_params("SERIAL", port=port_serial_scx11, baud_rate=115200, delay_init_chan=0.1, end_of_command_to_send="\r\n".encode('utf8'), end_of_command_to_receive="\r\n".encode('utf8'), delay_put_read=0.06) mount_astromecca.verbose_chan = False # --- shortcuts mount_astromecca_axisb = mount_astromecca.axis[Mountaxis.BASE] mount_astromecca_axisp = mount_astromecca.axis[Mountaxis.POLAR] # --- simulation or not mount_astromecca_axisb.real = False mount_astromecca_axisb.ratio_wheel_puley = 6.132857; # 6.27819 ; # 6.32721 ; # D=208.0 ; d=32.5 ; f=1.5 ; (D+f/2)/(d+f/2) mount_astromecca_axisb.inc_per_motor_rev = 1540 # DPR for -490000 to +490000 mount_astromecca_axisb.senseinc = 1 mount_astromecca_axisp.real = False mount_astromecca_axisp.ratio_wheel_puley = 6.75 ; # 6.75 ; # 6.7462935 ; # D=208.0 ; d=30.0 ; f=1.5 ; (D+f/2)/(d+f/2) mount_astromecca_axisp.inc_per_motor_rev = 1421 mount_astromecca_axisp.senseinc = -1 # --- Initial ha,dec for encoders #mount_astromecca_axisb.update_inc0(10750,-90,mount_astromecca_axisb.PIERSIDE_POS1) mount_astromecca.set_param("CONFIGURATION","Fork") if mount_astromecca.get_param("CONFIGURATION")=="German": # --- German mount mount_astromecca.set_param("LABEL_REGULAR","Tube West") ; # Tube west = PIERSIDE_POS1 mount_astromecca.set_param("LABEL_FLIPED","Tube East") mount_astromecca.set_param("CAN_REVERSE",True) mount_astromecca.set_param("LIME_REVERSE",+30) ; # Tube west = PIERSIDE_POS1 = [-180 : lim_side_east] mount_astromecca.set_param("LIMW_REVERSE",-30) ; # Tube east = PIERSIDE_POS2 = [lim_side_west : +180] mount_astromecca_axisb.update_inc0(0,-90,mount_astromecca_axisb.PIERSIDE_POS1) mount_astromecca_axisp.update_inc0(0,90,mount_astromecca_axisp.PIERSIDE_POS1) if mount_astromecca_axisb.real == True: mount_astromecca_axisb.update_inc0(62500,-90,mount_astromecca_axisb.PIERSIDE_POS1) if mount_astromecca_axisp.real == True: mount_astromecca_axisp.update_inc0(6500,90,mount_astromecca_axisp.PIERSIDE_POS1) mount_astromecca.park_ha = 270 mount_astromecca.park_dec = 90 mount_astromecca.park_side = mount_astromecca_axisb.PIERSIDE_POS1 elif mount_astromecca.get_param("CONFIGURATION")=="Fork": # --- Fork mount. Tube always "west" in "auto" mount_astromecca.set_param("LABEL_REGULAR","Regular") ; # Regular = PIERSIDE_POS1 mount_astromecca.set_param("LABEL_FLIPED","Fliped") mount_astromecca.set_param("CAN_REVERSE",False) mount_astromecca.set_param("LIME_REVERSE",+90) ; # Tube west = PIERSIDE_POS1 = [-180 : lim_side_east] mount_astromecca.set_param("LIMW_REVERSE",-90) ; # Tube east = PIERSIDE_POS2 = [lim_side_west : +180] mount_astromecca_axisb.update_inc0(0,0,mount_astromecca_axisb.PIERSIDE_POS1) mount_astromecca_axisp.update_inc0(0,90,mount_astromecca_axisp.PIERSIDE_POS1) if mount_astromecca_axisb.real == True: mount_astromecca_axisb.update_inc0(62500,0,mount_astromecca_axisb.PIERSIDE_POS1) if mount_astromecca_axisp.real == True: mount_astromecca_axisp.update_inc0(6500,90,mount_astromecca_axisp.PIERSIDE_POS1) else: mount_astromecca_axisp._incsimu = -239793.8 mount_astromecca.park_ha = 0 mount_astromecca.park_dec = 0 mount_astromecca.park_side = mount_astromecca_axisb.PIERSIDE_POS1 # --- first read of encoders (zero values the first time) incsimus = ["" for kaxis in range(Mountaxis.AXIS_MAX)] mount_astromecca.enc2cel(incsimus, save=mount_astromecca.SAVE_ALL) # --- second read of encoders (valid values) time.sleep(0.05) mount_astromecca.enc2cel(incsimus, save=mount_astromecca.SAVE_ALL) # --- Init the simulation values according the real ones mount_astromecca_axisb.synchro_real2simu() # --- Get the initial position res = mount_astromecca.hadec_coord() mount_astromecca.log.print("Initial position = {}".format(res)) # ======= Controller mount_astromecca.speedslew(10.0,10.0) t0 = time.time() mount_astromecca.disp() dt = time.time()-t0 if True: try: mount_astromecca.pad_create("pad_dev1") except (KeyboardInterrupt, SystemExit): pass except: raise if example == 2: """ EQMOD exploration """ home = celme.Home("GPS 2.25 E 43.567 148") site = celme.Site(home) # === SCX11 connection mount_astromecca = Mountastro_Astromecca("HADEC", name="Oriental Motor", manufacturer="Astro MECCA", model="TM350", serial_number="beta001", site=site, CONTROLLER_BASE_ID=1, CONTROLLER_POLAR_ID=2) mount_astromecca.set_channel_params("SERIAL", port=port_serial_scx11, baud_rate=115200, delay_init_chan=0.1, end_of_command_to_send="\r\n".encode('utf8'), end_of_command_to_receive="\r\n".encode('utf8'), delay_put_read=0.06) mount_astromecca.verbose_chan = False # --- shortcuts mount_astromecca_axisb = mount_astromecca.axis[Mountaxis.BASE] mount_astromecca_axisp = mount_astromecca.axis[Mountaxis.POLAR] # --- simulation or not mount_astromecca_axisb.real = True mount_astromecca_axisb.ratio_wheel_puley = 6.2 mount_astromecca_axisb.inc_per_motor_rev = 1900 mount_astromecca_axisb.senseinc = -1 mount_astromecca_axisp.real = False mount_astromecca_axisp.ratio_wheel_puley = 5.25 mount_astromecca_axisp.inc_per_motor_rev = 1900 mount_astromecca_axisp.senseinc = -1 # --- Initial ha,dec for encoders mount_astromecca_axisb.update_inc0(0,-90,mount_astromecca.axisb.PIERSIDE_POS1) mount_astromecca_axisp.update_inc0(0,90,mount_astromecca.axisp.PIERSIDE_POS1) # --- first read of encoders (zero values the first time) incsimus = ["" for kaxis in range(Mountaxis.AXIS_MAX)] mount_astromecca.enc2cel(incsimus, save=mount_astromecca.SAVE_ALL) # --- second read of encoders (valid values) time.sleep(0.05) mount_astromecca.enc2cel(incsimus, save=mount_astromecca.SAVE_ALL) # --- Init the simulation values according the real ones mount_astromecca_axisb.synchro_real2simu() # --- Get the initial position res = mount_astromecca.hadec_coord() mount_astromecca.log.print("Initial position = {}".format(res)) mount_astromecca.disp() # === EQMOD simulator mount_eqmod = Mountastro("HADEC", name="EQMOD", manufacturer="EQMOD", model="EQ 6", site=site) # --- shortcuts mount_eqmod_axisb = mount_eqmod.axis[Mountaxis.BASE] mount_eqmod_axisp = mount_eqmod.axis[Mountaxis.POLAR] # --- default values to simulate a EQ6 mount a = 9024000 ; # microsteps / 360° : Number of microsteps for one turn over the sky b = 64935 ; # (microsteps2 / sec) : Velocity parameter (i) = (1|g) * (b) / speedtrack(deg/s) / ((a)/360) d = 8388608 ; # (microsteps) : initial position (j) when the mount is just switched on s = 50133 ; # (microsteps) : Microsteps to a complete turnover of worm inc_per_sky_rev = a ratio_puley_motor = 1 inc_per_motor_rev = s ratio_wheel_puley = inc_per_sky_rev/(ratio_puley_motor*inc_per_motor_rev) # --- mount_eqmod_axisb.ratio_wheel_puley = ratio_wheel_puley mount_eqmod_axisb.ratio_puley_motor = ratio_puley_motor mount_eqmod_axisb.inc_per_motor_rev = inc_per_motor_rev mount_eqmod_axisb.update_inc0(d,-90,mount_eqmod_axisb.PIERSIDE_POS1) # --- mount_eqmod_axisp.ratio_wheel_puley = ratio_wheel_puley mount_eqmod_axisp.ratio_puley_motor = ratio_puley_motor mount_eqmod_axisp.inc_per_motor_rev = inc_per_motor_rev mount_eqmod_axisp.update_inc0(d+a/4,90,mount_eqmod_axisp.PIERSIDE_POS1) # --- eqmod_chan = Mountchannel("SERIAL", port=port_serial_eqmod, baud_rate=9600, delay_init_chan=0.1, end_of_command_to_send="\r".encode('utf8'), end_of_command_to_receive="\r".encode('utf8'), delay_put_read=0.06) mount_eqmod.verbose_chan = False # --- A useful class for EQMOD eqmod = Mountutils_eqmod() # --- A useful def for interactions between SCX11 and EQMOD def eqmod_update_coord(): res = mount_astromecca.hadec_coord() ha, dec, pierside = res res = mount_eqmod.hadec_hadec2enc(ha, dec, pierside, output_format=mount_eqmod.OUTPUT_LONG, save=mount_eqmod.SAVE_AS_SIMU) return res # --- update the EQMOD coord eqmod_update_coord() mount_eqmod.disp() # ======= EQMOD a1 = a a2 = a b1 = b b2 = b d1 = d d2 = d j1max = 16777216 j2max = j1max s1 = s s2 = s # --- prepare the loop lastG1 = "00" lastG2 = "00" lastI1 = 0 lastI2 = 0 lastH1 = 0 lastH2 = 0 lastM1 = 0 lastM2 = 0 # motion_type1, motion_sense1 = eqmod.decode_G(lastG1) motion_type2, motion_sense2 = eqmod.decode_G(lastG2) if mount_astromecca.axisb.real == True: current_motion1 = eqmod.MOTION_JOG_FAST_POSITIVE else: current_motion1 = eqmod.MOTION_STOPPED_POSITIVE if mount_astromecca.axisp.real == True: current_motion2 = eqmod.MOTION_JOG_FAST_POSITIVE else: current_motion2 = eqmod.MOTION_STOPPED_POSITIVE current_motor1 = eqmod.MOTOR_ON current_motor2 = eqmod.MOTOR_ON # --- E1_e1_target = -1 try: mount_eqmod.pad_create("pad_dev1") # --- loop to be a server t0 = time.time() mount_astromecca.log.print("Enter in the loop") while True: # --- update from SCX11 ha_sigmove = 0 dec_sigmove = 0 if mount_astromecca_axisb.real == True: err, ha_sigmove = mount_astromecca.putread(Mountaxis.BASE,"SIGMOVE",1,False) ha_sigmove = int(ha_sigmove) else: if mount_astromecca_axisb._simu_signal_move==0: ha_sigmove = 0 else: ha_sigmove = 1 if mount_astromecca_axisp.real == True: err, dec_sigmove = mount_astromecca.putread(Mountaxis.POLAR,"SIGMOVE",1,False) dec_sigmove = int(dec_sigmove) else: if mount_astromecca_axisp._simu_signal_move==0: dec_sigmove = 0 else: dec_sigmove = 1 # --- Update the current motion and moving status if ha_sigmove==0: current_moving1 = eqmod.MOTION_STOPPED if current_motion1==eqmod.MOTION_JOG_FAST_POSITIVE or current_motion1==eqmod.MOTION_JOG_SLOW_POSITIVE: current_motion1 = eqmod.MOTION_STOPPED_POSITIVE elif current_motion1==eqmod.MOTION_JOG_FAST_NEGATIVE or current_motion1==eqmod.MOTION_JOG_SLOW_NEGATIVE: current_motion1 = eqmod.MOTION_STOPPED_NEGATIVE if dec_sigmove==0: current_moving2 = eqmod.MOTION_STOPPED if current_motion2==eqmod.MOTION_JOG_FAST_POSITIVE or current_motion2==eqmod.MOTION_JOG_SLOW_POSITIVE: current_motion2 = eqmod.MOTION_STOPPED_POSITIVE elif current_motion2==eqmod.MOTION_JOG_FAST_NEGATIVE or current_motion2==eqmod.MOTION_JOG_SLOW_NEGATIVE: current_motion2 = eqmod.MOTION_STOPPED_NEGATIVE #print("SIGMOVE motion_type1={} current_motion1={} current_moving1={} current_motor1={}".format(motion_type1,current_motion1,current_moving1,current_motor1)) #print("SIGMOVE motion_type2={} current_motion2={} current_moving2={} current_motor2={}".format(motion_type2,current_motion2,current_moving2,current_motor2)) # --- update f1 and f2 f1 = eqmod.encode_f(motion_type1,current_motion1,current_moving1,current_motor1) f2 = eqmod.encode_f(motion_type2,current_motion2,current_moving2,current_motor2) # (none) Fast tracking speed multiplier (set with :Gm3...) g1 = "10" g2 = "10" # --- update EQMOD from the current position of SCX11 eqmod_update_coord() incb = mount_astromecca_axisb.inc incp = mount_astromecca_axisp.inc j1 = mount_eqmod_axisb.incsimu j2 = mount_eqmod_axisp.incsimu # --- read the EQMOD commands lignes = "" try: err, lignes = eqmod_chan.read_chan() lignes = str(lignes[0]) except: pass if err==eqmod_chan.NO_ERROR and lignes != "": msg = "!" dec = "" if lignes==':e1' or lignes==':e2': msg = "=020400" elif lignes==':a1': hexa = eqmod.int2hexa(a1) msg = "="+hexa elif lignes==':a2': hexa = eqmod.int2hexa(a2) msg = "="+hexa elif lignes==':b1': hexa = eqmod.int2hexa(b1) msg = "="+hexa elif lignes==':b2': hexa = eqmod.int2hexa(b2) msg = "="+hexa elif lignes==':g1': msg = "="+g1 elif lignes==':g2': msg = "="+g2 elif lignes==':s1': hexa = eqmod.int2hexa(s1) msg = "="+hexa elif lignes==':s2': hexa = eqmod.int2hexa(s2) msg = "="+hexa elif lignes==':V200': msg = "=" elif lignes==':q1010000': msg = "!0" elif lignes==':O10': msg = "!0" elif lignes==':V27D': msg = "=" elif lignes==':j1': hexa = eqmod.int2hexa(j1) msg = "="+hexa elif lignes==':j2': hexa = eqmod.int2hexa(j2) msg = "="+hexa elif lignes==':f1': msg = "="+f1 elif lignes==':f2': msg = "="+f2 if lignes.startswith(":K")==True: axis = int(lignes[2]) if axis==1: mount_astromecca.putread(Mountaxis.BASE,"SSTOP",1) mount_astromecca_axisb.simu_motion_stop() lastH1 = 0 lastM1 = 0 else: mount_astromecca.putread(Mountaxis.POLAR,"SSTOP",1) mount_astromecca_axisp.simu_motion_stop() lastH2 = 0 lastM2 = 0 msg = "=" if lignes.startswith(":P")==True: msg = "=" if lignes.startswith(":F")==True: # set the motor power msg = "=" if lignes.startswith(":E")==True: # update inc0 axis = int(lignes[2]) hexa = lignes[3:9] deci = eqmod.hexa2int(hexa) if axis==1: E1_e1_target = j1 + deci else: j1_target = E1_e1_target j2_target = j2 + deci # --- get the new HA,Dec incsimus = ["" for kaxis in range(Mountaxis.AXIS_MAX)] incsimus[Mountaxis.BASE] = j1_target incsimus[Mountaxis.POLAR] = j2_target celb, celp, pierside = mount_eqmod.enc2cel(incsimus, output_format=mount_eqmod.OUTPUT_SHORT, save=mount_eqmod.SAVE_NONE) ha, dec = mount_eqmod.cel2hadec(celb, celp, "deg", "deg") print("incb={} ha={:.4f} pierside={}".format(incb, ha, pierside)) print("incp={} dec={:.4f} pierside={}".format(incp, dec, pierside)) print("j1_target={} ha={:.4f} pierside={}".format(j1_target, ha, pierside)) print("j2_target={} dec={:.4f} pierside={}".format(j2_target, dec, pierside)) mount_astromecca_axisb.update_inc0(incb, ha, pierside) mount_astromecca_axisp.update_inc0(incp, dec, pierside) mount_eqmod_axisb.update_inc0(j1_target, ha, pierside) mount_eqmod_axisp.update_inc0(j2_target, dec, pierside) msg = "=" if lignes.startswith(":G")==True: axis = int(lignes[2]) digits = lignes[3:5] if axis==1: lastG1 = digits else: lastG2 = digits msg = "=" if lignes.startswith(":I")==True: axis = int(lignes[2]) hexa = lignes[3:9] deci = eqmod.hexa2int(hexa) if axis==1: lastI1 = deci else: lastI2 = deci msg = "=" if lignes.startswith(":H")==True: axis = int(lignes[2]) hexa = lignes[3:9] deci = eqmod.hexa2int(hexa) if axis==1: lastH1 = deci else: lastH2 = deci print("H={} deci={}".format(hexa,deci)) msg = "=" if lignes.startswith(":M")==True: axis = int(lignes[2]) hexa = lignes[3:9] deci = eqmod.hexa2int(hexa) if axis==1: lastM1 = deci else: lastM2 = deci print("M={} deci={}".format(hexa,deci)) msg = "=" if lignes.startswith(":J")==True: # --- select axis axis = int(lignes[2]) if axis==1: lastG = lastG1 lastH = lastH1 lastI = lastI1 multg = int(g1) else: lastG = lastG2 lastH = lastH2 lastI = lastI2 multg = int(g2) # --- motion_type, motion_sense = eqmod.decode_G(lastG) if axis==1: motion_type1 = motion_type motion_sense1 = motion_sense else: motion_type2 = motion_type motion_sense2 = motion_sense # --- choice according the motion type (GOTO=OFFSET or MOVE=CONTINUOUS) if motion_type == eqmod.G_OFFSET_FAST or motion_type == eqmod.G_OFFSET_SLOW: mount_astromecca.log.print("GOTO") # --- GOTO if motion_type == eqmod.G_OFFSET_FAST: # --- fast GOTO deg_per_sec = 10.0 if motion_type == eqmod.G_OFFSET_SLOW: # --- slow GOTO deg_per_sec = 1.0 if axis==1: # --- Convert velocity in inc/sec inc_per_sec_slew = abs(deg_per_sec * mount_astromecca.axisb.inc_per_deg) # --- Compute the target position in EQMOD inc j1_target = j1 + lastH1 if j1_target >= j1: motion_sense1 = eqmod.G_SENSE_POSITIVE if motion_type == eqmod.G_OFFSET_FAST: current_motion1 = eqmod.MOTION_JOG_FAST_POSITIVE else: current_motion1 = eqmod.MOTION_JOG_SLOW_POSITIVE else: motion_sense1 = eqmod.G_SENSE_NEGATIVE if motion_type == eqmod.G_OFFSET_FAST: current_motion1 = eqmod.MOTION_JOG_FAST_NEGATIVE else: current_motion1 = eqmod.MOTION_JOG_SLOW_NEGATIVE # --- get the corresponding EQMOD rot rot, pierside = mount_eqmod_axisb.inc2rot(j1_target) # --- get the corresponding SCX11 inc inc = mount_astromecca.axisb.rot2inc(rot) # --- get the corresponding SCX11 inc mount_astromecca.inc_goto(Mountaxis.BASE, inc, inc_per_sec_slew) motion_type1 = motion_type current_moving1 = eqmod.MOTION_MOVING #print("GOTO motion_type1={} current_motion1={} current_moving1={} current_motor1={}".format(motion_type1,current_motion1,current_moving1,current_motor1)) if axis==2: # --- Convert velocity in inc/sec inc_per_sec_slew = abs(deg_per_sec * mount_astromecca.axisp.inc_per_deg) # --- Compute the target position in EQMOD inc j2_target = j2 + lastH2 if j2_target >= j2: motion_sense2 = eqmod.G_SENSE_POSITIVE if motion_type == eqmod.G_OFFSET_FAST: current_motion2 = eqmod.MOTION_JOG_FAST_POSITIVE else: current_motion2 = eqmod.MOTION_JOG_SLOW_POSITIVE else: motion_sense2 = eqmod.G_SENSE_NEGATIVE if motion_type == eqmod.G_OFFSET_FAST: current_motion2 = eqmod.MOTION_JOG_FAST_NEGATIVE else: current_motion2 = eqmod.MOTION_JOG_SLOW_NEGATIVE # --- get the corresponding EQMOD rot rot, pierside = mount_eqmod_axisp.inc2rot(j2_target) # --- get the corresponding SCX11 inc inc = mount_astromecca.axisp.rot2inc(rot) # --- get the corresponding SCX11 inc mount_astromecca.inc_goto(Mountaxis.POLAR, inc, inc_per_sec_slew) motion_type2 = motion_type current_moving2 = eqmod.MOTION_MOVING #print("GOTO motion_type2={} current_motion2={} current_moving2={} current_motor2={}".format(motion_type2,current_motion2,current_moving2,current_motor2)) if motion_type == eqmod.G_CONTINUOUS_FAST or motion_type == eqmod.G_CONTINUOUS_SLOW: # --- MOVE mount_astromecca.log.print("MOVE") if motion_type == eqmod.G_CONTINUOUS_FAST: # --- fast MOVE mult = multg if motion_type == eqmod.G_CONTINUOUS_SLOW: # --- slow MOVE mult = 1.0 if motion_sense == eqmod.G_SENSE_NEGATIVE: sense = -1 else: sense = 1 # --- velocity if axis==1: ha_drift_deg_per_sec = b1/lastI1/a1*360*mult*sense dec_drift_deg_per_sec = 0 motion_sense1 = motion_sense if motion_sense == eqmod.G_SENSE_POSITIVE: if motion_type == eqmod.G_CONTINUOUS_FAST: current_motion1 = eqmod.MOTION_JOG_FAST_POSITIVE else: current_motion1 = eqmod.MOTION_JOG_SLOW_POSITIVE else: if motion_type == eqmod.G_CONTINUOUS_FAST: current_motion1 = eqmod.MOTION_JOG_FAST_NEGATIVE else: current_motion1 = eqmod.MOTION_JOG_SLOW_NEGATIVE current_moving1 = eqmod.MOTION_MOVING else: ha_drift_deg_per_sec = 0 dec_drift_deg_per_sec = b2/lastI2/a2*360*mult*sense motion_sense2 = motion_sense if motion_sense == eqmod.G_SENSE_POSITIVE: if motion_type == eqmod.G_CONTINUOUS_FAST: current_motion2 = eqmod.MOTION_JOG_FAST_POSITIVE else: current_motion2 = eqmod.MOTION_JOG_SLOW_POSITIVE else: if motion_type == eqmod.G_CONTINUOUS_FAST: current_motion2 = eqmod.MOTION_JOG_FAST_NEGATIVE else: current_motion2 = eqmod.MOTION_JOG_SLOW_NEGATIVE current_moving2 = eqmod.MOTION_MOVING mount_astromecca.hadec_move( ha_drift_deg_per_sec, dec_drift_deg_per_sec) time.sleep(0.2) msg = "=" # --- send the response to the client deci = 0 if msg !="": if len(msg)==7: deci = eqmod.hexa2int(msg[1:]) eqmod_chan.put_chan(msg) if lignes.startswith(":j")==True: display = 0 else: display = 1 if display==1: mount_eqmod.log.print("recu={} envoyé={} decimal={}".format(lignes, msg, deci)) elif display==2: mount_eqmod.log.print("recu={} envoyé={} decimal={} incb={} incp={}".format(lignes, msg, deci, incb, incp)) #dt00 = time.time()-t00 #print("recu={} envoyé={} decimal={:.1f} dt00={:.2f}".format(lignes, msg, deci,dt00)) time.sleep(0.01) #time.sleep(1) #print("lignes={}".format(lignes)) except (KeyboardInterrupt, SystemExit): mount_eqmod.pad_delete() except: raise if example == 3: port_serial_ascom='/dev/ttyUSB0' # loop between ASCOM(PC=COM10) -> PC=COM1 import serial try: fid.close() except: pass fid = serial.Serial( port=port_serial_ascom, baudrate = 9600, parity = serial.PARITY_NONE, stopbits = serial.STOPBITS_ONE, bytesize = serial.EIGHTBITS, timeout = 0 ) while True: # --- read the ASCOM commands lignes = "" try: lignes = fid.readlines() if lignes != []: print("lignes = {}".format(lignes)) except: pass