PLCsimulator.py
3.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import socket
import sys
import os
import json
PACKAGE_PARENT = '..'
SCRIPT_DIR = os.path.dirname(os.path.realpath(os.path.join(os.getcwd(), os.path.expanduser(__file__))))
sys.path.append(os.path.normpath(os.path.join(SCRIPT_DIR, PACKAGE_PARENT)))
from device.Device import Device
class PLCSimulator(Device):
status = { "presence" : "No", "pressure" : 1013.25, "wind_dir" : "N", "cloud" : 0, "rain" : 0}
config_file = "../config/user/conf.json"
request_path = "../resources"
conn = None
addr = None
sims = []
ended = 0
def __init__(self, argv: list(str)):
if (len(argv) > 1):
self.config_file = argv[1]
else:
self.plcPrint("Using the default configuration file : conf.json")
super().__init__("PLC")
def plcPrint(self, string: str):
print("PLCSimulator : " + string)
def configSocket(self):
self.sock.settimeout(1)
def parse(self):
try:
json_data = open(self.config_file, 'r')
except IOError:
self.plcPrint("Configuration file not found")
return (1)
json_content = json.loads(json_data.read())
for dic in json_content:
for key, value in dic.items():
if (key == "plcSimulator"):
self.sims.append(dic)
self.ended = max(dic["time"], self.ended)
return (0)
def create_status(self):
status = json.dumps(self.status)
return (status)
def handleConnection(self):
data = self.conn.recv(128).decode()
answer = ""
cut_data = data.split(" ")
if (len(cut_data) < 2):
self.plcPrint("Received invalid message on socket : " + data)
return (0)
cmd_type = cut_data[0]
cmd = cut_data[1]
args = cut_data[2:]
if (cmd_type == "GET"):
if (cmd == "STATUS"):
answer = self.create_status()
else:
answer = "Invalid cmd for GET: " + cmd
self.plcPrint(answer)
else:
self.plcPrint("Ignored message " + data)
if (len(answer) > 0):
self.conn.send(bytes(answer, "UTF-8"))
def updateStatus(self, i: int):
for dic in self.sims:
if (int(dic["time"]) == i):
for key, value in dic["plcSimulator"]:
self.status[key] = value
self.plcPrint("Element added to status : %s with value : "%(key) + value)
return (0)
def loop(self):
i = 0
while (True):
self.addr = None
self.conn = None
try:
self.conn, self.addr = self.sock.accept()
except socket.error as e:
# self.plcPrint("No connection established")
pass
i += 1
self.updateStatus(i)
if (self.addr and self.conn):
self.handleConnection()
self.conn.close()
return (0)
def run(self):
print("PLC simulator running")
self.parse()
self.configSocket()
self.loop()
return (0)
if __name__ == "__main__":
sim = PLCSimulator(sys.argv)
sim.run()