Blame view

src/monitoring/tasks.py 11.7 KB
5b5566ab   haribo   added celery
1
from __future__ import absolute_import
a190892c   haribo   Date: 05/07/2016
2
from django.conf import settings
ddf59dd4   haribo   Remaniement :
3
from common.models import *
81408f10   Etienne Pallier   Added some useful...
4
5

# (EP) OLD task base class
ac74403f   Etienne Pallier   bugfixed my wrong...
6
7
8
from celery.task import Task
# NEW task base class, but DOES NOT WORK (because celery3 and not 4 ?) !!!
#from celery import Task
81408f10   Etienne Pallier   Added some useful...
9

eabb9310   Patrick Maeght   scan captors list...
10
from src.monitoring.plc_checker import PlcChecker
ddf59dd4   haribo   Remaniement :
11
from devices.PLC import PLCController
4cb0ff36   Jeremy   Update
12
from utils.JDManipulator import *
6c2793c2   jeremy   Update
13
import json
bca9a283   Jeremy   Reworked the sche...
14
import utils.Logger as L
c72eb17a   Jeremy   Update celery task
15
import majordome.tasks
975e2c98   Quentin Durand   Auto stash before...
16
from celery import app, task
c72eb17a   Jeremy   Update celery task
17
import alert_manager.tasks
975e2c98   Quentin Durand   Auto stash before...
18
from celery import shared_task
4cb0ff36   Jeremy   Update
19
log = L.setupLogger("MonitoringTaskLogger", "Monitoring")
c5a3b4a0   haribo   Date: 05/07/2016
20

1a2dc19f   Etienne Pallier   nombreux bugfixes...
21
22
23
# EP
#DEBUG_FILE = False
DEBUG_FILE = True
ce470283   Jeremy   Plc simulator fin...
24

ce470283   Jeremy   Plc simulator fin...
25

65149de7   Jeremy   Update
26
27
'''
    Infinite task created at the program's start.
4cb0ff36   Jeremy   Update
28
    Checks the plc status, parse it, analyse it, store it in db
65149de7   Jeremy   Update
29
'''
257abe9b   Jeremy   Added comments
30
31


ef60c3ec   Jeremy   Majordome and mon...
32
33
34
class Monitoring(Task):
    timers = {}
    functions = {}
5b5566ab   haribo   added celery
35

3ed0a02b   Quentin Durand   Declaring variabl...
36
37
38
39
40
41
42
43
44
    state = None
    plcController = None
    majordome = None
    alert_manager = None
    alert_task = None
    majordome_task = None
    timer_status = None
    timer_tasks = None
    status_plc = None
eabb9310   Patrick Maeght   scan captors list...
45
    plc_checker = PlcChecker()
3ed0a02b   Quentin Durand   Declaring variabl...
46

5b5566ab   haribo   added celery
47
    def run(self):
ee2c4543   Etienne Pallier   Bugfix Monitoring...
48
        print ("AGENT ENV: startup...")
c72eb17a   Jeremy   Update celery task
49
        self.createTask()
1a2dc19f   Etienne Pallier   nombreux bugfixes...
50
51
52

        # (TRY TO) Connect to PLC
        # (non blocking if could not connect)
65149de7   Jeremy   Update
53
        self.setContext()
ee2c4543   Etienne Pallier   Bugfix Monitoring...
54
        print ("AGENT ENV: config PLC is (ip={}, port={})".format(self.plcController.ip, self.plcController.port))
1a2dc19f   Etienne Pallier   nombreux bugfixes...
55
56
57
        
        # (EP) self.setTime()
        self.setTimers()
ee2c4543   Etienne Pallier   Bugfix Monitoring...
58
        print("AGENT ENV: my timers (check env status every {}s, check other agents every {}s)".format(self.timer_status, self.timer_tasks))
1a2dc19f   Etienne Pallier   nombreux bugfixes...
59
        
c72eb17a   Jeremy   Update celery task
60
        self.setTasks()
ee2c4543   Etienne Pallier   Bugfix Monitoring...
61
        print("AGENT ENV: Other Agents id read from DB (majordome={}, alert={})".format(self.majordome_task, self.alert_task))
1a2dc19f   Etienne Pallier   nombreux bugfixes...
62
        
4cb0ff36   Jeremy   Update
63
        self.loop()
a190892c   haribo   Date: 05/07/2016
64

1a2dc19f   Etienne Pallier   nombreux bugfixes...
65

c72eb17a   Jeremy   Update celery task
66
67
    def createTask(self):
        try:
1a2dc19f   Etienne Pallier   nombreux bugfixes...
68
69
70
            # (EP) NO find() method available from Django ORM, c'est du n'importe quoi ce code (lastminute.com ?) !!!! 
            #TaskId.objects.find(task="monitoring").delete()
            TaskId.objects.filter(task="monitoring").delete()
c72eb17a   Jeremy   Update celery task
71
72
        except Exception as e:
            log.info(str(e))
1a2dc19f   Etienne Pallier   nombreux bugfixes...
73
74
            # (EP) return always if no monitoring task to delete, and so monitoring task id is never saved !!!
            #return 1
c72eb17a   Jeremy   Update celery task
75
76
77
78
79
80
81
82
83
84
85
86
        TaskId.objects.create(task_id=self.request.id, task="monitoring")
        return 0

    def setTasks(self):
        try:
            self.majordome_task = TaskId.objects.get(task="majordome")
            self.alert_task = TaskId.objects.get(task="alert_manager")
        except Exception as e:
            self.majordome_task = None
            self.alert_task = None
        return 0

65149de7   Jeremy   Update
87
    def setContext(self):
1a2dc19f   Etienne Pallier   nombreux bugfixes...
88
        self.plcController = PLCController()
4cb0ff36   Jeremy   Update
89
        self.state = "RUNNING"
65149de7   Jeremy   Update
90
        return (0)
c5a3b4a0   haribo   Date: 05/07/2016
91

1a2dc19f   Etienne Pallier   nombreux bugfixes...
92
    def setTimers(self):
ef60c3ec   Jeremy   Majordome and mon...
93
        self.timer_status = 2
c72eb17a   Jeremy   Update celery task
94
95
        self.timer_tasks = 5
        self.timers = {"timer_status": self.timer_status,
1a2dc19f   Etienne Pallier   nombreux bugfixes...
96
97
98
99
100
                       "timer_tasks": self.timer_tasks}
        # (EP)         "tasks": self.timer_tasks}
        # (EP) self.functions = {"timer_status": self.handleTimerStatus,
        self.functions = {"timer_status": self.handleStatus,
                          "timer_tasks": self.handleTasks}
4cb0ff36   Jeremy   Update
101
        return (0)
21598bc6   haribo   Date: 01/08/2016
102

975e2c98   Quentin Durand   Auto stash before...
103

bca9a283   Jeremy   Reworked the sche...
104
105
106
    def logDB(self, message: str):
        Log.objects.create(agent='Monitoring', message=message)

975e2c98   Quentin Durand   Auto stash before...
107
108
109
110
111
        '''
            Function called by the main loop to check if the majordome and alert_manager are running
        '''


c72eb17a   Jeremy   Update celery task
112
    def handleTasks(self):
1a2dc19f   Etienne Pallier   nombreux bugfixes...
113
114
115
        # Reset timer total duration
        # (EP) self.timers["tasks"] = self.timer_tasks
        self.timers["timer_tasks"] = self.timer_tasks
c72eb17a   Jeremy   Update celery task
116
117
118
        # TODO check majordome and alert_manager status
        if self.majordome_task is None:
            try:
ed1bf194   Etienne Pallier   bugfix task major...
119
120
121
                # (EP) bugfix !!!
                #self.monitoring_task = TaskId.objects.get(task="majordome")
                self.majordome_task = TaskId.objects.get(task="majordome")
c72eb17a   Jeremy   Update celery task
122
            except Exception as e:
552f1515   Etienne Pallier   New global variab...
123
                if settings.USE_CELERY:
ee2c4543   Etienne Pallier   Bugfix Monitoring...
124
125
126
127
128
129
130
131
132
                    try:
                        # Mind this will raise an Exception if RABBITMQ is not started !
                        majordome.tasks.Majordome.apply_async()
                        if settings.DEBUG and DEBUG_FILE:
                            log.info(str(e))
                    except Exception as e2:
                        print("YOU MUST START RABBITMQ before running PyROS !")
                        exit()
                else: print("USE_CELERY is false => do not create Majordome task")
1a2dc19f   Etienne Pallier   nombreux bugfixes...
133
                    
c72eb17a   Jeremy   Update celery task
134
135
136
137
        if self.alert_task is None:
            try:
                self.alert_task = TaskId.objects.get(task="alert_manager")
            except Exception as e:
ee2c4543   Etienne Pallier   Bugfix Monitoring...
138
                # Mind this will raise an Exception if RABBITMQ is not started !
552f1515   Etienne Pallier   New global variab...
139
                if settings.USE_CELERY:
ee2c4543   Etienne Pallier   Bugfix Monitoring...
140
141
142
143
144
145
146
147
                    try:
                        alert_manager.tasks.AlertListener.apply_async()
                        if settings.DEBUG and DEBUG_FILE:
                            log.info(str(e))
                    except Exception as e2:
                        print("YOU MUST START RABBITMQ before running PyROS !")
                        exit()
                else: print("USE_CELERY is false => do not create AlertListener task")
15e15006   Etienne Pallier   Nouvelles modifs ...
148
149
150
        # EP non car 0 = false
        #return 0
        return True
c72eb17a   Jeremy   Update celery task
151

257abe9b   Jeremy   Added comments
152
    '''
6c2793c2   jeremy   Update
153
154
        Check if all devices info are consitants
    '''
975e2c98   Quentin Durand   Auto stash before...
155

6c2793c2   jeremy   Update
156
157
158
159
160
161
162
163
    def isStatusValid(self):
        return True

    '''
        Extract content from the status dictionary
    '''
    def extractFromDict(self, status):
        synthesis = {}
d18e437e   Patrick Maeght   New json in
164
        devices = status["devices"]
f47ec727   Quentin Durand   hotfix monitoring
165
        #print(devices)
927b8599   Quentin Durand   Moving plc_mode a...
166
167
        #is_safe_str = status["is_safe"]
        #mode = status["mode"]
6c2793c2   jeremy   Update
168
        for device in devices:
e519f61d   Patrick Maeght   test valid device...
169
170
171
172
            if device['valid'] == "yes":
                for value in device["device_values"]:
                    synthesis[value["name"]] = value["value"]
                    synthesis[value["name"] + "_unit"] = value["unit"]
927b8599   Quentin Durand   Moving plc_mode a...
173
        return synthesis
6c2793c2   jeremy   Update
174

fe5613f5   jeremy   Update plc protocol
175
    # TODO ATTENTION SI DEUX DEVICES ONT LE MEME NOM
e519f61d   Patrick Maeght   test valid device...
176
    # TODO SAVE SYNTHESIS
927b8599   Quentin Durand   Moving plc_mode a...
177
    def saveContent(self, content):
d18e437e   Patrick Maeght   New json in
178
        devices = content[0]["devices"]
e519f61d   Patrick Maeght   test valid device...
179
        print("saveContent")
fe5613f5   jeremy   Update plc protocol
180
        for device in devices:
e519f61d   Patrick Maeght   test valid device...
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
            if device['valid'] == "yes":
                status = PlcDeviceStatus()
                try:
                    database_device = PlcDevice.objects.get(name=device["device_name"])
                except Exception as e:
                   # plc = Plc.objects.first()
                    database_device = PlcDevice.objects.create(name=device["name"])
                status.device = database_device
                for value in device["device_values"]:
                    status.setValue(value["name"], value["value"], value["unit"])
                    if   value["name"] == "OutsideTemp":
                        print(value["name"], value["value"])

                #status.setValue("mode", mode)
                #status.setValue("is_safe", is_safe_str)
                status.save()
fe5613f5   jeremy   Update plc protocol
197

6c2793c2   jeremy   Update
198
    '''
257abe9b   Jeremy   Added comments
199
200
        Parse status returned by plc
    '''
ce470283   Jeremy   Plc simulator fin...
201
202
203
    def parseStatus(self, status_plc):
        try:
            status = {}
f47ec727   Quentin Durand   hotfix monitoring
204
            dict = json.loads(status_plc)
d18e437e   Patrick Maeght   New json in
205
            if dict[0]["entity_name"] == "PLC_STATUS":
fe5613f5   jeremy   Update plc protocol
206
                if self.isStatusValid():
f47ec727   Quentin Durand   hotfix monitoring
207
                    status = self.extractFromDict(dict[0])
927b8599   Quentin Durand   Moving plc_mode a...
208
                    self.saveContent(dict)
fe5613f5   jeremy   Update plc protocol
209
210
211
                else:
                    # TODO HANDLE ERROR HERE
                    pass
ce470283   Jeremy   Plc simulator fin...
212
        except Exception as e:
f47ec727   Quentin Durand   hotfix monitoring
213

ce470283   Jeremy   Plc simulator fin...
214
215
216
217
218
219
220
            if DEBUG_FILE and settings.DEBUG:
                log.info(str(e))
            self.status_plc = {}
            return 1
        self.status_plc = status
        return 0

257abe9b   Jeremy   Added comments
221
    '''
fe5613f5   jeremy   Update plc protocol
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
        Check if string is ouside
    '''
    def isOutside(self, key):
        if key.find('Outside') != -1 or key.find('Rain') != -1\
                or key.find("Wind") != -1 or key.find("Sky") != -1:
            return True
        elif key == "Pressure":
            return True
        elif key == "DewPoint":
            return True
        return False

    def isInside(self, key):
        if key.find('Inside') != -1 or key.find('Door') != -1:
            return True
        elif key == "status":
            return True
        elif key == "current":
            return True
        return False

    '''
257abe9b   Jeremy   Added comments
244
245
        Save status returned by plc
    '''
ce470283   Jeremy   Plc simulator fin...
246
247
248
249
    def saveStatus(self):
        outside = WeatherWatch()
        inside = SiteWatch()
        for key, value in self.status_plc.items():
fe5613f5   jeremy   Update plc protocol
250
            if self.isOutside(key):
ce470283   Jeremy   Plc simulator fin...
251
                outside.setAttribute(key, value)
fe5613f5   jeremy   Update plc protocol
252
            elif self.isInside(key):
ce470283   Jeremy   Plc simulator fin...
253
254
255
256
257
                inside.setAttribute(key, value)
        outside.setGlobalStatus()
        inside.setGlobalStatus()
        outside.save()
        inside.save()
15e15006   Etienne Pallier   Nouvelles modifs ...
258
        #return 0
ce470283   Jeremy   Plc simulator fin...
259

eabb9310   Patrick Maeght   scan captors list...
260
261
    def parseNewStatus(self,status_plc ):
        # """ PM 20181009 parse new status for config
ff6e8ece   Patrick Maeght   check captors con...
262
        if status_plc.find('PLC_STATUS') >= 0:
eabb9310   Patrick Maeght   scan captors list...
263
264
            self.plc_checker.chk_config(status_plc)

257abe9b   Jeremy   Added comments
265
266
267
    '''
        Function called by the main loop to handle the plc status
    '''
1a2dc19f   Etienne Pallier   nombreux bugfixes...
268
269
    def handleStatus(self):
        # Reset timer total duration
4cb0ff36   Jeremy   Update
270
        self.timers["timer_status"] = self.timer_status
1a2dc19f   Etienne Pallier   nombreux bugfixes...
271
        status_plc = self.plcController.getStatus()
eabb9310   Patrick Maeght   scan captors list...
272
273
        # PM 20181009 parse new status for config
        self.parseNewStatus(status_plc)
f47ec727   Quentin Durand   hotfix monitoring
274

1a2dc19f   Etienne Pallier   nombreux bugfixes...
275
276
        # Error while READING status ?
        if (self.plcController.isError(status_plc)):
f47ec727   Quentin Durand   hotfix monitoring
277

ce470283   Jeremy   Plc simulator fin...
278
            if (settings.DEBUG and DEBUG_FILE):
1a2dc19f   Etienne Pallier   nombreux bugfixes...
279
                log.info("Invalid PLC status returned (while reading) : " + str(status_plc))
1b9ba1f6   Quentin Durand   Little update on ...
280
                Log.objects.create(message="Invalid PLC status returned (while reading) : ", agent="Monitoring")
15e15006   Etienne Pallier   Nouvelles modifs ...
281
282
            # EP Non car 1 = true
            #return (1)
f47ec727   Quentin Durand   hotfix monitoring
283

15e15006   Etienne Pallier   Nouvelles modifs ...
284
            return False
f47ec727   Quentin Durand   hotfix monitoring
285

ed1bf194   Etienne Pallier   bugfix task major...
286
        # (EP) if parseStatus() = THERE WAS AN ERROR !!!
1a2dc19f   Etienne Pallier   nombreux bugfixes...
287
        # Error while PARSING status ?
ce470283   Jeremy   Plc simulator fin...
288
289
        if self.parseStatus(status_plc):
            if (settings.DEBUG and DEBUG_FILE):
1a2dc19f   Etienne Pallier   nombreux bugfixes...
290
                log.info("Invalid PLC status returned (while parsing) : " + str(status_plc))
1b9ba1f6   Quentin Durand   Little update on ...
291
                Log.objects.create(message="Invalid PLC status returned (while parsing) : ", agent="Monitoring")
15e15006   Etienne Pallier   Nouvelles modifs ...
292
293
294
            # EP Non car 1 = true
            #return (1)
            return False
c7583f6e   Etienne Pallier   Cleaner isolation...
295
296
        print("Status received from PLC (read and parsed ok):")
        print(status_plc)
15e15006   Etienne Pallier   Nouvelles modifs ...
297
        #return self.saveStatus()
20ed1e7a   Etienne Pallier   bugfixed time sav...
298
299
        #print(datetime.datetime.now())

1b9ba1f6   Quentin Durand   Little update on ...
300
        Log.objects.create(message="Status received from PLC (read and parsed ok):", agent="Monitoring")
15e15006   Etienne Pallier   Nouvelles modifs ...
301
302
        self.saveStatus()
        return True
21598bc6   haribo   Date: 01/08/2016
303

257abe9b   Jeremy   Added comments
304
305
306
    '''
        Main loop
    '''
4cb0ff36   Jeremy   Update
307
    def loop(self):
f47ec727   Quentin Durand   hotfix monitoring
308
        i = 0
4cb0ff36   Jeremy   Update
309
        while (self.state != "SHUTDOWN"):
c7583f6e   Etienne Pallier   Cleaner isolation...
310
311
312
313
            print("-")
            print("-")
            print("-")
            print ("AGENT Monitoring (ENV): iteration "+str(i)+", (my state is " + self.state+") :")
4cb0ff36   Jeremy   Update
314
315
316
317
            minimal_timer = min(self.timers, key=self.timers.get)
            time.sleep(self.timers[minimal_timer])
            self.timers = {key: value - self.timers[minimal_timer] for key, value in self.timers.items()}
            for timer_name, timer_value in self.timers.items():
4cb0ff36   Jeremy   Update
318
                if (timer_value <= 0):
ef60c3ec   Jeremy   Majordome and mon...
319
                    if (timer_name in self.functions):
1a2dc19f   Etienne Pallier   nombreux bugfixes...
320
                        # Handle the timer function
4cb0ff36   Jeremy   Update
321
                        self.functions[timer_name]()
1a2dc19f   Etienne Pallier   nombreux bugfixes...
322
323
                        if (settings.DEBUG and DEBUG_FILE):
                            log.info("Timer : " + str(timer_name) + " executed by monitoring")
4cb0ff36   Jeremy   Update
324
                    else:
ce470283   Jeremy   Plc simulator fin...
325
                        if (settings.DEBUG and DEBUG_FILE):
4cb0ff36   Jeremy   Update
326
                            log.info("Timer : " + str(timer_name) + "is not known by the monitoring")
bca9a283   Jeremy   Reworked the sche...
327
                        self.logDB("Timer " + str(timer_name) + " unknown")
1a2dc19f   Etienne Pallier   nombreux bugfixes...
328
329
330
                # EP : pas au bon endroit !!!
                #if (settings.DEBUG and DEBUG_FILE):
                    #log.info("Timer : " + str(timer_name) + " executed by monitoring")
c7583f6e   Etienne Pallier   Cleaner isolation...
331
            i+=1
ef60c3ec   Jeremy   Majordome and mon...
332
333
        return (0)

1a2dc19f   Etienne Pallier   nombreux bugfixes...
334
335


ef60c3ec   Jeremy   Majordome and mon...
336
337
338
if (__name__ == "__main__"):
    m = Monitoring()
    m.run()