# -*- coding: utf-8 -*- import time import socket import mountastro import celme import sys import os # ##################################################################### # ##################################################################### # ##################################################################### # Main # ##################################################################### # ##################################################################### # ##################################################################### if __name__ == "__main__": RUN_CMDLINE = 0 RUN_SPYDER = 1 # --- Detect the origin of launch if any('SPYDER' in name for name in os.environ)==True: runner = RUN_SPYDER runner_msg = "Spyder" else: runner = RUN_CMDLINE runner_msg = "command line" hostname = socket.gethostname() print("Running from {} on hostname {}".format(runner_msg, hostname)) # --- Set the configuration variables if runner == RUN_SPYDER: config_path = "./" config_mount = "config_mcs_default.py" connect_real_mount = False port_serial_controller = "/dev/ttySC0" server_active = False remote_protocol = "TCP" remote_transport = "MCS" port_serial_remote_server = "Not defined" test_client = False port_serial_remote_client = "Not defined" use_pad = True # --- some special computers for dev if hostname == "titanium": config_mount = "config_mcs_tm350_guitalens.py" if hostname == "rapido2": config_mount = "config_mcs_tm350_guitalens.py" # --- Delete the previous objects if they are instanciated if 'remote_client' in globals(): del(remote_client) if 'remote_server' in globals(): del(remote_server) time.sleep(1) if 'mount1' in globals(): del(mount1) time.sleep(0.5) # --- Decode the command line arguments and set configuration variables if runner == RUN_CMDLINE: """ python3 mount_controle_software.py -pad python3 mount_controle_software.py -server -transport TCP -port 1111 -client -client_port 1111 -pad python3 mount_controle_software.py -server -transport SERIAL -port //./COM1 -client -client_port //./COM8 -pad """ # --- Set the default configuration variables config_path = "./" config_mount = "config_mcs_default.py" connect_real_mount = False port_serial_controller = "/dev/ttySC0" server_active = False remote_protocol = "LX200" remote_transport = "SERIAL" port_serial_remote_server = "Not defined" port_serial_remote_client = "Not defined" use_pad = False # --- import argparse parser = argparse.ArgumentParser(description='Launch MCS') # --- Config parser.add_argument('-config_path', action='store', help='Path where configuration files will be read.') parser.add_argument('-config_mount', action='store', help='Configuration files for the mount.') # --- Controller parser.add_argument('-mount_real', action='store_true', help='Connect to a real mount controller. You must specify the -mount_port.') parser.add_argument('-mount_port', action='store', help='Port identifier for the connection with a real mount controller -mount_real.') # --- Server parser.add_argument('-server', action='store_true', help='To activate a server. You must specify the -langage, -transport, -port.') parser.add_argument('-langage', action='store', type=str, help='Server langage protocol (LX200, ASCOM, ASTROMECCA).') parser.add_argument('-transport', action='store', type=str, help='Server transport protocol (SERIAL, TCP).') parser.add_argument('-port', action='store', type=str, help='Port identifier for the server to listen a client.') # --- Client parser.add_argument('-client', action='store_true', help='To test a client. You must specify the -client_port') parser.add_argument('-client_port', action='store', help='Port identifier for the client to dialog with a server.') # --- Pad parser.add_argument('-pad', action='store_true', help='Display the virtual pad.') # === args = parser.parse_args() print("Command line args = {}".format(args)) # === # --- Config if args.config_path != None: config_path = str(args.config_path) # --- if args.config_mount != None: config_mount = str(args.config_mount) # --- Controller if args.mount_real == True: connect_real_mount = True # --- if args.mount_port != None: port_serial_controller = str(args.mount_port) # --- Server if args.server == True: server_active = True # --- if args.langage != None: remote_protocol = str(args.langage) # --- if args.transport != None: remote_transport = str(args.transport) # --- if args.port != None: port_serial_remote_server = str(args.port) # --- Client if args.client == True: test_client = True # --- if args.client_port != None: port_serial_remote_client = str(args.client_port) # --- # --- Pad if args.pad==True: use_pad = True # --- Configuration of the communication ports if runner == RUN_CMDLINE: if remote_transport == "SERIAL" or connect_real_mount==True: # --- serial port configurations tools = mountastro.Mounttools() available_serial_ports = tools.get_available_serial_ports() nport = len(available_serial_ports) if connect_real_mount==True: if port_serial_controller not in available_serial_ports: print("Mount port {} is not in the available list {}".format(port_serial_controller,available_serial_ports)) exit(2) if server_active == True: if port_serial_remote_server not in available_serial_ports: print("Server listening port {} is not in the available list {}".format(port_serial_remote_server,available_serial_ports)) exit(2) if runner == RUN_SPYDER: # --- serial port configurations if remote_transport == "SERIAL": tools = mountastro.Mounttools() available_serial_ports = tools.get_available_serial_ports() nport = len(available_serial_ports) # --- At minima the server port exists iserver = 0 iclient = 0 imount = 0 if nport<1: print("Not enough serial ports for the server") exit() else: port_serial_remote_server=available_serial_ports[iserver] # --- Other ports if test_client == True and connect_real_mount == False: # --- Case : only client test, no real mount port_serial_remote_client=available_serial_ports[iclient] if test_client == False and connect_real_mount == True: # --- Case : only real mount, no client test port_serial_controller=available_serial_ports[iclient] if test_client == True and connect_real_mount == True: # --- Case : real mount and client test port_serial_remote_client=available_serial_ports[iclient] port_serial_controller=available_serial_ports[imount] # === Special computers if hostname == "titanium": print("Configuration = {}".format(hostname)) port_serial_remote_server=available_serial_ports[0] port_serial_remote_client=available_serial_ports[0] port_serial_controller=available_serial_ports[0] elif hostname == "rapido2": print("Configuration = {}".format(hostname)) port_serial_controller='/dev/ttySC0' port_serial_remote_server='/dev/ttySC1' port_serial_remote_client='/dev/ttySC0' elif hostname == "astromecca": print("Configuration = {}".format(hostname)) port_serial_controller='/dev/ttySC0' port_serial_remote_server='/dev/ttySC1' port_serial_remote_client='/dev/ttySC0' else: print("No predefined configuration for {}".format(hostname)) # --- Launch the program if runner == RUN_CMDLINE or runner == RUN_SPYDER: # --- TCP port configurations if remote_transport == "TCP": host_tcp_remote_server = socket.gethostname() port_tcp_remote_server = 1111 # === Special computers if hostname == "titanium": print("Configuration = {}".format(hostname)) host_tcp_remote_server = '192.168.0.4' elif hostname == "rapido2": print("Configuration = {}".format(hostname)) else: print("No predefined configuration for {}".format(hostname)) # --- Summary of the configuration if server_active == True: print("Server transport protocol = {}".format(remote_transport)) print("Server langage protocol = {}".format(remote_protocol)) if remote_transport == "SERIAL": print("port_serial_remote_server = {}".format(port_serial_remote_server)) if test_client == True: print("port_serial_remote_client = {}".format(port_serial_remote_client)) else: print("host_tcp_remote_server = {}".format(host_tcp_remote_server)) print("port_tcp_remote_server = {}".format(port_tcp_remote_server)) if connect_real_mount == True: print("port_serial_mount = {}".format(port_serial_controller)) # ======= Read the configuration file of the mount parameters config_mount_file = config_path + config_mount print("Read the configuration mount file = {}".format(config_mount_file)) exec(open(config_mount_file).read()) # ======= Transport protocol parameter if remote_protocol == "LX200" and remote_transport == "SERIAL": bauds = 9600 else: bauds = 115200 # ======= Server if server_active == True: print("="*30) if remote_transport == "SERIAL": remote_port = port_serial_remote_server else: remote_port = port_tcp_remote_server msg = "{} server on port {} for protocol {}".format(remote_transport, remote_port, remote_protocol) print(msg) remote_server = mountastro.MountremoteServer(mount1, remote_transport, port=remote_port, protocol=remote_protocol, verbose_level=1, baud=bauds) remote_server.start() time.sleep(1) # The server is non blocking # Now you have the hand to send commands to the server # To illustrate that, we send a message via a client print("Type remote_server.stop() to stop the server") # ======= Client if test_client == True: print("="*30) if remote_transport == "SERIAL": remote_port = port_serial_remote_client else: remote_port = port_tcp_remote_server msg = "{} client on port {} for protocol {}".format(remote_transport, remote_port, remote_protocol) print(msg) if remote_transport == "SERIAL": remote_client = mountastro.MountremoteClient(remote_transport, port=remote_port, baud=bauds, protocol=remote_protocol, verbose_level=1) else: remote_client = mountastro.MountremoteClient(remote_transport, hostname=host_tcp_remote_server, port=remote_port, protocol=remote_protocol, verbose_level=1) remote_client.open_chan() print("="*30) if remote_protocol == "LX200": command = ':GR' result = remote_client.putread_chan(command) print("Command = {}\nResult = {}".format(command,result)) print("="*30) command = '#:GD#' result = remote_client.putread_chan(command) print("Command = {}\nResult = {}".format(command,result)) elif remote_protocol == "MCS": command = "{'req': {'do':{'exec' : 'self.radec_coord()'}}" result = remote_client.putread_chan(command) print("Command = {}\nResult = {}".format(command,result)) print("="*30) command = "{'req': {'get': 'radec'}}" result = remote_client.putread_chan(command) print("Command = {}\nResult = {}".format(command,result)) elif remote_protocol == "ASCOM": command = 'RightAscension' result = remote_client.putread_chan(command) print("Command = {}\nResult = {}".format(command,result)) print("="*30) command = 'Declination' result = remote_client.putread_chan(command) print("Command = {}\nResult = {}".format(command,result)) print("="*30) # ======= pad if use_pad == True: try: print("Create the pad. Tk Event loop activated.") mount1.pad_create("pad_dev1") print("Pad deleted. Tk Event loop stopped") except (KeyboardInterrupt, SystemExit): pass except: raise # To stop the server: # remote_server.stop()