Device.py
5.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
from common.models import *
import socket
import configparser
import sys
import json
from pydoc import locate
CONFIG_FILE = "../simulators/config/socket_config.ini"
GRAMMAR_FILE = "../simulators/config/grammar.json"
class DeviceController():
'''
Generic object for the communication with all the devices (need inheritance)
'''
def __init__(self, device_name):
'''
The messages and their parameters will be defined in the objects that inherits from this class
'''
config = configparser.ConfigParser()
config.read(CONFIG_FILE)
self.ip = config.get(device_name, "ip")
self.port = int(config.get(device_name, "port"))
self.set_msgs = []
self.get_msgs = []
self.do_msgs = []
self.enums = {}
with open(GRAMMAR_FILE) as grammar_file:
grammar = json.load(grammar_file)
enums = grammar["Enums"]
for enum in enums:
self.enums[enum] = enums[enum]
if device_name[:6] == "Camera":
device = grammar["Camera"]
for item in device["set"]:
param_type = item["param_type"]
if len(param_type) < 4 or param_type[:4] != "Enum":
''' This transforms 'int' string to <int> type '''
param_type = locate(param_type)
self.set_msgs.append((item["name"], item["nb_param"], param_type))
for item in device["get"]:
self.get_msgs.append(item)
for item in device["do"]:
param_type = item["param_type"]
if len(param_type) < 4 or param_type[:4] != "Enum":
param_type = locate(param_type)
self.do_msgs.append((item["name"], item["nb_param"], param_type))
device = grammar[device_name]
for item in device["set"]:
param_type = item["param_type"]
if len(param_type) < 4 or param_type[:4] != "Enum":
param_type = locate(param_type)
self.set_msgs.append((item["name"], item["nb_param"], param_type))
for item in device["get"]:
self.get_msgs.append(item)
for item in device["do"]:
param_type = item["param_type"]
if len(param_type) < 4 or param_type[:4] != "Enum":
param_type = locate(param_type)
self.do_msgs.append((item["name"], item["nb_param"], param_type))
def init_socket(self):
if self.ip is None or self.port is None:
raise ValueError("IP and/or PORT not initialized")
# TODO: gérer un fail de connexion
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
self.sock.connect((self.ip, self.port))
except ConnectionError:
print("Connection failed to ip : " + str(self.ip) + " Port : " + str(self.port), file=sys.stderr)
return (1)
return (0)
def set(self, cmd, *args):
'''
Send a SET message to the device
'''
msg = [msg for msg in self.set_msgs if msg[0] == cmd]
if len(msg) == 0:
raise ValueError("Invalid message argument %s" % (cmd,))
msg = msg[0]
if len(args) not in msg[1]:
raise ValueError("Bad number of arguments")
if type(msg[2]) is str:
if msg[2] not in self.enums.keys():
raise TypeError("Enum %s doesn't exist. Please check the grammar.json file." % (msg[2],))
enum = self.enums[msg[2]]
for arg in args:
if arg not in enum:
raise TypeError("Bad value '%s' for enum '%s'" % (arg, msg[2]))
else:
for arg in args:
if type(arg) != msg[2]:
raise TypeError("Bad type of argument: expected %s" % (msg[2],))
# if self.init_socket() == 1:
# self._message_queue
message = "SET " + cmd + " " + " ".join([str(arg) for arg in args])
self.sock.send(bytes(message, "UTF-8"))
# print("set ", cmd)
def get(self, cmd):
'''
Send a GET message to the device
'''
if cmd not in self.get_msgs:
raise ValueError("Invalid message argument %s" % (cmd,))
self.init_socket()
message = "GET " + cmd
self.sock.send(bytes(message, "UTF-8"))
# print("get ", cmd)
# TODO: gérer le timeout
ret = self.sock.recv(1024).decode()
# print("return : ", ret)
return ret
def do(self, cmd, *args):
'''
Send a DO message to the device
'''
msg = [msg for msg in self.do_msgs if msg[0] == cmd]
if len(msg) == 0:
raise ValueError("Invalid message argument %s" % (cmd,))
msg = msg[0]
if len(args) not in msg[1]:
raise ValueError("Bad number of arguments")
if type(msg[2]) is str:
if msg[2] not in self.enums.keys():
raise TypeError("Enum %s doesn't exist. Please check the grammar.json file." % (msg[2],))
enum = self.enums[msg[2]]
for arg in args:
if arg not in enum:
raise TypeError("Bad value '%s' for enum '%s'" % (arg, msg[2]))
else:
for arg in args:
if type(arg) != msg[2]:
raise TypeError("Bad type of argument: expected %s" % (msg[2],))
self.init_socket()
message = "DO " + cmd + " " + " ".join([str(arg) for arg in args])
self.sock.send(bytes(message, "UTF-8"))
# print("do ", cmd)