Blame view

src/device_controller/concrete_component/sbig/sbig_simulator.py 5.18 KB
12c6569f   Etienne Pallier   Ajout des nouvell...
1
2
3
#!/usr/bin/env python3

import sys
ed7f9eb6   Etienne Pallier   update sbig_contr...
4

51c0662b   Etienne Pallier   Lecture du mode D...
5
from device_controller.abstract_component.device_simulator import printd, DeviceSimulator, UnknownCommandException
3e61fc36   Etienne Pallier   Grosse refactoris...
6
7

# Only if we override the get_server() method to change the server type
8d0fc742   Etienne Pallier   Grosse refactoris...
8
9
10
from device_controller.channels.server_udp_or_tcp import get_SocketServer_UDP_TCP


76c1ad1f   Etienne Pallier   Protocol et Comma...
11

76c1ad1f   Etienne Pallier   Protocol et Comma...
12
class DS_SBIG(DeviceSimulator):
76c1ad1f   Etienne Pallier   Protocol et Comma...
13
14
15
16

    #_protoc = None
    #_gen2nat_cmds = {}

3e61fc36   Etienne Pallier   Grosse refactoris...
17
    '''
cd72879f   Etienne Pallier   simulateur DS_Gem...
18
    # @override
76c1ad1f   Etienne Pallier   Protocol et Comma...
19
20
21
    @classmethod
    def serve_forever(cls, PORT):
        super().serve_forever(PORT, get_SocketServer_UDP_TCP_sbig)
3e61fc36   Etienne Pallier   Grosse refactoris...
22
    '''
76c1ad1f   Etienne Pallier   Protocol et Comma...
23

cd72879f   Etienne Pallier   simulateur DS_Gem...
24
    '''
76c1ad1f   Etienne Pallier   Protocol et Comma...
25
26
27
28
    @classmethod
    def set_protoc_and_cmds(cls, protoc, gen2nat_cmds, DEBUG=False):
        cls._protoc = protoc
        cls._gen2nat_cmds = gen2nat_cmds
51c0662b   Etienne Pallier   Lecture du mode D...
29
        printd("****** (SIM SBIG) my cmds are:", cls._gen2nat_cmds, "******")
cd72879f   Etienne Pallier   simulateur DS_Gem...
30
    '''
76c1ad1f   Etienne Pallier   Protocol et Comma...
31
32
33
34
35

    '''
    def __init__(self, protoc, gen2nat_cmds, DEBUG=False):
        self._protoc = protoc
        self._gen2nat_cmds = gen2nat_cmds
51c0662b   Etienne Pallier   Lecture du mode D...
36
        printd("****** (SIM SBIG) my cmds are:", self._gen2nat_cmds, "******")
76c1ad1f   Etienne Pallier   Protocol et Comma...
37
38
39
40
41
42

    def serve_forever(self, PORT):
        super().serve_forever(PORT, get_SocketServer_UDP_TCP_sbig)
    '''


ed7f9eb6   Etienne Pallier   update sbig_contr...
43
    # @override superclass method (if named make_answer_for_request)
3e61fc36   Etienne Pallier   Grosse refactoris...
44
    @classmethod
ed7f9eb6   Etienne Pallier   update sbig_contr...
45
46
    #def make_answer_for_request(cls, request_bytes:bytes): 
    def make_answer_for_request_OTHER(cls, request_bytes:bytes):
3e61fc36   Etienne Pallier   Grosse refactoris...
47
48
        #if request == STAMP + ":GD#": return STAMP + "+12:28"
        #if request == b'0100000000000000:GD#': return bytes(STAMP + "+12:28", "utf-8")
51c0662b   Etienne Pallier   Lecture du mode D...
49
        printd("(SIM SBIG) Request received is", request_bytes)
3e61fc36   Etienne Pallier   Grosse refactoris...
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
    
        '''
        3 types of commands:
        - TYPE1: '06' or '050000'
        - TYPE2: ':cde#' (mainly ':??#' - i.e : ':GD#', ':GR#', ...)
        - TYPE3: 'b?#' (bC# = Cold Start, bW# = selecting Warm Start, bR# = selecting Warm Restart)
        '''
        
        # Convert to string
        request = request_bytes.decode("utf-8")
        if len(request) < cls.protoc.STAMP_LENGTH+2+1: raise UnknownCommandException(request)
        
        # Remove TERMINATOR
        request = request[0:-1]
        
        # Remove leading stamp
        stamp = request[0:cls.protoc.STAMP_LENGTH]
        command = request[cls.protoc.STAMP_LENGTH:]
51c0662b   Etienne Pallier   Lecture du mode D...
68
        printd("(SIM SBIG) Command received is", repr(command))
3e61fc36   Etienne Pallier   Grosse refactoris...
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
        
        # TYPE1
        if command not in (cls.protoc.COMMAND5, cls.protoc.COMMAND6):
            # TYPE2 or 3
            if len(command) < 3: raise UnknownCommandException()
            if not (command[-1]=='#'): raise UnknownCommandException()
            if not (command[0]==':') and command not in ('bC#','bW#','bR#'): raise UnknownCommandException()
        
        command_start = command[1:3]
        if command == cls.protoc.COMMAND6: answer = "G"
        elif command == cls.protoc.COMMAND5: answer = cls.protoc.COMMAND5
        
        #gr_request = STAMP + ':GR#' + END
        #return bytes(stamp + "15:01:48#" + TERMINATOR, "utf-8")
        elif command == ':GR#': answer = "25:01:49"
        
        #gd_request = STAMP + ':GD#' + END
        #if request == bytes(gd_request, "utf-8"): return bytes(STAMP + "+12:28#", "utf-8")
        #elif useful_request == 'GD': answer = "+12:28"
        elif command == ':GD#': answer = "+22:29"
        elif command == ':SG+00#': answer = "1"
        elif command == ':GG#': answer = "+00"
        elif command_start == 'Gv': answer = 'T'
        elif command_start in ('GC','GL', 'Gg', 'Gt'):
            answer = Memo.get(command_start[1])
            ##answer = getc().get_answer_for_native_command(command_start)
51c0662b   Etienne Pallier   Lecture du mode D...
95
            printd("****** (SIM SBIG) answer for native cmd", command_start, "is:", cls.get_simulated_answer_for_native_cmd(command_start))
3e61fc36   Etienne Pallier   Grosse refactoris...
96
97
98
99
100
101
102
103
104
105
106
            # Gemini telescope replaces "*" with ":"
            if command_start in ('Gg','Gt'): answer = answer.replace('*',':')
        else:
            # Remove ending '#'
            command = command[0:-1]
            ##if command_start in ('SC', 'SL', 'Sg', 'St'): Memo.set(command_start[1],command[3:])
            if command_start in ('SC', 'SL', 'Sg', 'St'): Memo.set(command_start[1],command[3:])
            if command[0] == ':': command = command[1:]  
            answer = command.upper()

        full_answer_in_bytes = bytes(stamp + answer + '#' + cls.protoc.TERMINATOR, "utf-8")
51c0662b   Etienne Pallier   Lecture du mode D...
107
108
        #printd("request str upper is", str(request).upper())
        printd("(SIM SBIG) Answer sent is", full_answer_in_bytes)
3e61fc36   Etienne Pallier   Grosse refactoris...
109
110
111
112
113
114
        return full_answer_in_bytes


    # @override superclass to change the simulator server channel type (socket, serial, ...)
    @classmethod
    def get_server(cls, myhost:str="localhost", myport:int=11112, PROTOCOL:str="TCP"):
51c0662b   Etienne Pallier   Lecture du mode D...
115
        printd("\n****************** SBIG get_server() ********************************\n")
3e61fc36   Etienne Pallier   Grosse refactoris...
116
        return get_SocketServer_UDP_TCP(myhost, myport, PROTOCOL, cls.make_answer_for_request)
8d0fc742   Etienne Pallier   Grosse refactoris...
117

76c1ad1f   Etienne Pallier   Protocol et Comma...
118
'''
3e61fc36   Etienne Pallier   Grosse refactoris...
119
120
121
def get_SocketServer_UDP_TCP_sbig(myhost:str="localhost", myport:int=11112, PROTOCOL:str="TCP"):
    #return get_SocketServer_UDP_TCP(myhost, myport, PROTOCOL)
    return get_SocketServer_UDP_TCP(myhost, myport, PROTOCOL, make_answer_for_request)
76c1ad1f   Etienne Pallier   Protocol et Comma...
122
'''
8d0fc742   Etienne Pallier   Grosse refactoris...
123

8d0fc742   Etienne Pallier   Grosse refactoris...
124

3e61fc36   Etienne Pallier   Grosse refactoris...
125
126
127
# SHORTCUTS
#_protoc = cls.protoc
#_gen2nat_cmds = DS_SBIG._gen2nat_cmds
8d0fc742   Etienne Pallier   Grosse refactoris...
128
129
130



3e61fc36   Etienne Pallier   Grosse refactoris...
131
132
133
134
'''
@property
def protoc(): return cls.protoc
'''
12c6569f   Etienne Pallier   Ajout des nouvell...
135

3e61fc36   Etienne Pallier   Grosse refactoris...
136
137
138
139
'''
def cls.protoc: return DS_SBIG._protoc
def getc(): return DS_SBIG._gen2nat_cmds
'''
12c6569f   Etienne Pallier   Ajout des nouvell...

12c6569f   Etienne Pallier   Ajout des nouvell...