import sys import os pwd = os.environ['PROJECT_ROOT_PATH'] if pwd not in sys.path: sys.path.append(pwd) from src.core.pyros_django.agent.Agent import Agent, build_agent from common.models import Period, SP_Period, PyrosUser, SP_Period_Guest, SP_PeriodWorkflow, ScientificProgram,SP_Period_User, ScienceTheme from django.shortcuts import reverse from django.conf import settings from django.core.mail import send_mail from dateutil.relativedelta import relativedelta from django.db.models import Q from django.utils import timezone from django.test.utils import setup_test_environment import numpy as np class AgentSP(Agent): period = None # old config init # def __init__(self, config_filename=None, RUN_IN_THREAD=True,use_db_test=False): # ##if name is None: name = self.__class__.__name__ # if use_db_test: # print("USE DB TEST") # setup_test_environment() # self.TEST_COMMANDS_LIST = [""] # super().__init__(None, RUN_IN_THREAD) # next_period = Period.objects.next_period() # period = next_period # new init with obsconfig def __init__(self,use_db_test=False): ##if name is None: name = self.__class__.__name__ if use_db_test: print("USE DB TEST") setup_test_environment() super().__init__() next_period = Period.objects.next_period() period = next_period # @override def _init(self): super()._init() def associate_tac_sp_auto(self,themes,tac_users,scientific_programs): print("Associating tac to sp") matrix_tac_themes = np.zeros([len(tac_users),len(themes)]) maxtrix_themes_sp = np.zeros([len(themes),len(scientific_programs)]) matrix_tac_sp = np.zeros([len(tac_users),len(scientific_programs)]) for i,tac_user in enumerate(tac_users): for j,theme in enumerate(themes): if theme.name in tac_user.get_referee_themes_as_str(): matrix_tac_themes[i,j] = 1 for i,theme in enumerate(themes): for j,sp in enumerate(scientific_programs): if theme.id == sp.science_theme.id: maxtrix_themes_sp[i,j] = 1 matrix_tac_sp = np.dot(matrix_tac_themes,maxtrix_themes_sp) nb_tac_per_sp = np.sum(matrix_tac_sp,axis=0) next_period = Period.objects.next_period() for i,sp in enumerate(scientific_programs): if nb_tac_per_sp[i-1] == 2: # We auto assign the tac users to scientific programs print(sp) sp_period = SP_Period.objects.get(scientific_program=sp,period=next_period) available_tac_users = PyrosUser.objects.filter(referee_themes=sp.science_theme) print("available tacs :") print(available_tac_users) sp_period.referee1 = available_tac_users[0] sp_period.referee2 = available_tac_users[1] sp_period.save() #return matrix_tac_sp def change_sp_status(self,scientific_programs,new_status): print(f"---- CHANGE STATUS FOR {scientific_programs} TO {new_status}------- ") for sp in scientific_programs: if sp.status != new_status: sp.status = new_status sp.save() def send_mail_to_tac_for_evaluation(self,tac_users,next_period): domain = settings.DEFAULT_DOMAIN url = f"{domain}{reverse('list_submitted_scientific_program')}" mail_subject = '[PyROS CC] The evaluation period is now opened' mail_message = (f"Hi,\n\nYou can now evaluate scientific programs for the next period ({next_period}).\n" f"Click on the following link {url} to evaluate your assignated scientific programs." "\n\nCordially,\n\nPyROS Control Center") email_list = tac_users.values_list("email") for email in email_list: send_mail( mail_subject, mail_message, from_email=None, recipient_list=[email], fail_silently=False, ) def send_mail_to_observers_for_notification(self,sp_periods): for sp_period in sp_periods: sp_pi = sp_period.scientific_program.sp_pi scientific_program = sp_period.scientific_program domain = settings.DEFAULT_DOMAIN url = f"{domain}{reverse('sp_register',args=(scientific_program.pk,sp_period.period.pk))}" mail_subject = '[PyROS CC] New registration to a scientific program' mail_message = (f"Hi,\n\nYou were invited to join a scientific program that as been submitted using PyROS.\n" f"The name of the scientific program is {scientific_program.name} and his PI is {sp_pi.first_name} {sp_pi.last_name}.\n" f"To accept this invitation, click on the following link : {url}\n" f"Once you have joined the scientific program, you can start to submit sequences" "You might be asked to login first and will be redirected to the scientific program page.\n" "If the redirection doesn't work, click again on the link after you've logged in.\n" "If you don't own an PyROS account, go on the website in order to create an account with the same mail adress that you are using to read this mail." "\n\nCordially,\n\nPyROS Control Center") invited_observers_of_sp = SP_Period_Guest.objects.filter(SP_Period=sp_period).values("user") recipient_list = invited_observers_of_sp for invited_observer in recipient_list: send_mail( mail_subject, mail_message, from_email=None, recipient_list=[invited_observer], fail_silently=False, ) def send_mail_to_unit_users_for_tac_assignation(self): domain = settings.DEFAULT_DOMAIN url = f"{domain}{reverse('list_drafted_scientific_program')}" mail_subject = '[PyROS CC] TAC assignation to scientific programs for the next period' mail_message = (f"Hi,\n\nYou can assign TAC users to scientific programs by choosing them in the {url} page.\n" "PyROS has suggested TAC to some of the scientific programs but you can change those assignations.\n" f"The TAC assignation will be effective and couldn't be modified at the {self.period.submission_end_date}.\n" "\n\nCordially,\n\nPyROS Control Center") unit_users = PyrosUser.objects.unit_users().values_list("email",flat=True) send_mail( mail_subject, mail_message, from_email=None, recipient_list=unit_users, fail_silently=False, ) print("--------- SEND MAIL TO UNIT USERS ----------") def automatic_period_workflow(self): today = timezone.now().date() next_period = Period.objects.next_period() # check if next_period has changed if self.period != next_period: self.period = next_period # get scientific program for next_period next_sp = SP_Period.objects.filter(period=next_period) auto_validated_sp = ScientificProgram.objects.filter(is_auto_validated=True) auto_validated_sp_periods = SP_Period.objects.filter(scientific_program__in=auto_validated_sp,period=next_period) # remove auto validated sp from next_sp next_sp = next_sp.exclude(scientific_program__in=auto_validated_sp) # get all tac users tac_users = PyrosUser.objects.filter(user_level__name="TAC") # submission workflow if not SP_PeriodWorkflow.objects.filter(action=SP_PeriodWorkflow.SUBMISSION, period=self.period).exists(): print("routine automatic period workflow SUBMISSION") # if the next_period is actually in the "submission" subperiod if next_period in Period.objects.submission_periods(): # we have to assign TAC to SP themes = ScienceTheme.objects.all() # get id of scientific programs from SP_Period sp_id = next_sp.exclude(scientific_program__is_auto_validated=True).filter(Q(referee1=None)|Q(referee2=None)).values("scientific_program") # get scientific programs sp = ScientificProgram.objects.filter(id__in=sp_id).order_by("name") # if we are ten days before the end of the submission period, we have to assign TAC to scientific programs # and send a mail to the Unit users to they assign the TAC users to SP if next_period.submission_end_date + relativedelta(days=-10) == today : self.associate_tac_sp_auto(themes,tac_users,sp) # send mail to unit pi to tell him to associate TAC to SP self.send_mail_to_unit_users_for_tac_assignation() SP_PeriodWorkflow.objects.create(period=self.period,action=SP_PeriodWorkflow.SUBMISSION) if not SP_PeriodWorkflow.objects.filter(action=SP_PeriodWorkflow.EVALUATION, period=self.period).exists(): print("routine automatic period workflow EVALUATION") if next_period in Period.objects.evaluation_periods() and next_period.submission_end_date == today : next_sp = SP_Period.objects.filter(period=next_period).exclude(scientific_program__in=auto_validated_sp).filter(status=SP_Period.STATUSES_DRAFT) self.change_sp_status(next_sp,SP_Period.STATUSES_SUBMITTED) self.send_mail_to_tac_for_evaluation(tac_users,next_period) # for auto validated sp, we have to change their status self.change_sp_status(auto_validated_sp_periods,SP_Period.STATUSES_ACCEPTED) for sp in auto_validated_sp_periods: sp.is_valid = SP_Period.IS_VALID_ACCEPTED sp.save() SP_PeriodWorkflow.objects.create(period=self.period,action=SP_PeriodWorkflow.EVALUATION) if not SP_PeriodWorkflow.objects.filter(action=SP_PeriodWorkflow.VALIDATION, period=self.period).exists(): print("routine automatic period workflow VALIDATION") if next_period.unit_pi_validation_start_date == today : next_sp = SP_Period.objects.filter(period=next_period).exclude(scientific_program__in=auto_validated_sp).filter(status=SP_Period.STATUSES_SUBMITTED) self.change_sp_status(next_sp,SP_Period.STATUSES_EVALUATED) next_sp = SP_Period.objects.filter(period=next_period).exclude(scientific_program__in=auto_validated_sp).filter(status=SP_Period.STATUSES_EVALUATED) SP_PeriodWorkflow.objects.create(period=self.period,action=SP_PeriodWorkflow.VALIDATION) if not SP_PeriodWorkflow.objects.filter(action=SP_PeriodWorkflow.NOTIFICATION, period=self.period).exists(): print("routine automatic period workflow NOTIFICATION") if next_period in Period.objects.notification_periods(): next_sp_accepted = SP_Period.objects.filter(period=next_period).filter(is_valid=SP_Period.IS_VALID_ACCEPTED) self.change_sp_status(next_sp_accepted,SP_Period.STATUSES_ACCEPTED) next_sp_rejected = SP_Period.objects.filter(period=next_period).filter(is_valid=SP_Period.IS_VALID_REJECTED) self.change_sp_status(next_sp_rejected,SP_Period.STATUSES_REJECTED) next_sp_to_be_notified = next_sp.filter(status=SP_Period.STATUSES_ACCEPTED,is_valid = True) self.send_mail_to_observers_for_notification(next_sp_to_be_notified) SP_PeriodWorkflow.objects.create(period=self.period,action=SP_PeriodWorkflow.NOTIFICATION) def routine_process_body(self): print("routine automatic period workflow") print(SP_PeriodWorkflow.objects.all()) print(PyrosUser.objects.all()) for sp_period_workflow in SP_PeriodWorkflow.objects.all(): print(sp_period_workflow.period) print(sp_period_workflow.action) self.automatic_period_workflow() if __name__ == "__main__": # with thread RUN_IN_THREAD=True # with process #RUN_IN_THREAD=False print("ARGV OF AGENT SP :",sys.argv) if len(sys.argv) > 1 and sys.argv[1] == "test": print("i'm in test") agentSP = AgentSP(use_db_test=True) agentSP.run() #agent = build_agent(agentSP, RUN_IN_THREAD=True) else: agent = build_agent(AgentSP) ''' TEST_MODE, configfile = extract_parameters() agent = AgentM("AgentM", configfile, RUN_IN_THREAD) agent.setSimulatorMode(TEST_MODE) ''' print(agent) agent.run()