#!/usr/bin/env python3 # cf https://docs.python.org/3/library/socketserver.html#socketserver-tcpserver-example """Socket Server implementation To be used as a minimalist telescope simulator to which a socket client (SocketClient) can connect """ # Standard library imports import socketserver # Third party imports # None # Local application imports # None class UnknownCommandException(Exception): pass ###STAMP = '01000000' #STAMP = '0100000000000000' #HOST, PORT = "localhost", 11110 #HOST, PORT = "localhost", 9999 #HOST, PORT = "localhost", 20001 # stamp is 8 digits long STAMP_LENGTH = 8 # COMMON CONSTANTS WITH CLIENT TERMINATOR = '\x00' COMMAND5 = '050000' #COMMAND6_SIMPLE = '6' COMMAND6 = '\x00\x06\x00' class Memo: @classmethod def get(cls, var): return cls._variables[var] @classmethod def set(cls, var, value): cls._variables[var] = value _variables = { "C" : '10/01/18', "L" : '10:20:35', "g" : '+10', "t" : '+45:00:00', } def get_SocketServer_UDP_TCP(myhost:str="localhost", myport:int=11110, PROTOCOL:str="TCP"): socketserver_type = socketserver.UDPServer if PROTOCOL=="UDP" else socketserver.TCPServer socketserver_handler_type = socketserver.DatagramRequestHandler if PROTOCOL=="UDP" else socketserver.StreamRequestHandler """ A classic version of request handler: """ class _MyUDPorTCPHandler_classic(socketserver.BaseRequestHandler): # The request handler class for our server. # It is instantiated once per connection to the server, and must # override the handle() method to implement communication to the client def handle(self): # Get received data # - TCP if PROTOCOL == "TCP": # For TCP, self.request is the TCP socket connected to the client data_rcv = self.request.recv(1024).strip() # - UDP else: # For UDP, self.request consists of a pair of data and client socket data_rcv = self.request[0].strip() socket_client = self.request[1] print("{} wrote:".format(self.client_address[0])) print(data_rcv) # Send back the same data, but upper-cased # - TCP if PROTOCOL == "TCP": self.request.sendall(data_rcv.upper()) # - UDP # Since there is no connection, the client address must be given explicitly when sending data back via sendto(). else: socket_client.sendto(data_rcv.upper(), self.client_address) """ An alternative request handler class that makes use of streams (file-like objects that simplify communication by providing the standard file interface): """ #class MyTCPHandler(socketserver.StreamRequestHandler): #class MyUDPHandler(socketserver.DatagramRequestHandler): class _MyUDPorTCPHandler(socketserver_handler_type): def get_useful_data(self, data_received): return data_received def make_answer_for_request(self, request_bytes:bytes): #if request == STAMP + ":GD#": return STAMP + "+12:28" #if request == b'0100000000000000:GD#': return bytes(STAMP + "+12:28", "utf-8") print("Request received is", request_bytes) ''' 3 types of commands: - TYPE1: '06' or '050000' - TYPE2: ':cde#' (mainly ':??#' - i.e : ':GD#', ':GR#', ...) - TYPE3: 'b?#' (bC# = Cold Start, bW# = selecting Warm Start, bR# = selecting Warm Restart) ''' # Convert to string request = request_bytes.decode("utf-8") if len(request) < STAMP_LENGTH+2+1: raise UnknownCommandException(request) # Remove TERMINATOR request = request[0:-1] # Remove leading stamp stamp = request[0:STAMP_LENGTH] command = request[STAMP_LENGTH:] print("Command received is", repr(command)) # TYPE1 if command not in (COMMAND5, COMMAND6): # TYPE2 or 3 if len(command) < 3: raise UnknownCommandException() if not (command[-1]=='#'): raise UnknownCommandException() if not (command[0]==':') and command not in ('bC#','bW#','bR#'): raise UnknownCommandException() command_start = command[1:3] if command == COMMAND6: answer = "G" elif command == COMMAND5: answer = COMMAND5 #gr_request = STAMP + ':GR#' + END #return bytes(stamp + "15:01:48#" + TERMINATOR, "utf-8") elif command == ':GR#': answer = "15:01:48" #gd_request = STAMP + ':GD#' + END #if request == bytes(gd_request, "utf-8"): return bytes(STAMP + "+12:28#", "utf-8") #elif useful_request == 'GD': answer = "+12:28" elif command == ':GD#': answer = "+12:28" elif command == ':SG+00#': answer = "1" elif command == ':GG#': answer = "+00" elif command_start == 'Gv': answer = 'T' elif command_start in ('GC','GL', 'Gg', 'Gt'): answer = Memo.get(command_start[1]) # Gemini telescope replaces "*" with ":" if command_start in ('Gg','Gt'): answer = answer.replace('*',':') else: # Remove ending '#' command = command[0:-1] if command_start in ('SC', 'SL', 'Sg', 'St'): Memo.set(command_start[1],command[3:]) if command[0] == ':': command = command[1:] answer = command.upper() full_answer_in_bytes = bytes(stamp + answer + '#' + TERMINATOR, "utf-8") #print("request str upper is", str(request).upper()) print("Answer sent is", full_answer_in_bytes) return full_answer_in_bytes def handle(self): # 1) RECEIVE REQUEST from client # self.rfile is a file-like object created by the handler; # we can now use e.g. readline() instead of raw recv() calls data_received = self.rfile.readline().strip() # data is "bytes" type data_useful = self.get_useful_data(data_received) #print("data type is", type(data)) print("\nFrom {}, received: {}".format(self.client_address[0], data_useful)) # 2) SEND REPLY to client # Likewise, self.wfile is a file-like object used to write back to the client self.wfile.write(self.make_answer_for_request(data_useful)) # inutile, ca plante car data est deja en bytes: #self.wfile.write(data.upper().encode()) # Return a "classic" handler: #return socketserver_type((HOST, PORT), _MyUDPorTCPHandler_classic) # or a more "modern" handler: return socketserver_type((myhost, myport), _MyUDPorTCPHandler) if __name__ == "__main__": #with socketserver_type((HOST, PORT), MyUDPorTCPHandler_classic) as myserver: ''' with socketserver_type((HOST, PORT), MyUDPorTCPHandler) as myserver: myserver.serve_forever() ''' with get_SocketServer_UDP_TCP("localhost", 11110, "UDP") as myserver: myserver.serve_forever()