Blame view

src/monitoring/tasks.py 10.7 KB
5b5566ab   haribo   added celery
1
from __future__ import absolute_import
a190892c   haribo   Date: 05/07/2016
2
from django.conf import settings
ddf59dd4   haribo   Remaniement :
3
from common.models import *
81408f10   Etienne Pallier   Added some useful...
4
5

# (EP) OLD task base class
ac74403f   Etienne Pallier   bugfixed my wrong...
6
7
8
from celery.task import Task
# NEW task base class, but DOES NOT WORK (because celery3 and not 4 ?) !!!
#from celery import Task
81408f10   Etienne Pallier   Added some useful...
9

ddf59dd4   haribo   Remaniement :
10
from devices.PLC import PLCController
4cb0ff36   Jeremy   Update
11
from utils.JDManipulator import *
6c2793c2   jeremy   Update
12
import json
bca9a283   Jeremy   Reworked the sche...
13
import utils.Logger as L
c72eb17a   Jeremy   Update celery task
14
import majordome.tasks
975e2c98   Quentin Durand   Auto stash before...
15
from celery import app, task
c72eb17a   Jeremy   Update celery task
16
import alert_manager.tasks
975e2c98   Quentin Durand   Auto stash before...
17
from celery import shared_task
4cb0ff36   Jeremy   Update
18
log = L.setupLogger("MonitoringTaskLogger", "Monitoring")
c5a3b4a0   haribo   Date: 05/07/2016
19

1a2dc19f   Etienne Pallier   nombreux bugfixes...
20
21
22
# EP
#DEBUG_FILE = False
DEBUG_FILE = True
ce470283   Jeremy   Plc simulator fin...
23

ce470283   Jeremy   Plc simulator fin...
24

65149de7   Jeremy   Update
25
26
'''
    Infinite task created at the program's start.
4cb0ff36   Jeremy   Update
27
    Checks the plc status, parse it, analyse it, store it in db
65149de7   Jeremy   Update
28
'''
257abe9b   Jeremy   Added comments
29
30


ef60c3ec   Jeremy   Majordome and mon...
31
32
33
class Monitoring(Task):
    timers = {}
    functions = {}
5b5566ab   haribo   added celery
34

3ed0a02b   Quentin Durand   Declaring variabl...
35
36
37
38
39
40
41
42
43
44
    state = None
    plcController = None
    majordome = None
    alert_manager = None
    alert_task = None
    majordome_task = None
    timer_status = None
    timer_tasks = None
    status_plc = None

5b5566ab   haribo   added celery
45
    def run(self):
ee2c4543   Etienne Pallier   Bugfix Monitoring...
46
        print ("AGENT ENV: startup...")
c72eb17a   Jeremy   Update celery task
47
        self.createTask()
1a2dc19f   Etienne Pallier   nombreux bugfixes...
48
49
50

        # (TRY TO) Connect to PLC
        # (non blocking if could not connect)
65149de7   Jeremy   Update
51
        self.setContext()
ee2c4543   Etienne Pallier   Bugfix Monitoring...
52
        print ("AGENT ENV: config PLC is (ip={}, port={})".format(self.plcController.ip, self.plcController.port))
1a2dc19f   Etienne Pallier   nombreux bugfixes...
53
54
55
        
        # (EP) self.setTime()
        self.setTimers()
ee2c4543   Etienne Pallier   Bugfix Monitoring...
56
        print("AGENT ENV: my timers (check env status every {}s, check other agents every {}s)".format(self.timer_status, self.timer_tasks))
1a2dc19f   Etienne Pallier   nombreux bugfixes...
57
        
c72eb17a   Jeremy   Update celery task
58
        self.setTasks()
ee2c4543   Etienne Pallier   Bugfix Monitoring...
59
        print("AGENT ENV: Other Agents id read from DB (majordome={}, alert={})".format(self.majordome_task, self.alert_task))
1a2dc19f   Etienne Pallier   nombreux bugfixes...
60
        
4cb0ff36   Jeremy   Update
61
        self.loop()
a190892c   haribo   Date: 05/07/2016
62

1a2dc19f   Etienne Pallier   nombreux bugfixes...
63

c72eb17a   Jeremy   Update celery task
64
65
    def createTask(self):
        try:
1a2dc19f   Etienne Pallier   nombreux bugfixes...
66
67
68
            # (EP) NO find() method available from Django ORM, c'est du n'importe quoi ce code (lastminute.com ?) !!!! 
            #TaskId.objects.find(task="monitoring").delete()
            TaskId.objects.filter(task="monitoring").delete()
c72eb17a   Jeremy   Update celery task
69
70
        except Exception as e:
            log.info(str(e))
1a2dc19f   Etienne Pallier   nombreux bugfixes...
71
72
            # (EP) return always if no monitoring task to delete, and so monitoring task id is never saved !!!
            #return 1
c72eb17a   Jeremy   Update celery task
73
74
75
76
77
78
79
80
81
82
83
84
        TaskId.objects.create(task_id=self.request.id, task="monitoring")
        return 0

    def setTasks(self):
        try:
            self.majordome_task = TaskId.objects.get(task="majordome")
            self.alert_task = TaskId.objects.get(task="alert_manager")
        except Exception as e:
            self.majordome_task = None
            self.alert_task = None
        return 0

65149de7   Jeremy   Update
85
    def setContext(self):
1a2dc19f   Etienne Pallier   nombreux bugfixes...
86
        self.plcController = PLCController()
4cb0ff36   Jeremy   Update
87
        self.state = "RUNNING"
65149de7   Jeremy   Update
88
        return (0)
c5a3b4a0   haribo   Date: 05/07/2016
89

1a2dc19f   Etienne Pallier   nombreux bugfixes...
90
    def setTimers(self):
ef60c3ec   Jeremy   Majordome and mon...
91
        self.timer_status = 2
c72eb17a   Jeremy   Update celery task
92
93
        self.timer_tasks = 5
        self.timers = {"timer_status": self.timer_status,
1a2dc19f   Etienne Pallier   nombreux bugfixes...
94
95
96
97
98
                       "timer_tasks": self.timer_tasks}
        # (EP)         "tasks": self.timer_tasks}
        # (EP) self.functions = {"timer_status": self.handleTimerStatus,
        self.functions = {"timer_status": self.handleStatus,
                          "timer_tasks": self.handleTasks}
4cb0ff36   Jeremy   Update
99
        return (0)
21598bc6   haribo   Date: 01/08/2016
100

975e2c98   Quentin Durand   Auto stash before...
101

bca9a283   Jeremy   Reworked the sche...
102
103
104
    def logDB(self, message: str):
        Log.objects.create(agent='Monitoring', message=message)

975e2c98   Quentin Durand   Auto stash before...
105
106
107
108
109
        '''
            Function called by the main loop to check if the majordome and alert_manager are running
        '''


c72eb17a   Jeremy   Update celery task
110
    def handleTasks(self):
1a2dc19f   Etienne Pallier   nombreux bugfixes...
111
112
113
        # Reset timer total duration
        # (EP) self.timers["tasks"] = self.timer_tasks
        self.timers["timer_tasks"] = self.timer_tasks
c72eb17a   Jeremy   Update celery task
114
115
116
        # TODO check majordome and alert_manager status
        if self.majordome_task is None:
            try:
ed1bf194   Etienne Pallier   bugfix task major...
117
118
119
                # (EP) bugfix !!!
                #self.monitoring_task = TaskId.objects.get(task="majordome")
                self.majordome_task = TaskId.objects.get(task="majordome")
c72eb17a   Jeremy   Update celery task
120
            except Exception as e:
552f1515   Etienne Pallier   New global variab...
121
                if settings.USE_CELERY:
ee2c4543   Etienne Pallier   Bugfix Monitoring...
122
123
124
125
126
127
128
129
130
                    try:
                        # Mind this will raise an Exception if RABBITMQ is not started !
                        majordome.tasks.Majordome.apply_async()
                        if settings.DEBUG and DEBUG_FILE:
                            log.info(str(e))
                    except Exception as e2:
                        print("YOU MUST START RABBITMQ before running PyROS !")
                        exit()
                else: print("USE_CELERY is false => do not create Majordome task")
1a2dc19f   Etienne Pallier   nombreux bugfixes...
131
                    
c72eb17a   Jeremy   Update celery task
132
133
134
135
        if self.alert_task is None:
            try:
                self.alert_task = TaskId.objects.get(task="alert_manager")
            except Exception as e:
ee2c4543   Etienne Pallier   Bugfix Monitoring...
136
                # Mind this will raise an Exception if RABBITMQ is not started !
552f1515   Etienne Pallier   New global variab...
137
                if settings.USE_CELERY:
ee2c4543   Etienne Pallier   Bugfix Monitoring...
138
139
140
141
142
143
144
145
                    try:
                        alert_manager.tasks.AlertListener.apply_async()
                        if settings.DEBUG and DEBUG_FILE:
                            log.info(str(e))
                    except Exception as e2:
                        print("YOU MUST START RABBITMQ before running PyROS !")
                        exit()
                else: print("USE_CELERY is false => do not create AlertListener task")
15e15006   Etienne Pallier   Nouvelles modifs ...
146
147
148
        # EP non car 0 = false
        #return 0
        return True
c72eb17a   Jeremy   Update celery task
149

257abe9b   Jeremy   Added comments
150
    '''
6c2793c2   jeremy   Update
151
152
        Check if all devices info are consitants
    '''
975e2c98   Quentin Durand   Auto stash before...
153

6c2793c2   jeremy   Update
154
155
156
157
158
159
160
161
162
    def isStatusValid(self):
        return True

    '''
        Extract content from the status dictionary
    '''
    def extractFromDict(self, status):
        synthesis = {}
        devices = status["device"]
927b8599   Quentin Durand   Moving plc_mode a...
163
164
        #is_safe_str = status["is_safe"]
        #mode = status["mode"]
6c2793c2   jeremy   Update
165
166
167
168
        for device in devices:
            for value in device["values"]:
                synthesis[value["name"]] = value["value"]
                synthesis[value["name"] + "_unit"] = value["unit"]
927b8599   Quentin Durand   Moving plc_mode a...
169
        return synthesis
6c2793c2   jeremy   Update
170

fe5613f5   jeremy   Update plc protocol
171
    # TODO ATTENTION SI DEUX DEVICES ONT LE MEME NOM
927b8599   Quentin Durand   Moving plc_mode a...
172
    def saveContent(self, content):
fe5613f5   jeremy   Update plc protocol
173
174
175
176
177
178
        devices = content["device"]
        for device in devices:
            status = PlcDeviceStatus()
            try:
                database_device = PlcDevice.objects.get(name=device["name"])
            except Exception as e:
9f3db1a3   Quentin Durand   plc_mode in progr...
179
180
               # plc = Plc.objects.first()
                database_device = PlcDevice.objects.create(name=device["name"])
fe5613f5   jeremy   Update plc protocol
181
182
183
            status.device = database_device
            for value in device["values"]:
                status.setValue(value["name"], value["value"], value["unit"])
f117fdd9   theopuhl   PLC STATE + MODE
184

927b8599   Quentin Durand   Moving plc_mode a...
185
186
            #status.setValue("mode", mode)
            #status.setValue("is_safe", is_safe_str)
fe5613f5   jeremy   Update plc protocol
187
188
            status.save()

6c2793c2   jeremy   Update
189
    '''
257abe9b   Jeremy   Added comments
190
191
        Parse status returned by plc
    '''
ce470283   Jeremy   Plc simulator fin...
192
193
194
    def parseStatus(self, status_plc):
        try:
            status = {}
fe5613f5   jeremy   Update plc protocol
195
196
197
            dict = json.loads(status_plc)[0]
            if dict["name"] == "STATUS":
                if self.isStatusValid():
927b8599   Quentin Durand   Moving plc_mode a...
198
199
                    status = self.extractFromDict(dict)
                    self.saveContent(dict)
fe5613f5   jeremy   Update plc protocol
200
201
202
                else:
                    # TODO HANDLE ERROR HERE
                    pass
ce470283   Jeremy   Plc simulator fin...
203
204
205
206
207
208
209
210
        except Exception as e:
            if DEBUG_FILE and settings.DEBUG:
                log.info(str(e))
            self.status_plc = {}
            return 1
        self.status_plc = status
        return 0

257abe9b   Jeremy   Added comments
211
    '''
fe5613f5   jeremy   Update plc protocol
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
        Check if string is ouside
    '''
    def isOutside(self, key):
        if key.find('Outside') != -1 or key.find('Rain') != -1\
                or key.find("Wind") != -1 or key.find("Sky") != -1:
            return True
        elif key == "Pressure":
            return True
        elif key == "DewPoint":
            return True
        return False

    def isInside(self, key):
        if key.find('Inside') != -1 or key.find('Door') != -1:
            return True
        elif key == "status":
            return True
        elif key == "current":
            return True
        return False

    '''
257abe9b   Jeremy   Added comments
234
235
        Save status returned by plc
    '''
ce470283   Jeremy   Plc simulator fin...
236
237
238
239
    def saveStatus(self):
        outside = WeatherWatch()
        inside = SiteWatch()
        for key, value in self.status_plc.items():
fe5613f5   jeremy   Update plc protocol
240
            if self.isOutside(key):
ce470283   Jeremy   Plc simulator fin...
241
                outside.setAttribute(key, value)
fe5613f5   jeremy   Update plc protocol
242
            elif self.isInside(key):
ce470283   Jeremy   Plc simulator fin...
243
244
245
246
247
                inside.setAttribute(key, value)
        outside.setGlobalStatus()
        inside.setGlobalStatus()
        outside.save()
        inside.save()
15e15006   Etienne Pallier   Nouvelles modifs ...
248
        #return 0
ce470283   Jeremy   Plc simulator fin...
249

257abe9b   Jeremy   Added comments
250
251
252
    '''
        Function called by the main loop to handle the plc status
    '''
1a2dc19f   Etienne Pallier   nombreux bugfixes...
253
254
    def handleStatus(self):
        # Reset timer total duration
4cb0ff36   Jeremy   Update
255
        self.timers["timer_status"] = self.timer_status
1a2dc19f   Etienne Pallier   nombreux bugfixes...
256
257
258
259
        status_plc = self.plcController.getStatus()
        
        # Error while READING status ?
        if (self.plcController.isError(status_plc)):
ce470283   Jeremy   Plc simulator fin...
260
            if (settings.DEBUG and DEBUG_FILE):
1a2dc19f   Etienne Pallier   nombreux bugfixes...
261
                log.info("Invalid PLC status returned (while reading) : " + str(status_plc))
15e15006   Etienne Pallier   Nouvelles modifs ...
262
263
264
            # EP Non car 1 = true
            #return (1)
            return False
1a2dc19f   Etienne Pallier   nombreux bugfixes...
265
        
ed1bf194   Etienne Pallier   bugfix task major...
266
        # (EP) if parseStatus() = THERE WAS AN ERROR !!!
1a2dc19f   Etienne Pallier   nombreux bugfixes...
267
        # Error while PARSING status ?
ce470283   Jeremy   Plc simulator fin...
268
269
        if self.parseStatus(status_plc):
            if (settings.DEBUG and DEBUG_FILE):
1a2dc19f   Etienne Pallier   nombreux bugfixes...
270
                log.info("Invalid PLC status returned (while parsing) : " + str(status_plc))
15e15006   Etienne Pallier   Nouvelles modifs ...
271
272
273
274
            # EP Non car 1 = true
            #return (1)
            return False
        
c7583f6e   Etienne Pallier   Cleaner isolation...
275
276
        print("Status received from PLC (read and parsed ok):")
        print(status_plc)
15e15006   Etienne Pallier   Nouvelles modifs ...
277
        #return self.saveStatus()
20ed1e7a   Etienne Pallier   bugfixed time sav...
278
279
        #print(datetime.datetime.now())

15e15006   Etienne Pallier   Nouvelles modifs ...
280
281
        self.saveStatus()
        return True
21598bc6   haribo   Date: 01/08/2016
282

257abe9b   Jeremy   Added comments
283
284
285
    '''
        Main loop
    '''
4cb0ff36   Jeremy   Update
286
    def loop(self):
c7583f6e   Etienne Pallier   Cleaner isolation...
287
        i=0
4cb0ff36   Jeremy   Update
288
        while (self.state != "SHUTDOWN"):
c7583f6e   Etienne Pallier   Cleaner isolation...
289
290
291
292
            print("-")
            print("-")
            print("-")
            print ("AGENT Monitoring (ENV): iteration "+str(i)+", (my state is " + self.state+") :")
4cb0ff36   Jeremy   Update
293
294
295
296
            minimal_timer = min(self.timers, key=self.timers.get)
            time.sleep(self.timers[minimal_timer])
            self.timers = {key: value - self.timers[minimal_timer] for key, value in self.timers.items()}
            for timer_name, timer_value in self.timers.items():
4cb0ff36   Jeremy   Update
297
                if (timer_value <= 0):
ef60c3ec   Jeremy   Majordome and mon...
298
                    if (timer_name in self.functions):
1a2dc19f   Etienne Pallier   nombreux bugfixes...
299
                        # Handle the timer function
4cb0ff36   Jeremy   Update
300
                        self.functions[timer_name]()
1a2dc19f   Etienne Pallier   nombreux bugfixes...
301
302
                        if (settings.DEBUG and DEBUG_FILE):
                            log.info("Timer : " + str(timer_name) + " executed by monitoring")
4cb0ff36   Jeremy   Update
303
                    else:
ce470283   Jeremy   Plc simulator fin...
304
                        if (settings.DEBUG and DEBUG_FILE):
4cb0ff36   Jeremy   Update
305
                            log.info("Timer : " + str(timer_name) + "is not known by the monitoring")
bca9a283   Jeremy   Reworked the sche...
306
                        self.logDB("Timer " + str(timer_name) + " unknown")
1a2dc19f   Etienne Pallier   nombreux bugfixes...
307
308
309
                # EP : pas au bon endroit !!!
                #if (settings.DEBUG and DEBUG_FILE):
                    #log.info("Timer : " + str(timer_name) + " executed by monitoring")
c7583f6e   Etienne Pallier   Cleaner isolation...
310
            i+=1
ef60c3ec   Jeremy   Majordome and mon...
311
312
        return (0)

1a2dc19f   Etienne Pallier   nombreux bugfixes...
313
314


ef60c3ec   Jeremy   Majordome and mon...
315
316
317
if (__name__ == "__main__"):
    m = Monitoring()
    m.run()