from __future__ import absolute_import from celery.task import Task from alert_manager.StrategyBuilder import StrategyBuilder from common.models import * from django.conf import settings import time import voeventparse import os import sys from os.path import isfile, join from decimal import Decimal import utils.Logger as L log = L.setupLogger("AlertListenerLogger", "AlertListener") TIMESTAMP_JD = 2440587.500000 VOEVENTS_PATH = "alert_manager/events_received" DEBUG_FILE = False ''' Launched at server start Listens to VOEvents, and create appropriate request when an event is received ''' class AlertListener(Task): old_files = [] # TODO check monitoring and majordome status ? # def createTask(self): # try: # TaskId.objects.find(task="alert_manager").delete() # except Exception as e: # self.log(str(e)) # return 1 # TaskId.objects.create(task_id=self.request.id, task="alert_manager") # return 0 # # def setTasks(self): # try: # self.monitoring_task = TaskId.objects.get(task="monitoring") # self.majordome_task = TaskId.objects.get(task="majordome") # except Exception as e: # self.monitoring_task = None # self.alert_task = None # return 0 def run(self): # self.createTask() self.old_files = [f for f in os.listdir(VOEVENTS_PATH) if isfile(join(VOEVENTS_PATH, f))] Log.objects.create(agent="Alert manager", message="Start alert manager") # self.setTasks() while True: if (settings.DEBUG and DEBUG_FILE): log.info("Checking fresh events") fresh_events = self.get_fresh_events() for event in fresh_events: self.analyze_event(event) if (settings.DEBUG): log.info("Analyzed event : " + str(event)) time.sleep(1) ''' Reads in the VOEVENTS_PATH directory to see if there are new events to analyze :returns : A list containing the new files to analyze ''' def get_fresh_events(self): fresh_events = [] files = [f for f in os.listdir(VOEVENTS_PATH) if isfile(join(VOEVENTS_PATH, f))] diff = list(set(files).difference(set(self.old_files))) for event in diff: if (settings.DEBUG): log.info("New file found : %s" % (event)) Log.objects.create(agent="Alert manager", message="New file found : %s" % (event,)) fresh_events.append(event) self.old_files = files return fresh_events ''' Opens and parse the voevent file Will create the request & the alert object related ''' def analyze_event(self, event_file): with open(os.path.join(VOEVENTS_PATH, event_file), 'rb') as f: voevent = voeventparse.load(f) # TODO: Faire un try/except pour gérer les mauvais fichiers self.create_related_request(voevent, event_file) ''' Creates a request object related to the voevent received. For the moment, it doesn't take care of the voevent content, and hardcode the sequences etc :param voevent: Object resulting of the VOEvent parsing with voeventparse library :returns : The request ''' def create_related_request(self, voevent, event_file): pyros_user = PyrosUser.objects.get(username="pyros") scientific_program = ScientificProgram.objects.get(name="GRB") alert = self.get_alert_attributes(voevent, event_file) if alert == None or StrategyObs.objects.filter(is_default=True).exists() == False: # TODO: Supprimer le fichier reçu ? Ou le stocker dans un dossier à part return alert.strat = StrategyObs.objects.filter(is_default=True)[0] name = "GRB-" + str(alert.trig_id) Log.objects.create(agent="Alert manager", message="Creating alert from file : %s ..." % (alert.strat.xml_file,)) sb = StrategyBuilder() sb.create_request_from_strategy(alert.strat.xml_file, pyros_user, scientific_program, name) # (EP) Validate request AND run scheduling task !!! req = sb.validate() # (EP) Update each sequence with RA/DEC for seq in req.sequences.all(): seq.ra = alert.burst_ra seq.dec = alert.burst_dec seq.save() alert.request_id = req # (EP) What does this do exactly ??? alert.__dict__.update(req.__dict__) alert.save() Log.objects.create(agent="Alert manager", message="Alert created.") ''' Parses the VOEvent to get all the required attributes Creates the alert object Handles errors (missing params, wrong params, ...) :param voevent: Object resulting of the VOEvent parsing with voeventparse library :returns : The alert object The parameters are a dictionnary with the "" s as keys, and a dictionnary of their as values. The nested dictionnaries have the 'name' attribute as keys, and another dictionnary as values, containing {attribute : value} pairs The s without are in the dictionnary with 'None' as key ''' def get_alert_attributes(self, voevent, event_file): v_params = voeventparse.pull_params(voevent) alert = Alert(voevent_file=event_file) try: alert.trig_id = v_params[None]["TrigID"]["value"] alert.editor = v_params[None]["Packet_Type"]["value"] alert.soln_status = v_params[None]["Soln_Status"]["value"] alert.pkt_ser_num = v_params[None]["Pkt_Ser_Num"]["value"] tjd = Decimal(v_params[None]["Burst_TJD"]["value"]) sod = Decimal(v_params[None]["Burst_SOD"]["value"]) alert.burst_jd = tjd - 13370 - 1 + 2453371 + (sod / 86400) alert.defly_not_grb = int(alert.soln_status[0]) & 0xF0 alert.author = voevent.Who.AuthorIVORN alert.jd_send = time.mktime(time.strptime(str(voevent.Who.Date), "%Y-%m-%dT%H:%M:%S")) / 86400 + TIMESTAMP_JD alert.jd_received = time.time() / 86400 + TIMESTAMP_JD alert.astro_coord_system = voevent.WhereWhen.ObsDataLocation.ObservationLocation.AstroCoordSystem alert.burst_ra = voevent.WhereWhen.ObsDataLocation.ObservationLocation.AstroCoords.Position2D.Value2.C1 alert.burst_dec = voevent.WhereWhen.ObsDataLocation.ObservationLocation.AstroCoords.Position2D.Value2.C2 alert.error_radius = voevent.WhereWhen.ObsDataLocation.ObservationLocation.AstroCoords.Position2D.Error2Radius except KeyError or AttributeError as e: if (settings.DEBUG): log.info("Error while parsing VOEvent " + str(e)) return None return alert