Blame view

src/device_controller/concrete_component/sbig/sbig_simulator.py 4.55 KB
12c6569f   Etienne Pallier   Ajout des nouvell...
1
2
3
#!/usr/bin/env python3

import sys
cd72879f   Etienne Pallier   simulateur DS_Gem...
4
from device_controller.abstract_component.device_simulator import DeviceSimulator, UnknownCommandException
8d0fc742   Etienne Pallier   Grosse refactoris...
5
6
7
from device_controller.channels.server_udp_or_tcp import get_SocketServer_UDP_TCP


cd72879f   Etienne Pallier   simulateur DS_Gem...
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Memo:
    
    @classmethod
    def get(cls, var):
        return cls._variables[var]
    
    @classmethod
    def set(cls, var, value):
        cls._variables[var] = value
    
    _variables = {
        "C" : '20/10/19',
        "L" : '20:20:36',
        "g" : '+20',
        "t" : '+25:00:00',
    }
8d0fc742   Etienne Pallier   Grosse refactoris...
24

76c1ad1f   Etienne Pallier   Protocol et Comma...
25

76c1ad1f   Etienne Pallier   Protocol et Comma...
26
class DS_SBIG(DeviceSimulator):
76c1ad1f   Etienne Pallier   Protocol et Comma...
27
28
29
30

    #_protoc = None
    #_gen2nat_cmds = {}

cd72879f   Etienne Pallier   simulateur DS_Gem...
31
    # @override
76c1ad1f   Etienne Pallier   Protocol et Comma...
32
33
34
35
    @classmethod
    def serve_forever(cls, PORT):
        super().serve_forever(PORT, get_SocketServer_UDP_TCP_sbig)

cd72879f   Etienne Pallier   simulateur DS_Gem...
36
    '''
76c1ad1f   Etienne Pallier   Protocol et Comma...
37
38
39
40
41
    @classmethod
    def set_protoc_and_cmds(cls, protoc, gen2nat_cmds, DEBUG=False):
        cls._protoc = protoc
        cls._gen2nat_cmds = gen2nat_cmds
        print("****** (SIM) my cmds are:", cls._gen2nat_cmds, "******")
cd72879f   Etienne Pallier   simulateur DS_Gem...
42
    '''
76c1ad1f   Etienne Pallier   Protocol et Comma...
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

    '''
    def __init__(self, protoc, gen2nat_cmds, DEBUG=False):
        self._protoc = protoc
        self._gen2nat_cmds = gen2nat_cmds
        print("****** (SIM) my cmds are:", self._gen2nat_cmds, "******")

    def serve_forever(self, PORT):
        super().serve_forever(PORT, get_SocketServer_UDP_TCP_sbig)
    '''


# SHORTCUTS
#_protoc = getp()
#_gen2nat_cmds = DS_SBIG._gen2nat_cmds


8d0fc742   Etienne Pallier   Grosse refactoris...
60

76c1ad1f   Etienne Pallier   Protocol et Comma...
61
62
63
64
65
66
'''
@property
def protoc(): return getp()
'''
def getp(): return DS_SBIG._protoc
def getc(): return DS_SBIG._gen2nat_cmds
8d0fc742   Etienne Pallier   Grosse refactoris...
67
68
69
70
71

# @override superclass method
def make_answer_for_request(request_bytes:bytes):
    #if request == STAMP + ":GD#": return STAMP + "+12:28"
    #if request == b'0100000000000000:GD#': return bytes(STAMP + "+12:28", "utf-8")
76c1ad1f   Etienne Pallier   Protocol et Comma...
72
    print("(SIM) Request received is", request_bytes)
8d0fc742   Etienne Pallier   Grosse refactoris...
73
74
75
76
77
78
79
80
81
82

    '''
    3 types of commands:
    - TYPE1: '06' or '050000'
    - TYPE2: ':cde#' (mainly ':??#' - i.e : ':GD#', ':GR#', ...)
    - TYPE3: 'b?#' (bC# = Cold Start, bW# = selecting Warm Start, bR# = selecting Warm Restart)
    '''
    
    # Convert to string
    request = request_bytes.decode("utf-8")
76c1ad1f   Etienne Pallier   Protocol et Comma...
83
    if len(request) < getp().STAMP_LENGTH+2+1: raise UnknownCommandException(request)
8d0fc742   Etienne Pallier   Grosse refactoris...
84
85
86
87
88
    
    # Remove TERMINATOR
    request = request[0:-1]
    
    # Remove leading stamp
76c1ad1f   Etienne Pallier   Protocol et Comma...
89
90
91
    stamp = request[0:getp().STAMP_LENGTH]
    command = request[getp().STAMP_LENGTH:]
    print("(SIM) Command received is", repr(command))
8d0fc742   Etienne Pallier   Grosse refactoris...
92
93
    
    # TYPE1
76c1ad1f   Etienne Pallier   Protocol et Comma...
94
    if command not in (getp().COMMAND5, getp().COMMAND6):
8d0fc742   Etienne Pallier   Grosse refactoris...
95
96
97
98
99
100
        # TYPE2 or 3
        if len(command) < 3: raise UnknownCommandException()
        if not (command[-1]=='#'): raise UnknownCommandException()
        if not (command[0]==':') and command not in ('bC#','bW#','bR#'): raise UnknownCommandException()
    
    command_start = command[1:3]
76c1ad1f   Etienne Pallier   Protocol et Comma...
101
102
    if command == getp().COMMAND6: answer = "G"
    elif command == getp().COMMAND5: answer = getp().COMMAND5
8d0fc742   Etienne Pallier   Grosse refactoris...
103
104
105
106
107
108
109
110
111
112
113
114
115
116
    
    #gr_request = STAMP + ':GR#' + END
    #return bytes(stamp + "15:01:48#" + TERMINATOR, "utf-8")
    elif command == ':GR#': answer = "25:01:49"
    
    #gd_request = STAMP + ':GD#' + END
    #if request == bytes(gd_request, "utf-8"): return bytes(STAMP + "+12:28#", "utf-8")
    #elif useful_request == 'GD': answer = "+12:28"
    elif command == ':GD#': answer = "+22:29"
    elif command == ':SG+00#': answer = "1"
    elif command == ':GG#': answer = "+00"
    elif command_start == 'Gv': answer = 'T'
    elif command_start in ('GC','GL', 'Gg', 'Gt'):
        answer = Memo.get(command_start[1])
23039e1d   Etienne Pallier   Nouvelle classe G...
117
        ##answer = getc().get_answer_for_native_command(command_start)
2f0ab5e5   Etienne Pallier   bugfix simulateur
118
        print("****** (SIM) answer for native cmd", command_start, "is:", getc().get_simulated_answer_for_native_cmd(command_start))
8d0fc742   Etienne Pallier   Grosse refactoris...
119
120
121
122
123
        # Gemini telescope replaces "*" with ":"
        if command_start in ('Gg','Gt'): answer = answer.replace('*',':')
    else:
        # Remove ending '#'
        command = command[0:-1]
23039e1d   Etienne Pallier   Nouvelle classe G...
124
        ##if command_start in ('SC', 'SL', 'Sg', 'St'): Memo.set(command_start[1],command[3:])
8d0fc742   Etienne Pallier   Grosse refactoris...
125
126
127
        if command_start in ('SC', 'SL', 'Sg', 'St'): Memo.set(command_start[1],command[3:])
        if command[0] == ':': command = command[1:]  
        answer = command.upper()
cd72879f   Etienne Pallier   simulateur DS_Gem...
128

76c1ad1f   Etienne Pallier   Protocol et Comma...
129
    full_answer_in_bytes = bytes(stamp + answer + '#' + getp().TERMINATOR, "utf-8")
8d0fc742   Etienne Pallier   Grosse refactoris...
130
    #print("request str upper is", str(request).upper())
76c1ad1f   Etienne Pallier   Protocol et Comma...
131
    print("(SIM) Answer sent is", full_answer_in_bytes)
8d0fc742   Etienne Pallier   Grosse refactoris...
132
133
134
135
136
137
138
    return full_answer_in_bytes



def get_SocketServer_UDP_TCP_sbig(myhost:str="localhost", myport:int=11112, PROTOCOL:str="TCP"):
    #return get_SocketServer_UDP_TCP(myhost, myport, PROTOCOL)
    return get_SocketServer_UDP_TCP(myhost, myport, PROTOCOL, make_answer_for_request)
12c6569f   Etienne Pallier   Ajout des nouvell...

12c6569f   Etienne Pallier   Ajout des nouvell...

12c6569f   Etienne Pallier   Ajout des nouvell...