mount_control_software.py 14 KB
# -*- coding: utf-8 -*-
import time
import socket
    
import mountastro
import celme
import sys
import os

# #####################################################################
# #####################################################################
# #####################################################################
# Main
# #####################################################################
# #####################################################################
# #####################################################################
            
if __name__ == "__main__":

    RUN_CMDLINE = 0
    RUN_SPYDER  = 1

    # --- Detect the origin of launch
    if any('SPYDER' in name for name in os.environ)==True:
        runner = RUN_SPYDER
        runner_msg = "Spyder"
    else:
        runner = RUN_CMDLINE
        runner_msg = "command line"
    hostname = socket.gethostname()
    print("Running from {} on hostname {}".format(runner_msg, hostname))


    # --- Set the configuration variables 
    if runner == RUN_SPYDER:
        config_path = "./"
        config_mount = "config_mcs_default.py"
        connect_real_mount = False
        port_serial_controller = "/dev/ttySC0"
        server_active = False
        remote_protocol = "TCP"
        remote_transport = "MCS"
        port_serial_remote_server = "Not defined"
        test_client = False
        port_serial_remote_client = "Not defined"
        use_pad = True

        # --- some special computers for dev
        if hostname == "titanium":
            config_mount = "config_mcs_tm350_guitalens.py"
        if hostname == "rapido2":
            config_mount = "config_mcs_tm350_guitalens.py"
        
        # --- Delete the previous objects if they are instanciated        
        if 'remote_client' in globals():
            del(remote_client)
        if 'remote_server' in globals():
            del(remote_server)
            time.sleep(1)
        if 'mount1' in globals():
            del(mount1)
        time.sleep(0.5)
    
    # --- Decode the command line arguments and set configuration variables 
    if runner == RUN_CMDLINE:
        """
        python3 mount_controle_software.py -pad
        python3 mount_controle_software.py -server -transport TCP -port 1111 -client -client_port 1111 -pad
        python3 mount_controle_software.py -server -transport SERIAL -port //./COM1 -client -client_port //./COM8 -pad
        """
        # --- Set the default configuration variables
        config_path = "./"
        config_mount = "config_mcs_default.py"
        connect_real_mount = False
        port_serial_controller = "/dev/ttySC0"
        server_active = False
        remote_protocol = "LX200"
        remote_transport = "SERIAL"
        port_serial_remote_server = "Not defined"
        port_serial_remote_client = "Not defined"
        use_pad = False
        # ---
        import argparse        
        parser = argparse.ArgumentParser(description='Launch MCS')
        # --- Config
        parser.add_argument('-config_path', action='store', help='Path where configuration files will be read.')
        parser.add_argument('-config_mount', action='store', help='Configuration files for the mount.')
        # --- Controller
        parser.add_argument('-mount_real', action='store_true', help='Connect to a real mount controller. You must specify the -mount_port.')
        parser.add_argument('-mount_port', action='store', help='Port identifier for the connection with a real mount controller -mount_real.')
        # --- Server
        parser.add_argument('-server', action='store_true', help='To activate a server. You must specify the -langage, -transport, -port.')
        parser.add_argument('-langage', action='store', type=str, help='Server langage protocol (LX200, ASCOM, ASTROMECCA).')
        parser.add_argument('-transport', action='store', type=str, help='Server transport protocol (SERIAL, TCP).')
        parser.add_argument('-port', action='store', type=str, help='Port identifier for the server to listen a client.')
        # --- Client
        parser.add_argument('-client', action='store_true', help='To test a client. You must specify the -client_port')
        parser.add_argument('-client_port', action='store', help='Port identifier for the client to dialog with a server.')
        # --- Pad
        parser.add_argument('-pad', action='store_true', help='Display the virtual pad.')
        # ===
        args = parser.parse_args()
        print("Command line args = {}".format(args))   
        # ===        
        # --- Config
        if args.config_path != None:
            config_path = str(args.config_path)
        # ---
        if args.config_mount != None:
            config_mount = str(args.config_mount)
        # --- Controller
        if args.mount_real == True:
            connect_real_mount = True
        # ---
        if args.mount_port != None:
            port_serial_controller = str(args.mount_port)
        # --- Server
        if args.server == True:
            server_active = True
        # --- 
        if args.langage != None:
            remote_protocol = str(args.langage)
        # ---
        if args.transport != None:
            remote_transport = str(args.transport)
        # ---
        if args.port != None:
            port_serial_remote_server = str(args.port)
        # --- Client
        if args.client == True:
            test_client = True
        # --- 
        if args.client_port != None:
            port_serial_remote_client = str(args.client_port)
        # ---
        # --- Pad
        if args.pad==True:
            use_pad = True
                
    # --- Configuration of the communication ports
    if runner == RUN_CMDLINE:
        if remote_transport == "SERIAL" or connect_real_mount==True:
            # --- serial port configurations
            tools = mountastro.Mounttools()
            available_serial_ports = tools.get_available_serial_ports()
            nport = len(available_serial_ports)

            if connect_real_mount==True:
                if port_serial_controller not in available_serial_ports:
                    print("Mount port {} is not in the available list {}".format(port_serial_controller,available_serial_ports))
                    exit(2)
                    
            if server_active == True:
                if port_serial_remote_server not in available_serial_ports:
                    print("Server listening port {} is not in the available list {}".format(port_serial_remote_server,available_serial_ports))
                    exit(2)   
                    
    if runner == RUN_SPYDER:
        # --- serial port configurations
        if remote_transport == "SERIAL":
            tools = mountastro.Mounttools()
            available_serial_ports = tools.get_available_serial_ports()
            nport = len(available_serial_ports)
            # --- At minima the server port exists
            iserver = 0
            iclient = 0
            imount = 0
            if nport<1:
                print("Not enough serial ports for the server")
                exit()
            else:
                port_serial_remote_server=available_serial_ports[iserver]
            # --- Other ports
            if test_client == True and connect_real_mount == False:
                # --- Case : only client test, no real mount
                port_serial_remote_client=available_serial_ports[iclient]
            if test_client == False and connect_real_mount == True:
                # --- Case : only real mount, no client test
                port_serial_controller=available_serial_ports[iclient]
            if test_client == True and connect_real_mount == True:
                # --- Case : real mount and client test
                port_serial_remote_client=available_serial_ports[iclient]
                port_serial_controller=available_serial_ports[imount]
            # === Special computers
            if hostname == "titanium":
                print("Configuration = {}".format(hostname))
                port_serial_remote_server=available_serial_ports[0]
                port_serial_remote_client=available_serial_ports[0]
                port_serial_controller=available_serial_ports[0]
            elif hostname == "rapido2":
                print("Configuration = {}".format(hostname))
                port_serial_controller='/dev/ttySC0'
                port_serial_remote_server='/dev/ttySC1'
                port_serial_remote_client='/dev/ttySC0'
            elif hostname == "astromecca":
                print("Configuration = {}".format(hostname))
                port_serial_controller='/dev/ttySC0'
                port_serial_remote_server='/dev/ttySC1'
                port_serial_remote_client='/dev/ttySC0'
            else:
                print("No predefined configuration for {}".format(hostname))

    # --- Launch the program
    if runner == RUN_CMDLINE or runner == RUN_SPYDER:

        # --- TCP port configurations
        if remote_transport == "TCP":
            host_tcp_remote_server = socket.gethostname()
            port_tcp_remote_server = 1111
            # === Special computers
            if hostname == "titanium":
                print("Configuration = {}".format(hostname))
                host_tcp_remote_server = '192.168.0.4'
            elif hostname == "rapido2":
                print("Configuration = {}".format(hostname))
            else:
                print("No predefined configuration for {}".format(hostname))
        
        # --- Summary of the configuration
        if server_active == True:
            print("Server transport protocol = {}".format(remote_transport))
            print("Server langage protocol = {}".format(remote_protocol))
            if remote_transport == "SERIAL":
                print("port_serial_remote_server = {}".format(port_serial_remote_server))
                if test_client == True:
                    print("port_serial_remote_client = {}".format(port_serial_remote_client))
            else:
                print("host_tcp_remote_server = {}".format(host_tcp_remote_server))
                print("port_tcp_remote_server = {}".format(port_tcp_remote_server))
        if connect_real_mount == True:
            print("port_serial_mount         = {}".format(port_serial_controller))

        # ======= Read the configuration file of the mount parameters
        config_mount_file = config_path + config_mount 
        print("Read the configuration mount file = {}".format(config_mount_file))
        exec(open(config_mount_file).read())

        # ======= Transport protocol parameter
        if remote_protocol == "LX200" and remote_transport == "SERIAL":
            bauds = 9600
        else:
            bauds = 115200
        
        # ======= Server
        if server_active == True:
            print("="*30)
            if remote_transport == "SERIAL":
                remote_port = port_serial_remote_server
            else:
                remote_port = port_tcp_remote_server
            msg = "{} server on port {} for protocol {}".format(remote_transport, remote_port, remote_protocol)
            print(msg)
            remote_server = mountastro.MountremoteServer(mount1, remote_transport, port=remote_port, protocol=remote_protocol, verbose_level=1, baud=bauds) 
            remote_server.start()
            time.sleep(1)
            
            # The server is non blocking
            # Now you have the hand to send commands to the server        
            # To illustrate that, we send a message via a client
            
            print("Type remote_server.stop() to stop the server")

        # ======= Client
        if test_client == True:

            print("="*30)
            if remote_transport == "SERIAL":
                remote_port = port_serial_remote_client
            else:
                remote_port = port_tcp_remote_server            
            msg = "{} client on port {} for protocol {}".format(remote_transport, remote_port, remote_protocol)
            print(msg)
            if remote_transport == "SERIAL":
                remote_client = mountastro.MountremoteClient(remote_transport, port=remote_port, baud=bauds, protocol=remote_protocol, verbose_level=1)
            else:
                remote_client = mountastro.MountremoteClient(remote_transport, hostname=host_tcp_remote_server, port=remote_port, protocol=remote_protocol, verbose_level=1)
            remote_client.open_chan()
            print("="*30)
            if remote_protocol == "LX200":
                command = ':GR'
                result = remote_client.putread_chan(command)
                print("Command = {}\nResult = {}".format(command,result))
                print("="*30)
                command = '#:GD#'
                result = remote_client.putread_chan(command)
                print("Command = {}\nResult = {}".format(command,result))
            elif remote_protocol == "MCS":
                command = "{'req': {'do':{'exec' : 'self.radec_coord()'}}"
                result = remote_client.putread_chan(command)
                print("Command = {}\nResult = {}".format(command,result))
                print("="*30)
                command = "{'req': {'get': 'radec'}}"
                result = remote_client.putread_chan(command)
                print("Command = {}\nResult = {}".format(command,result))
            elif remote_protocol == "ASCOM":
                command = 'RightAscension'
                result = remote_client.putread_chan(command)
                print("Command = {}\nResult = {}".format(command,result))
                print("="*30)
                command = 'Declination'
                result = remote_client.putread_chan(command)
                print("Command = {}\nResult = {}".format(command,result))
            print("="*30)

        # ======= pad
        if use_pad == True:
            try:
                print("Create the pad. Tk Event loop activated.")
                mount1.pad_create("pad_dev1")
                print("Pad deleted. Tk Event loop stopped")
            except (KeyboardInterrupt, SystemExit):
                pass
            except:
                raise  
            
        # To stop the server:
        # remote_server.stop()