plcSimulator.py
2.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import sys
import os
DEBUG_FILE = False
PACKAGE_PARENT = '..'
SCRIPT_DIR = os.path.dirname(os.path.realpath(os.path.join(os.getcwd(), os.path.expanduser(__file__))))
sys.path.append(os.path.normpath(os.path.join(SCRIPT_DIR, PACKAGE_PARENT)))
from utils.Device import Device
from utils.StatusManager import StatusManager
class PLCSimulator(Device, StatusManager):
status = {"status" : "VALID", "presence" : "No", "pressure" : 1013.25, "wind_dir" : "N", "cloud" : 0, "rain" : 0}
def __init__(self, argv):
super().__init__(argv)
self.setDevice("PLC")
self.setStatusManager("plcSimulator", argv)
def plcPrint(self, string: str):
if DEBUG_FILE:
print("PLCSimulator : " + string)
def handleConnections(self):
for co in self.connections:
data = self.readBytes(128, co)
answer = ""
cut_data = data.split(" ")
if (len(cut_data) < 2):
self.plcPrint("Received invalid message on socket : " + data)
continue
cmd_type = cut_data[0]
cmd = cut_data[1]
args = cut_data[2:]
if (cmd_type == "GET"):
if (cmd == "STATUS"):
answer = self.create_status()
else:
answer = "Invalid cmd for GET: " + cmd
self.plcPrint(answer)
else:
self.plcPrint("Ignored message " + data)
if (len(answer) > 0):
self.sendMessage(answer, co)
return (0)
def loop(self):
i = 0
if (self.ended == 0):
self.plcPrint("Not entry for telescope found in config file : " + self.config_file)
return (0)
while (True):
self.updateStatus(i)
if (self.isConnected()):
self.handleConnections()
i += 1
if (i > self.ended):
return (0)
return (1)
def run(self):
if DEBUG_FILE:
print("PLC simulator running")
self.parse()
self.configSocket()
self.loop()
return (0)
if __name__ == "__main__":
sim = PLCSimulator(sys.argv)
sim.run()