plcSimulator.py 2.82 KB
import os
import sys

DEBUG_FILE = False
PACKAGE_PARENT = '..'
SCRIPT_DIR = os.path.dirname(os.path.realpath(os.path.join(os.getcwd(), os.path.expanduser(__file__))))
sys.path.append(os.path.normpath(os.path.join(SCRIPT_DIR, PACKAGE_PARENT)))

from utils.Device import Device
from utils.StatusManager import StatusManager

class PLCSimulator(Device, StatusManager):
    status = {
        "WeatherStationRainSensor": 0,
        "WeatherStationCloudWatcher": 0,
        "WeatherStationWindSensor": 10,
        "WeatherStationWindDir": 'E',
        "WeatherStationTemperatureSensor": 20,
        "WeatherStationHumiditySensor": 50,
        "WeatherStationPressureSensor": 1013,
        "TelescopeRoomShutter": 0,
        "TelescopeRoomLights": "off",
        "TelescopeRoomDoor1": "close",
        "TelescopeRoomDoor2": "close",
        "TelescopeRoomTemperatureSensor": 20,
        "TelescopeRoomHumiditySensor": 50,
        "TelescopeRoomPressure": 1013,
    }

    def __init__(self, argv):
        super().__init__(argv)
        self.setDevice("PLC")
        self.setStatusManager("plcSimulator", argv)

    def plcPrint(self, string: str):
        if DEBUG_FILE:
            print("PLCSimulator : " + string)

    def create_status(self):
        status = ""
        for key, value in self.status.items():
            status += key + '=' + str(value) + '\n'
        return status

    def handleConnections(self):
        for co in self.connections:
            data = self.readBytes(128, co)
            answer = ""
            cut_data = data.split(" ")
            if (len(cut_data) < 2):
                self.plcPrint("Received invalid message on socket : " + data)
                continue
            cmd_type = cut_data[0]
            cmd = cut_data[1]
            args = cut_data[2:]
            if (cmd_type == "GET"):
                if (cmd == "GlobalStatus"):
                    answer = self.create_status()
                else:
                    answer = "Invalid cmd for GET: " + cmd
                    self.plcPrint(answer)
            else:
                self.plcPrint("Ignored message " + data)

            if (len(answer) > 0):
                self.sendMessage(answer, co)
        return (0)

    def loop(self):
        i = 0

        if (self.ended == 0):
            self.plcPrint("Not entry for telescope found in config file : " + self.config_file)
            return (0)
        while (True):
            self.updateStatus(i)
            if (self.isConnected()):
                self.handleConnections()
            i += 1
            if (i > self.ended):
                return (0)
        return (1)

    def run(self):
        if DEBUG_FILE:
            print("PLC simulator running")
        self.parse()
        self.configSocket()
        self.loop()
        return (0)

if __name__ == "__main__":
    sim = PLCSimulator(sys.argv)
    sim.run()