plcSimulator.py
2.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import os
import sys
DEBUG_FILE = False
PACKAGE_PARENT = '..'
SCRIPT_DIR = os.path.dirname(os.path.realpath(os.path.join(os.getcwd(), os.path.expanduser(__file__))))
sys.path.append(os.path.normpath(os.path.join(SCRIPT_DIR, PACKAGE_PARENT)))
from utils.Device import Device
from utils.StatusManager import StatusManager
class PLCSimulator(Device, StatusManager):
status = {
"WeatherStationRainSensor": 0,
"WeatherStationCloudWatcher": 0,
"WeatherStationWindSensor": 10,
"WeatherStationWindDir": 'E',
"WeatherStationTemperatureSensor": 20,
"WeatherStationHumiditySensor": 50,
"WeatherStationPressureSensor": 1013,
"TelescopeRoomShutter": 0,
"TelescopeRoomLights": "off",
"TelescopeRoomDoor1": "close",
"TelescopeRoomDoor2": "close",
"TelescopeRoomTemperatureSensor": 20,
"TelescopeRoomHumiditySensor": 50,
"TelescopeRoomPressure": 1013,
}
def __init__(self, argv):
super().__init__(argv)
self.setDevice("PLC")
self.setStatusManager("plcSimulator", argv)
def plcPrint(self, string: str):
if DEBUG_FILE:
print("PLCSimulator : " + string)
def create_status(self):
status = ""
for key, value in self.status.items():
status += key + '=' + str(value) + '\n'
return status
def handleConnections(self):
for co in self.connections:
data = self.readBytes(128, co)
answer = ""
cut_data = data.split(" ")
if (len(cut_data) < 2):
self.plcPrint("Received invalid message on socket : " + data)
continue
cmd_type = cut_data[0]
cmd = cut_data[1]
args = cut_data[2:]
if (cmd_type == "GET"):
if (cmd == "GlobalStatus"):
answer = self.create_status()
else:
answer = "Invalid cmd for GET: " + cmd
self.plcPrint(answer)
else:
self.plcPrint("Ignored message " + data)
if (len(answer) > 0):
self.sendMessage(answer, co)
return (0)
def loop(self):
i = 0
if (self.ended == 0):
self.plcPrint("Not entry for telescope found in config file : " + self.config_file)
return (0)
while (True):
self.updateStatus(i)
if (self.isConnected()):
self.handleConnections()
i += 1
if (i > self.ended):
return (0)
return (1)
def run(self):
if DEBUG_FILE:
print("PLC simulator running")
self.parse()
self.configSocket()
self.loop()
return (0)
if __name__ == "__main__":
sim = PLCSimulator(sys.argv)
sim.run()