# Standard imports import os import yaml #import json # Django imports from django.test import TestCase from django.urls import reverse # Project imports from user_mgmt.models import PyrosUser, Period, SP_Period, SP_Period_User, ScientificProgram from seq_submit.models import * from common.models import * from src.core.pyros_django.obs_config.obsconfig_class import OBSConfig from src.core.pyros_django.seq_submit.forms import PlanForm, SequenceForm SAVED_REQUESTS_FOLDER = "misc/saved_requests/" # OLD TESTS # class TestRoutineManager(TestCase): # fixtures = ['tests/routine_mgr_test_TZ.json'] # def setUp(self): # #print("Routine Manager test") # pass # def test_import_fake_file(self): # self.client.login(username="test@test.test", password="test") # path = "/seq_submit/import_request" # response = self.client.post(path, {"request_file": "toto.xml"}, follow=True) # self.assertTrue("error" in response.context.keys(), "There should be an error of non existant file") # self.assertEqual(Request.objects.count(), 1, "There should still be only one request") # def test_import_invalid_file(self): # self.client.login(username="test@test.test", password="test") # path = "/seq_submit/import_request" # with open("manage.py") as file: # response = self.client.post(path, {"request_file": file}, follow=True) # self.assertTrue("error" in response.context.keys(), "There should be an error of invalid file") # self.assertEqual(Request.objects.count(), 1, "There should still be only one request") # def test_import(self): # self.client.login(username="test@test.test", password="test") # path = "/seq_submit/import_request" # with open(SAVED_REQUESTS_FOLDER + "request_unittest.xml") as file: # response = self.client.post(path, {"request_file": file}, follow=True) # self.assertTrue("success" in response.context.keys(), "There should be a success message") # self.assertEqual(Request.objects.count(), 2, "There should still be two request") # def test_export_incomplete(self): # req = Request.objects.get() # req.complete = False # req.save() # self.client.login(username="test@test.test", password="test") # path = "/seq_submit/export_request/" + str(req.id) # response = self.client.get(path, follow=True) # self.assertTrue("error" in response.context.keys(), "There should be an error of incomplete request") # def test_export(self): # req = Request.objects.get() # self.client.login(username="test@test.test", password="test") # path = "/seq_submit/export_request/" + str(req.id) # response = self.client.get(path, follow=True) # file_path = SAVED_REQUESTS_FOLDER + "request" + str(req.id) + ".xml" # self.assertTrue(os.path.isfile(file_path), "There should be a new file %s" % (file_path,)) # os.remove(file_path) class SequencesTests(TestCase): fixtures = ['tests/sequences_test_TZ.json'] def setUp(self) -> None: password = "password123" # create a new period for the tests that will be the explotation period self.period1 = Period.objects.create() # replace for sp_period and sequence their attached period so we don't use the fix period in the fixture for sp_period in SP_Period.objects.all(): sp_period.period = self.period1 sp_period.save() for sequence in Sequence.objects.all(): sequence.period = self.period1 sequence.save() Period.objects.get(id=1).delete() self.period2 = Period.objects.create( start_date=Period.objects.all().first().end_date) self.period3 = Period.objects.create( start_date=self.period2.end_date, submission_duration=550) self.usr1 = PyrosUser.objects.get(username="observer") self.usr2 = PyrosUser.objects.get(username="unit_pi") self.usr3 = PyrosUser.objects.get(username="tac") self.usr4 = PyrosUser.objects.get(username="operator") self.usr5 = PyrosUser.objects.get(username="observer2") self.usr6 = PyrosUser.objects.get(username="tac2") self.usr7 = PyrosUser.objects.get(username="observer3") self.usr8 = PyrosUser.objects.get(username="observer4") self.usr1.set_password(password) self.usr1.save() self.usr2.set_password(password) self.usr2.save() self.usr3.set_password(password) self.usr3.save() self.usr4.set_password(password) self.usr4.save() self.usr5.set_password(password) self.usr5.save() self.usr6.set_password(password) self.usr6.save() self.usr7.set_password(password) self.usr7.save() self.usr8.set_password(password) self.usr8.save() # add usr 1 and usr5 to SP debris sp_debris_period = ScientificProgram.objects.get( name="debris").SP_Periods.first() SP_Period_User.objects.create( SP_Period=sp_debris_period, user=self.usr1) SP_Period_User.objects.create( SP_Period=sp_debris_period, user=self.usr5) # replacing who submitted sequences for seq in Sequence.objects.filter(id__lt=4): seq.pyros_user = self.usr1 seq.save() for seq in Sequence.objects.filter(id__gte=4): seq.pyros_user = self.usr5 seq.save() def logout(self): self.client.post(reverse("user_logout")) print("Log out") def login_as_user(self, number): if number == 1: self.client.post(reverse("login_validation"), { "email": self.usr1, "password": "password123"}) elif number == 2: self.client.post(reverse("login_validation"), { "email": self.usr2, "password": "password123"}) elif number == 3: self.client.post(reverse("login_validation"), { "email": self.usr3, "password": "password123"}) elif number == 4: self.client.post(reverse("login_validation"), { "email": self.usr4, "password": "password123"}) elif number == 5: self.client.post(reverse("login_validation"), { "email": self.usr5, "password": "password123"}) elif number == 6: self.client.post(reverse("login_validation"), { "email": self.usr6, "password": "password123"}) elif number == 7: self.client.post(reverse("login_validation"), { "email": self.usr7, "password": "password123"}) elif number == 8: self.client.post(reverse("login_validation"), { "email": self.usr8, "password": "password123"}) print(f'Log in as {self.client.session["user"]}') def test_SEQ_SP_PI_can_view_list(self): self.login_as_user(1) response = self.client.get(reverse("sequences_list")) self.assertEqual(response.status_code, 200) # user1 can see everything because he is SP_PI on this scientific program for seq in Sequence.objects.filter(scientific_program=ScientificProgram.objects.get(name="debris")): self.assertContains(response, seq.name) self.logout() def test_SEQ_Observer_can_view_list(self): self.login_as_user(5) response = self.client.get(reverse("sequences_list")) self.assertEqual(response.status_code, 200) # user5 can see all sequences his scientific program for seq in Sequence.objects.filter(scientific_program=ScientificProgram.objects.get(name="debris")): self.assertContains(response, seq.name) self.logout() def test_SEQ_Unit_PI_can_view_list(self): self.login_as_user(2) response = self.client.get(reverse("sequences_list")) self.assertEqual(response.status_code, 200) # user2 can only see all sequences for seq in Sequence.objects.all(): self.assertContains(response, seq.name) self.logout() def test_SEQ_SP_PI_can_view_detail(self): self.login_as_user(1) response = self.client.get(reverse("action_sequence", kwargs={ "action": "view", "seq_id": 1})) seq = Sequence.objects.get(id=1) self.assertEqual(response.status_code, 200) self.assertContains(response, seq.name) self.assertContains(response, self.usr1.username) self.assertContains(response, seq.tolerance_after) self.logout() def test_SEQ_Observer_can_view_detail(self): self.login_as_user(5) response = self.client.get(reverse("action_sequence", kwargs={ "action": "view", "seq_id": 1})) # can see this sequence because he's in this SP self.assertEqual(response.status_code, 200) self.logout() def test_SEQ_Unit_PI_can_view_detail(self): self.login_as_user(2) response = self.client.get(reverse("action_sequence", kwargs={ "action": "view", "seq_id": 1})) seq = Sequence.objects.get(id=1) self.assertEqual(response.status_code, 200) self.assertContains(response, seq.name) self.assertContains(response, self.usr1.username) self.assertContains(response, seq.tolerance_after) self.logout() def test_SEQ_SP_PI_can_delete(self): self.login_as_user(1) nb_of_seq_pre_delete = Sequence.objects.all().count() response = self.client.get(reverse("action_sequence", kwargs={ "action": "delete", "seq_id": 1})) nb_of_seq_post_delete = Sequence.objects.all().count() self.assertEqual(response.status_code, 302) self.assertNotEqual(nb_of_seq_pre_delete, nb_of_seq_post_delete) self.logout() def test_SEQ_Observer_can_delete(self): self.login_as_user(5) nb_of_seq_pre_delete = Sequence.objects.all().count() response = self.client.get(reverse("action_sequence", kwargs={ "action": "delete", "seq_id": 1})) nb_of_seq_post_delete = Sequence.objects.all().count() # can't see this sequence because he's not the creator nor SP_PI of this SP self.assertEqual(response.status_code, 302) self.assertNotEqual(nb_of_seq_pre_delete, nb_of_seq_post_delete) self.logout() def test_SEQ_Observer_cannot_delete_recorded_seq(self): self.login_as_user(1) nb_of_seq_pre_delete = Sequence.objects.all().count() seq = Sequence.objects.get(id=1) seq.status = Sequence.REC_FINISHED seq.save() response = self.client.get(reverse("action_sequence", kwargs={ "action": "delete", "seq_id": 1})) nb_of_seq_post_delete = Sequence.objects.all().count() self.assertEqual(response.status_code, 302) self.assertEqual(nb_of_seq_pre_delete, nb_of_seq_post_delete) self.logout() def test_SEQ_Unit_PI_can_delete(self): self.login_as_user(2) nb_of_seq_pre_delete = Sequence.objects.all().count() response = self.client.get(reverse("action_sequence", kwargs={ "action": "delete", "seq_id": 1})) nb_of_seq_post_delete = Sequence.objects.all().count() self.assertEqual(response.status_code, 302) self.assertNotEqual(nb_of_seq_pre_delete, nb_of_seq_post_delete) self.logout() def test_SEQ_SP_PI_can_create_seq(self): self.login_as_user(1) # create base of seq response = self.client.get(reverse("create_sequence")) created_seq = Sequence.objects.filter( pyros_user=self.usr1).order_by("-created").first() seq_id = created_seq.id config = OBSConfig(os.environ["PATH_TO_OBSCONF_FILE"]) sp_of_user = self.usr1.get_scientific_programs() sp_list = ScientificProgram.objects.observable_programs().filter(id__in=sp_of_user) sequence_form = SequenceForm(instance=created_seq, data_from_config=config.getEditableMountAttributes( config.unit_name), layouts=config.get_layouts(config.unit_name), sp_list=sp_list) layout = list(config.get_layouts(config.unit_name) ["layouts"].values())[0]["name"] # edit the new sequence post_data = { "action": "save", } for field_name, field in sequence_form.fields.items(): post_data[field_name] = field.initial post_data["scientific_program"] = sp_list[0].id post_data["name"] = created_seq.name post_data["start_date"] = created_seq.start_date.strftime( "%Y-%m-%d %H:%M:%S") post_data["layout"] = layout response = self.client.post(reverse("sequence_validate", kwargs={ "seq_id": seq_id}), data=post_data) # need to reinstantiate value to get updated values created_seq = Sequence.objects.get(id=seq_id) self.assertEqual(response.status_code, 302) self.assertEqual(self.usr1, created_seq.pyros_user) self.assertNotEqual(created_seq.config_attributes["layout"], None) self.assertEqual(created_seq.config_attributes["layout"], layout) # create one or more album depending layout config response = self.client.get(reverse('create_albums', kwargs={ "seq_id": seq_id, "layout": layout})) self.assertEqual(response.status_code, 200) # check if there is at least one album self.assertNotEqual(created_seq.albums.all().count(), 0) # We're creating an plan linked to the previous album we created album = Album.objects.filter( sequence=Sequence.objects.get(id=seq_id)).first() post_data = { "name": album.name, "desc": "test", "action": "save_and_add" } response = self.client.post(reverse("album_validate", kwargs={ "alb_id": album.id}), data=post_data, follow=True) # we're using follow = true so the status code corresponds to the LAST url (action_plan) self.assertEqual(response.status_code, 200) # check if there is at least one plan self.assertNotEqual(Sequence.objects.get( id=seq_id).albums.all().first().plans.all().count(), 0) # but this plan doesn't have config_attributes, we need to add them plan = Plan.objects.get(album=album) post_data = {} plan_form = PlanForm(data_from_config=config.getEditableChannelAttributes( config.unit_name, list(config.get_channels(config.unit_name).keys())[0]), edited_plan=None) for field_name, field in plan_form.fields.items(): if field.__dict__.get("_choices"): post_data[field_name] = str(field._choices[0][0]) else: post_data[field_name] = str(field.initial) post_data["nb_images"] = "1" post_data["action"] = "save" response = self.client.post(reverse("plan_validate", kwargs={ "plan_id": plan.id}), data=post_data) self.assertEqual(response.status_code, 302) self.assertEqual(Plan.objects.filter(album=album).count(), 1) self.logout() def test_SEQ_Observer_can_create_seq_copy(self): self.login_as_user(5) sequence_to_be_copied = Sequence.objects.filter( pyros_user=self.usr5).first() nb_of_sequence_pre_create = Sequence.objects.all().count() response = self.client.post(reverse("copy_sequence", kwargs={ "seq_id": sequence_to_be_copied.id})) self.assertEqual(Sequence.objects.all().count(), nb_of_sequence_pre_create+1) last_created_seq = Sequence.objects.all().order_by("-created").first() self.assertEqual(self.usr5, last_created_seq.pyros_user) self.assertEqual(response.status_code, 302) # check if there is at least one album self.assertNotEqual(last_created_seq.albums.all().count(), 0) # check if there is at least one plan self.assertNotEqual( last_created_seq.albums.all().first().plans.all().count(), 0) self.logout() def test_SEQ_SP_PI_can_update_and_submit_seq(self): self.login_as_user(1) # create base sequence response = self.client.get(reverse("create_sequence")) created_seq = Sequence.objects.filter( pyros_user=self.usr1).order_by("-created").first() seq_id = created_seq.id config = OBSConfig(os.environ["PATH_TO_OBSCONF_FILE"]) sp_of_user = self.usr1.get_scientific_programs() sp_list = ScientificProgram.objects.observable_programs().filter(id__in=sp_of_user) sequence_form = SequenceForm(instance=created_seq, data_from_config=config.getEditableMountAttributes( config.unit_name), layouts=config.get_layouts(config.unit_name), sp_list=sp_list) layout = list(config.get_layouts(config.unit_name) ["layouts"].values())[0]["name"] # Edit / create sequence update_post_data = { "action": "save", } for field_name, field in sequence_form.fields.items(): update_post_data[field_name] = field.initial update_post_data["scientific_program"] = sp_list[0].id update_post_data["name"] = created_seq.name update_post_data["start_date"] = created_seq.start_date.strftime( "%Y-%m-%d %H:%M:%S") update_post_data["layout"] = layout response = self.client.post(reverse("sequence_validate", kwargs={ "seq_id": seq_id}), data=update_post_data) # need to reinstantiate value to get updated values created_seq = Sequence.objects.get(id=seq_id) self.assertEqual(response.status_code, 302) self.assertEqual(self.usr1, created_seq.pyros_user) self.assertNotEqual(created_seq.config_attributes["layout"], None) self.assertEqual(created_seq.config_attributes["layout"], layout) # create one or more album depending layout config response = self.client.get(reverse('create_albums', kwargs={ "seq_id": seq_id, "layout": layout})) self.assertEqual(response.status_code, 200) # check if there is at least one album self.assertNotEqual(created_seq.albums.all().count(), 0) # We're creating an plan linked to the previous album we created album = Album.objects.filter( sequence=Sequence.objects.get(id=seq_id)).first() post_data = { "name": album.name, "desc": "test", "action": "save_and_add" } # follow = True because we have more than one redirection response = self.client.post(reverse("album_validate", kwargs={ "alb_id": album.id}), data=post_data, follow=True) # we're using follow = true so the status code corresponds to the LAST url (action_plan) self.assertEqual(response.status_code, 200) # check if there is at least one plan self.assertNotEqual(Sequence.objects.get( id=seq_id).albums.all().first().plans.all().count(), 0) # update seq and submit update_post_data["action"] = "save_and_submit" update_post_data["tolerance_before"] = "5s" response = self.client.post(reverse("sequence_validate", kwargs={ "seq_id": seq_id}), data=update_post_data) updated_seq = Sequence.objects.get(id=seq_id) self.assertEqual(response.status_code, 302) self.assertEqual(updated_seq.status, Sequence.TOBEPLANNED) self.assertEqual(updated_seq.tolerance_before, "5s") self.logout() def test_SEQ_Observer_can_export_copy(self): # create a complete sequence with previous test : self.test_SEQ_SP_PI_can_create_seq() self.login_as_user(1) seq = Sequence.objects.filter( pyros_user=self.usr1).order_by("-created").first() response = self.client.get(reverse("export_sequence", kwargs={ "seq_id": seq.id, "type": "copy"})) self.assertEqual(response.status_code, 200) self.assertEquals(response.get('Content-Disposition'), f"attachment; filename=\"{seq.name}_copy.yml\"") content = response.content.decode('utf-8') yaml_content = yaml.safe_load(stream=content) sequence_yaml = yaml_content.get("sequence") # check that we have sequence section self.assertNotEqual(sequence_yaml, None) albums = sequence_yaml.get("ALBUMS") # check if we have albums section self.assertNotEqual(albums, None) plans = albums[0].get("Album").get("Plans") # check if we have plans section self.assertNotEqual(plans, None) # check if we have a value for SP self.assertNotEqual(sequence_yaml.get( "scientific_program").get("value"), None) # dump dict into yaml file to test export in the next test yaml_content["sequence"]["name"]["value"] = "test_import_sequence" outfile = open('./misc/fixtures/tests/test_import_seq.yml', 'w+', encoding='utf8') yaml.safe_dump(yaml_content, outfile) # outfile.write(content) outfile.close() self.logout() def test_SEQ_Observer_can_import(self): self.login_as_user(1) response = None with open('./misc/fixtures/tests/test_import_seq.yml') as seq_yml: response = self.client.post(reverse("import_sequence"), { 'name': 'test_import_seq.yml', 'sequence_file': seq_yml}) self.assertEqual(response.status_code, 200) content_as_json = response.json() self.assertEqual(content_as_json.get("succeed"), True) # check name of the imported sequence self.assertEqual(Sequence.objects.get( id=content_as_json["sequence_id"]).name, "test_import_sequence") self.logout()