from django.core.paginator import Paginator from src.pyros_logger import log from pprint import pprint import mimetypes from django.http import HttpResponse from django.utils.encoding import smart_str from wsgiref.util import FileWrapper from collections import OrderedDict from django.utils.decorators import method_decorator from django.views.decorators.csrf import csrf_exempt from django.shortcuts import get_object_or_404, render, redirect from common.models import * from django.db.models import Q from django.contrib.auth.decorators import login_required from src.core.pyros_django.dashboard.decorator import level_required import ast import os import datetime from src.core.pyros_django.obsconfig.obsconfig_class import OBSConfig from .forms import RequestForm, SequenceForm, AlbumForm, PlanForm, uneditablePlanForm from .validators import check_plan_validity, check_album_validity, check_sequence_validity, check_request_validity from .functions import check_sequence_file_validity from .RequestSerializer import RequestSerializer import scheduler from django.contrib import messages from django.http import JsonResponse """ logger """ from django.conf import settings # "import utils.Logger as l # "log = l.setupLogger("routine_manager-views", "routine_manager-views") import yaml import json """ XML Export / Import utils """ SAVED_REQUESTS_FOLDER = "misc/saved_requests/" @login_required @level_required("Admin", "Unit-PI", "Observer") def sequences_list(request, status=0, message="", import_response=None): """ Retrieves and display the routines list (routine manager main page) """ if settings.DEBUG: #log.info("From sequences_list") pass if status == "-1": error = True elif status == "1": success = True # TODO: uncomment for alert filter # requests_objs = Request.objects.filter(pyros_user=request.user).filter(is_alert=False).order_by("-updated") sp_of_user = request.user.get_scientific_program() sequences_objs = Sequence.objects.filter(scientific_program__in=sp_of_user).order_by("-updated") #sp_where_user_is_sp_pi = request.user.get_scientific_program_where_user_is_sp_pi() #sequences_objs = Sequence.objects.filter( # pyros_user=request.user).order_by("-updated") # if sp_where_user_is_sp_pi.count() > 0: # sequences_objs = sequences_objs | Sequence.objects.filter( # scientific_program__in=sp_where_user_is_sp_pi).order_by("-updated") sequences = [] if request.session.get("role") in ("Admin", "Unit-PI"): sequences_objs = Sequence.objects.all().order_by("-updated") paginator = Paginator(sequences_objs, settings.NB_ELEMENT_PER_PAGE) page_number = request.GET.get("page") page_obj = paginator.get_page(page_number) for seq in page_obj: nb_album = Album.objects.filter(sequence=seq).count nb_plan = Plan.objects.filter( album__in=Album.objects.filter(sequence=seq).values("id")).count sequences.append( {"seq": seq, "nb_album": nb_album, "nb_plan": nb_plan}) #print("REQ/views: Les requetes sont:") # requests_objs = Request.objects.filter(pyros_user=request.user).order_by("-updated") # print("REQ/views: Les requetes sont:") # requests = [] # for req in requests_objs: # #print("- REQ/views: requete", req) # sequences = req.sequences # nb_executed = sequences.filter(status=Sequence.EXECUTED).count # nb_cancelled = sequences.filter(Q(status=Sequence.INVALID) | Q(status=Sequence.CANCELLED) # | Q(status=Sequence.UNPLANNABLE)).count() # requests.append({'req': req, 'nb_seq': sequences.count(), 'nb_executed': nb_executed, 'nb_cancelled': nb_cancelled}) print("REQ/views: ICI:") # opens template src/routine_manager/templates/routine_manager/sequences_list.html with all local variables return render(request, "routine_manager/sequences_list.html", locals()) @login_required def action_request(request, req_id, action, status=0, message=""): """ Apply actions (view, edit, delete) on a given request """ req = Request.objects.get(id=req_id) depth_level = 1 if settings.DEBUG: #log.info("From action_request") pass if status == "-1": error = True elif status == "1": success = True if req.submitted == True and action == "edit": error = True message = "You can't edit a submitted request" action = "view" if check_request_validity(req) == True: return redirect(unsubmit_request, req_id) if action == "edit": form = RequestForm(instance=req) edit = True return render(request, "routine_manager/view_request.html", locals()) elif action == "view": form = RequestForm(instance=req, readonly=True) edit = False return render(request, "routine_manager/view_request.html", locals()) elif action == "delete": req.delete() return redirect(sequences_list, status='1', message="Successfully deleted the request") return redirect(sequences_list) @login_required def request_validate(request, req_id): """ Called when the request form was validated. Possible actions : Cancel, Save, Save and add sequence, Delete """ action = request.POST.get("action") req = Request.objects.get(id=req_id) form = RequestForm(instance=req, data=request.POST) if (settings.DEBUG): #log.info("From request_validate") pass if action == "cancel": if req.name == "New request": req.delete() return redirect(sequences_list, status=1, message="Cancelled request modification") elif action == "delete": return redirect(action_request, req_id, "delete") elif form.is_valid(): req = form.save() if action == "save": return redirect(action_request, req_id=req_id, action="edit", status=1, message="Request saved") if action == "save_and_add": return redirect(create_sequence, req_id=req_id) else: error = True message = "Please check your field's validity" depth_level = 1 edit = True return render(request, "routine_manager/view_request.html", locals()) @login_required @level_required("Admin", "Unit-PI", "Observer") def action_sequence(request, seq_id, action, status=0, message=""): """ Apply actions (view, edit, delete) on a given sequence """ unit_name = os.environ["unit_name"] config = OBSConfig(os.environ["PATH_TO_OBSCONF_FILE"], unit_name) seq_id = int(seq_id) seq = Sequence.objects.get(id=seq_id) depth_level = 1 if (settings.DEBUG): #log.info("From action_sequence") pass if status == "-1": error = True elif status == "1": success = True if seq.status != Sequence.DRAFT and action == "edit": pass # error = True # message = "You can't edit a submitted request" # messages.add_message( # request, messages.WARNING, "You can't edit a submitted request") # action = "view" horizon_line = config.getHorizonLine(config.unit_name) horizon_line_svg = f'
(values : {horizon_line})
' if request.session.get("role") in ("Admin", "Unit-PI"): sp_list = ScientificProgram.objects.observable_programs() else: sp_of_user = request.user.get_scientific_program() sp_list = ScientificProgram.objects.observable_programs().filter(id__in=sp_of_user) if seq.pyros_user != None and seq.scientific_program != None and request.session.get("role") == "Observer" and seq.scientific_program not in sp_of_user: messages.add_message( request, messages.INFO, "You can't acces to this page because this sequence is linked to a scientific program that you aren't part of") return redirect(sequences_list) if action == "edit": if len(sp_list) == 0: messages.add_message( request, messages.INFO, "Can't submit a Sequence : there is no scientific program to be assigned with") return redirect(sequences_list) form = SequenceForm(instance=seq, data_from_config=config.getEditableAttributesOfMount( config.unit_name), layouts=config.get_layouts(config.unit_name), sp_list=sp_list) edit = True return render(request, "routine_manager/view_sequence.html", locals()) elif action == "view": form = SequenceForm(instance=seq, data_from_config=config.getEditableAttributesOfMount( config.unit_name), layouts=config.get_layouts(config.unit_name), sp_list=sp_list, readonly=True) edit = False return render(request, "routine_manager/view_sequence.html", locals()) elif action == "delete": if seq.status in (Sequence.DRAFT, Sequence.TOBEPLANNED): log.info( f"User {request.user} did action delete sequence {seq.name}") seq.delete() # if check_sequence_validity(seq) == True: # return redirect(unsubmit_request, req_id) return redirect(sequences_list) return redirect(sequences_list) @login_required @level_required("Admin", "Unit-PI", "Observer") def unsubmit_sequence(request, seq_id): seq = Sequence.objects.get(id=int(seq_id)) status = seq.status message = "" if status == Sequence.TOBEPLANNED: seq.status = Sequence.DRAFT seq.save() message = "Sequence unsubmitted" else: message = "The sequence isn't submitted." messages.add_message(request,messages.INFO,message) return redirect(action_sequence, seq_id, "view") @login_required @level_required("Admin", "Unit-PI", "Observer") def sequence_validate(request, seq_id): """ Called when the sequence form was validated. Possible actions : Cancel, Save, Save and add album, Delete """ if (settings.DEBUG): #log.info("From sequence_validate") pass unit_name = os.environ["unit_name"] config = OBSConfig(os.environ["PATH_TO_OBSCONF_FILE"], unit_name) seq_id = int(seq_id) action = request.POST.get("action") seq = Sequence.objects.get(id=seq_id) form = SequenceForm(instance=seq, data=request.POST, data_from_config=config.getEditableAttributesOfMount( config.unit_name), layouts=config.get_layouts(config.unit_name)) seq.last_modified_by = request.user seq.save() if action == "cancel": seq.delete() return redirect(sequences_list) elif action == "delete": return redirect(action_sequence, seq_id, "delete") elif action == "check_validity": seq.save() is_seq_valid = check_sequence_validity(seq) for album in Album.objects.filter(sequence=seq): for plan in Plan.objects.filter(album=album): check_plan_validity(plan) message = "" if is_seq_valid == True: message = f"The sequence is valid and can be submitted." else: message = f"The sequence isn't valid. Check if your sequence have, at least, an album and that album has one plan at minimum.\ Each album needs at least one plan." messages.add_message(request, messages.INFO, message) return redirect(action_sequence, seq_id, "edit") elif form.is_valid(): seq = form.save() if action == "save": message = "Sequence saved" messages.add_message(request, messages.INFO, message) return redirect(sequences_list, status=1, message="Sequence saved") # return redirect(action_sequence, seq_id=seq_id, action="edit", status=1, message="Sequence saved") elif action == "save_and_do_nothing": return redirect(action_sequence, seq_id=seq_id, action="edit", status=1, message="Sequence saved") if action == "save_and_add": albums = config.getLayoutByName( unit_name=config.unit_name, name_of_layout=seq.config_attributes["layout"])["ALBUMS"] for album_name in albums: album_desc = config.getAlbumByName( config.unit_name, album_name).get("description", "") create_album(request, seq_id, album_name=album_name, desc=album_desc) # return redirect(create_album, seq_id=seq_id) messages.add_message(request, messages.INFO, "Created albums according to chosen layouts") return redirect(action_sequence, seq.id, "edit") if action == "save_and_submit": if check_sequence_validity(seq): seq.status = Sequence.TOBEPLANNED seq.save() message = "Sequence submitted" messages.add_message(request, messages.INFO, message) log.info( f"User {request.user} did action submit sequence {seq} for period {seq.period} ") return redirect(sequences_list) else: message = "Can't submit sequence because it's incomplete (Need at least 1 album with 1 plan)" messages.add_message(request, messages.ERROR, message) status = 1 edit = True return render(request, "routine_manager/view_sequence.html", locals()) else: error = True message = "Please check your field's validity" depth_level = 1 edit = True return render(request, "routine_manager/view_sequence.html", locals()) @login_required @level_required("Admin", "Unit-PI", "Observer") def action_album(request, alb_id, action, status=0, message=""): """ Apply actions (view, edit, delete) on a given album """ unit_name = os.environ["unit_name"] config = OBSConfig(os.environ["PATH_TO_OBSCONF_FILE"], unit_name) alb_id = int(alb_id) alb = Album.objects.get(id=alb_id) seq_id = alb.sequence.id depth_level = 2 seq = alb.sequence if (settings.DEBUG): #log.info("From action_album") pass if status == "-1": error = True elif status == "1": success = True if alb.sequence.status != Sequence.DRAFT and action == "edit": error = True message = "You can't edit a submitted request" action = "view" # if check_album_validity(alb) == True: # return redirect(unsubmit_request, req_id) # pass if action == "edit": form = AlbumForm(instance=alb, readonly=True) edit = True return render(request, "routine_manager/view_album.html", locals()) elif action == "view": form = AlbumForm(instance=alb, readonly=True) edit = False return render(request, "routine_manager/view_album.html", locals()) elif action == "delete": log.info( f"User {request.user} did action delete Album {alb.name} of Sequence {seq}") alb.delete() return redirect(action_sequence, seq_id=alb.sequence.id, action="edit", status='1', message="Successfully deleted the album") return redirect(sequences_list) @login_required @level_required("Admin", "Unit-PI", "Observer") def album_validate(request, alb_id): """ Called when the album form was validated. Possible actions : Cancel, Save, Save and add plan, Delete """ unit_name = os.environ["unit_name"] config = OBSConfig(os.environ["PATH_TO_OBSCONF_FILE"], unit_name) alb_id = int(alb_id) action = request.POST.get("action") alb = Album.objects.get(id=alb_id) form = AlbumForm(instance=alb, data=request.POST) alb.sequence.last_modified_by = request.user alb.sequence.save() if (settings.DEBUG): #log.info("From album_validate") pass if action == "cancel": alb.delete() return redirect(action_sequence, seq_id=alb.sequence.id, action="edit", status='1', message="Cancelled album modification") elif action == "delete": return redirect(action_album, alb_id, "delete") elif form.is_valid(): alb = form.save() if action == "save": return redirect(action_sequence, seq_id=alb.sequence.id, action="edit") # return redirect(action_album, alb_id=alb_id, action="edit", status=1, message="Album saved") if action == "save_and_add": return redirect(create_plan, alb_id=alb_id) else: error = True message = "Please check your field's validity" depth_level = 2 edit = True seq_id = alb.sequence.id action = "edit" return render(request, "routine_manager/view_album.html", locals()) @login_required @level_required("Admin", "Unit-PI", "Observer") def action_plan(request, plan_id, action, status=0, message=""): """ Apply actions (view, edit, delete) on a given plan """ unit_name = os.environ["unit_name"] config = OBSConfig(os.environ["PATH_TO_OBSCONF_FILE"], unit_name) plan_id = int(plan_id) plan = Plan.objects.get(id=plan_id) album_name = plan.album.name channel = config.getAlbumByName( config.unit_name, album_name)["CHANNELS"][0] data_from_config = config.getEditableAttributesOfChannel( config.unit_name, channel) seq_id = plan.album.sequence.id seq = plan.album.sequence alb_id = plan.album.id depth_level = 3 # get index of plan within the album. Adding one for human comprehension index_of_plan_in_album = list(plan.album.plans.all( ).values_list("id", flat=True)).index(plan.id) + 1 if (settings.DEBUG): #log.info("From action_plan") pass if status == "-1": error = True elif status == "1": success = True if plan.album.sequence.status != Sequence.DRAFT and action == "edit": error = True message = "You can't edit a submitted sequence" action = "view" if action == "edit": form = PlanForm(edited_plan=plan, data_from_config=data_from_config) uneditableForm = uneditablePlanForm( data_from_config=config.getUneditableAttributesOfChannel(config.unit_name, channel)) edit = True return render(request, "routine_manager/view_plan.html", locals()) elif action == "view": form = PlanForm(edited_plan=plan, data_from_config=data_from_config, readonly=True) uneditableForm = uneditablePlanForm( data_from_config=config.getUneditableAttributesOfChannel(config.unit_name, channel)) edit = False return render(request, "routine_manager/view_plan.html", locals()) elif action == "delete": log.info( f"User {request.user} did action delete Plan {plan.id} of Sequence {seq}") plan.delete() check_sequence_validity(seq) check_album_validity(plan.album) return redirect(action_album, alb_id=plan.album.id, action="edit", status='1', message="Successfully deleted the plan") return redirect(sequences_list) @login_required @level_required("Admin", "Unit-PI", "Observer") def plan_validate(request, plan_id): """ Called when the plan form was validated. Possible actions : Cancel, Save, Delete """ unit_name = os.environ["unit_name"] config = OBSConfig(os.environ["PATH_TO_OBSCONF_FILE"], unit_name) plan_id = int(plan_id) action = request.POST.get("action") plan = Plan.objects.get(id=plan_id) seq = plan.album.sequence seq.last_modified_by = request.user seq.save() # name_of_channel_group = plan.album.name_of_channel_group # channel = None # if name_of_channel_group == "All": # channel = config.getLayoutByName(config.unit_name,config.get_layouts(config.unit_name)["groups"][0]["name"]) # else: # # we retrieve config of the first channel of the group # channel = config.getLayoutByName(config.unit_name,name_of_channel_group)["channels"][0] album_name = plan.album.name channel = config.getAlbumByName( config.unit_name, album_name)["CHANNELS"][0] data_from_config = config.getEditableAttributesOfChannel( config.unit_name, channel) #form = PlanForm(instance=plan, data=request.POST) form = PlanForm(edited_plan=plan, data_from_config=data_from_config) if (settings.DEBUG): #log.info("From plan_validate") pass if action == "cancel": plan.delete() return redirect(action_album, alb_id=plan.album.id, action="edit", status='1', message="Cancelled plan modification") elif action == "delete": return redirect(action_plan, plan_id, "delete") elif form.is_valid(): # unused plan = form.save() if action == "save": action = "edit" status = 1 message = "Plan saved" return redirect(action_plan, locals()) elif action == "save": if request.POST: post_data = request.POST.copy() if post_data.get("csrfmiddlewaretoken"): post_data.pop("csrfmiddlewaretoken") post_data.pop("action") nb_images = post_data.pop("nb_images")[0] config_attributes = {} for key, value in post_data.items(): if type(value) == str: try: # linked values new_dict = {key: {}} splitted_values = value.split(";") config_attributes[key] = {} for splitted_value in splitted_values: subkey, subvalue = splitted_value.split(":") config_attributes[key][subkey] = ast.literal_eval( subvalue) except: # Do nothing, normal string config_attributes[key] = ast.literal_eval(value) plan.nb_images = int(nb_images) plan.config_attributes = config_attributes plan.save() # if check_plan_validity(plan) == True: # return redirect(unsubmit_request, req_id) # pass return redirect(action_album, plan.album.id, "edit") # return redirect(action_plan, plan_id, "edit", 1, "Plan saved") else: error = True message = "Please check your field's validity" edit = True seq_id = plan.album.sequence.id alb_id = plan.album.id depth_level = 3 return render(request, "routine_manager/view_plan.html", locals()) @login_required def create_request(request): """ Create a new request and redirects to the editing action on it """ req = Request.objects.create(pyros_user=request.user, name="New request", is_alert=False, autodeposit=False, complete=False, submitted=False) return redirect(action_request, req.id, "edit") @login_required @level_required("Admin", "Unit-PI", "Observer") def create_sequence(request): """ Create a new sequence and redirects to the editing action on it """ date_of_sequence = datetime.datetime.utcnow() date_of_sequence = date_of_sequence.strftime("%Y%m%dT%H%M%S") sp_of_user = request.user.get_scientific_program() sp_list = ScientificProgram.objects.observable_programs().filter(id__in=sp_of_user) if len(sp_list) == 0: messages.add_message( request, messages.INFO, "Can't submit a Sequence : there is no scientific program to be assigned with") return redirect(sequences_list) seq = Sequence.objects.create( pyros_user=request.user, name=f"seq_{str(date_of_sequence)}", status=Sequence.DRAFT) log.info(f"User {request.user} did action create Sequence {seq.name}") return redirect(action_sequence, seq.id, "edit") @login_required @level_required("Admin", "Unit-PI", "Observer") def create_album(request, seq_id, album_name, desc): """ Create a new album and redirects to the editing action on it """ seq = Sequence.objects.get(id=seq_id) alb = Album.objects.create( sequence=seq, name=album_name, desc=desc, complete=False) log.info( f"User {request.user} did action create Album {alb.id} of Sequence {seq.name}") # return redirect(action_album, alb.id, "edit") @login_required @level_required("Admin", "Unit-PI", "Observer") def create_plan(request, alb_id): """ Create a new plan and redirects to the editing action on it """ alb = Album.objects.get(id=alb_id) plan = Plan.objects.create(album=alb, nb_images=1) log.info( f"User {request.user} did action create Plan {plan.id} of Sequence {plan.album.sequence.name}") return redirect(action_plan, plan.id, "edit") @login_required def submit_request(request, req_id, redir): """ Submits a request and its sequences for scheduling """ if (settings.DEBUG): pass log.info("From submit_request") req = Request.objects.get(id=req_id) error = False if req.complete == False: error = True message = "A request must be complete to be submitted" elif req.submitted == True: error = True message = "The request is already submitted" if error == True: return redirect(action_request, req_id=req_id, action="view", status=-1, message=message) for seq in req.sequences.all(): seq.status = Sequence.TOBEPLANNED seq.save() req.submitted = True req.save() # TODO : changer le first_schedule ... ''' if settings.USE_CELERY: print("WITH CELERY") scheduler.tasks.scheduling.delay(first_schedule=True, alert=False) else: ''' print("Change 1st schedule") scheduler.tasks.scheduling().run(first_schedule=True, alert=False) message = "The request was submitted" if redir == "action_request": return redirect(action_request, req_id=req_id, action="view", status=1, message=message) else: return redirect(sequences_list, status=1, message=message) @login_required def unsubmit_request(request, req_id): """ Unsubmits a request and remove its sequences from scheduling """ if (settings.DEBUG): pass log.info("From unsubmit_request") req = Request.objects.get(id=req_id) # TODO: uncomment pour la production # if req.sequences.filter(Q(status=Sequence.EXECUTED) | Q(status=Sequence.EXECUTING)).exists(): # message = "You can't unsubmit a request with executed sequences" # return redirect(action_request, req_id=req_id, action="view", status=-1, message=message) req.submitted = False req.save() sequences = req.sequences.filter(Q(status=Sequence.TOBEPLANNED) | Q(status=Sequence.PLANNED) | Q(status=Sequence.INVALID) | Q(status=Sequence.UNPLANNABLE)) for seq in sequences: seq.status = Sequence.COMPLETE seq.save() # TODO: uncomment # scheduler.tasks.scheduling.delay(first_schedule=True, alert=False) # TODO : changer le first_schedule ... if req.complete == True: message = "The request was unsubmitted" else: message = "The request was unsubmitted because it is incomplete" return redirect(action_request, req_id=req_id, action="edit", status=1, message=message) @login_required def export_request(request, req_id): """ Create an XML file with the given request, and send a download request to the user """ if (settings.DEBUG): #log.info("From export_request") pass req = Request.objects.get(id=req_id) if req.complete == False: message = "Request must be completely valid to be serialized" return redirect(action_request, req_id=req_id, action="view", status=-1, message=message) file_name = SAVED_REQUESTS_FOLDER + "request" + str(req.id) + ".xml" rs = RequestSerializer() rs.serialize(req, file_name) wrapper = FileWrapper(open(file_name, "r")) content_type = mimetypes.guess_type(file_name)[0] response = HttpResponse(wrapper, content_type=content_type) response['Content-Length'] = os.path.getsize(file_name) response['Content-Disposition'] = 'attachment; filename=%s' % smart_str( os.path.basename(file_name)) return response @login_required @csrf_exempt def import_request(request): """ Ask for a XML file, parse it and create a request in DB Don't do anything if there is a single error into the file """ if (settings.DEBUG): pass #log.info("From import_request") log.info(request) if request.method == "POST": file = request.FILES.get("request_file") if file is None: status = -1 message = "File does not exist" elif file.size > 1000000: status = -1 message = "File is too big (more than 1 000 000 bytes)" else: rs = RequestSerializer() message = rs.unserialize(file.read(), request.user) if message != "": status = -1 else: status = 1 message = "Request imported successfully. Please check it before submitting for scheduling." print("message is", message) else: status = -1 message = "Internal error" # Now, display the list of requests (updated with this new request) return redirect(sequences_list, status=status, message=message) # @login_required # def test_create_plan(request): # config = OBSConfig(os.environ["PATH_TO_OBSCONF_FILE"],os.environ["unit_name"]) # if request.POST: # post_data = request.POST.copy() # post_data.pop("csrfmiddlewaretoken") # nb_images = post_data.pop("nb_images")[0] # config_attributes = {} # for key,value in post_data.items(): # if type(value) == str: # try: # # linked values # new_dict = {key:{}} # splitted_values = value.split(";") # config_attributes[key] = {} # for splitted_value in splitted_values: # subkey,subvalue = splitted_value.split(":") # config_attributes[key][subkey] = ast.literal_eval(subvalue) # except: # # Do nothing, normal string # config_attributes[key] = ast.literal_eval(value) # plan = Plan() # plan.nb_images = int(nb_images) # plan.config_attributes = config_attributes # plan.save() # form = PlanForm(data_from_config=config.getEditableAttributesOfChannel(config.unit_name,list(config.get_channels(config.unit_name).keys())[0])) # uneditableForm = uneditablePlanForm(data_from_config=config.getUneditableAttributesOfChannel(config.unit_name,list(config.get_channels(config.unit_name).keys())[0])) # return render(request,"routine_manager/testcreateplan.html",{"form":form,"uneditableForm":uneditableForm}) @login_required @csrf_exempt @level_required("Admin", "Unit-PI", "Observer") def import_sequence(request): """ Ask for a YAML file, parse it and create a request in DB Returns a JSON that contains at least, success boolean and errors if this boolean is false """ if request.method == "POST": file = request.FILES.get("sequence_file") status = 0 if file is None: status = -1 message = "File does not exist" elif file.size > 1000000: status = -1 message = "File is too big (more than 1 000 000 bytes)" else: yaml_content = yaml.safe_load(file.read()) import_response = check_sequence_file_validity( yaml_content, request) if import_response["succeed"]: message = "Request imported successfully. Please check it before submitting for scheduling." else: message = "Failed to import, check errors" print("message is", message) else: status = -1 message = "Internal error" return JsonResponse(import_response) # Now, display the list of requests (updated with this new request) return redirect(sequences_list, status=status, message=message, import_response=import_response) @login_required @level_required("Admin", "Unit-PI", "Observer") def export_sequence(request, seq_id: int, type: str): sp_of_user = request.user.get_scientific_program() sp_list = ScientificProgram.objects.observable_programs().filter(id__in=sp_of_user) seq = Sequence.objects.get(id=seq_id) unit_name = os.environ["unit_name"] config = OBSConfig(os.environ["PATH_TO_OBSCONF_FILE"], unit_name) sequence_form = SequenceForm(instance=seq, data_from_config=config.getEditableAttributesOfMount( config.unit_name), layouts=config.get_layouts(config.unit_name), sp_list=sp_list) yaml_dict = { "sequence": {} } for field in sequence_form.fields.keys(): if sequence_form.fields[field].required == False: continue field_dict = {"editable": True} if field == "scientific_program": sp_list = list(sp_list.values_list("name", flat=True)) field_dict["value"] = sp_list.index(seq.scientific_program.name) field_dict["values"] = sp_list field_dict["value_type"] = "int" elif field == "start_date": field_dict["value"] = seq.start_date.strftime("%d/%m/%Y %H:%M:%S") field_dict["value_type"] = "str" else: field_dict["value_type"] = sequence_form.fields[field].widget.input_type if seq.__dict__.get(field) == None: value = seq.config_attributes.get(field) else: value = seq.__dict__.get(str(field)) field_dict["value"] = value if sequence_form.fields[field].__dict__.get("_choices"): field_dict["value_type"] = "int" field_dict["values"] = [ value[0] for value in sequence_form.fields[field].__dict__.get("_choices")] field_dict["value"] = field_dict["values"].index(value) if field == "layout": field_dict["editable"] = False if type == "template": field_dict["value"] = None if sequence_form.fields[field].__dict__.get("min_value"): field_dict["min_value"] = sequence_form.fields[field].__dict__.get( "min_value") if sequence_form.fields[field].__dict__.get("max_value"): field_dict["max_value"] = sequence_form.fields[field].__dict__.get( "max_value") yaml_dict["sequence"][field] = field_dict album_list = [] for album in seq.albums.all(): album_dict = { "Album": { "name": album.name } } plans_list = [] for plan in album.plans.all(): plan_form = PlanForm(data_from_config=config.getEditableAttributesOfChannel( config.unit_name, list(config.get_channels(config.unit_name).keys())[0]), edited_plan=plan) plan_dict = {"Plan": {}} for field in plan_form.fields.keys(): field_dict = {} if field == "nb_images": field_dict["value"] = plan.nb_images field_dict["value_type"] = "int" field_dict["min_value"] = 1 else: if plan_form.fields[field].__dict__.get("_choices"): value = plan.config_attributes.get(field) field_dict["value_type"] = "int" field_dict["values"] = [ value[0] for value in plan_form.fields[field].__dict__.get("_choices")] container_value_to_str = "" for subkey in value.keys(): container_value_to_str += f"{subkey}:{value[subkey]};" container_value_to_str = container_value_to_str[:-1] field_dict["value"] = field_dict["values"].index( container_value_to_str) else: value = plan.config_attributes.get(field) field_dict["value"] = value if plan_form.fields[field].__dict__.get("min_value"): field_dict["min_value"] = plan_form.fields[field].__dict__.get( "min_value") if plan_form.fields[field].__dict__.get("max_value"): field_dict["max_value"] = plan_form.fields[field].__dict__.get( "max_value") field_dict["value_type"] = plan_form.fields[field].widget.input_type if type == "template": field_dict["value"] = None plan_dict["Plan"][field] = field_dict plans_list.append(plan_dict) album_dict["Album"]["Plans"] = plans_list album_list.append(album_dict) yaml_dict["sequence"]["ALBUMS"] = album_list response = HttpResponse( yaml.dump(yaml_dict, sort_keys=False), content_type='application/yaml') response['Content-Disposition'] = 'attachment; filename="{}_{}.yml"'.format( seq.name, type) return response @login_required @level_required("Admin", "Unit-PI", "Observer") def create_albums(request, seq_id, layout): seq = Sequence.objects.get(id=seq_id) unit_name = os.environ["unit_name"] config = OBSConfig(os.environ["PATH_TO_OBSCONF_FILE"], unit_name) albums = config.getLayoutByName( unit_name=config.unit_name, name_of_layout=layout)["ALBUMS"] for album_name in albums: album_desc = config.getAlbumByName( config.unit_name, album_name).get("description", "") create_album(request, seq_id, album_name=album_name, desc=album_desc) # return redirect(create_album, seq_id=seq_id) messages.add_message(request, messages.INFO, "Created album(s) according to chosen layout") return HttpResponse("ok") @login_required @level_required("Admin", "Unit-PI", "Observer") def delete_albums(request, seq_id): seq = Sequence.objects.get(id=seq_id) albums = Album.objects.filter(sequence=seq).delete() seq.complete = False seq.save() return HttpResponse(albums) @login_required @level_required("Admin", "Unit-PI", "Observer") def edit_plan(request, id): plan = get_object_or_404(Plan, pk=id) config = OBSConfig( os.environ["PATH_TO_OBSCONF_FILE"], os.environ["unit_name"]) if request.POST: post_data = request.POST.copy() post_data.pop("csrfmiddlewaretoken") nb_images = post_data.pop("nb_images")[0] config_attributes = {} for key, value in post_data.items(): if type(value) == str: try: # linked values new_dict = {key: {}} splitted_values = value.split(";") config_attributes[key] = {} for splitted_value in splitted_values: subkey, subvalue = splitted_value.split(":") config_attributes[key][subkey] = ast.literal_eval( subvalue) except: # Do nothing, normal string config_attributes[key] = ast.literal_eval(value) plan.nb_images = int(nb_images) plan.config_attributes = config_attributes plan.save() form = PlanForm(data_from_config=config.getEditableAttributesOfChannel( config.unit_name, list(config.get_channels(config.unit_name).keys())[0]), edited_plan=plan) uneditableForm = uneditablePlanForm(data_from_config=config.getUneditableAttributesOfChannel( config.unit_name, list(config.get_channels(config.unit_name).keys())[0])) return render(request, "routine_manager/testcreateplan.html", {"form": form, "uneditableForm": uneditableForm}) @login_required @level_required("Admin", "Unit-PI", "Observer") def copy_sequence(request, seq_id): sequence_to_be_copied = Sequence.objects.get(id=seq_id) original_sequence = Sequence.objects.get(id=seq_id) new_seq = sequence_to_be_copied date_of_sequence = datetime.datetime.utcnow() date_of_sequence = date_of_sequence.strftime("%Y%m%dT%H%M%S") name = f"seq_{str(date_of_sequence)}" new_seq.name = name new_seq.pk = None new_seq._state.adding = True new_seq.pyros_user = request.user new_seq.complete = False new_seq.status = Sequence.DRAFT new_seq.save() for album in original_sequence.albums.all(): original_plans_of_album = Album.objects.get(id=album.id).plans.all() new_alb = album new_alb.complete = False new_alb.pk = None new_alb.sequence = new_seq new_alb._state.adding = True new_alb.save() for plan in original_plans_of_album: new_plan = plan new_plan.album = new_alb new_plan.complete = False new_plan.pk = None new_plan._state.adding = True new_plan.save() return redirect(action_sequence, new_seq.id, "edit")