Blame view

src/alert_manager/tasks.py 6.25 KB
5b5566ab   haribo   added celery
1
2
3
from __future__ import absolute_import

from celery.task import Task
77816f10   haribo   Workflow implemen...
4
from common.RequestBuilder import RequestBuilder
5b5566ab   haribo   added celery
5

94336356   haribo   Added starting fi...
6
from django.conf import settings
5b5566ab   haribo   added celery
7
8
from django.templatetags.static import static
import socket
94336356   haribo   Added starting fi...
9
from pyrosapp.models import *
5b5566ab   haribo   added celery
10
11

import time
94336356   haribo   Added starting fi...
12
import voeventparse
5b5566ab   haribo   added celery
13

94336356   haribo   Added starting fi...
14
15
import os
from os.path import isfile, join
77816f10   haribo   Workflow implemen...
16
17
import observation_manager

5b5566ab   haribo   added celery
18
19
20
IP = '127.0.0.1'
PORT = 31569

94336356   haribo   Added starting fi...
21
22
23
VOEVENTS_PATH = "alert_manager/events_received"


5b5566ab   haribo   added celery
24
25
26
27
class alert_listener(Task):
    '''
        Launched at server start
        Listens to VOEvents, and create appropriate request when an event is received
94336356   haribo   Added starting fi...
28
    '''
5b5566ab   haribo   added celery
29
30

    def run(self):
94336356   haribo   Added starting fi...
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
        '''
            Is called at the beginning of the task
            Calls the function to get the new events in the VOEVENTS_PATH directory
            For each event, calls a fonction to analyze it
        '''

        if settings.SIMULATION == False:
            self.old_files = [
                f for f in os.listdir(VOEVENTS_PATH) if isfile(join(VOEVENTS_PATH, f))]
            while True:
                fresh_events = self.get_fresh_events()
                for event in fresh_events:
                    self.analyze_event(event)
                time.sleep(1)
        else:
            self.run_simulation()

    def get_fresh_events(self):
        '''
            Reads in the VOEVENTS_PATH directory to see if there are new events to analyze
            :returns : A list containing the new files to analyze
        '''

        fresh_events = []
        files = [
            f for f in os.listdir(VOEVENTS_PATH) if isfile(join(VOEVENTS_PATH, f))]

        diff = list(set(files).difference(set(self.old_files)))

        for event in diff:
            print("New file found : %s" % (event, ))
            fresh_events.append(event)
        self.old_files = files
        return fresh_events

    def analyze_event(self, event):
        '''
            Opens and parse the voevent file
            Will create the request & the alert object related
        '''
        with open(os.path.join(VOEVENTS_PATH, event), 'rb') as f:
            voevent = voeventparse.load(f)
        print(voevent.Who.AuthorIVORN)
        request = self.create_related_request(voevent)
        Alert.objects.create(request=request, voevent_xml=event)

    def create_related_request(self, voevent):
        '''
            Creates a request object related to the voevent received.
            For the moment, it doesn't take care of the voevent content, and hardcode the sequences etc

            :returns : The request
        '''

        pyros_user = PyrosUser.objects.get(user__username="haribo")
        scientific_program = ScientificProgram.objects.get(name="GRB")
        request_builder = RequestBuilder()
        request_builder.start_new_request(
            pyros_user, scientific_program, True, name="req1")
        seq1 = request_builder.add_sequence(1, 0, 10, name="seq1")
        alb11 = request_builder.add_album(seq1, Device.NIR, name="alb11")
        request_builder.add_plan(
            alb11, Device.NIR_FILTER_1, 120, 5, name="plan111")
        alb12 = request_builder.add_album(seq1, Device.VIS, name="alb12")
        request_builder.add_plan(
            alb11, Device.VIS_FILTER_1, 180, 1, name="plan121")
        seq2 = request_builder.add_sequence(1, 0, 10, name="seq2")
        alb21 = request_builder.add_album(seq2, Device.NIR, name="alb21")
        request_builder.add_plan(
            alb21, Device.NIR_FILTER_2, 60, 3, name="plan211")
        return request_builder.validate_request()

    def run_simulation(self):
        '''
            Uses a socket to block until a message is received (simulation of a VOEvent reception)
        '''
5b5566ab   haribo   added celery
107
108
        print("run")
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
94336356   haribo   Added starting fi...
109
110
        self.server_socket.setsockopt(
            socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
5b5566ab   haribo   added celery
111
112
113
114
        self.server_socket.setblocking(True)
        self.server_socket.bind((IP, PORT))
        self.server_socket.listen(12)
        while True:
94336356   haribo   Added starting fi...
115
            # il faudra que tout ça soit dans un try ...
77816f10   haribo   Workflow implemen...
116
            print("Task en attente de signal")
5b5566ab   haribo   added celery
117
            self.server_socket.accept()
77816f10   haribo   Workflow implemen...
118
            print("signal reçu")
94336356   haribo   Added starting fi...
119
            self.alert_received_simulation()
5b5566ab   haribo   added celery
120

94336356   haribo   Added starting fi...
121
122
123
124
125
    def alert_received_simulation(self):
        '''
            Simulates a VOEvent message by reading sequences into a file
            Creates a request with them
        '''
77816f10   haribo   Workflow implemen...
126
127
128
129
130
        # je supprime les sequences et requetes déjà existantes
        Sequence.objects.all().delete()
        Request.objects.all().delete()

        Log.objects.create(agent='Alert manager', message='Alert received')
94336356   haribo   Added starting fi...
131

77816f10   haribo   Workflow implemen...
132
133
        for file_name in os.listdir(observation_manager.tasks.IMAGES_FOLDER):
            if file_name != "empty":
94336356   haribo   Added starting fi...
134
135
                file_path = os.path.join(
                    observation_manager.tasks.IMAGES_FOLDER, file_name)
77816f10   haribo   Workflow implemen...
136
                os.unlink(file_path)
94336356   haribo   Added starting fi...
137
138
139

        # j'ouvre le fichier des séquences de simulation, et j le lis ligne par
        # ligne
5b5566ab   haribo   added celery
140
141
        with open("alert_manager/simulation_sequences", 'r') as sequences_file:
            sequences = sequences_file.readlines()
77816f10   haribo   Workflow implemen...
142

94336356   haribo   Added starting fi...
143
144
        Log.objects.create(
            agent='Alert manager', message='Simulation sequences file read')
77816f10   haribo   Workflow implemen...
145

5b5566ab   haribo   added celery
146
        sequences = [sequence.strip('\n') for sequence in sequences]
77816f10   haribo   Workflow implemen...
147
148
        request_builder = RequestBuilder()
        request_builder.start_new_request()
5b5566ab   haribo   added celery
149
        for sequence in sequences:
94336356   haribo   Added starting fi...
150
151
152
153
154
155
156
157
158
            # TODO: envoyer les paramètres au lieu de la sequence en entier
            #             sequence_array = raw_sequence.split(" ")
            #             id_seq = sequence_array[0]
            #             priority = int(sequence_array[2])
            #             ''' transforms the duration (seconds) in days (86,400s in a day)'''
            #             duration = Decimal(float(sequence_array[4]) / 86400.0)
            #             jd1 = Decimal("%.8f" % float(sequence_array[5]))
            #             jd2 = Decimal("%.8f" % float(sequence_array[6]))

77816f10   haribo   Workflow implemen...
159
160
161
            request_builder.add_sequence(sequence)
        request_builder.validate_request()
        Log.objects.create(agent='Alert manager', message='Request built')
5b5566ab   haribo   added celery
162

94336356   haribo   Added starting fi...
163

5b5566ab   haribo   added celery
164
class change_obs_strategy(Task):
94336356   haribo   Added starting fi...
165
166
167
168
    '''
        Modifies the request for the current alert
        To be done
    '''
5b5566ab   haribo   added celery
169
170
171
172

    def run(self):
        time.sleep(5)
        print("change_obs_strategy")