# -*- coding: utf-8 -*-
import os
import time
import socket
import numpy as np
import matplotlib.pyplot as plt
import math
# --- celme imports
modulename = 'celme'
if modulename in dir():
del celme
if modulename not in dir():
import celme
from .mountastro import Mountastro
from .mountaxis import Mountaxis
from .mountchannel import Mountchannel
from .mountutils_eqmod import Mountutils_eqmod
# #####################################################################
# #####################################################################
# #####################################################################
# Class Mountastro_Astromecca
# #####################################################################
# #####################################################################
# #####################################################################
[docs]class Mountastro_Astromecca(Mountastro,Mountchannel):
# =====================================================================
# =====================================================================
# Private methods
# =====================================================================
# =====================================================================
def error_messages(self, alarm_code):
error_codes = []
error_code = {}
error_code["Phenomenon"] = "No error"
error_code["Alarm_Code"] = "00h"
error_code["ALARM_LED_Blinks"] = "none"
error_code["ALMCLR_Effect"] = "No effect"
error_code["Protective_Function"] = "No error"
error_code["Description"] = "No error"
error_code["Action"] = "Nothing to do"
error_codes.append(error_code)
error_code = {}
error_code["Phenomenon"] = "Motion and sequence execution stop"
error_code["Alarm_Code"] = "32h"
error_code["ALARM_LED_Blinks"] = "1"
error_code["ALMCLR_Effect"] = "Clears alarm"
error_code["Protective_Function"] = "Out of position range"
error_code["Description"] = "The PABS value exceeded the coordinate control range (-2,147,483.648 to +2,147,483.647)."
error_code["Action"] = "Check that PABS is in the range."
error_codes.append(error_code)
error_code = {}
error_code["Phenomenon"] = "Motion and sequence execution stop"
error_code["Alarm_Code"] = "90h"
error_code["ALARM_LED_Blinks"] = "1"
error_code["ALMCLR_Effect"] = "Clears alarm"
error_code["Protective_Function"] = "Stack overflow"
error_code["Description"] = "Sequence memory stack exhausted"
error_code["Action"] = "Restructure sequences to reduce the number of nested blocks or subroutine calls"
error_codes.append(error_code)
error_code = {}
error_code["Phenomenon"] = "Motion and sequence execution stop"
error_code["Alarm_Code"] = "94h"
error_code["ALARM_LED_Blinks"] = "1"
error_code["ALMCLR_Effect"] = "Clears alarm"
error_code["Protective_Function"] = "Sequence reference error"
error_code["Description"] = "Attempt to call a non-existing sequence as a subroutine"
error_code["Action"] = "Revise the CALL statement or rename the intended target sequence"
error_codes.append(error_code)
error_code = {}
error_code["Phenomenon"] = "Motion and sequence execution stop"
error_code["Alarm_Code"] = "98h"
error_code["ALARM_LED_Blinks"] = "1"
error_code["ALMCLR_Effect"] = "Clears alarm"
error_code["Protective_Function"] = "Calculation overflow"
error_code["Description"] = "Sequence calculation result exceeded numerical limits"
error_code["Action"] = "Check math operators, make sure they cannot overflow"
error_codes.append(error_code)
error_code = {}
error_code["Phenomenon"] = "Motion and sequence execution stop"
error_code["Alarm_Code"] = "99h"
error_code["ALARM_LED_Blinks"] = "1"
error_code["ALMCLR_Effect"] = "Clears alarm"
error_code["Protective_Function"] = "Parameter range error"
error_code["Description"] = "Attempt to set a parameter to a value outside its range"
error_code["Action"] = "Make sure all assignments stay within defined limits"
error_codes.append(error_code)
error_code = {}
error_code["Phenomenon"] = "Motion and sequence execution stop"
error_code["Alarm_Code"] = "9Ah"
error_code["ALARM_LED_Blinks"] = "1"
error_code["ALMCLR_Effect"] = "Clears alarm"
error_code["Protective_Function"] = "Zero division"
error_code["Description"] = "Attempt to divide by zero"
error_code["Action"] = "Check division operations, test divisor for zero before division"
error_codes.append(error_code)
error_code = {}
error_code["Phenomenon"] = "Motion and sequence execution stop"
error_code["Alarm_Code"] = "9Dh"
error_code["ALARM_LED_Blinks"] = "1"
error_code["ALMCLR_Effect"] = "Clears alarm"
error_code["Protective_Function"] = "PC command execution error"
error_code["Description"] = "Attempt to modify position counter PC while a motion was in process"
error_code["Action"] = "Make sure that PC is only changed when motor is stopped"
error_codes.append(error_code)
error_code = {}
error_code["Phenomenon"] = "Motion and sequence execution stop"
error_code["Alarm_Code"] = "9Eh"
error_code["ALARM_LED_Blinks"] = "1"
error_code["ALMCLR_Effect"] = "Clears alarm"
error_code["Protective_Function"] = "User variable reference error"
error_code["Description"] = "Attempt to access a non-existing user-defined variable"
error_code["Action"] = "Make sure the target user-defined variable exists: use the correct name in sequence"
error_codes.append(error_code)
error_code = {}
error_code["Phenomenon"] = "Motion and sequence execution stop"
error_code["Alarm_Code"] = "9Fh"
error_code["ALARM_LED_Blinks"] = "1"
error_code["ALMCLR_Effect"] = "Clears alarm"
error_code["Protective_Function"] = "Parameter write error"
error_code["Description"] = "Attempt to change a parameter under invalid condition"
error_code["Action"] = "Check if you tried to write a parameter, which is not allowed to write to, during operation."
error_codes.append(error_code)
error_code = {}
error_code["Phenomenon"] = "Motion and sequence execution stop"
error_code["Alarm_Code"] = "A0h"
error_code["ALARM_LED_Blinks"] = "1"
error_code["ALMCLR_Effect"] = "Clears alarm"
error_code["Protective_Function"] = "Motion while in motion"
error_code["Description"] = "Attempt to execute a motion while an incompatible motion is in progress"
error_code["Action"] = "Make sure motions are not started before a previous motion is complete. Use MEND, poll SIGMOVE, or monitor the MOVE output to detect motion complete."
error_codes.append(error_code)
error_code = {}
error_code["Phenomenon"] = "Motion and sequence execution stop"
error_code["Alarm_Code"] = "E0h"
error_code["ALARM_LED_Blinks"] = "1"
error_code["ALMCLR_Effect"] = "Clears alarm"
error_code["Protective_Function"] = "User alarm"
error_code["Description"] = "ALMSET command intentionally executed"
error_code["Action"] = "If a user alarm was not expected, check sequence programming for inappropriate ALMSET command(s)"
error_codes.append(error_code)
error_code = {}
error_code["Phenomenon"] = "Motion and sequence execution stop"
error_code["Alarm_Code"] = "10h"
error_code["ALARM_LED_Blinks"] = "4"
error_code["ALMCLR_Effect"] = "Clears alarm"
error_code["Protective_Function"] = "Excessive Position Deviation"
error_code["Description"] = "When performing the MEND command or mechanical home seeking operation, the END signal was not output in the time set by ENDWAIT."
error_code["Action"] = "If DEND=0, check whether the overload was occurred or ENDACT (END range) was too small. If DEND=1, check whether the driver END signal is connected, the overload was occurred or the END signal range of the driver was too small."
error_codes.append(error_code)
error_code = {}
error_code["Phenomenon"] = "Motion and sequence execution stop"
error_code["Alarm_Code"] = "60h"
error_code["ALARM_LED_Blinks"] = "7"
error_code["ALMCLR_Effect"] = "Clears alarm"
error_code["Protective_Function"] = "LS logic error"
error_code["Description"] = "Positive and negative position limit signals on simultaneously"
error_code["Action"] = "- Check limit sensors and wiring. - Check input signal configuration. - Check the logic setting for limit sensors (OTLV): Normally open (N.O.) or Normally closed (N.C.)."
error_codes.append(error_code)
error_code = {}
error_code["Phenomenon"] = "Motion and sequence execution stop"
error_code["Alarm_Code"] = "61h"
error_code["ALARM_LED_Blinks"] = "7"
error_code["ALMCLR_Effect"] = "Clears alarm"
error_code["Protective_Function"] = "LS connected in reverse"
error_code["Description"] = "Positive or negative position limit signal detected opposite home seeking direction"
error_code["Action"] = "- Check limit sensors and wiring. - Check input signal configuration. - Check the logic setting for limit sensors (OTLV): Normally open (N.O.) or Normally closed (N.C.)."
error_codes.append(error_code)
error_code = {}
error_code["Phenomenon"] = "Motion and sequence execution stop"
error_code["Alarm_Code"] = "62h"
error_code["ALARM_LED_Blinks"] = "7"
error_code["ALMCLR_Effect"] = "Clears alarm"
error_code["Protective_Function"] = "HOME operation failed"
error_code["Description"] = "Unstable or unexpected position limit signal detected while seeking home position"
error_code["Action"] = "- Check limit sensors and wiring. - Check input signal configuration. - Check the logic setting for limit sensors (OTLV): Normally open (N.O.) or Normally closed (N.C.)."
error_codes.append(error_code)
error_code = {}
error_code["Phenomenon"] = "Motion and sequence execution stop"
error_code["Alarm_Code"] = "63h"
error_code["ALARM_LED_Blinks"] = "7"
error_code["ALMCLR_Effect"] = "Clears alarm"
error_code["Protective_Function"] = "HOMELS not found"
error_code["Description"] = "No HOME input detected between position limit signals while seeking home position"
error_code["Action"] = "Check HOME sensor wiring and connections"
error_codes.append(error_code)
error_code = {}
error_code["Phenomenon"] = "Motion and sequence execution stop"
error_code["Alarm_Code"] = "64h"
error_code["ALARM_LED_Blinks"] = "7"
error_code["ALMCLR_Effect"] = "Clears alarm"
error_code["Protective_Function"] = "TIM, SENSOR signal error"
error_code["Description"] = "TIM position or SENSOR input expected with HOME input: not found"
error_code["Action"] = "Selected mechanical home seeking operation (see HOMETYP) requires a valid SENSOR input and/or a valid TIM position while HOME input active. Make sure HOME and other required input(s) can be active at the same location."
error_codes.append(error_code)
error_code = {}
error_code["Phenomenon"] = "Motion and sequence execution stop"
error_code["Alarm_Code"] = "6Ah"
error_code["ALARM_LED_Blinks"] = "7"
error_code["ALMCLR_Effect"] = "Clears alarm"
error_code["Protective_Function"] = "LS detected during home offset motion"
error_code["Description"] = "Positive or negative position limit signal detected while moving to OFFSET position after homing"
error_code["Action"] = "Make sure that the OFFSET distance, measured from the HOME signal position, does not trigger a limit sensor"
error_codes.append(error_code)
error_code = {}
error_code["Phenomenon"] = "Motion and sequence execution stop"
error_code["Alarm_Code"] = "6Eh"
error_code["ALARM_LED_Blinks"] = "7"
error_code["ALMCLR_Effect"] = "Clears alarm"
error_code["Protective_Function"] = "Driver alarm"
error_code["Description"] = "Driver alarm signal is active"
error_code["Action"] = "Check the driver and see the operating manual of the driver."
error_codes.append(error_code)
error_code = {}
error_code["Phenomenon"] = "Motion and sequence execution stop"
error_code["Alarm_Code"] = "6Fh"
error_code["ALARM_LED_Blinks"] = "7"
error_code["ALMCLR_Effect"] = "Clears alarm"
error_code["Protective_Function"] = "Driver connection error"
error_code["Description"] = "The command was canceled due to no response from the driver during executing the command or before executing the command"
error_code["Action"] = "Be sure to check if driver and the CM10/SCX11 are connected securely."
error_codes.append(error_code)
error_code = {}
error_code["Phenomenon"] = "Motion and sequence execution stop"
error_code["Alarm_Code"] = "70h"
error_code["ALARM_LED_Blinks"] = "7"
error_code["ALMCLR_Effect"] = "Clears alarm"
error_code["Protective_Function"] = "Motion parameter error"
error_code["Description"] = "Attempt to execute motion with incompatible motion parameters"
error_code["Action"] = "- Make sure current is enabled (CURRENT=1). - Home seeking: make sure required inputs are configured. - Linked indexing: make sure all linked segments execute in the same direction."
error_codes.append(error_code)
error_code = {}
error_code["Phenomenon"] = "Motion and sequence execution stop. Motor may or may not have holding torque, depending on ALMACT."
error_code["Alarm_Code"] = "68h"
error_code["ALARM_LED_Blinks"] = "6"
error_code["ALMCLR_Effect"] = "Clears alarm"
error_code["Protective_Function"] = "Panic stop"
error_code["Description"] = "System executed a panic stop because of a PSTOP input or command"
error_code["Action"] = "If a panic stop was unexpected: - Check PSTOP input configuration. - Check sequence programming for inappropriate PSTOP command(s)."
error_codes.append(error_code)
error_code = {}
error_code["Phenomenon"] = "Motion and sequence execution stop. Motor may or may not have holding torque, depending on ALMACT."
error_code["Alarm_Code"] = "66h"
error_code["ALARM_LED_Blinks"] = "7"
error_code["ALMCLR_Effect"] = "Clears alarm"
error_code["Protective_Function"] = "Hardware over travel"
error_code["Description"] = "Positive or negative position limit signal detected"
error_code["Action"] = "- Check motion parameters. - Make sure home position is correct. - Check limit sensors and wiring. - Check input signal configuration. - Check the logic setting for limit sensors (OTLV): Normally open (N.O.) or Normally closed (N.C.)."
error_codes.append(error_code)
error_code = {}
error_code["Phenomenon"] = "Motion and sequence execution stop. Motor may or may not have holding torque, depending on ALMACT."
error_code["Alarm_Code"] = "67h"
error_code["ALARM_LED_Blinks"] = "7"
error_code["ALMCLR_Effect"] = "Clears alarm"
error_code["Protective_Function"] = "Software over travel"
error_code["Description"] = "Position outside of programmed positive and negative position limits"
error_code["Action"] = "- Check motion parameters. - Check software position limits. - Make sure home position is correct."
error_codes.append(error_code)
error_code = {}
error_code["Phenomenon"] = "The motor lacks holding torque."
error_code["Alarm_Code"] = "41h"
error_code["ALARM_LED_Blinks"] = "9"
error_code["ALMCLR_Effect"] = "No effect"
error_code["Protective_Function"] = "EEPROM error"
error_code["Description"] = "User data in non-volatile EEPROM memory is corrupt"
error_code["Action"] = "Contact Oriental Motor to arrange for inspection or repair."
error_codes.append(error_code)
error_code = {}
error_code["Phenomenon"] = "The motor lacks holding torque."
error_code["Alarm_Code"] = "F0h"
error_code["ALARM_LED_Blinks"] = "ON"
error_code["ALMCLR_Effect"] = "No effect"
error_code["Protective_Function"] = "System error"
error_code["Description"] = "System detected unexpected internal logic state"
error_code["Action"] = "Contact Oriental Motor to arrange for inspection or repair."
error_codes.append(error_code)
error_code = {}
error_code["Phenomenon"] = "The motor lacks holding torque."
error_code["Alarm_Code"] = "F1h"
error_code["ALARM_LED_Blinks"] = "ON"
error_code["ALMCLR_Effect"] = "No effect"
error_code["Protective_Function"] = "Memory error"
error_code["Description"] = "Internal memory access error"
error_code["Action"] = "Contact Oriental Motor to arrange for inspection or repair."
error_codes.append(error_code)
error_code = {}
error_code["Phenomenon"] = "The motor lacks holding torque."
error_code["Alarm_Code"] = "F2h"
error_code["ALARM_LED_Blinks"] = "ON"
error_code["ALMCLR_Effect"] = "No effect"
error_code["Protective_Function"] = "Sequence internal error"
error_code["Description"] = "Sequence code invalid or corrupt"
error_code["Action"] = "Contact Oriental Motor to arrange for inspection or repair."
error_codes.append(error_code)
found = False
for error_code in error_codes:
if (error_code["Alarm_Code"] == alarm_code) or (error_code["Alarm_Code"] == alarm_code+'h'):
found = True
break
return found, error_code
def _my_open_chan(self):
# --- Concrete method
# --- Overloading method according the language protocol
fid = self._fid_chan
if fid==None:
return self.ERR_CHAN_NOT_OPENED
# --- Active echo, message returned are not verbose, clear all alarms
for kaxis in range(Mountaxis.AXIS_MAX):
current_axis = self.axis[kaxis]
if current_axis == None:
continue
if current_axis.real == False:
continue
if current_axis.language_protocol == "SCX11":
cmd = self._talks[kaxis]
# --- Check echo
err, res = self.putread_chan(cmd + " ; ECHO",1)
if res!="1":
self.putread_chan(cmd + " ; ECHO 1")
# --- Check echo
err, res = self.putread_chan(cmd + " ; VERBOSE",1)
if res!="0":
self.putread_chan(cmd + " ; VERBOSE 0")
# --- Check alarms
err, res = self.putread_chan(cmd + " ; ALM",1)
#self.log.print("kaxis={} err={} res={}".format(kaxis,err,res))
#res = "00"
almclr = False
if res!="00":
found, error_code = self.error_messages(res)
almclr = True
msg = error_code["Protective_Function"]
msg += error_code["Description"]
self.log.print("ALARM {} axis {} because: {}".format(res,kaxis,msg))
if res=="66" or res=="67":
almclr = False
msg = error_code["Action"]
if almclr==False:
self.log.print("Cannot clear alarm {} axis {}. Action to do = {}".format(res,kaxis,msg))
else:
self.log.print("Clear alarm {} axis {} ({}) ".format(res,kaxis,msg))
err, res = self.putread_chan(cmd + " ; ALMCLR ; ABORT ; ALMCLR")
# --- activate the software limits LIMN and LIMP
# --- and download the absolute position from the controler
err, res = self.putread_chan(cmd + " ; HOMETYP 12 ; PC = 0 ; EHOME")
time.sleep(0.5)
err, res = self.putread_chan(cmd + " ; RUN getabs")
#err, res = self.putread_chan(cmd + " ; RUN init1")
#self.log.print("kaxis={} err={} res={}".format(kaxis,err,res))
time.sleep(0.5)
# --- First read incs because the first time it is always zero
err, res = self.putread_chan(cmd + " ; PC")
#self.log.print("kaxis={} err={} res={}".format(kaxis,err,res))
err, res = self.putread_chan(cmd + " ; TA=0.5 ; TD=0.5")
err, res = self.putread_chan(cmd + " ; N_BACKLASH -1000")
fid.flush()
time.sleep(3) # long time for the axis dec
return self.NO_ERROR
def _my_read_encs(self, incsimus):
# --- Concrete method
# --- Overloading method according the language protocol
increals = incsimus
# --- Loop over all the possible axis types
for kaxis in range(Mountaxis.AXIS_MAX):
current_axis = self.axis[kaxis]
if current_axis == None:
continue
if current_axis.real == False:
continue
# --- This axis is valid and real. We read the position
if current_axis.language_protocol == "SCX11":
cmd = self._talks[kaxis] + " ; PC"
err, res = self.putread_chan(cmd,1)
if err==self.NO_ERROR:
try:
increals[kaxis] = float(res)
except:
self.log.print("Problem reading PC, axis={} res={}".format(kaxis,res))
return increals
def _update_motion_states(self):
"""
In case of slewing started, perhaps the mount is already tracking.
This method allows to switch the ASCOM states according the mount.
"""
# --- Concrete method
# --- Overloading method according the language protocol
#
# --- Loop over all the possible axis types
axes_motion_state_reals = []
for kaxis in range(Mountaxis.AXIS_MAX):
current_axis = self.axis[kaxis]
if current_axis == None:
axes_motion_state_reals.append(Mountaxis.MOTION_STATE_UNKNOWN)
continue
if current_axis.real == False:
axes_motion_state_reals.append(Mountaxis.MOTION_STATE_UNKNOWN)
continue
# --- This axis is valid and real. We read the position
if current_axis.language_protocol == "SCX11":
cmd = self._talks[kaxis] + " ; SIGMOVE"
err, res = self.putread_chan(cmd,1)
if err==self.NO_ERROR:
try:
sigmove = int(res)
except:
self.log.print("Problem reading SIGMOVE, axis={} res={}".format(kaxis,res))
cmd = self._talks[kaxis] + " ; Z"
err, res = self.putread_chan(cmd,1)
z = -1
if err==self.NO_ERROR:
try:
z = float(res)
except:
self.log.print("Problem reading Z, axis={} res={}".format(kaxis,res))
#print("z={} sigmove={} current_axis.motion_state={}".format(z, sigmove, current_axis.motion_state))
current_axis.motion_state = Mountaxis.MOTION_STATE_UNKNOWN
if z==1 and sigmove==1:
current_axis.motion_state = Mountaxis.MOTION_STATE_SLEWING
elif z==2 and sigmove==1:
current_axis.motion_state = Mountaxis.MOTION_STATE_DRIFTING
elif z==3 and sigmove==1:
current_axis.motion_state = Mountaxis.MOTION_STATE_MOVING
elif z==0 or sigmove==0:
current_axis.motion_state = Mountaxis.MOTION_STATE_NOMOTION
cmd = self._talks[kaxis] + " ; Z=0"
err, res = self.putread_chan(cmd,1)
# ---
axes_motion_state_reals.append(current_axis.motion_state)
return axes_motion_state_reals
def _my_hadec_drift(self,hadec_speeddrift_ha_deg_per_sec:float, hadec_speeddrift_dec_deg_per_sec:float):
# --- Concrete method
# --- Overloading method according the language protocol
err = self.NO_ERROR
res = 0
ha_target = 0 # any value is correct
dec_target = 0 # any value is correct
# --- Compute incs of the target and the drifts (for any mount_type)
celb, celp, celr, dcelb, dcelp, dcelr = self.astro2cel("HADEC", ha_target, dec_target, hadec_speeddrift_ha_deg_per_sec, hadec_speeddrift_dec_deg_per_sec)
# --- Loop over all the possible axis types
for kaxis in range(Mountaxis.AXIS_MAX):
current_axis = self.axis[kaxis]
if current_axis == None:
continue
if current_axis.real == False:
continue
# --- This axis is valid and real. We read the position
if current_axis.language_protocol == "SCX11":
# === Target position and drift
if kaxis == Mountaxis.BASE:
dcel = dcelb
elif kaxis == Mountaxis.POLAR:
dcel = dcelp
elif kaxis == Mountaxis.ROTATOR:
dcel = dcelr
# === Drift Velocity inc/sec
inc_per_sec_drift = dcel * current_axis.senseinc * current_axis.inc_per_deg
#print("Inital drift={} inc/sec".format(inc_per_sec_drift))
inc_per_sec_drift *= self.mult_inc_per_sec_drift
if self.site.latitude>=0:
inc_per_sec_drift *= -1
# ---
cmd = self._talks[kaxis]
cmd += " ; MSTOP ; ABORT"
self.log.print("hadec_drift {} cmd >> {}".format(current_axis._axis_type, cmd))
err, res = self.putread_chan(cmd)
time.sleep(0.05)
if inc_per_sec_drift!=0:
cmd = self._talks[kaxis]
cmd += " ; N_DRFVEL={:.3f}".format(inc_per_sec_drift)
self.log.print("hadec_drift {} cmd >> {}".format(current_axis._axis_type, cmd))
err, res = self.putread_chan(cmd)
cmd = self._talks[kaxis]
if kaxis == Mountaxis.BASE:
cmd += " ; N_PECPHASE=0; N_PEC1VEL=0.0 ; N_PEC2VEL=0.0"
#cmd += " ; N_PECPHASE=-80; N_PEC1VEL=0.120; N_PEC2VEL=0.080"
else:
cmd += " ; N_PECPHASE=0; N_PEC1VEL=0.0 ; N_PEC2VEL=0.0"
cmd += " ; RUN drf1"
self.log.print("hadec_drift {} cmd >> {}".format(current_axis._axis_type, cmd))
err, res = self.putread_chan(cmd)
current_axis.motion_state = Mountaxis.MOTION_STATE_DRIFTING
return err, res
def _my_hadec_goto(self, ha_target, dec_target, pierside_target):
# --- Concrete method
# --- Overloading method according the language protocol
err = self.NO_ERROR
res = 0
# --- Compute incs of the target and the drifts (for any mount_type)
celb, celp, celr, dcelb, dcelp, dcelr = self.astro2cel("HADEC", ha_target, dec_target, self._hadec_speeddrift_ha_deg_per_sec, self._hadec_speeddrift_dec_deg_per_sec)
incb, incp = self.cel2enc(celb, celp, pierside_target, self.OUTPUT_SHORT, self.SAVE_NONE)
incr = 0
#print("ha_target={} dec_target={} pierside_target={}".format(ha_target,dec_target,pierside_target))
#print("celb={} celp={}".format(celb,celp))
#print("incb={} incp={}".format(incb,incp))
# --- Loop over all the possible axis types
for kaxis in range(Mountaxis.AXIS_MAX):
current_axis = self.axis[kaxis]
if current_axis == None:
continue
if current_axis.real == False:
continue
# --- This axis is valid and real. We read the position
if current_axis.language_protocol == "SCX11":
# === Target position and drift
if kaxis == Mountaxis.BASE:
inc = incb
dcel = dcelb
elif kaxis == Mountaxis.POLAR:
inc = incp
dcel = dcelp
elif kaxis == Mountaxis.ROTATOR:
inc = incr
dcel = dcelr
# === Target slew
dslw = current_axis.slew_deg_per_sec
# === Slew Velocity inc/sec
inc_per_sec_slew = dslw * current_axis.senseinc * current_axis.inc_per_deg
# === Drift Velocity inc/sec
inc_per_sec_drift = dcel * current_axis.senseinc * current_axis.inc_per_deg
#print("Inital drift={} inc/sec".format(inc_per_sec_drift))
inc_per_sec_drift *= self.mult_inc_per_sec_drift
if self.site.latitude>=0:
inc_per_sec_drift *= -1
# ---
cmd = self._talks[kaxis]
cmd += " ; MSTOP ; ABORT"
self.log.print("hadec_goto {} cmd >> {}".format(current_axis._axis_type, cmd))
err, res = self.putread_chan(cmd)
time.sleep(0.05)
if inc_per_sec_drift==0:
cmd = self._talks[kaxis]
cmd += " ; N_GOTOVEL={:.3f}".format(abs(inc_per_sec_slew))
cmd += " ; N_GOTOPOS={:.0f}".format(inc)
cmd += " ; RUN goto1"
self.log.print("hadec_goto {} cmd >> {}".format(current_axis._axis_type, cmd))
err, res = self.putread_chan(cmd)
else:
cmd = self._talks[kaxis]
cmd += " ; N_GOTOVEL={:.3f}".format(abs(inc_per_sec_slew))
cmd += " ; N_GOTOPOS={:.0f}".format(inc)
cmd += " ; N_DRFVEL={:.3f}".format(inc_per_sec_drift)
self.log.print("hadec_goto {} cmd >> {}".format(current_axis._axis_type, cmd))
err, res = self.putread_chan(cmd)
cmd = self._talks[kaxis]
if kaxis == Mountaxis.BASE:
cmd += " ; N_PECPHASE=0; N_PEC1VEL=0.0 ; N_PEC2VEL=0.0"
#cmd += " ; N_PECPHASE=-80; N_PEC1VEL=0.120; N_PEC2VEL=0.080"
else:
cmd += " ; N_PECPHASE=0; N_PEC1VEL=0.0 ; N_PEC2VEL=0.0"
cmd += " ; RUN goto_drf1"
self.log.print("hadec_goto {} cmd >> {}".format(current_axis._axis_type, cmd))
err, res = self.putread_chan(cmd)
current_axis.motion_state = Mountaxis.MOTION_STATE_SLEWING
return err, res
def _my_hadec_move(self, ha_move_deg_per_sec, dec_move_deg_per_sec):
# --- Concrete method
# --- Overloading method according the language protocol
err = self.NO_ERROR
res = 0
ha_target = 0 # any value is correct
dec_target = 0 # any value is correct
# --- Compute incs of the target and the drifts (for any mount_type)
celb, celp, celr, dcelb, dcelp, dcelr = self.astro2cel("HADEC", ha_target, dec_target, ha_move_deg_per_sec, dec_move_deg_per_sec)
# ---
# --- Loop over all the possible axis types
for kaxis in range(Mountaxis.AXIS_MAX):
current_axis = self.axis[kaxis]
if current_axis == None:
continue
if current_axis.real == False:
continue
# --- This axis is valid and real. We read the position
if current_axis.language_protocol == "SCX11":
# === Target position and drift
if kaxis == Mountaxis.BASE:
dmov = dcelb
elif kaxis == Mountaxis.POLAR:
dmov = dcelp
elif kaxis == Mountaxis.ROTATOR:
dmov = dcelr
# === Move Velocity inc/sec
inc_per_sec_move = dmov * current_axis.senseinc * current_axis.inc_per_deg
#print("Inital inc_per_sec_move={} inc/sec".format(inc_per_sec_move))
if self.site.latitude>=0:
inc_per_sec_move *= -1
# ---
cmd = self._talks[kaxis]
cmd += " ; MSTOP ; ABORT"
self.log.print("hadec_move {} cmd >> {}".format(current_axis._axis_type, cmd))
err, res = self.putread_chan(cmd)
time.sleep(0.05)
# ---
cmd = self._talks[kaxis]
#cmd += " ; Z = 0" ; # means no drift
cmd += " ; N_GOTOVEL={:.3f}".format(inc_per_sec_move)
cmd += " ; RUN gotolimit1"
self.log.print("hadec_move {} cmd >> {}".format(current_axis._axis_type, cmd))
err, res = self.putread_chan(cmd)
current_axis.motion_state = Mountaxis.MOTION_STATE_MOVING
return err, res
def _my_hadec_move_stop(self):
# --- Concrete method
# --- Overloading method according the language protocol
err = self.NO_ERROR
res = 0
ha_target = 0 # any value is correct
dec_target = 0 # any value is correct
# --- Compute incs of the target and the drifts (for any mount_type)
celb, celp, celr, dcelb, dcelp, dcelr = self.astro2cel("HADEC", ha_target, dec_target, self._hadec_speeddrift_ha_deg_per_sec, self._hadec_speeddrift_dec_deg_per_sec)
# --- Loop over all the possible axis types
for kaxis in range(Mountaxis.AXIS_MAX):
current_axis = self.axis[kaxis]
if current_axis == None:
continue
if current_axis.real == False:
continue
# --- This axis is valid and real. We read the position
if current_axis.language_protocol == "SCX11":
# === Target position and drift
if kaxis == Mountaxis.BASE:
dcel = dcelb
elif kaxis == Mountaxis.POLAR:
dcel = dcelp
elif kaxis == Mountaxis.ROTATOR:
dcel = dcelr
# === Drift Velocity inc/sec
inc_per_sec_drift = dcel * current_axis.senseinc * current_axis.inc_per_deg
#print("Inital drift={} inc/sec".format(inc_per_sec_drift))
inc_per_sec_drift *= self.mult_inc_per_sec_drift
if self.site.latitude>=0:
inc_per_sec_drift *= -1
# ---
cmd = self._talks[kaxis]
cmd += " ; MSTOP ; ABORT"
self.log.print("hadec_move_stop {} cmd >> {}".format(current_axis._axis_type, cmd))
err, res = self.putread_chan(cmd)
time.sleep(0.05)
if inc_per_sec_drift!=0:
cmd = self._talks[kaxis]
cmd += " ; N_DRFVEL={:.3f}".format(inc_per_sec_drift)
self.log.print("hadec_move_stop {} cmd >> {}".format(current_axis._axis_type, cmd))
err, res = self.putread_chan(cmd)
cmd = self._talks[kaxis]
if kaxis == Mountaxis.BASE:
cmd += " ; N_PECPHASE=0; N_PEC1VEL=0.0 ; N_PEC2VEL=0.0"
#cmd += " ; N_PECPHASE=-80; N_PEC1VEL=0.120; N_PEC2VEL=0.080"
else:
cmd += " ; N_PECPHASE=0; N_PEC1VEL=0.0 ; N_PEC2VEL=0.0"
cmd += " ; RUN drf1"
current_axis.motion_state = Mountaxis.MOTION_STATE_DRIFTING
else:
current_axis.motion_state = Mountaxis.MOTION_STATE_NOMOTION
self.log.print("hadec_move_stop {} cmd >> {}".format(current_axis._axis_type, cmd))
err, res = self.putread_chan(cmd)
return err, res
def _my_hadec_stop(self):
# --- Concrete method
# --- Overloading method according the language protocol
err = self.NO_ERROR
res = 0
# --- Loop over all the possible axis types
for kaxis in range(Mountaxis.AXIS_MAX):
current_axis = self.axis[kaxis]
if current_axis == None:
continue
if current_axis.real == False:
continue
# --- This axis is valid and real. We read the position
if current_axis.language_protocol == "SCX11":
# ---
cmd = self._talks[kaxis]
cmd += " ; MSTOP" # or ABORT at high speed ?
err, res = self.putread_chan(cmd)
current_axis.motion_state = Mountaxis.MOTION_STATE_NOMOTION
return err, res
def _my_inc_goto(self, axis_id:int, inc:float, inc_per_sec_slew:float):
# --- Concrete method
# --- Overloading method according the language protocol
cmd = ""
err = self.NO_ERROR
res = 0
kaxis = axis_id
current_axis = self.axis[kaxis]
if current_axis.real==True:
cmd = self._talks[kaxis]
if cmd != "":
cmd += " ; MSTOP ; ABORT ; VR={:.3f}".format(inc_per_sec_slew)
cmd += " ; TA=0.5 ; TD=0.5 ; MA {:.0f}".format(inc)
self.log.print("inc_goto axis={} cmd >> {}".format(axis_id,cmd))
err, res = self.putread_chan(cmd)
return err, res
def _my_goto_park(self, incb:float, incp:float, incr:float):
# --- Concrete method
# --- Overloading method according the language protocol
err = self.NO_ERROR
res = 0
self.hadec_goto(self.park_ha,self.park_dec,side=self.park_side)
return err, res
[docs] def hadec_travel(self):
"""
Exemple of using RUN lutpos1
"""
dposs = np.zeros(20)
vels = np.zeros(20)
kaxis = 0
cmd = self._talks[kaxis] + " ; PC"
err, res = self.putread_chan(cmd,1)
pc0 = float(res)
dposs = np.linspace(1000,100000,20)
poss = pc0 + dposs
vels += 1000
n = len(dposs)
for k in range(n):
v = 1000 + (10000-1000)*math.exp(-(k-n/2.)*(k-n/2.)/12.0)
vels[k] = v
#return poss, vels
# --
k = 0
cmd = self._talks[kaxis]
cmd += " ; A={:.3f}".format(abs(vels[k]))
cmd += " ; B={:.3f}".format(abs(poss[k]))
self.log.print("hadec_travel cmd >> {}".format(cmd))
err, res = self.putread_chan(cmd)
# --
k = 1
cmd = self._talks[kaxis]
cmd += " ; C={:.3f}".format(abs(vels[k]))
cmd += " ; D={:.3f}".format(abs(poss[k]))
self.log.print("hadec_travel cmd >> {}".format(cmd))
err, res = self.putread_chan(cmd)
# --
k = 2
cmd = self._talks[kaxis]
cmd += " ; E={:.3f}".format(abs(vels[k]))
cmd += " ; F={:.3f}".format(abs(poss[k]))
self.log.print("hadec_travel cmd >> {}".format(cmd))
err, res = self.putread_chan(cmd)
# --
k = 3
cmd = self._talks[kaxis]
cmd += " ; G={:.3f}".format(abs(vels[k]))
cmd += " ; H={:.3f}".format(abs(poss[k]))
self.log.print("hadec_travel cmd >> {}".format(cmd))
err, res = self.putread_chan(cmd)
# --
cmd = self._talks[kaxis]
cmd += " ; run lutpos1"
err, res = self.putread_chan(cmd)
# --
cmd = self._talks[kaxis]
cmd += " ; Y"
err, res = self.putread_chan(cmd,1)
y0 = float(res)
y = y0
self.log.print("hadec_travel y0={}".format(y0))
# --
n = len(poss)
for k in range(4,n):
cmd = self._talks[kaxis] + " ; PC"
err, res = self.putread_chan(cmd,1)
pc = float(res)
self.log.print("hadec_travel k={} y={} pc={}".format(k,y,pc))
while True:
# --
cmd = self._talks[kaxis]
cmd += " ; Y"
err, res = self.putread_chan(cmd,1)
y = float(res)
if (y!=y0):
break
y0 = y
self.log.print("hadec_travel k={} y={}".format(k,y))
cmd = self._talks[kaxis]
if (y==2):
cmd += " ; A={:.3f}".format(abs(vels[k]))
cmd += " ; B={:.3f}".format(abs(poss[k]))
elif (y==3):
cmd += " ; C={:.3f}".format(abs(vels[k]))
cmd += " ; D={:.3f}".format(abs(poss[k]))
elif (y==4):
cmd += " ; E={:.3f}".format(abs(vels[k]))
cmd += " ; F={:.3f}".format(abs(poss[k]))
elif (y==1):
cmd += " ; G={:.3f}".format(abs(vels[k]))
cmd += " ; H={:.3f}".format(abs(poss[k]))
self.log.print("hadec_travel cmd >> {}".format(cmd))
err, res = self.putread_chan(cmd)
kaxis = 0
cmd = self._talks[kaxis] + " ; SSTOP ; ABORT"
err, res = self.putread_chan(cmd,1)
self.log.print("hadec_travel Finished")
def tachymeter_read_enc(self, axis_id:int):
cmd = ""
err = self.NO_ERROR
res = 0
kaxis = axis_id
current_axis = self.axis[kaxis]
if current_axis.real==True:
cmd = self._talks[kaxis]
if cmd != "":
cmd += " ; PC"
err, res = self.putread_chan(cmd,1)
return err, res
def tachymeter_check_vrs(self, duration_sec:float, vrmin:float, vrmax:float, dvr:float):
duration_sec0 = duration_sec
axis_id = Mountaxis.BASE
current_axis = self.axis[axis_id]
vr = vrmin
while vr <= vrmax:
deg_per_sec = 1.0*vr/(current_axis.senseinc * current_axis.inc_per_deg)
arcsec_per_sec = deg_per_sec*3600
duration_sec = duration_sec0
deg = deg_per_sec * duration_sec
if deg>120:
duration_sec /= 2
v, dv, ratio = self.tachymeter_check_vr(duration_sec, vr)
msg = "==> VR={:.3f} Vobs={:.4f} +/- {:.4f} ratio={:.5f} drift={:.3f}".format(vr,v,dv,ratio,arcsec_per_sec)
self.log.print(msg)
vr +=dvr
def tachymeter_check_vr(self, duration_sec:float, vr:float=""):
axis_id = Mountaxis.BASE
# ---
if vr=="":
cmd = self._talks[axis_id] + " ; VR"
err, res = self.putread_chan(cmd,1)
if err==self.NO_ERROR:
vr = float(res)
else:
vr = 0
#self.log.print("vr = {}".format(vr))
# ---
if False:
# --- methode classique
motion = "MCN"
# ---
cmd = "MSTOP".format(vr,motion)
err, res = self.putread_chan(cmd,1)
time.sleep(1)
cmd = "VR={:.3f} ; {}".format(vr,motion)
err, res = self.putread_chan(cmd,1)
else:
# --- methode en alternance
inc_per_sec_drift = vr
#period = 10.0 # sec
#dpr = self.axis[axis_id].inc_per_motor_rev/10000
#fdpr = round((1.0+dpr),3)-1.0
#m = inc_per_sec_drift/fdpr
#print("Corrected drift={} inc/sec dpr={} fdpr={} m={}".format(inc_per_sec_drift,dpr,fdpr,m))
#mint1 = math.floor(m)
#cycrat = m - mint1
#vr1 = (mint1+0.5)*fdpr
#p1 = (1-cycrat)*period
#mint2 = mint1+1
#vr2 = (mint2+0.5)*fdpr
#p2 = cycrat*period
#print("C={:.3f}({:.3f}) D={:.3f} E={:.3f}({:.3f}) F={:.3f}".format(vr1,mint1*fdpr,p1,vr2,mint2*fdpr,p2))
# ---
cmd = self._talks[axis_id]
cmd += " ; MSTOP ; ABORT"
err, res = self.putread_chan(cmd)
time.sleep(0.1)
cmd = self._talks[axis_id]
cmd += " ; N_DRFVEL={:.3f}".format(inc_per_sec_drift)
err, res = self.putread_chan(cmd)
cmd = self._talks[axis_id]
cmd += " ; TA = 0.5 ; TD=0.5 ; RUN drf1"
err, res = self.putread_chan(cmd)
time.sleep(2)
# ---
v_av0 = 0
v_av = 0
t0 = time.time()
ts = []
incs = []
dincs = []
while True:
t = time.time()
dt_sec = (t-t0)
if dt_sec > duration_sec:
break
# ---
err, inc = self.tachymeter_read_enc(axis_id)
inc = float(inc)
if incs==[]:
inc0 = inc
dinc = inc - inc0
ts.append(dt_sec)
incs.append(inc)
dincs.append(dinc)
# ---
if len(incs)>=2:
# --- instantaneous
dinc2 = dincs[-1]
dinc1 = dincs[-2]
t2 = ts[-1]
t1 = ts[-2]
dinc = dinc2-dinc1 # inc
dt = t2-t1 # sec
if len(incs)>=3:
# --- averaged
dinc2 = dincs[-1]
dinc1 = dincs[1]
t2 = ts[-1]
t1 = ts[1]
dinc = dinc2-dinc1 # inc
dt = t2-t1 # sec
v_av = dinc/dt # inc/sec
else:
v_av = 0
dv_av = v_av - v_av0
# ---
#ligne = "{} {} {}".format(dt_sec,v,v_av)
#with open("sideral_positions.txt","a",encoding='utf-8') as fid:
# fid.write(ligne+"\n")
# ---
#self.log.print("dt_sec={:.1f} v={:.4f} v_av={:.4f} dv_av={:.4f}".format(dt_sec,v,v_av,dv_av))
time.sleep(5.0)
v_av0 = v_av
# ---
cmd = "SSTOP"
err, res = self.putread_chan(cmd,1)
# ---
# ===
if v_av==0:
ratio = 1
else:
ratio = vr/v_av
return v_av, abs(dv_av), ratio
def tachymeter_check_sideral(self, duration_min, mult=""):
self.log.print("calib_sideral_duration {} minutes".format(duration_min))
# ---
if mult=="":
mult = self.mult_inc_per_sec_drift
self.mult_inc_per_sec_drift = mult
self.log.print("mult = {}".format(mult))
# ---
angle = str(self.site.longitude)
date = celme.Date("now")
jd = date.jd() + 3./24
longuai = celme.Angle(angle).rad()
meca = celme.Mechanics()
lst = meca._mc_tsl(jd,-longuai)
ang = celme.Angle(str(lst)+"r")
lst_hms = ang.sexagesimal("H0.0")
ra = lst_hms
dec = 0
self.log.print("Pointing {} {}".format(ra,dec))
self.hadec_speeddrift("diurnal",0)
self.radec_goto(ra, dec, blocking=False)
time.sleep(10.0)
self.log.print("End of pointing")
#diurnal_drift = 360*3600/self._sideral_sec_per_day # 15.0410844 arcsec/sec
t0 = time.time()
ts = []
ras = []
dras = []
while True:
t = time.time()
dt_sec = (t-t0)
dt_min = dt_sec/60.
if dt_min > duration_min:
self.log.print("exit after dt_min={}".format(dt_min))
break
# ---
ra, dec, side = self.radec_coord()
ra_deg = celme.Angle(ra).deg()
if ras==[]:
ra_deg0 = ra_deg
dra_deg = ra_deg - ra_deg0
ts.append(dt_sec)
ras.append(ra_deg)
dras.append(dra_deg*3600)
# ---
if len(ras)>=2:
# --- instantaneous
dra2 = dras[-1]
dra1 = dras[-2]
t2 = ts[-1]
t1 = ts[-2]
dra = dra2-dra1 # arcsec
dt = t2-t1 # sec
drift = dra/dt # arcsec/sec
else:
drift = 0
if len(ras)>=3:
# --- averaged
dra2 = dras[-1]
dra1 = dras[1]
t2 = ts[-1]
t1 = ts[1]
dra = dra2-dra1 # arcsec
dt = t2-t1 # sec
drift_av = dra/dt # arcsec/sec
else:
drift_av = 0
# ---
ligne = "{} {} {}".format(dt_sec,dra_deg*3600,ra_deg)
with open("sideral_positions.txt","a",encoding='utf-8') as fid:
fid.write(ligne+"\n")
# ---
self.log.print("dt_sec={:.1f} dra={:.2f} drift={:.2f} arcsec/sec av={:2f}".format(dt_sec,dra_deg*3600,drift,drift_av))
time.sleep(5.0)
# ===
xs = np.array(ts)
ys = np.array(dras)
fig = plt.figure()
ax = fig.add_subplot(1,1,1)
h = ax.plot(xs,ys,"r-")
h[0].set_linewidth(1.5)
plt.title("Derive during sideral drift mult={}".format(mult))
plt.xlabel("time (sec)")
plt.ylabel("R.A. offset (arcsec)")
plt.grid(True)
plt.show()
outfile = "sideral_positions.png"
plt.savefig(outfile, facecolor='w', edgecolor='w')
self.hadec_stop()
# =====================================================================
# =====================================================================
# Methods for experimented users (debug, etc)
# =====================================================================
# =====================================================================
def putread(self, axis_id:int, msg, index=-1, disp=True):
cmd = ""
err = self.NO_ERROR
res = 0
kaxis = axis_id
current_axis = self.axis[kaxis]
if current_axis.real==True:
cmd = self._talks[kaxis]
if cmd != "":
cmd += " ; {}".format(msg)
if disp==True:
self.log.print("putread {} cmd >> {}".format(axis_id,cmd))
err, res = self.putread_chan(cmd, index)
return err, res
# =====================================================================
# =====================================================================
# Methods for users
# =====================================================================
# =====================================================================
# =====================================================================
# =====================================================================
# Special methods
# =====================================================================
# =====================================================================
def __init__(self, *args, **kwargs):
"""
Conversion from Uniform Python object into protocol language
Usage : Mountastro("HADEC", name="SCX11")
"""
# === Decode params
# --- Use the __init__ of the parent class Mountastro
super(Mountastro_Astromecca,self).__init__(*args, **kwargs)
# --- Special params for this mount
# --- Dicos of optional and mandatory parameters
params_optional = {}
# --- special SCX11 for TALK function
params_optional["CONTROLLER_BASE_ID"] = (int,1)
params_optional["CONTROLLER_POLAR_ID"] = (int,2)
params_optional["CONTROLLER_ROT_ID"] = (int,3)
params_mandatory = {}
# --- Decode parameters
params = self.decode_kwargs(params_optional, params_mandatory, **kwargs)
# --- Add the personnal dict to the general one
self._mount_params.update(params)
# === Configure according params
# --- talks are string that are prefix of SCX11 commands to switch in the dedicaded controler in daisy chain
self._talks = ["" for kaxis in range(Mountaxis.AXIS_MAX)]
self._talks[Mountaxis.BASE] = "TALK"+str(self._mount_params["CONTROLLER_BASE_ID"])
self._talks[Mountaxis.POLAR] = "TALK"+str(self._mount_params["CONTROLLER_POLAR_ID"])
self._talks[Mountaxis.ROT] = "TALK"+str(self._mount_params["CONTROLLER_ROT_ID"])
# --- shortcuts
axisb = self.axis[Mountaxis.BASE]
axisp = self.axis[Mountaxis.POLAR]
axisr = self.axis[Mountaxis.ROT]
# --- initialize
if axisb!=None:
axisb.real = True
axisb.ratio_wheel_puley = 6.29870
axisb.inc_per_motor_rev = 1540 # IMC parameter. System Confg -> System Parameters - Distance/Revolution
axisb.ratio_puley_motor = 100 # harmonic reducer
axisb.senseinc = -1
axisb.slew_deg_per_sec = 10
axisb.update_inc0(0,-90,axisb.PIERSIDE_POS1)
axisb.language_protocol = "SCX11"
if axisp!=None:
axisp.real = False
axisp.ratio_wheel_puley = 6.4262
axisp.inc_per_motor_rev = 1525
axisp.ratio_puley_motor = 100 # harmonic reducer
axisp.senseinc = -1
axisp.slew_deg_per_sec = 10
axisp.update_inc0(0,90,axisp.PIERSIDE_POS1)
axisp.language_protocol = "SCX11"
if axisr!=None:
axisr.real = False
axisr.ratio_wheel_puley = 6.4262
axisr.inc_per_motor_rev = 1525
axisp.ratio_puley_motor = 100
axisr.senseinc = -1
axisr.slew_deg_per_sec = 10
axisr.update_inc0(0,90,axisr.PIERSIDE_POS1)
axisr.language_protocol = "SCX11"
# ---
self.mult_inc_per_sec_drift = 1.017 * 0.9914 ; # 0.9984 # 1.00517
self.mult_inc_per_sec_drift = 1.0082+0.0082*1.4
def __del__(self):
try:
self.close_chan()
except:
pass
# #####################################################################
# #####################################################################
# #####################################################################
# Main
# #####################################################################
# #####################################################################
# #####################################################################
if __name__ == "__main__":
cwd = os.getcwd()
hostname = socket.gethostname()
example = 1
print("Example = {}".format(example))
if 'mount_astromecca' in globals():
del(mount_astromecca)
if 'eqmod_chan' in globals():
del(eqmod_chan)
#try:
# mount_astromecca.close_chan()
#except:
# pass
#try:
# eqmod_chan.close_chan()
#except:
# pass
# --- configuration depending the computer
if hostname == "titanium":
print("Configuration = {}".format(hostname))
port_serial_scx11='//./com6' ; # '//./com4'
port_serial_eqmod='//./com1'
elif hostname == "rapido2":
print("Configuration = {}".format(hostname))
port_serial_scx11='/dev/ttyAMA0'
port_serial_eqmod='/dev/ttyAMA1'
else:
print("Attention, pas de configuration pour {}".format(hostname))
port_serial_scx11='//./com1'
port_serial_eqmod='//./com2'
print("port_scx11 = {}".format(port_serial_scx11))
print("port_eqmod = {}".format(port_serial_eqmod))
if example == 1:
"""
Basic example
"""
home = celme.Home("GPS 2.0375 E 43.6443484725 136.9")
site = celme.Site(home)
# === ASTROMECCA connection
mount_astromecca = Mountastro_Astromecca("HADEC", name="Guitalens Mount", manufacturer="Astro MECCA", model="TM350", serial_number="beta001", site=site, CONTROLLER_BASE_ID=1, CONTROLLER_POLAR_ID=2)
mount_astromecca.set_channel_params("SERIAL", port=port_serial_scx11, baud_rate=115200, delay_init_chan=0.1, end_of_command_to_send="\r\n".encode('utf8'), end_of_command_to_receive="\r\n".encode('utf8'), delay_put_read=0.06)
mount_astromecca.verbose_chan = False
# --- shortcuts
mount_astromecca_axisb = mount_astromecca.axis[Mountaxis.BASE]
mount_astromecca_axisp = mount_astromecca.axis[Mountaxis.POLAR]
# --- simulation or not
mount_astromecca_axisb.real = False
mount_astromecca_axisb.ratio_wheel_puley = 6.132857; # 6.27819 ; # 6.32721 ; # D=208.0 ; d=32.5 ; f=1.5 ; (D+f/2)/(d+f/2)
mount_astromecca_axisb.inc_per_motor_rev = 1540 # DPR for -490000 to +490000
mount_astromecca_axisb.senseinc = 1
mount_astromecca_axisp.real = False
mount_astromecca_axisp.ratio_wheel_puley = 6.75 ; # 6.75 ; # 6.7462935 ; # D=208.0 ; d=30.0 ; f=1.5 ; (D+f/2)/(d+f/2)
mount_astromecca_axisp.inc_per_motor_rev = 1421
mount_astromecca_axisp.senseinc = -1
# --- Initial ha,dec for encoders
#mount_astromecca_axisb.update_inc0(10750,-90,mount_astromecca_axisb.PIERSIDE_POS1)
mount_astromecca.set_param("CONFIGURATION","Fork")
if mount_astromecca.get_param("CONFIGURATION")=="German":
# --- German mount
mount_astromecca.set_param("LABEL_REGULAR","Tube West") ; # Tube west = PIERSIDE_POS1
mount_astromecca.set_param("LABEL_FLIPED","Tube East")
mount_astromecca.set_param("CAN_REVERSE",True)
mount_astromecca.set_param("LIME_REVERSE",+30) ; # Tube west = PIERSIDE_POS1 = [-180 : lim_side_east]
mount_astromecca.set_param("LIMW_REVERSE",-30) ; # Tube east = PIERSIDE_POS2 = [lim_side_west : +180]
mount_astromecca_axisb.update_inc0(0,-90,mount_astromecca_axisb.PIERSIDE_POS1)
mount_astromecca_axisp.update_inc0(0,90,mount_astromecca_axisp.PIERSIDE_POS1)
if mount_astromecca_axisb.real == True:
mount_astromecca_axisb.update_inc0(62500,-90,mount_astromecca_axisb.PIERSIDE_POS1)
if mount_astromecca_axisp.real == True:
mount_astromecca_axisp.update_inc0(6500,90,mount_astromecca_axisp.PIERSIDE_POS1)
mount_astromecca.park_ha = 270
mount_astromecca.park_dec = 90
mount_astromecca.park_side = mount_astromecca_axisb.PIERSIDE_POS1
elif mount_astromecca.get_param("CONFIGURATION")=="Fork":
# --- Fork mount. Tube always "west" in "auto"
mount_astromecca.set_param("LABEL_REGULAR","Regular") ; # Regular = PIERSIDE_POS1
mount_astromecca.set_param("LABEL_FLIPED","Fliped")
mount_astromecca.set_param("CAN_REVERSE",False)
mount_astromecca.set_param("LIME_REVERSE",+90) ; # Tube west = PIERSIDE_POS1 = [-180 : lim_side_east]
mount_astromecca.set_param("LIMW_REVERSE",-90) ; # Tube east = PIERSIDE_POS2 = [lim_side_west : +180]
mount_astromecca_axisb.update_inc0(0,0,mount_astromecca_axisb.PIERSIDE_POS1)
mount_astromecca_axisp.update_inc0(0,90,mount_astromecca_axisp.PIERSIDE_POS1)
if mount_astromecca_axisb.real == True:
mount_astromecca_axisb.update_inc0(62500,0,mount_astromecca_axisb.PIERSIDE_POS1)
if mount_astromecca_axisp.real == True:
mount_astromecca_axisp.update_inc0(6500,90,mount_astromecca_axisp.PIERSIDE_POS1)
else:
mount_astromecca_axisp._incsimu = -239793.8
mount_astromecca.park_ha = 0
mount_astromecca.park_dec = 0
mount_astromecca.park_side = mount_astromecca_axisb.PIERSIDE_POS1
# --- first read of encoders (zero values the first time)
incsimus = ["" for kaxis in range(Mountaxis.AXIS_MAX)]
mount_astromecca.enc2cel(incsimus, save=mount_astromecca.SAVE_ALL)
# --- second read of encoders (valid values)
time.sleep(0.05)
mount_astromecca.enc2cel(incsimus, save=mount_astromecca.SAVE_ALL)
# --- Init the simulation values according the real ones
mount_astromecca_axisb.synchro_real2simu()
# --- Get the initial position
res = mount_astromecca.hadec_coord()
mount_astromecca.log.print("Initial position = {}".format(res))
# ======= Controller
mount_astromecca.speedslew(10.0,10.0)
t0 = time.time()
mount_astromecca.disp()
dt = time.time()-t0
if True:
try:
mount_astromecca.pad_create("pad_dev1")
except (KeyboardInterrupt, SystemExit):
pass
except:
raise
if example == 2:
"""
EQMOD exploration
"""
home = celme.Home("GPS 2.25 E 43.567 148")
site = celme.Site(home)
# === SCX11 connection
mount_astromecca = Mountastro_Astromecca("HADEC", name="Oriental Motor", manufacturer="Astro MECCA", model="TM350", serial_number="beta001", site=site, CONTROLLER_BASE_ID=1, CONTROLLER_POLAR_ID=2)
mount_astromecca.set_channel_params("SERIAL", port=port_serial_scx11, baud_rate=115200, delay_init_chan=0.1, end_of_command_to_send="\r\n".encode('utf8'), end_of_command_to_receive="\r\n".encode('utf8'), delay_put_read=0.06)
mount_astromecca.verbose_chan = False
# --- shortcuts
mount_astromecca_axisb = mount_astromecca.axis[Mountaxis.BASE]
mount_astromecca_axisp = mount_astromecca.axis[Mountaxis.POLAR]
# --- simulation or not
mount_astromecca_axisb.real = True
mount_astromecca_axisb.ratio_wheel_puley = 6.2
mount_astromecca_axisb.inc_per_motor_rev = 1900
mount_astromecca_axisb.senseinc = -1
mount_astromecca_axisp.real = False
mount_astromecca_axisp.ratio_wheel_puley = 5.25
mount_astromecca_axisp.inc_per_motor_rev = 1900
mount_astromecca_axisp.senseinc = -1
# --- Initial ha,dec for encoders
mount_astromecca_axisb.update_inc0(0,-90,mount_astromecca.axisb.PIERSIDE_POS1)
mount_astromecca_axisp.update_inc0(0,90,mount_astromecca.axisp.PIERSIDE_POS1)
# --- first read of encoders (zero values the first time)
incsimus = ["" for kaxis in range(Mountaxis.AXIS_MAX)]
mount_astromecca.enc2cel(incsimus, save=mount_astromecca.SAVE_ALL)
# --- second read of encoders (valid values)
time.sleep(0.05)
mount_astromecca.enc2cel(incsimus, save=mount_astromecca.SAVE_ALL)
# --- Init the simulation values according the real ones
mount_astromecca_axisb.synchro_real2simu()
# --- Get the initial position
res = mount_astromecca.hadec_coord()
mount_astromecca.log.print("Initial position = {}".format(res))
mount_astromecca.disp()
# === EQMOD simulator
mount_eqmod = Mountastro("HADEC", name="EQMOD", manufacturer="EQMOD", model="EQ 6", site=site)
# --- shortcuts
mount_eqmod_axisb = mount_eqmod.axis[Mountaxis.BASE]
mount_eqmod_axisp = mount_eqmod.axis[Mountaxis.POLAR]
# --- default values to simulate a EQ6 mount
a = 9024000 ; # microsteps / 360° : Number of microsteps for one turn over the sky
b = 64935 ; # (microsteps2 / sec) : Velocity parameter (i) = (1|g) * (b) / speedtrack(deg/s) / ((a)/360)
d = 8388608 ; # (microsteps) : initial position (j) when the mount is just switched on
s = 50133 ; # (microsteps) : Microsteps to a complete turnover of worm
inc_per_sky_rev = a
ratio_puley_motor = 1
inc_per_motor_rev = s
ratio_wheel_puley = inc_per_sky_rev/(ratio_puley_motor*inc_per_motor_rev)
# ---
mount_eqmod_axisb.ratio_wheel_puley = ratio_wheel_puley
mount_eqmod_axisb.ratio_puley_motor = ratio_puley_motor
mount_eqmod_axisb.inc_per_motor_rev = inc_per_motor_rev
mount_eqmod_axisb.update_inc0(d,-90,mount_eqmod_axisb.PIERSIDE_POS1)
# ---
mount_eqmod_axisp.ratio_wheel_puley = ratio_wheel_puley
mount_eqmod_axisp.ratio_puley_motor = ratio_puley_motor
mount_eqmod_axisp.inc_per_motor_rev = inc_per_motor_rev
mount_eqmod_axisp.update_inc0(d+a/4,90,mount_eqmod_axisp.PIERSIDE_POS1)
# ---
eqmod_chan = Mountchannel("SERIAL", port=port_serial_eqmod, baud_rate=9600, delay_init_chan=0.1, end_of_command_to_send="\r".encode('utf8'), end_of_command_to_receive="\r".encode('utf8'), delay_put_read=0.06)
mount_eqmod.verbose_chan = False
# --- A useful class for EQMOD
eqmod = Mountutils_eqmod()
# --- A useful def for interactions between SCX11 and EQMOD
def eqmod_update_coord():
res = mount_astromecca.hadec_coord()
ha, dec, pierside = res
res = mount_eqmod.hadec_hadec2enc(ha, dec, pierside, output_format=mount_eqmod.OUTPUT_LONG, save=mount_eqmod.SAVE_AS_SIMU)
return res
# --- update the EQMOD coord
eqmod_update_coord()
mount_eqmod.disp()
# ======= EQMOD
a1 = a
a2 = a
b1 = b
b2 = b
d1 = d
d2 = d
j1max = 16777216
j2max = j1max
s1 = s
s2 = s
# --- prepare the loop
lastG1 = "00"
lastG2 = "00"
lastI1 = 0
lastI2 = 0
lastH1 = 0
lastH2 = 0
lastM1 = 0
lastM2 = 0
#
motion_type1, motion_sense1 = eqmod.decode_G(lastG1)
motion_type2, motion_sense2 = eqmod.decode_G(lastG2)
if mount_astromecca.axisb.real == True:
current_motion1 = eqmod.MOTION_JOG_FAST_POSITIVE
else:
current_motion1 = eqmod.MOTION_STOPPED_POSITIVE
if mount_astromecca.axisp.real == True:
current_motion2 = eqmod.MOTION_JOG_FAST_POSITIVE
else:
current_motion2 = eqmod.MOTION_STOPPED_POSITIVE
current_motor1 = eqmod.MOTOR_ON
current_motor2 = eqmod.MOTOR_ON
# ---
E1_e1_target = -1
try:
mount_eqmod.pad_create("pad_dev1")
# --- loop to be a server
t0 = time.time()
mount_astromecca.log.print("Enter in the loop")
while True:
# --- update from SCX11
ha_sigmove = 0
dec_sigmove = 0
if mount_astromecca_axisb.real == True:
err, ha_sigmove = mount_astromecca.putread(Mountaxis.BASE,"SIGMOVE",1,False)
ha_sigmove = int(ha_sigmove)
else:
if mount_astromecca_axisb._simu_signal_move==0:
ha_sigmove = 0
else:
ha_sigmove = 1
if mount_astromecca_axisp.real == True:
err, dec_sigmove = mount_astromecca.putread(Mountaxis.POLAR,"SIGMOVE",1,False)
dec_sigmove = int(dec_sigmove)
else:
if mount_astromecca_axisp._simu_signal_move==0:
dec_sigmove = 0
else:
dec_sigmove = 1
# --- Update the current motion and moving status
if ha_sigmove==0:
current_moving1 = eqmod.MOTION_STOPPED
if current_motion1==eqmod.MOTION_JOG_FAST_POSITIVE or current_motion1==eqmod.MOTION_JOG_SLOW_POSITIVE:
current_motion1 = eqmod.MOTION_STOPPED_POSITIVE
elif current_motion1==eqmod.MOTION_JOG_FAST_NEGATIVE or current_motion1==eqmod.MOTION_JOG_SLOW_NEGATIVE:
current_motion1 = eqmod.MOTION_STOPPED_NEGATIVE
if dec_sigmove==0:
current_moving2 = eqmod.MOTION_STOPPED
if current_motion2==eqmod.MOTION_JOG_FAST_POSITIVE or current_motion2==eqmod.MOTION_JOG_SLOW_POSITIVE:
current_motion2 = eqmod.MOTION_STOPPED_POSITIVE
elif current_motion2==eqmod.MOTION_JOG_FAST_NEGATIVE or current_motion2==eqmod.MOTION_JOG_SLOW_NEGATIVE:
current_motion2 = eqmod.MOTION_STOPPED_NEGATIVE
#print("SIGMOVE motion_type1={} current_motion1={} current_moving1={} current_motor1={}".format(motion_type1,current_motion1,current_moving1,current_motor1))
#print("SIGMOVE motion_type2={} current_motion2={} current_moving2={} current_motor2={}".format(motion_type2,current_motion2,current_moving2,current_motor2))
# --- update f1 and f2
f1 = eqmod.encode_f(motion_type1,current_motion1,current_moving1,current_motor1)
f2 = eqmod.encode_f(motion_type2,current_motion2,current_moving2,current_motor2)
# (none) Fast tracking speed multiplier (set with :Gm3...)
g1 = "10"
g2 = "10"
# --- update EQMOD from the current position of SCX11
eqmod_update_coord()
incb = mount_astromecca_axisb.inc
incp = mount_astromecca_axisp.inc
j1 = mount_eqmod_axisb.incsimu
j2 = mount_eqmod_axisp.incsimu
# --- read the EQMOD commands
lignes = ""
try:
err, lignes = eqmod_chan.read_chan()
lignes = str(lignes[0])
except:
pass
if err==eqmod_chan.NO_ERROR and lignes != "":
msg = "!"
dec = ""
if lignes==':e1' or lignes==':e2':
msg = "=020400"
elif lignes==':a1':
hexa = eqmod.int2hexa(a1)
msg = "="+hexa
elif lignes==':a2':
hexa = eqmod.int2hexa(a2)
msg = "="+hexa
elif lignes==':b1':
hexa = eqmod.int2hexa(b1)
msg = "="+hexa
elif lignes==':b2':
hexa = eqmod.int2hexa(b2)
msg = "="+hexa
elif lignes==':g1':
msg = "="+g1
elif lignes==':g2':
msg = "="+g2
elif lignes==':s1':
hexa = eqmod.int2hexa(s1)
msg = "="+hexa
elif lignes==':s2':
hexa = eqmod.int2hexa(s2)
msg = "="+hexa
elif lignes==':V200':
msg = "="
elif lignes==':q1010000':
msg = "!0"
elif lignes==':O10':
msg = "!0"
elif lignes==':V27D':
msg = "="
elif lignes==':j1':
hexa = eqmod.int2hexa(j1)
msg = "="+hexa
elif lignes==':j2':
hexa = eqmod.int2hexa(j2)
msg = "="+hexa
elif lignes==':f1':
msg = "="+f1
elif lignes==':f2':
msg = "="+f2
if lignes.startswith(":K")==True:
axis = int(lignes[2])
if axis==1:
mount_astromecca.putread(Mountaxis.BASE,"SSTOP",1)
mount_astromecca_axisb.simu_motion_stop()
lastH1 = 0
lastM1 = 0
else:
mount_astromecca.putread(Mountaxis.POLAR,"SSTOP",1)
mount_astromecca_axisp.simu_motion_stop()
lastH2 = 0
lastM2 = 0
msg = "="
if lignes.startswith(":P")==True:
msg = "="
if lignes.startswith(":F")==True:
# set the motor power
msg = "="
if lignes.startswith(":E")==True:
# update inc0
axis = int(lignes[2])
hexa = lignes[3:9]
deci = eqmod.hexa2int(hexa)
if axis==1:
E1_e1_target = j1 + deci
else:
j1_target = E1_e1_target
j2_target = j2 + deci
# --- get the new HA,Dec
incsimus = ["" for kaxis in range(Mountaxis.AXIS_MAX)]
incsimus[Mountaxis.BASE] = j1_target
incsimus[Mountaxis.POLAR] = j2_target
celb, celp, pierside = mount_eqmod.enc2cel(incsimus, output_format=mount_eqmod.OUTPUT_SHORT, save=mount_eqmod.SAVE_NONE)
ha, dec = mount_eqmod.cel2hadec(celb, celp, "deg", "deg")
print("incb={} ha={:.4f} pierside={}".format(incb, ha, pierside))
print("incp={} dec={:.4f} pierside={}".format(incp, dec, pierside))
print("j1_target={} ha={:.4f} pierside={}".format(j1_target, ha, pierside))
print("j2_target={} dec={:.4f} pierside={}".format(j2_target, dec, pierside))
mount_astromecca_axisb.update_inc0(incb, ha, pierside)
mount_astromecca_axisp.update_inc0(incp, dec, pierside)
mount_eqmod_axisb.update_inc0(j1_target, ha, pierside)
mount_eqmod_axisp.update_inc0(j2_target, dec, pierside)
msg = "="
if lignes.startswith(":G")==True:
axis = int(lignes[2])
digits = lignes[3:5]
if axis==1:
lastG1 = digits
else:
lastG2 = digits
msg = "="
if lignes.startswith(":I")==True:
axis = int(lignes[2])
hexa = lignes[3:9]
deci = eqmod.hexa2int(hexa)
if axis==1:
lastI1 = deci
else:
lastI2 = deci
msg = "="
if lignes.startswith(":H")==True:
axis = int(lignes[2])
hexa = lignes[3:9]
deci = eqmod.hexa2int(hexa)
if axis==1:
lastH1 = deci
else:
lastH2 = deci
print("H={} deci={}".format(hexa,deci))
msg = "="
if lignes.startswith(":M")==True:
axis = int(lignes[2])
hexa = lignes[3:9]
deci = eqmod.hexa2int(hexa)
if axis==1:
lastM1 = deci
else:
lastM2 = deci
print("M={} deci={}".format(hexa,deci))
msg = "="
if lignes.startswith(":J")==True:
# --- select axis
axis = int(lignes[2])
if axis==1:
lastG = lastG1
lastH = lastH1
lastI = lastI1
multg = int(g1)
else:
lastG = lastG2
lastH = lastH2
lastI = lastI2
multg = int(g2)
# ---
motion_type, motion_sense = eqmod.decode_G(lastG)
if axis==1:
motion_type1 = motion_type
motion_sense1 = motion_sense
else:
motion_type2 = motion_type
motion_sense2 = motion_sense
# --- choice according the motion type (GOTO=OFFSET or MOVE=CONTINUOUS)
if motion_type == eqmod.G_OFFSET_FAST or motion_type == eqmod.G_OFFSET_SLOW:
mount_astromecca.log.print("GOTO")
# --- GOTO
if motion_type == eqmod.G_OFFSET_FAST:
# --- fast GOTO
deg_per_sec = 10.0
if motion_type == eqmod.G_OFFSET_SLOW:
# --- slow GOTO
deg_per_sec = 1.0
if axis==1:
# --- Convert velocity in inc/sec
inc_per_sec_slew = abs(deg_per_sec * mount_astromecca.axisb.inc_per_deg)
# --- Compute the target position in EQMOD inc
j1_target = j1 + lastH1
if j1_target >= j1:
motion_sense1 = eqmod.G_SENSE_POSITIVE
if motion_type == eqmod.G_OFFSET_FAST:
current_motion1 = eqmod.MOTION_JOG_FAST_POSITIVE
else:
current_motion1 = eqmod.MOTION_JOG_SLOW_POSITIVE
else:
motion_sense1 = eqmod.G_SENSE_NEGATIVE
if motion_type == eqmod.G_OFFSET_FAST:
current_motion1 = eqmod.MOTION_JOG_FAST_NEGATIVE
else:
current_motion1 = eqmod.MOTION_JOG_SLOW_NEGATIVE
# --- get the corresponding EQMOD rot
rot, pierside = mount_eqmod_axisb.inc2rot(j1_target)
# --- get the corresponding SCX11 inc
inc = mount_astromecca.axisb.rot2inc(rot)
# --- get the corresponding SCX11 inc
mount_astromecca.inc_goto(Mountaxis.BASE, inc, inc_per_sec_slew)
motion_type1 = motion_type
current_moving1 = eqmod.MOTION_MOVING
#print("GOTO motion_type1={} current_motion1={} current_moving1={} current_motor1={}".format(motion_type1,current_motion1,current_moving1,current_motor1))
if axis==2:
# --- Convert velocity in inc/sec
inc_per_sec_slew = abs(deg_per_sec * mount_astromecca.axisp.inc_per_deg)
# --- Compute the target position in EQMOD inc
j2_target = j2 + lastH2
if j2_target >= j2:
motion_sense2 = eqmod.G_SENSE_POSITIVE
if motion_type == eqmod.G_OFFSET_FAST:
current_motion2 = eqmod.MOTION_JOG_FAST_POSITIVE
else:
current_motion2 = eqmod.MOTION_JOG_SLOW_POSITIVE
else:
motion_sense2 = eqmod.G_SENSE_NEGATIVE
if motion_type == eqmod.G_OFFSET_FAST:
current_motion2 = eqmod.MOTION_JOG_FAST_NEGATIVE
else:
current_motion2 = eqmod.MOTION_JOG_SLOW_NEGATIVE
# --- get the corresponding EQMOD rot
rot, pierside = mount_eqmod_axisp.inc2rot(j2_target)
# --- get the corresponding SCX11 inc
inc = mount_astromecca.axisp.rot2inc(rot)
# --- get the corresponding SCX11 inc
mount_astromecca.inc_goto(Mountaxis.POLAR, inc, inc_per_sec_slew)
motion_type2 = motion_type
current_moving2 = eqmod.MOTION_MOVING
#print("GOTO motion_type2={} current_motion2={} current_moving2={} current_motor2={}".format(motion_type2,current_motion2,current_moving2,current_motor2))
if motion_type == eqmod.G_CONTINUOUS_FAST or motion_type == eqmod.G_CONTINUOUS_SLOW:
# --- MOVE
mount_astromecca.log.print("MOVE")
if motion_type == eqmod.G_CONTINUOUS_FAST:
# --- fast MOVE
mult = multg
if motion_type == eqmod.G_CONTINUOUS_SLOW:
# --- slow MOVE
mult = 1.0
if motion_sense == eqmod.G_SENSE_NEGATIVE:
sense = -1
else:
sense = 1
# --- velocity
if axis==1:
ha_drift_deg_per_sec = b1/lastI1/a1*360*mult*sense
dec_drift_deg_per_sec = 0
motion_sense1 = motion_sense
if motion_sense == eqmod.G_SENSE_POSITIVE:
if motion_type == eqmod.G_CONTINUOUS_FAST:
current_motion1 = eqmod.MOTION_JOG_FAST_POSITIVE
else:
current_motion1 = eqmod.MOTION_JOG_SLOW_POSITIVE
else:
if motion_type == eqmod.G_CONTINUOUS_FAST:
current_motion1 = eqmod.MOTION_JOG_FAST_NEGATIVE
else:
current_motion1 = eqmod.MOTION_JOG_SLOW_NEGATIVE
current_moving1 = eqmod.MOTION_MOVING
else:
ha_drift_deg_per_sec = 0
dec_drift_deg_per_sec = b2/lastI2/a2*360*mult*sense
motion_sense2 = motion_sense
if motion_sense == eqmod.G_SENSE_POSITIVE:
if motion_type == eqmod.G_CONTINUOUS_FAST:
current_motion2 = eqmod.MOTION_JOG_FAST_POSITIVE
else:
current_motion2 = eqmod.MOTION_JOG_SLOW_POSITIVE
else:
if motion_type == eqmod.G_CONTINUOUS_FAST:
current_motion2 = eqmod.MOTION_JOG_FAST_NEGATIVE
else:
current_motion2 = eqmod.MOTION_JOG_SLOW_NEGATIVE
current_moving2 = eqmod.MOTION_MOVING
mount_astromecca.hadec_move( ha_drift_deg_per_sec, dec_drift_deg_per_sec)
time.sleep(0.2)
msg = "="
# --- send the response to the client
deci = 0
if msg !="":
if len(msg)==7:
deci = eqmod.hexa2int(msg[1:])
eqmod_chan.put_chan(msg)
if lignes.startswith(":j")==True:
display = 0
else:
display = 1
if display==1:
mount_eqmod.log.print("recu={} envoyé={} decimal={}".format(lignes, msg, deci))
elif display==2:
mount_eqmod.log.print("recu={} envoyé={} decimal={} incb={} incp={}".format(lignes, msg, deci, incb, incp))
#dt00 = time.time()-t00
#print("recu={} envoyé={} decimal={:.1f} dt00={:.2f}".format(lignes, msg, deci,dt00))
time.sleep(0.01)
#time.sleep(1)
#print("lignes={}".format(lignes))
except (KeyboardInterrupt, SystemExit):
mount_eqmod.pad_delete()
except:
raise
if example == 3:
port_serial_ascom='/dev/ttyUSB0' # loop between ASCOM(PC=COM10) -> PC=COM1
import serial
try:
fid.close()
except:
pass
fid = serial.Serial(
port=port_serial_ascom,
baudrate = 9600,
parity = serial.PARITY_NONE,
stopbits = serial.STOPBITS_ONE,
bytesize = serial.EIGHTBITS,
timeout = 0
)
while True:
# --- read the ASCOM commands
lignes = ""
try:
lignes = fid.readlines()
if lignes != []:
print("lignes = {}".format(lignes))
except:
pass