Blame view

src/alert_manager/tasks.py 6.68 KB
5b5566ab   haribo   added celery
1
2
3
from __future__ import absolute_import

from celery.task import Task
77816f10   haribo   Workflow implemen...
4
from common.RequestBuilder import RequestBuilder
5b5566ab   haribo   added celery
5

94336356   haribo   Added starting fi...
6
from django.conf import settings
5b5566ab   haribo   added celery
7
8
from django.templatetags.static import static
import socket
94336356   haribo   Added starting fi...
9
from pyrosapp.models import *
5b5566ab   haribo   added celery
10
11

import time
94336356   haribo   Added starting fi...
12
import voeventparse
5b5566ab   haribo   added celery
13

94336356   haribo   Added starting fi...
14
15
import os
from os.path import isfile, join
77816f10   haribo   Workflow implemen...
16
import observation_manager
bb45cd4a   haribo   Date: 25/05/2016
17
import majordome.TaskManager
7a79e25b   haribo   Date: 19/05/2016
18
19
from decimal import Decimal

5b5566ab   haribo   added celery
20
21
22
IP = '127.0.0.1'
PORT = 31569

94336356   haribo   Added starting fi...
23
24
25
VOEVENTS_PATH = "alert_manager/events_received"


5b5566ab   haribo   added celery
26
27
28
29
class alert_listener(Task):
    '''
        Launched at server start
        Listens to VOEvents, and create appropriate request when an event is received
94336356   haribo   Added starting fi...
30
    '''
5b5566ab   haribo   added celery
31
32

    def run(self):
94336356   haribo   Added starting fi...
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
        '''
            Is called at the beginning of the task
            Calls the function to get the new events in the VOEVENTS_PATH directory
            For each event, calls a fonction to analyze it
        '''

        if settings.SIMULATION == False:
            self.old_files = [
                f for f in os.listdir(VOEVENTS_PATH) if isfile(join(VOEVENTS_PATH, f))]
            while True:
                fresh_events = self.get_fresh_events()
                for event in fresh_events:
                    self.analyze_event(event)
                time.sleep(1)
        else:
            self.run_simulation()

    def get_fresh_events(self):
        '''
            Reads in the VOEVENTS_PATH directory to see if there are new events to analyze
            :returns : A list containing the new files to analyze
        '''

        fresh_events = []
        files = [
            f for f in os.listdir(VOEVENTS_PATH) if isfile(join(VOEVENTS_PATH, f))]

        diff = list(set(files).difference(set(self.old_files)))

        for event in diff:
7a79e25b   haribo   Date: 19/05/2016
63
            print("New file found : %s" % (event,))
94336356   haribo   Added starting fi...
64
65
66
67
68
69
70
71
72
73
74
75
            fresh_events.append(event)
        self.old_files = files
        return fresh_events

    def analyze_event(self, event):
        '''
            Opens and parse the voevent file
            Will create the request & the alert object related
        '''
        with open(os.path.join(VOEVENTS_PATH, event), 'rb') as f:
            voevent = voeventparse.load(f)
        print(voevent.Who.AuthorIVORN)
94336356   haribo   Added starting fi...
76
77
78
79
80
81
82
83
84
85
86
87
88
        request = self.create_related_request(voevent)
        Alert.objects.create(request=request, voevent_xml=event)

    def create_related_request(self, voevent):
        '''
            Creates a request object related to the voevent received.
            For the moment, it doesn't take care of the voevent content, and hardcode the sequences etc

            :returns : The request
        '''

        pyros_user = PyrosUser.objects.get(user__username="haribo")
        scientific_program = ScientificProgram.objects.get(name="GRB")
eecfb779   haribo   Date: 26/05/2016
89
90
        v_params = voeventparse.pull_params(voevent)
        voevent_id = v_params[None]['TrigID']['value']
94336356   haribo   Added starting fi...
91
92
        request_builder = RequestBuilder()
        request_builder.start_new_request(
eecfb779   haribo   Date: 26/05/2016
93
94
95
            pyros_user, scientific_program, True, name=voevent_id)
        seq1 = request_builder.add_sequence(1, 0, 10, name=voevent_id + "_1")
        alb11 = request_builder.add_album(seq1, Device.NIR, name=voevent_id + "_11")
94336356   haribo   Added starting fi...
96
        request_builder.add_plan(
eecfb779   haribo   Date: 26/05/2016
97
98
            alb11, Device.NIR_FILTER_1, 120, 5, name=voevent_id + "_111")
        alb12 = request_builder.add_album(seq1, Device.VIS, name=voevent_id + "_12")
94336356   haribo   Added starting fi...
99
        request_builder.add_plan(
eecfb779   haribo   Date: 26/05/2016
100
101
102
            alb12, Device.VIS_FILTER_1, 180, 1, name=voevent_id + "_121")
        seq2 = request_builder.add_sequence(1, 0, 10, name=voevent_id + "_2")
        alb21 = request_builder.add_album(seq2, Device.NIR, name=voevent_id + "_21")
94336356   haribo   Added starting fi...
103
        request_builder.add_plan(
eecfb779   haribo   Date: 26/05/2016
104
            alb21, Device.NIR_FILTER_2, 60, 3, name=voevent_id + "_211")
94336356   haribo   Added starting fi...
105
106
107
108
109
110
        return request_builder.validate_request()

    def run_simulation(self):
        '''
            Uses a socket to block until a message is received (simulation of a VOEvent reception)
        '''
5b5566ab   haribo   added celery
111
112
        print("run")
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
94336356   haribo   Added starting fi...
113
114
        self.server_socket.setsockopt(
            socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
5b5566ab   haribo   added celery
115
116
117
118
        self.server_socket.setblocking(True)
        self.server_socket.bind((IP, PORT))
        self.server_socket.listen(12)
        while True:
94336356   haribo   Added starting fi...
119
            # il faudra que tout ça soit dans un try ...
77816f10   haribo   Workflow implemen...
120
            print("Task en attente de signal")
5b5566ab   haribo   added celery
121
            self.server_socket.accept()
77816f10   haribo   Workflow implemen...
122
            print("signal reçu")
94336356   haribo   Added starting fi...
123
            self.alert_received_simulation()
5b5566ab   haribo   added celery
124

94336356   haribo   Added starting fi...
125
126
    def alert_received_simulation(self):
        '''
7a79e25b   haribo   Date: 19/05/2016
127
            IMPORTANT : To do simulation, settings.SIMULATION must be set True
94336356   haribo   Added starting fi...
128
129
130
            Simulates a VOEvent message by reading sequences into a file
            Creates a request with them
        '''
7a79e25b   haribo   Date: 19/05/2016
131

77816f10   haribo   Workflow implemen...
132
        # je supprime les sequences et requetes déjà existantes
bb45cd4a   haribo   Date: 25/05/2016
133
134
135
        majordome.TaskManager.delete_pending_tasks(['execute_sequence', 'execute_plan'])
        Plan.objects.all().delete()
        Album.objects.all().delete()
77816f10   haribo   Workflow implemen...
136
        Sequence.objects.all().delete()
bb45cd4a   haribo   Date: 25/05/2016
137
        Alert.objects.all().delete()
77816f10   haribo   Workflow implemen...
138
        Request.objects.all().delete()
bb45cd4a   haribo   Date: 25/05/2016
139
        ScheduleHasSequences.objects.all().delete()
77816f10   haribo   Workflow implemen...
140
141

        Log.objects.create(agent='Alert manager', message='Alert received')
94336356   haribo   Added starting fi...
142

77816f10   haribo   Workflow implemen...
143
144
        for file_name in os.listdir(observation_manager.tasks.IMAGES_FOLDER):
            if file_name != "empty":
94336356   haribo   Added starting fi...
145
146
                file_path = os.path.join(
                    observation_manager.tasks.IMAGES_FOLDER, file_name)
77816f10   haribo   Workflow implemen...
147
                os.unlink(file_path)
94336356   haribo   Added starting fi...
148

7a79e25b   haribo   Date: 19/05/2016
149
        # j'ouvre le fichier des séquences de simulation, et je le lis ligne par ligne
5b5566ab   haribo   added celery
150
151
        with open("alert_manager/simulation_sequences", 'r') as sequences_file:
            sequences = sequences_file.readlines()
77816f10   haribo   Workflow implemen...
152

94336356   haribo   Added starting fi...
153
154
        Log.objects.create(
            agent='Alert manager', message='Simulation sequences file read')
77816f10   haribo   Workflow implemen...
155

5b5566ab   haribo   added celery
156
        sequences = [sequence.strip('\n') for sequence in sequences]
77816f10   haribo   Workflow implemen...
157
        request_builder = RequestBuilder()
eecfb779   haribo   Date: 26/05/2016
158
        request_builder.start_new_request(PyrosUser.objects.get(), ScientificProgram.objects.get(), True, name="Simulation_request")
5b5566ab   haribo   added celery
159
        for sequence in sequences:
7a79e25b   haribo   Date: 19/05/2016
160
161
162
163
164
165
166
167
            sequence_array = sequence.split(" ")
            id_seq = sequence_array[0]
            priority = int(sequence_array[2])
            ''' transforms the duration (seconds) in days (86,400s in a day)'''
            duration = Decimal(float(sequence_array[4]) / 86400.0)
            jd1 = Decimal("%.8f" % float(sequence_array[5]))
            jd2 = Decimal("%.8f" % float(sequence_array[6]))

eecfb779   haribo   Date: 26/05/2016
168
            request_builder.add_sequence(priority, jd1, jd2, name="Simulation_" + id_seq, duration=duration)
7a79e25b   haribo   Date: 19/05/2016
169

77816f10   haribo   Workflow implemen...
170
        request_builder.validate_request()
bb45cd4a   haribo   Date: 25/05/2016
171
        Alert.objects.create(request=request_builder.request, strategyobs=StrategyObs.objects.all()[0])
77816f10   haribo   Workflow implemen...
172
        Log.objects.create(agent='Alert manager', message='Request built')