Commit f1bb8c76fd6ee7426469796aaad56e1792d11b5e

Authored by hitier
1 parent f079f974

Move auth tests to db cases

app/commands/commands.py
1 1 import csv
2 2 import os
3 3 import sys
  4 +
4 5 import click
5 6 import random
6 7  
7 8 from flask import current_app
8   -from sqlalchemy.exc import OperationalError, IntegrityError
9   -from sqlalchemy.orm.exc import NoResultFound
  9 +from sqlalchemy.exc import IntegrityError
10 10 from sqlalchemy.sql import func
11   -from sqlalchemy.ext.automap import automap_base
12   -from sqlalchemy.orm import Session
13   -from sqlalchemy import create_engine
14 11  
15 12 from app.models import db, Agent, Service, Project, Capacity, Period, Charge, AgentStatus, Company, AgentBap, \
16   - AgentGrade, Category, CategoryLabel
  13 + AgentGrade, Category, Label, ProjectLabel
  14 +# TODO: rename to methods and add get_roles()
17 15 from app.auth.models import User, _nameToRole, _roleToName
18 16  
19 17 from . import bp
... ... @@ -75,24 +73,38 @@ def feed_from_irap(csv_file_name):
75 73  
76 74 # Get the columns values
77 75 #
78   - projects = []
79 76 agents = []
80 77 services = []
81 78 baps = []
82 79 grades = []
83 80 companies = []
84 81 statuses = []
  82 +
  83 + # build a category dict of lists
  84 + # key being the category name,
  85 + # list being filled with corresponding labels
85 86 categories = {k: [] for k in categorie_keys}
  87 +
  88 + #
  89 + categorized_projects = {}
  90 +
86 91 for r in rows:
87   - projects.append(r[project_key])
88 92 services.append(r[service_key])
89 93 baps.append(r[bap_key])
90 94 grades.append(r[grade_key])
91 95 companies.append(r[company_key])
92 96 statuses.append(r[status_key])
93   - # build a dict of lists, key being the category name
  97 +
  98 + # categorized_projects
  99 + project = r[project_key]
  100 + categorized_projects[project] = []
  101 +
  102 + # fill in the category-labels dict
94 103 for k in categorie_keys:
95 104 categories[k].append(r[k])
  105 + categorized_projects[project].append(r[k])
  106 +
  107 + # build agents list of dicts
96 108 agents.append({
97 109 'firstname': r[firstname_key],
98 110 'secondname': r[secondname_key],
... ... @@ -114,13 +126,16 @@ def feed_from_irap(csv_file_name):
114 126 # 2- then keep only uniq item with set()
115 127 # 3- at last alpha sort with sorted()
116 128 #
117   - projects = sorted(set(filter(None, projects)))
118 129 services = sorted(set(filter(None, services)))
119 130 baps = sorted(set(filter(None, baps)))
120 131 grades = sorted(set(filter(None, grades)))
121 132 companies = sorted(set(filter(None, companies)))
122 133 statuses = sorted(set(filter(None, statuses)))
123 134  
  135 + # Do the same for the projects, that are keys of categorized_projects
  136 + #
  137 + projects = sorted(set(categorized_projects.keys()))
  138 +
124 139 # Do the same for the category labels stored as a dict
125 140 #
126 141 # k is category name
... ... @@ -145,14 +160,14 @@ def feed_from_irap(csv_file_name):
145 160 db.session.add(n_b)
146 161 db.session.commit()
147 162  
148   - # Feed projects from column
  163 + # Feed grades from column
149 164 #
150 165 for g in grades:
151 166 n_g = AgentGrade(name=g)
152 167 db.session.add(n_g)
153 168 db.session.commit()
154 169  
155   - # Feed projects from column
  170 + # Feed companies from column
156 171 #
157 172 for c in companies:
158 173 n_c = Company(name=c)
... ... @@ -191,23 +206,35 @@ def feed_from_irap(csv_file_name):
191 206 db.session.commit()
192 207  
193 208 # Add one default capacity
  209 + #
194 210 db.session.add(Capacity(name="Agent"))
195 211 db.session.commit()
196 212  
197 213 # Feed categories and labels
198 214 #
199 215 for c, l in categories.items():
200   - print("Adding category ", c)
  216 + current_app.logger.debug("Adding category " + c)
201 217 n_c = Category(name=c)
202 218 db.session.add(n_c)
203 219 # db.session.commit()
204 220 for label in l:
205   - # print("Adding label {} for id {}".format(label, n_c.id))
206   - n_l = CategoryLabel(name=label, category=n_c)
  221 + current_app.logger.debug("Adding label {} for id {}".format(label, n_c.id))
  222 + n_l = Label(name=label, category=n_c)
207 223 db.session.add(n_l)
208 224 db.session.commit()
209 225  
210   - # Feed agents from columns
  226 + # Feed project's labels
  227 + #
  228 + for p, labels in categorized_projects.items():
  229 + n_p = Project.query.filter(Project.name == p).one()
  230 + for l in labels:
  231 + n_l = Label.query.filter(Label.name == l).one()
  232 + n_c = n_l.category
  233 + n_pl = ProjectLabel(project=n_p, category=n_c, label=n_l)
  234 + db.session.add(n_pl)
  235 + db.session.commit()
  236 +
  237 + # Feed agents from agents list previously filled
211 238 #
212 239 for a in agents:
213 240 status = AgentStatus.query.filter(AgentStatus.name == a['status']).one_or_none()
... ... @@ -235,6 +262,9 @@ def feed_from_irap(csv_file_name):
235 262 db.session.add(n_a)
236 263 db.session.commit()
237 264  
  265 + # Feed agents from agents list previously filled
  266 + #
  267 +
238 268 # Now feed the charges.
239 269 #
240 270 # At least one for each csv row
... ... @@ -441,8 +471,8 @@ def feed_random_charges(agent):
441 471 def user_add(email, name, login, password, role):
442 472 """ Add a new user in db."""
443 473 user = User.query.filter(User.name == name).one_or_none()
444   - if (user):
445   - current_app.logger.error(f"user already exists {name}")
  474 + if user:
  475 + current_app.logger.warn(f"user already exists {name}")
446 476 return
447 477 user = User(email=email, name=name, login=login, password=password, role=role)
448 478 db.session.add(user)
... ...
tests/backend_tests.py
... ... @@ -60,45 +60,3 @@ class DbMgrTestCase(BaseTestCase):
60 60 # Waiting for 17 periods + headers line
61 61 self.assertEqual(18, len(stacked_charges))
62 62  
63   -
64   -class AuthModelTestCase(BaseTestCase):
65   -
66   - def skip_if_no_sqlitememory(self):
67   - if 'memory' not in self.app.config['SQLALCHEMY_DATABASE_URI']:
68   - self.skipTest("Needs in memory sqlite")
69   -
70   - def get_admin(self):
71   - return User.query.filter(User.name == 'admin').one()
72   -
73   - def setUp(self):
74   - BaseTestCase.setUp(self)
75   - self.skip_if_no_sqlitememory()
76   - db.create_all()
77   - admin = User(email='admin@nowhere.org', name='admin', login='admin', role='admin')
78   - db.session.add(admin)
79   - db.session.commit()
80   -
81   - def test_in_memory(self):
82   - self.app.logger.info("In memory Sqlite DB for tests")
83   - self.assertTrue(True)
84   -
85   - def test_setrole(self):
86   - admin = self.get_admin()
87   - admin.set_role("ADMIN")
88   - db.session.commit()
89   - admin = self.get_admin()
90   - self.assertTrue(admin is not None)
91   - self.assertTrue(admin.has_role("ADMIN"))
92   - self.assertFalse(admin.has_role("SERVICE"))
93   -
94   - def test_setrole_valueerror(self):
95   - admin = self.get_admin()
96   - with self.assertRaises(ValueError) as ve:
97   - admin.set_role("NOSUCHROLE")
98   -
99   - def test_setcheckpassword(self):
100   - admin = self.get_admin()
101   - admin.set_password("hahaha")
102   - db.session.commit()
103   - admin2 = self.get_admin()
104   - self.assertTrue(admin2.check_password("hahaha"))
... ...
tests/db_tests.py
1 1 import unittest
2 2  
3   -from app import create_app, db
  3 +from app import create_app, db, User
4 4 from app.models import Project
5 5 from pdc_config import TestConfig
6 6 from tests.common_db_feed import feed_projects
... ... @@ -26,3 +26,46 @@ class DbBaseTestCase(unittest.TestCase):
26 26 def test_first(self):
27 27 projects = Project.query.all()
28 28 self.assertEqual(3, len(projects))
  29 +
  30 +
  31 +class AuthModelTestCase(DbBaseTestCase):
  32 +
  33 + def skip_if_no_sqlitememory(self):
  34 + if 'memory' not in self.app.config['SQLALCHEMY_DATABASE_URI']:
  35 + self.skipTest("Needs in memory sqlite")
  36 +
  37 + def get_admin(self):
  38 + return User.query.filter(User.name == 'admin').one()
  39 +
  40 + def setUp(self):
  41 + DbBaseTestCase.setUp(self)
  42 + self.skip_if_no_sqlitememory()
  43 + db.create_all()
  44 + admin = User(email='admin@nowhere.org', name='admin', login='admin', role='admin')
  45 + db.session.add(admin)
  46 + db.session.commit()
  47 +
  48 + def test_in_memory(self):
  49 + self.app.logger.info("In memory Sqlite DB for tests")
  50 + self.assertTrue(True)
  51 +
  52 + def test_setrole(self):
  53 + admin = self.get_admin()
  54 + admin.set_role("ADMIN")
  55 + db.session.commit()
  56 + admin = self.get_admin()
  57 + self.assertTrue(admin is not None)
  58 + self.assertTrue(admin.has_role("ADMIN"))
  59 + self.assertFalse(admin.has_role("SERVICE"))
  60 +
  61 + def test_setrole_valueerror(self):
  62 + admin = self.get_admin()
  63 + with self.assertRaises(ValueError) as ve:
  64 + admin.set_role("NOSUCHROLE")
  65 +
  66 + def test_setcheckpassword(self):
  67 + admin = self.get_admin()
  68 + admin.set_password("hahaha")
  69 + db.session.commit()
  70 + admin2 = self.get_admin()
  71 + self.assertTrue(admin2.check_password("hahaha"))
... ...