Blame view

src/device_controller/concrete_component/sbig/sbig_simulator.py 5 KB
12c6569f   Etienne Pallier   Ajout des nouvell...
1
2
3
4
#!/usr/bin/env python3

import sys

8d0fc742   Etienne Pallier   Grosse refactoris...
5
6
7
8
9
10
11
12
13
14
#from device_controller.concrete_component.sbig.server_udp_or_tcp_sbig import get_SocketServer_UDP_TCP_sbig
from device_controller.abstract_component.device_simulator import DeviceSimulator

#HOST = "localhost"





#!/usr/bin/env python3
12c6569f   Etienne Pallier   Ajout des nouvell...
15

4783e5b5   Etienne Pallier   GROS RENOMMAGE de...
16
#from device_controller.concrete_component.device_simulator_common.server_udp_or_tcp import get_SocketServer_UDP_TCP
8d0fc742   Etienne Pallier   Grosse refactoris...
17
18
19
20
21
22
23
24
25
26
from device_controller.channels.server_udp_or_tcp import get_SocketServer_UDP_TCP



###STAMP = '01000000'
#STAMP = '0100000000000000'
#HOST, PORT = "localhost", 11110
#HOST, PORT = "localhost", 9999
#HOST, PORT = "localhost", 20001

8d0fc742   Etienne Pallier   Grosse refactoris...
27

8d0fc742   Etienne Pallier   Grosse refactoris...
28
29
30
31
32
33


class UnknownCommandException(Exception):
    pass


76c1ad1f   Etienne Pallier   Protocol et Comma...
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# Voir https://stackoverflow.com/questions/10085996/shutdown-socketserver-serve-forever-in-one-thread-python-application

#class DeviceSimulatorSBIG(DeviceSimulator):
class DS_SBIG(DeviceSimulator):
    #with socketserver_type((HOST, PORT), MyUDPorTCPHandler_classic) as myserver:

    #_protoc = None
    #_gen2nat_cmds = {}

    @classmethod
    def serve_forever(cls, PORT):
        super().serve_forever(PORT, get_SocketServer_UDP_TCP_sbig)

    @classmethod
    def set_protoc_and_cmds(cls, protoc, gen2nat_cmds, DEBUG=False):
        cls._protoc = protoc
        cls._gen2nat_cmds = gen2nat_cmds
        print("****** (SIM) my cmds are:", cls._gen2nat_cmds, "******")

    '''
    def __init__(self, protoc, gen2nat_cmds, DEBUG=False):
        self._protoc = protoc
        self._gen2nat_cmds = gen2nat_cmds
        print("****** (SIM) my cmds are:", self._gen2nat_cmds, "******")

    def serve_forever(self, PORT):
        super().serve_forever(PORT, get_SocketServer_UDP_TCP_sbig)
    '''


# SHORTCUTS
#_protoc = getp()
#_gen2nat_cmds = DS_SBIG._gen2nat_cmds


8d0fc742   Etienne Pallier   Grosse refactoris...
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
class Memo:
    
    @classmethod
    def get(cls, var):
        return cls._variables[var]
    
    @classmethod
    def set(cls, var, value):
        cls._variables[var] = value
    
    _variables = {
        "C" : '20/10/19',
        "L" : '20:20:36',
        "g" : '+10',
        "t" : '+45:00:00',
    }

76c1ad1f   Etienne Pallier   Protocol et Comma...
86
87
88
89
90
91
'''
@property
def protoc(): return getp()
'''
def getp(): return DS_SBIG._protoc
def getc(): return DS_SBIG._gen2nat_cmds
8d0fc742   Etienne Pallier   Grosse refactoris...
92
93
94
95
96

# @override superclass method
def make_answer_for_request(request_bytes:bytes):
    #if request == STAMP + ":GD#": return STAMP + "+12:28"
    #if request == b'0100000000000000:GD#': return bytes(STAMP + "+12:28", "utf-8")
76c1ad1f   Etienne Pallier   Protocol et Comma...
97
    print("(SIM) Request received is", request_bytes)
8d0fc742   Etienne Pallier   Grosse refactoris...
98
99
100
101
102
103
104
105
106
107

    '''
    3 types of commands:
    - TYPE1: '06' or '050000'
    - TYPE2: ':cde#' (mainly ':??#' - i.e : ':GD#', ':GR#', ...)
    - TYPE3: 'b?#' (bC# = Cold Start, bW# = selecting Warm Start, bR# = selecting Warm Restart)
    '''
    
    # Convert to string
    request = request_bytes.decode("utf-8")
76c1ad1f   Etienne Pallier   Protocol et Comma...
108
109
    #if len(request) < getp().STAMP_LENGTH+2+1: raise UnknownCommandException(request)
    if len(request) < getp().STAMP_LENGTH+2+1: raise UnknownCommandException(request)
8d0fc742   Etienne Pallier   Grosse refactoris...
110
111
112
113
114
    
    # Remove TERMINATOR
    request = request[0:-1]
    
    # Remove leading stamp
76c1ad1f   Etienne Pallier   Protocol et Comma...
115
116
117
    stamp = request[0:getp().STAMP_LENGTH]
    command = request[getp().STAMP_LENGTH:]
    print("(SIM) Command received is", repr(command))
8d0fc742   Etienne Pallier   Grosse refactoris...
118
119
    
    # TYPE1
76c1ad1f   Etienne Pallier   Protocol et Comma...
120
    if command not in (getp().COMMAND5, getp().COMMAND6):
8d0fc742   Etienne Pallier   Grosse refactoris...
121
122
123
124
125
126
        # TYPE2 or 3
        if len(command) < 3: raise UnknownCommandException()
        if not (command[-1]=='#'): raise UnknownCommandException()
        if not (command[0]==':') and command not in ('bC#','bW#','bR#'): raise UnknownCommandException()
    
    command_start = command[1:3]
76c1ad1f   Etienne Pallier   Protocol et Comma...
127
128
    if command == getp().COMMAND6: answer = "G"
    elif command == getp().COMMAND5: answer = getp().COMMAND5
8d0fc742   Etienne Pallier   Grosse refactoris...
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
    
    #gr_request = STAMP + ':GR#' + END
    #return bytes(stamp + "15:01:48#" + TERMINATOR, "utf-8")
    elif command == ':GR#': answer = "25:01:49"
    
    #gd_request = STAMP + ':GD#' + END
    #if request == bytes(gd_request, "utf-8"): return bytes(STAMP + "+12:28#", "utf-8")
    #elif useful_request == 'GD': answer = "+12:28"
    elif command == ':GD#': answer = "+22:29"
    elif command == ':SG+00#': answer = "1"
    elif command == ':GG#': answer = "+00"
    elif command_start == 'Gv': answer = 'T'
    elif command_start in ('GC','GL', 'Gg', 'Gt'):
        answer = Memo.get(command_start[1])
        # Gemini telescope replaces "*" with ":"
        if command_start in ('Gg','Gt'): answer = answer.replace('*',':')
    else:
        # Remove ending '#'
        command = command[0:-1]
        if command_start in ('SC', 'SL', 'Sg', 'St'): Memo.set(command_start[1],command[3:])
        if command[0] == ':': command = command[1:]  
        answer = command.upper()
    
76c1ad1f   Etienne Pallier   Protocol et Comma...
152
    full_answer_in_bytes = bytes(stamp + answer + '#' + getp().TERMINATOR, "utf-8")
8d0fc742   Etienne Pallier   Grosse refactoris...
153
    #print("request str upper is", str(request).upper())
76c1ad1f   Etienne Pallier   Protocol et Comma...
154
    print("(SIM) Answer sent is", full_answer_in_bytes)
8d0fc742   Etienne Pallier   Grosse refactoris...
155
156
157
158
159
160
161
    return full_answer_in_bytes



def get_SocketServer_UDP_TCP_sbig(myhost:str="localhost", myport:int=11112, PROTOCOL:str="TCP"):
    #return get_SocketServer_UDP_TCP(myhost, myport, PROTOCOL)
    return get_SocketServer_UDP_TCP(myhost, myport, PROTOCOL, make_answer_for_request)
12c6569f   Etienne Pallier   Ajout des nouvell...

12c6569f   Etienne Pallier   Ajout des nouvell...

12c6569f   Etienne Pallier   Ajout des nouvell...