tasks.py 6.97 KB
from __future__ import absolute_import
from alert_mgmt.StrategyBuilder import StrategyBuilder
from common.models import *
from django.conf import settings
import time
import voeventparse
import os
import sys
from os.path import isfile, join
from decimal import Decimal
import utils.Logger as L
log = L.setupLogger("AlertListenerLogger", "AlertListener")


TIMESTAMP_JD = 2440587.500000
VOEVENTS_PATH = "alert_manager/events_received"
DEBUG_FILE = False

'''
    Launched at server start
    Listens to VOEvents, and create appropriate request when an event is received
'''


class AlertListener():
    old_files = []

    # TODO check monitoring and majordome status ?
    # def createTask(self):
    #     try:
    #         TaskId.objects.find(task="alert_manager").delete()
    #     except Exception as e:
    #         self.log(str(e))
    #         return 1
    #     TaskId.objects.create(task_id=self.request.id, task="alert_manager")
    #     return 0
    #
    # def setTasks(self):
    #     try:
    #         self.monitoring_task = TaskId.objects.get(task="monitoring")
    #         self.majordome_task = TaskId.objects.get(task="majordome")
    #     except Exception as e:
    #         self.monitoring_task = None
    #         self.alert_task = None
    #     return 0

    '''
        Is called at the beginning of the task
        Calls the function to get the new events in the VOEVENTS_PATH directory
        For each event, calls a fonction to analyze it
    '''
    def run(self):
        # self.createTask()
        self.old_files = [f for f in os.listdir(VOEVENTS_PATH) if isfile(join(VOEVENTS_PATH, f))]
        Log.objects.create(agent="Alert manager", message="Start alert manager")
        # self.setTasks()

        while True:
            print("(ALM): start new iteration")
            if (settings.DEBUG and DEBUG_FILE):
                log.info("Checking fresh events")
            fresh_events = self.get_fresh_events()
            for event in fresh_events:
                self.analyze_event(event)
                if (settings.DEBUG):
                    log.info("Analyzed event : " + str(event))
            time.sleep(1)

    '''
        Reads in the VOEVENTS_PATH directory to see if there are new events to analyze
        :returns : A list containing the new files to analyze
    '''
    def get_fresh_events(self):

        fresh_events = []
        files = [f for f in os.listdir(VOEVENTS_PATH) if isfile(join(VOEVENTS_PATH, f))]

        diff = list(set(files).difference(set(self.old_files)))

        for event in diff:
            if (settings.DEBUG):
                log.info("New file found : %s" % (event))
            Log.objects.create(agent="Alert manager", message="New file found : %s" % (event,))
            fresh_events.append(event)
        self.old_files = files
        return fresh_events

    '''
        Opens and parse the voevent file
        Will create the request & the alert object related
    '''
    def analyze_event(self, event_file):

        with open(os.path.join(VOEVENTS_PATH, event_file), 'rb') as f:
            voevent = voeventparse.load(f)
            # TODO: Faire un try/except pour gérer les mauvais fichiers
        self.create_related_request(voevent, event_file)

    '''
        Creates a request object related to the voevent received.
        For the moment, it doesn't take care of the voevent content, and hardcode the sequences etc

        :param voevent: Object resulting of the VOEvent parsing with voeventparse library
        :returns : The request
    '''
    def create_related_request(self, voevent, event_file):

        pyros_user = PyrosUser.objects.get(username="pyros")
        scientific_program = ScientificProgram.objects.get(name="GRB")

        alert = self.get_alert_attributes(voevent, event_file)

        if alert == None or StrategyObs.objects.filter(is_default=True).exists() == False:
            # TODO: Supprimer le fichier reçu ? Ou le stocker dans un dossier à part
            return

        alert.strat = StrategyObs.objects.filter(is_default=True)[0]
        name = "GRB-" + str(alert.trig_id)

        Log.objects.create(agent="Alert manager", message="Creating alert from file : %s ..." % (alert.strat.xml_file,))
        sb = StrategyBuilder()
        sb.create_request_from_strategy(alert.strat.xml_file, pyros_user, scientific_program, name)
        # (EP) Validate request AND run scheduling task !!!
        req = sb.validate()
        # (EP) Update each sequence with RA/DEC
        for seq in req.sequences.all():
            seq.ra = alert.burst_ra
            seq.dec = alert.burst_dec
            seq.save()
        alert.request_id = req
        # (EP) What does this do exactly ???
        alert.__dict__.update(req.__dict__)
        alert.save()
        Log.objects.create(agent="Alert manager", message="Alert created.")

    '''
        Parses the VOEvent to get all the required attributes
        Creates the alert object
        Handles errors (missing params, wrong params, ...)

        :param voevent: Object resulting of the VOEvent parsing with voeventparse library
        :returns : The alert object

        The parameters are a dictionnary with the "<What>" <Group>s as keys,
        and a dictionnary of their <Param> as values.
        The nested dictionnaries have the 'name' attribute as keys,
        and another dictionnary as values, containing {attribute : value} pairs
        The <Param>s without <Group> are in the dictionnary with 'None' as key
    '''
    def get_alert_attributes(self, voevent, event_file):
        v_params = voeventparse.pull_params(voevent)

        alert = Alert(voevent_file=event_file)
        try:
            alert.trig_id = v_params[None]["TrigID"]["value"]
            alert.editor = v_params[None]["Packet_Type"]["value"]
            alert.soln_status = v_params[None]["Soln_Status"]["value"]
            alert.pkt_ser_num = v_params[None]["Pkt_Ser_Num"]["value"]

            tjd = Decimal(v_params[None]["Burst_TJD"]["value"])
            sod = Decimal(v_params[None]["Burst_SOD"]["value"])
            alert.burst_jd = tjd - 13370 - 1 + 2453371 + (sod / 86400)
            alert.defly_not_grb = int(alert.soln_status[0]) & 0xF0

            alert.author = voevent.Who.AuthorIVORN
            alert.jd_send = time.mktime(time.strptime(str(voevent.Who.Date), "%Y-%m-%dT%H:%M:%S")) / 86400 + TIMESTAMP_JD
            alert.jd_received = time.time() / 86400 + TIMESTAMP_JD

            alert.astro_coord_system = voevent.WhereWhen.ObsDataLocation.ObservationLocation.AstroCoordSystem
            alert.burst_ra = voevent.WhereWhen.ObsDataLocation.ObservationLocation.AstroCoords.Position2D.Value2.C1
            alert.burst_dec = voevent.WhereWhen.ObsDataLocation.ObservationLocation.AstroCoords.Position2D.Value2.C2
            alert.error_radius = voevent.WhereWhen.ObsDataLocation.ObservationLocation.AstroCoords.Position2D.Error2Radius
        except KeyError or AttributeError as e:
            if (settings.DEBUG):
                log.info("Error while parsing VOEvent " + str(e))
            return None
        return alert