Blame view

src/device_controller/concrete_component/sbig/sbig_simulator.py 4.23 KB
12c6569f   Etienne Pallier   Ajout des nouvell...
1
2
3
4
#!/usr/bin/env python3

import sys

8d0fc742   Etienne Pallier   Grosse refactoris...
5
6
7
8
9
10
11
12
13
14
#from device_controller.concrete_component.sbig.server_udp_or_tcp_sbig import get_SocketServer_UDP_TCP_sbig
from device_controller.abstract_component.device_simulator import DeviceSimulator

#HOST = "localhost"





#!/usr/bin/env python3
12c6569f   Etienne Pallier   Ajout des nouvell...
15

4783e5b5   Etienne Pallier   GROS RENOMMAGE de...
16
#from device_controller.concrete_component.device_simulator_common.server_udp_or_tcp import get_SocketServer_UDP_TCP
8d0fc742   Etienne Pallier   Grosse refactoris...
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
from device_controller.channels.server_udp_or_tcp import get_SocketServer_UDP_TCP



###STAMP = '01000000'
#STAMP = '0100000000000000'
#HOST, PORT = "localhost", 11110
#HOST, PORT = "localhost", 9999
#HOST, PORT = "localhost", 20001

# stamp is 8 digits long
STAMP_LENGTH = 8

# COMMON CONSTANTS WITH CLIENT
TERMINATOR = '\x00'
COMMAND5 = '050000'
#COMMAND6_SIMPLE = '6'
COMMAND6 = '\x00\x06\x00'


class UnknownCommandException(Exception):
    pass


class Memo:
    
    @classmethod
    def get(cls, var):
        return cls._variables[var]
    
    @classmethod
    def set(cls, var, value):
        cls._variables[var] = value
    
    _variables = {
        "C" : '20/10/19',
        "L" : '20:20:36',
        "g" : '+10',
        "t" : '+45:00:00',
    }




# @override superclass method
def make_answer_for_request(request_bytes:bytes):
    #if request == STAMP + ":GD#": return STAMP + "+12:28"
    #if request == b'0100000000000000:GD#': return bytes(STAMP + "+12:28", "utf-8")
    print("Request received is", request_bytes)

    '''
    3 types of commands:
    - TYPE1: '06' or '050000'
    - TYPE2: ':cde#' (mainly ':??#' - i.e : ':GD#', ':GR#', ...)
    - TYPE3: 'b?#' (bC# = Cold Start, bW# = selecting Warm Start, bR# = selecting Warm Restart)
    '''
    
    # Convert to string
    request = request_bytes.decode("utf-8")
    if len(request) < STAMP_LENGTH+2+1: raise UnknownCommandException(request)
    
    # Remove TERMINATOR
    request = request[0:-1]
    
    # Remove leading stamp
    stamp = request[0:STAMP_LENGTH]
    command = request[STAMP_LENGTH:]
    print("Command received is", repr(command))
    
    # TYPE1
    if command not in (COMMAND5, COMMAND6):
        # TYPE2 or 3
        if len(command) < 3: raise UnknownCommandException()
        if not (command[-1]=='#'): raise UnknownCommandException()
        if not (command[0]==':') and command not in ('bC#','bW#','bR#'): raise UnknownCommandException()
    
    command_start = command[1:3]
    if command == COMMAND6: answer = "G"
    elif command == COMMAND5: answer = COMMAND5
    
    #gr_request = STAMP + ':GR#' + END
    #return bytes(stamp + "15:01:48#" + TERMINATOR, "utf-8")
    elif command == ':GR#': answer = "25:01:49"
    
    #gd_request = STAMP + ':GD#' + END
    #if request == bytes(gd_request, "utf-8"): return bytes(STAMP + "+12:28#", "utf-8")
    #elif useful_request == 'GD': answer = "+12:28"
    elif command == ':GD#': answer = "+22:29"
    elif command == ':SG+00#': answer = "1"
    elif command == ':GG#': answer = "+00"
    elif command_start == 'Gv': answer = 'T'
    elif command_start in ('GC','GL', 'Gg', 'Gt'):
        answer = Memo.get(command_start[1])
        # Gemini telescope replaces "*" with ":"
        if command_start in ('Gg','Gt'): answer = answer.replace('*',':')
    else:
        # Remove ending '#'
        command = command[0:-1]
        if command_start in ('SC', 'SL', 'Sg', 'St'): Memo.set(command_start[1],command[3:])
        if command[0] == ':': command = command[1:]  
        answer = command.upper()
    
    full_answer_in_bytes = bytes(stamp + answer + '#' + TERMINATOR, "utf-8")
    #print("request str upper is", str(request).upper())
    print("Answer sent is", full_answer_in_bytes)
    return full_answer_in_bytes



def get_SocketServer_UDP_TCP_sbig(myhost:str="localhost", myport:int=11112, PROTOCOL:str="TCP"):
    #return get_SocketServer_UDP_TCP(myhost, myport, PROTOCOL)
    return get_SocketServer_UDP_TCP(myhost, myport, PROTOCOL, make_answer_for_request)
12c6569f   Etienne Pallier   Ajout des nouvell...
129

12c6569f   Etienne Pallier   Ajout des nouvell...
130
131
132
133


# Voir https://stackoverflow.com/questions/10085996/shutdown-socketserver-serve-forever-in-one-thread-python-application

8d0fc742   Etienne Pallier   Grosse refactoris...
134
135
#class TelescopeGeminiSimulator:
class DeviceSimulatorSBIG(DeviceSimulator):
12c6569f   Etienne Pallier   Ajout des nouvell...
136
137
    #with socketserver_type((HOST, PORT), MyUDPorTCPHandler_classic) as myserver:

12c6569f   Etienne Pallier   Ajout des nouvell...
138
139
    @classmethod
    def serve_forever(cls, PORT):
8d0fc742   Etienne Pallier   Grosse refactoris...
140
        super().serve_forever(PORT, get_SocketServer_UDP_TCP_sbig)
12c6569f   Etienne Pallier   Ajout des nouvell...