tasks.py 6.54 KB
from __future__ import absolute_import

from celery.task import Task
from common.RequestBuilder import RequestBuilder

from django.conf import settings
from django.templatetags.static import static
import socket
from pyrosapp.models import *

import time
import voeventparse

import os
from os.path import isfile, join
import observation_manager
import majordome.TaskManager
from decimal import Decimal

IP = '127.0.0.1'
PORT = 31569

VOEVENTS_PATH = "alert_manager/events_received"


class alert_listener(Task):
    '''
        Launched at server start
        Listens to VOEvents, and create appropriate request when an event is received
    '''

    def run(self):
        '''
            Is called at the beginning of the task
            Calls the function to get the new events in the VOEVENTS_PATH directory
            For each event, calls a fonction to analyze it
        '''

        if settings.SIMULATION == False:
            self.old_files = [
                f for f in os.listdir(VOEVENTS_PATH) if isfile(join(VOEVENTS_PATH, f))]
            while True:
                fresh_events = self.get_fresh_events()
                for event in fresh_events:
                    self.analyze_event(event)
                time.sleep(1)
        else:
            self.run_simulation()

    def get_fresh_events(self):
        '''
            Reads in the VOEVENTS_PATH directory to see if there are new events to analyze
            :returns : A list containing the new files to analyze
        '''

        fresh_events = []
        files = [
            f for f in os.listdir(VOEVENTS_PATH) if isfile(join(VOEVENTS_PATH, f))]

        diff = list(set(files).difference(set(self.old_files)))

        for event in diff:
            print("New file found : %s" % (event,))
            fresh_events.append(event)
        self.old_files = files
        return fresh_events

    def analyze_event(self, event):
        '''
            Opens and parse the voevent file
            Will create the request & the alert object related
        '''
        with open(os.path.join(VOEVENTS_PATH, event), 'rb') as f:
            voevent = voeventparse.load(f)
        print(voevent.Who.AuthorIVORN)
        majordome.TaskManager.delete_pending_tasks(['execute_sequence', 'execute_plan'])
        request = self.create_related_request(voevent)
        Alert.objects.create(request=request, voevent_xml=event)

    def create_related_request(self, voevent):
        '''
            Creates a request object related to the voevent received.
            For the moment, it doesn't take care of the voevent content, and hardcode the sequences etc

            :returns : The request
        '''

        pyros_user = PyrosUser.objects.get(user__username="haribo")
        scientific_program = ScientificProgram.objects.get(name="GRB")
        request_builder = RequestBuilder()
        request_builder.start_new_request(
            pyros_user, scientific_program, True, name="req1")
        seq1 = request_builder.add_sequence(1, 0, 10, name="seq1")
        alb11 = request_builder.add_album(seq1, Device.NIR, name="alb11")
        request_builder.add_plan(
            alb11, Device.NIR_FILTER_1, 120, 5, name="plan111")
        alb12 = request_builder.add_album(seq1, Device.VIS, name="alb12")
        request_builder.add_plan(
            alb12, Device.VIS_FILTER_1, 180, 1, name="plan121")
        seq2 = request_builder.add_sequence(1, 0, 10, name="seq2")
        alb21 = request_builder.add_album(seq2, Device.NIR, name="alb21")
        request_builder.add_plan(
            alb21, Device.NIR_FILTER_2, 60, 3, name="plan211")
        return request_builder.validate_request()

    def run_simulation(self):
        '''
            Uses a socket to block until a message is received (simulation of a VOEvent reception)
        '''
        print("run")
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setsockopt(
            socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.setblocking(True)
        self.server_socket.bind((IP, PORT))
        self.server_socket.listen(12)
        while True:
            # il faudra que tout ça soit dans un try ...
            print("Task en attente de signal")
            self.server_socket.accept()
            print("signal reçu")
            self.alert_received_simulation()

    def alert_received_simulation(self):
        '''
            IMPORTANT : To do simulation, settings.SIMULATION must be set True
            Simulates a VOEvent message by reading sequences into a file
            Creates a request with them
        '''

        # je supprime les sequences et requetes déjà existantes
        majordome.TaskManager.delete_pending_tasks(['execute_sequence', 'execute_plan'])
        Plan.objects.all().delete()
        Album.objects.all().delete()
        Sequence.objects.all().delete()
        Alert.objects.all().delete()
        Request.objects.all().delete()
        ScheduleHasSequences.objects.all().delete()

        Log.objects.create(agent='Alert manager', message='Alert received')

        for file_name in os.listdir(observation_manager.tasks.IMAGES_FOLDER):
            if file_name != "empty":
                file_path = os.path.join(
                    observation_manager.tasks.IMAGES_FOLDER, file_name)
                os.unlink(file_path)

        # j'ouvre le fichier des séquences de simulation, et je le lis ligne par ligne
        with open("alert_manager/simulation_sequences", 'r') as sequences_file:
            sequences = sequences_file.readlines()

        Log.objects.create(
            agent='Alert manager', message='Simulation sequences file read')

        sequences = [sequence.strip('\n') for sequence in sequences]
        request_builder = RequestBuilder()
        request_builder.start_new_request(PyrosUser.objects.get(), ScientificProgram.objects.get(), True)
        for sequence in sequences:
            sequence_array = sequence.split(" ")
            id_seq = sequence_array[0]
            priority = int(sequence_array[2])
            ''' transforms the duration (seconds) in days (86,400s in a day)'''
            duration = Decimal(float(sequence_array[4]) / 86400.0)
            jd1 = Decimal("%.8f" % float(sequence_array[5]))
            jd2 = Decimal("%.8f" % float(sequence_array[6]))

            request_builder.add_sequence(priority, jd1, jd2, name=id_seq, duration=duration)

        request_builder.validate_request()
        Alert.objects.create(request=request_builder.request, strategyobs=StrategyObs.objects.all()[0])
        Log.objects.create(agent='Alert manager', message='Request built')