From f1bb8c76fd6ee7426469796aaad56e1792d11b5e Mon Sep 17 00:00:00 2001 From: Richard Hitier Date: Thu, 29 Apr 2021 13:46:04 +0200 Subject: [PATCH] Move auth tests to db cases --- app/commands/commands.py | 66 ++++++++++++++++++++++++++++++++++++++++++++++++------------------ tests/backend_tests.py | 42 ------------------------------------------ tests/db_tests.py | 45 ++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 92 insertions(+), 61 deletions(-) diff --git a/app/commands/commands.py b/app/commands/commands.py index b8c5216..6901343 100644 --- a/app/commands/commands.py +++ b/app/commands/commands.py @@ -1,19 +1,17 @@ import csv import os import sys + import click import random from flask import current_app -from sqlalchemy.exc import OperationalError, IntegrityError -from sqlalchemy.orm.exc import NoResultFound +from sqlalchemy.exc import IntegrityError from sqlalchemy.sql import func -from sqlalchemy.ext.automap import automap_base -from sqlalchemy.orm import Session -from sqlalchemy import create_engine from app.models import db, Agent, Service, Project, Capacity, Period, Charge, AgentStatus, Company, AgentBap, \ - AgentGrade, Category, CategoryLabel + AgentGrade, Category, Label, ProjectLabel +# TODO: rename to methods and add get_roles() from app.auth.models import User, _nameToRole, _roleToName from . import bp @@ -75,24 +73,38 @@ def feed_from_irap(csv_file_name): # Get the columns values # - projects = [] agents = [] services = [] baps = [] grades = [] companies = [] statuses = [] + + # build a category dict of lists + # key being the category name, + # list being filled with corresponding labels categories = {k: [] for k in categorie_keys} + + # + categorized_projects = {} + for r in rows: - projects.append(r[project_key]) services.append(r[service_key]) baps.append(r[bap_key]) grades.append(r[grade_key]) companies.append(r[company_key]) statuses.append(r[status_key]) - # build a dict of lists, key being the category name + + # categorized_projects + project = r[project_key] + categorized_projects[project] = [] + + # fill in the category-labels dict for k in categorie_keys: categories[k].append(r[k]) + categorized_projects[project].append(r[k]) + + # build agents list of dicts agents.append({ 'firstname': r[firstname_key], 'secondname': r[secondname_key], @@ -114,13 +126,16 @@ def feed_from_irap(csv_file_name): # 2- then keep only uniq item with set() # 3- at last alpha sort with sorted() # - projects = sorted(set(filter(None, projects))) services = sorted(set(filter(None, services))) baps = sorted(set(filter(None, baps))) grades = sorted(set(filter(None, grades))) companies = sorted(set(filter(None, companies))) statuses = sorted(set(filter(None, statuses))) + # Do the same for the projects, that are keys of categorized_projects + # + projects = sorted(set(categorized_projects.keys())) + # Do the same for the category labels stored as a dict # # k is category name @@ -145,14 +160,14 @@ def feed_from_irap(csv_file_name): db.session.add(n_b) db.session.commit() - # Feed projects from column + # Feed grades from column # for g in grades: n_g = AgentGrade(name=g) db.session.add(n_g) db.session.commit() - # Feed projects from column + # Feed companies from column # for c in companies: n_c = Company(name=c) @@ -191,23 +206,35 @@ def feed_from_irap(csv_file_name): db.session.commit() # Add one default capacity + # db.session.add(Capacity(name="Agent")) db.session.commit() # Feed categories and labels # for c, l in categories.items(): - print("Adding category ", c) + current_app.logger.debug("Adding category " + c) n_c = Category(name=c) db.session.add(n_c) # db.session.commit() for label in l: - # print("Adding label {} for id {}".format(label, n_c.id)) - n_l = CategoryLabel(name=label, category=n_c) + current_app.logger.debug("Adding label {} for id {}".format(label, n_c.id)) + n_l = Label(name=label, category=n_c) db.session.add(n_l) db.session.commit() - # Feed agents from columns + # Feed project's labels + # + for p, labels in categorized_projects.items(): + n_p = Project.query.filter(Project.name == p).one() + for l in labels: + n_l = Label.query.filter(Label.name == l).one() + n_c = n_l.category + n_pl = ProjectLabel(project=n_p, category=n_c, label=n_l) + db.session.add(n_pl) + db.session.commit() + + # Feed agents from agents list previously filled # for a in agents: status = AgentStatus.query.filter(AgentStatus.name == a['status']).one_or_none() @@ -235,6 +262,9 @@ def feed_from_irap(csv_file_name): db.session.add(n_a) db.session.commit() + # Feed agents from agents list previously filled + # + # Now feed the charges. # # At least one for each csv row @@ -441,8 +471,8 @@ def feed_random_charges(agent): def user_add(email, name, login, password, role): """ Add a new user in db.""" user = User.query.filter(User.name == name).one_or_none() - if (user): - current_app.logger.error(f"user already exists {name}") + if user: + current_app.logger.warn(f"user already exists {name}") return user = User(email=email, name=name, login=login, password=password, role=role) db.session.add(user) diff --git a/tests/backend_tests.py b/tests/backend_tests.py index a74a134..f41d8b8 100644 --- a/tests/backend_tests.py +++ b/tests/backend_tests.py @@ -60,45 +60,3 @@ class DbMgrTestCase(BaseTestCase): # Waiting for 17 periods + headers line self.assertEqual(18, len(stacked_charges)) - -class AuthModelTestCase(BaseTestCase): - - def skip_if_no_sqlitememory(self): - if 'memory' not in self.app.config['SQLALCHEMY_DATABASE_URI']: - self.skipTest("Needs in memory sqlite") - - def get_admin(self): - return User.query.filter(User.name == 'admin').one() - - def setUp(self): - BaseTestCase.setUp(self) - self.skip_if_no_sqlitememory() - db.create_all() - admin = User(email='admin@nowhere.org', name='admin', login='admin', role='admin') - db.session.add(admin) - db.session.commit() - - def test_in_memory(self): - self.app.logger.info("In memory Sqlite DB for tests") - self.assertTrue(True) - - def test_setrole(self): - admin = self.get_admin() - admin.set_role("ADMIN") - db.session.commit() - admin = self.get_admin() - self.assertTrue(admin is not None) - self.assertTrue(admin.has_role("ADMIN")) - self.assertFalse(admin.has_role("SERVICE")) - - def test_setrole_valueerror(self): - admin = self.get_admin() - with self.assertRaises(ValueError) as ve: - admin.set_role("NOSUCHROLE") - - def test_setcheckpassword(self): - admin = self.get_admin() - admin.set_password("hahaha") - db.session.commit() - admin2 = self.get_admin() - self.assertTrue(admin2.check_password("hahaha")) diff --git a/tests/db_tests.py b/tests/db_tests.py index 18772be..06420b3 100644 --- a/tests/db_tests.py +++ b/tests/db_tests.py @@ -1,6 +1,6 @@ import unittest -from app import create_app, db +from app import create_app, db, User from app.models import Project from pdc_config import TestConfig from tests.common_db_feed import feed_projects @@ -26,3 +26,46 @@ class DbBaseTestCase(unittest.TestCase): def test_first(self): projects = Project.query.all() self.assertEqual(3, len(projects)) + + +class AuthModelTestCase(DbBaseTestCase): + + def skip_if_no_sqlitememory(self): + if 'memory' not in self.app.config['SQLALCHEMY_DATABASE_URI']: + self.skipTest("Needs in memory sqlite") + + def get_admin(self): + return User.query.filter(User.name == 'admin').one() + + def setUp(self): + DbBaseTestCase.setUp(self) + self.skip_if_no_sqlitememory() + db.create_all() + admin = User(email='admin@nowhere.org', name='admin', login='admin', role='admin') + db.session.add(admin) + db.session.commit() + + def test_in_memory(self): + self.app.logger.info("In memory Sqlite DB for tests") + self.assertTrue(True) + + def test_setrole(self): + admin = self.get_admin() + admin.set_role("ADMIN") + db.session.commit() + admin = self.get_admin() + self.assertTrue(admin is not None) + self.assertTrue(admin.has_role("ADMIN")) + self.assertFalse(admin.has_role("SERVICE")) + + def test_setrole_valueerror(self): + admin = self.get_admin() + with self.assertRaises(ValueError) as ve: + admin.set_role("NOSUCHROLE") + + def test_setcheckpassword(self): + admin = self.get_admin() + admin.set_password("hahaha") + db.session.commit() + admin2 = self.get_admin() + self.assertTrue(admin2.check_password("hahaha")) -- libgit2 0.21.2