Commit b8338b91bd7fbaefc51f20c4f809619c62b2dd30
Exists in
master
and in
4 other branches
Import Irap CSV
Showing
8 changed files
with
433 additions
and
93 deletions
Show diff stats
CHANGELOG.md
VERSION.txt
app/commands/commands.py
1 | 1 | import csv |
2 | 2 | import os |
3 | 3 | import sys |
4 | + | |
4 | 5 | import click |
5 | 6 | import random |
6 | 7 | |
7 | 8 | from flask import current_app |
8 | -from sqlalchemy.exc import OperationalError, IntegrityError | |
9 | -from sqlalchemy.orm.exc import NoResultFound | |
9 | +from sqlalchemy.exc import IntegrityError | |
10 | 10 | from sqlalchemy.sql import func |
11 | -from sqlalchemy.ext.automap import automap_base | |
12 | -from sqlalchemy.orm import Session | |
13 | -from sqlalchemy import create_engine | |
14 | 11 | |
15 | -from app.models import db, Agent, Service, Project, Capacity, Period, Charge | |
12 | +from app.models import db, Agent, Service, Project, Capacity, Period, Charge, AgentStatus, Company, AgentBap, \ | |
13 | + AgentGrade, Category, Label, ProjectLabel | |
14 | +# TODO: rename to methods and add get_roles() | |
16 | 15 | from app.auth.models import User, _nameToRole, _roleToName |
17 | 16 | |
18 | 17 | from . import bp |
... | ... | @@ -57,38 +56,129 @@ def feed_from_irap(csv_file_name): |
57 | 56 | with open(csv_file_name, newline='') as csvfile: |
58 | 57 | csvreader = csv.DictReader(csvfile, delimiter=',', quotechar='"') |
59 | 58 | for row in csvreader: |
60 | - # print('\n'.join(row.keys())) | |
61 | - # break | |
62 | 59 | # Remove any leading/trailing spaces |
63 | 60 | row = {k: v.strip() for k, v in row.items()} |
64 | 61 | rows.append(row) |
65 | 62 | |
66 | 63 | firstname_key = 'NOM' |
67 | - secondname_key = 'prénom' | |
64 | + secondname_key = 'PRÉNOM' | |
65 | + status_key = 'STATUT' | |
66 | + virtual_key = 'VIRTUEL' | |
67 | + company_key = 'SOCIÉTÉ' | |
68 | + bap_key = 'BAP' | |
69 | + grade_key = 'GRADE CORPS' | |
68 | 70 | project_key = 'PROJETS' |
69 | - service_key = 'Groupe métier' | |
70 | - # typology_title = 'TYPOLOGIE' | |
71 | - # thematic_title = 'thématique' | |
71 | + service_key = 'GROUPE MÉTIER' | |
72 | + categorie_keys = ['TYPOLOGIE', 'THÉMATIQUE'] | |
72 | 73 | |
73 | 74 | # Get the columns values |
74 | 75 | # |
75 | - projects = [r[project_key] for r in rows] | |
76 | - projects = sorted(set(projects)) | |
77 | - agents = [(r[firstname_key], r[secondname_key].strip()) for r in rows] | |
78 | - agents = sorted(set(agents)) | |
79 | - services = [r[service_key] for r in rows] | |
80 | - services = sorted(set(services)) | |
81 | - | |
82 | - # thematics = [r[thematic_title] for r in rows] | |
83 | - # thematics = sorted(set(thematics)) | |
84 | - # typologies = [r[typology_title] for r in rows] | |
85 | - # typologies = sorted(set(typologies)) | |
86 | - | |
87 | - # Feed agents from column | |
76 | + agents = [] | |
77 | + services = [] | |
78 | + baps = [] | |
79 | + grades = [] | |
80 | + companies = [] | |
81 | + statuses = [] | |
82 | + | |
83 | + # build a category dict of lists | |
84 | + # key being the category name, | |
85 | + # list being filled with corresponding labels | |
86 | + categories = {k: [] for k in categorie_keys} | |
87 | + | |
88 | 88 | # |
89 | - for a in agents: | |
90 | - n_a = Agent(firstname=a[0], secondname=a[1]) | |
91 | - db.session.add(n_a) | |
89 | + categorized_projects = {} | |
90 | + | |
91 | + for r in rows: | |
92 | + services.append(r[service_key]) | |
93 | + baps.append(r[bap_key]) | |
94 | + grades.append(r[grade_key]) | |
95 | + companies.append(r[company_key]) | |
96 | + statuses.append(r[status_key]) | |
97 | + | |
98 | + # categorized_projects | |
99 | + project = r[project_key] | |
100 | + categorized_projects[project] = [] | |
101 | + | |
102 | + # fill in the category-labels dict | |
103 | + for k in categorie_keys: | |
104 | + categories[k].append(r[k]) | |
105 | + categorized_projects[project].append(r[k]) | |
106 | + | |
107 | + # build agents list of dicts | |
108 | + agents.append({ | |
109 | + 'firstname': r[firstname_key], | |
110 | + 'secondname': r[secondname_key], | |
111 | + 'status': r[status_key], | |
112 | + 'virtual': r[virtual_key], | |
113 | + 'company': r[company_key], | |
114 | + 'bap': r[bap_key], | |
115 | + 'grade': r[grade_key]}) | |
116 | + | |
117 | + # Uppercase the small tables | |
118 | + # | |
119 | + baps = [x.upper() for x in baps] | |
120 | + grades = [x.upper() for x in grades] | |
121 | + statuses = [x.upper() for x in statuses] | |
122 | + | |
123 | + # Now, sort the lists | |
124 | + # | |
125 | + # 1- first remove empty string with filter() | |
126 | + # 2- then keep only uniq item with set() | |
127 | + # 3- at last alpha sort with sorted() | |
128 | + # | |
129 | + services = sorted(set(filter(None, services))) | |
130 | + baps = sorted(set(filter(None, baps))) | |
131 | + grades = sorted(set(filter(None, grades))) | |
132 | + companies = sorted(set(filter(None, companies))) | |
133 | + statuses = sorted(set(filter(None, statuses))) | |
134 | + | |
135 | + # Do the same for the projects, that are keys of categorized_projects | |
136 | + # | |
137 | + projects = sorted(set(categorized_projects.keys())) | |
138 | + | |
139 | + # Do the same for the category labels stored as a dict | |
140 | + # | |
141 | + # k is category name | |
142 | + # v is labels list for that category | |
143 | + for k in categorie_keys: | |
144 | + labels = sorted(set(filter(None, categories[k]))) | |
145 | + categories[k] = labels | |
146 | + | |
147 | + # At least, as agents is a list of dicts, sorting is a bit tricky | |
148 | + # | |
149 | + # the first one liner will store the last agent's line only | |
150 | + # then we alpha sort on the name | |
151 | + # on both, the discrimination is based on the name couple: (firstname, secondname) | |
152 | + # | |
153 | + agents = list({(a['firstname'], a['secondname']): a for a in agents}.values()) | |
154 | + agents = sorted(agents, key=lambda a: (a['firstname'], a['secondname'])) | |
155 | + | |
156 | + # Feed baps from column | |
157 | + # | |
158 | + for b in baps: | |
159 | + n_b = AgentBap(name=b) | |
160 | + db.session.add(n_b) | |
161 | + db.session.commit() | |
162 | + | |
163 | + # Feed grades from column | |
164 | + # | |
165 | + for g in grades: | |
166 | + n_g = AgentGrade(name=g) | |
167 | + db.session.add(n_g) | |
168 | + db.session.commit() | |
169 | + | |
170 | + # Feed companies from column | |
171 | + # | |
172 | + for c in companies: | |
173 | + n_c = Company(name=c) | |
174 | + db.session.add(n_c) | |
175 | + db.session.commit() | |
176 | + | |
177 | + # Feed statuses from column | |
178 | + # | |
179 | + for s in statuses: | |
180 | + n_s = AgentStatus(name=s) | |
181 | + db.session.add(n_s) | |
92 | 182 | db.session.commit() |
93 | 183 | |
94 | 184 | # Feed projects from column |
... | ... | @@ -107,7 +197,8 @@ def feed_from_irap(csv_file_name): |
107 | 197 | |
108 | 198 | # Feed periods names |
109 | 199 | # Todo: are statically built, |
110 | - # should come from year column name. | |
200 | + # should come from year column name. | |
201 | + # also see later | |
111 | 202 | # |
112 | 203 | for p in range(2011, 2030): |
113 | 204 | n_p = Period(name=f"{p}") |
... | ... | @@ -115,9 +206,65 @@ def feed_from_irap(csv_file_name): |
115 | 206 | db.session.commit() |
116 | 207 | |
117 | 208 | # Add one default capacity |
118 | - db.session.add(Capacity(name="Travailleur")) | |
209 | + # | |
210 | + db.session.add(Capacity(name="Agent")) | |
211 | + db.session.commit() | |
212 | + | |
213 | + # Feed categories and labels | |
214 | + # | |
215 | + for c, l in categories.items(): | |
216 | + current_app.logger.debug("Adding category " + c) | |
217 | + n_c = Category(name=c) | |
218 | + db.session.add(n_c) | |
219 | + # db.session.commit() | |
220 | + for label in l: | |
221 | + current_app.logger.debug("Adding label {} for id {}".format(label, n_c.id)) | |
222 | + n_l = Label(name=label, category=n_c) | |
223 | + db.session.add(n_l) | |
224 | + db.session.commit() | |
225 | + | |
226 | + # Feed project's labels | |
227 | + # | |
228 | + for p, labels in categorized_projects.items(): | |
229 | + n_p = Project.query.filter(Project.name == p).one() | |
230 | + for l in labels: | |
231 | + n_l = Label.query.filter(Label.name == l).one() | |
232 | + n_c = n_l.category | |
233 | + n_pl = ProjectLabel(project=n_p, category=n_c, label=n_l) | |
234 | + db.session.add(n_pl) | |
119 | 235 | db.session.commit() |
120 | 236 | |
237 | + # Feed agents from agents list previously filled | |
238 | + # | |
239 | + for a in agents: | |
240 | + status = AgentStatus.query.filter(AgentStatus.name == a['status']).one_or_none() | |
241 | + if status is None and a['status']: | |
242 | + status = AgentStatus(name=a['status']) | |
243 | + company = Company.query.filter(Company.name == a['company']).one_or_none() | |
244 | + if company is None and a['company']: | |
245 | + company = Company(name=a['company']) | |
246 | + bap = AgentBap.query.filter(AgentBap.name == a['bap']).one_or_none() | |
247 | + if bap is None and a['bap']: | |
248 | + bap = AgentBap(name=a['bap']) | |
249 | + grade = AgentGrade.query.filter(AgentGrade.name == a['grade']).one_or_none() | |
250 | + if grade is None and a['grade']: | |
251 | + grade = AgentBap(name=a['grade']) | |
252 | + virtual = 1 if a['virtual'] else 0 | |
253 | + permanent = 1 if a['status'] == 'PERM' else 0 | |
254 | + n_a = Agent(firstname=a['firstname'], | |
255 | + secondname=a['secondname'], | |
256 | + status_id=status.id if status else None, | |
257 | + company_id=company.id if company else None, | |
258 | + bap_id=bap.id if bap else None, | |
259 | + grade_id=grade.id if grade else None, | |
260 | + virtual=virtual, | |
261 | + permanent=permanent) | |
262 | + db.session.add(n_a) | |
263 | + db.session.commit() | |
264 | + | |
265 | + # Feed agents from agents list previously filled | |
266 | + # | |
267 | + | |
121 | 268 | # Now feed the charges. |
122 | 269 | # |
123 | 270 | # At least one for each csv row |
... | ... | @@ -128,6 +275,7 @@ def feed_from_irap(csv_file_name): |
128 | 275 | a = Agent.query.filter(Agent.firstname == r[firstname_key], Agent.secondname == r[secondname_key]).one() |
129 | 276 | s = Service.query.filter(Service.name == r[service_key]).one() |
130 | 277 | c = Capacity.query.first() |
278 | + # TODO: period names should come from db request | |
131 | 279 | for period_name in range(2011, 2030): |
132 | 280 | t = Period.query.filter(Period.name == period_name).one() |
133 | 281 | charge = r[f"{period_name}"] |
... | ... | @@ -323,8 +471,8 @@ def feed_random_charges(agent): |
323 | 471 | def user_add(email, name, login, password, role): |
324 | 472 | """ Add a new user in db.""" |
325 | 473 | user = User.query.filter(User.name == name).one_or_none() |
326 | - if (user): | |
327 | - current_app.logger.error(f"user already exists {name}") | |
474 | + if user: | |
475 | + current_app.logger.warn(f"user already exists {name}") | |
328 | 476 | return |
329 | 477 | user = User(email=email, name=name, login=login, password=password, role=role) |
330 | 478 | db.session.add(user) |
... | ... | @@ -351,12 +499,12 @@ def user_update(user_id, name, role, email, password): |
351 | 499 | user.set_role(role) |
352 | 500 | print(f"User --{user.name}-- role updated to {_roleToName[user.role]}") |
353 | 501 | if email: |
354 | - user.email=email | |
502 | + user.email = email | |
355 | 503 | print(f"User --{user.name}-- email updated to {user.email}") |
356 | 504 | if password: |
357 | 505 | print(f"User --{user.name}-- password updated") |
358 | 506 | user.set_password(password) |
359 | - if not ( name or role or email or password): | |
507 | + if not (name or role or email or password): | |
360 | 508 | print(f"No update for user --{user.name}--") |
361 | 509 | db.session.commit() |
362 | 510 | ... | ... |
app/models.py
1 | 1 | from flask_sqlalchemy import SQLAlchemy |
2 | +from sqlalchemy.orm import relationship | |
2 | 3 | |
3 | 4 | db = SQLAlchemy() |
4 | 5 | |
5 | 6 | |
6 | -class Agent(db.Model): | |
7 | +# | |
8 | +# Categorized projects | |
9 | +# | |
10 | + | |
11 | +class Project(db.Model): | |
7 | 12 | id = db.Column(db.Integer, primary_key=True) |
8 | - firstname = db.Column(db.String(100)) | |
9 | - secondname = db.Column(db.String(100)) | |
13 | + name = db.Column(db.String) | |
14 | + labels = relationship("ProjectLabel", back_populates="project") | |
10 | 15 | |
11 | 16 | |
12 | -class Project(db.Model): | |
17 | +class Category(db.Model): | |
13 | 18 | id = db.Column(db.Integer, primary_key=True) |
14 | - name = db.Column(db.String(100), unique=True) | |
19 | + name = db.Column(db.String) | |
20 | + labels = relationship("Label", back_populates="category") | |
21 | + projects = relationship("ProjectLabel", back_populates="category") | |
15 | 22 | |
16 | 23 | |
17 | -class Service(db.Model): | |
24 | +class Label(db.Model): | |
18 | 25 | id = db.Column(db.Integer, primary_key=True) |
19 | - name = db.Column(db.String(100), unique=True) | |
20 | - abbr = db.Column(db.String(50), unique=True) | |
26 | + name = db.Column(db.String, unique=True) | |
27 | + category_id = db.Column(db.Integer, db.ForeignKey('category.id')) | |
28 | + category = relationship("Category", back_populates="labels") | |
29 | + projects = relationship("ProjectLabel", back_populates="label") | |
30 | + | |
31 | + | |
32 | +class ProjectLabel(db.Model): | |
33 | + project_id = db.Column(db.Integer, db.ForeignKey('project.id'), primary_key=True) | |
34 | + category_id = db.Column(db.Integer, db.ForeignKey('category.id'), primary_key=True) | |
35 | + label_id = db.Column(db.Integer, db.ForeignKey('label.id')) | |
36 | + | |
37 | + project = relationship("Project", back_populates="labels") | |
38 | + category = relationship("Category", back_populates="projects") | |
39 | + label = relationship("Label", back_populates="projects") | |
40 | + | |
41 | + | |
42 | +# | |
43 | +# Agents | |
44 | +# | |
45 | + | |
46 | +class AgentBap(db.Model): | |
47 | + id = db.Column(db.Integer, primary_key=True) | |
48 | + name = db.Column(db.String(16)) | |
21 | 49 | |
22 | 50 | |
23 | -class Function(db.Model): | |
51 | +class AgentGrade(db.Model): | |
52 | + id = db.Column(db.Integer, primary_key=True) | |
53 | + name = db.Column(db.String(16)) | |
54 | + | |
55 | + | |
56 | +class AgentStatus(db.Model): | |
57 | + id = db.Column(db.Integer, primary_key=True) | |
58 | + name = db.Column(db.String(16)) | |
59 | + | |
60 | + | |
61 | +class Company(db.Model): | |
62 | + id = db.Column(db.Integer, primary_key=True) | |
63 | + name = db.Column(db.String(16)) | |
64 | + | |
65 | + | |
66 | +class Agent(db.Model): | |
67 | + id = db.Column(db.Integer, primary_key=True) | |
68 | + firstname = db.Column(db.String(100)) | |
69 | + secondname = db.Column(db.String(100)) | |
70 | + company_id = db.Column(db.Integer, db.ForeignKey('company.id')) | |
71 | + grade_id = db.Column(db.Integer, db.ForeignKey('agent_grade.id')) | |
72 | + status_id = db.Column(db.Integer, db.ForeignKey('agent_status.id')) | |
73 | + bap_id = db.Column(db.Integer, db.ForeignKey('agent_bap.id')) | |
74 | + virtual = db.Column(db.Integer) # integer boolean | |
75 | + permanent = db.Column(db.Integer) # integer boolean | |
76 | + | |
77 | + | |
78 | +class Service(db.Model): | |
24 | 79 | id = db.Column(db.Integer, primary_key=True) |
25 | 80 | name = db.Column(db.String(100), unique=True) |
81 | + abbr = db.Column(db.String(50), unique=True) | |
26 | 82 | |
27 | 83 | |
28 | 84 | class Capacity(db.Model): | ... | ... |
tests/backend_tests.py
1 | +import os | |
2 | +import sys | |
1 | 3 | import unittest |
4 | +from shutil import copyfile | |
2 | 5 | |
3 | 6 | from pdc_config import TestConfig |
4 | 7 | from app import create_app, db_mgr, db |
... | ... | @@ -7,18 +10,29 @@ from app.auth.models import User |
7 | 10 | |
8 | 11 | class BaseTestCase(unittest.TestCase): |
9 | 12 | def setUp(self): |
10 | - # configure data base | |
13 | + # initialise app | |
11 | 14 | self.app = create_app(TestConfig) |
15 | + | |
16 | + # copy resource demo db to test file | |
17 | + appdir = os.path.join(os.path.dirname(os.path.abspath(__file__)), os.pardir) | |
18 | + sqlite_file_name = os.path.abspath(os.path.join(appdir, 'resources', 'lesia-btp.sqlite')) | |
19 | + if not os.path.isdir(self.app.instance_path): | |
20 | + os.mkdir(self.app.instance_path) | |
21 | + self.db_path = os.path.join(self.app.instance_path, 'test.db') | |
22 | + copyfile(sqlite_file_name, self.db_path) | |
23 | + | |
24 | + # force db path to newly create file | |
25 | + self.app.config.update( | |
26 | + SQLALCHEMY_DATABASE_URI='sqlite:///' + self.db_path | |
27 | + ) | |
28 | + | |
29 | + # update flask context | |
12 | 30 | self.app_context = self.app.app_context() |
13 | 31 | self.app_context.push() |
14 | - # TODO: shall we always copy db from resources sqlite file ? | |
15 | - # that would allow us to run all tests in sqlite:memory: | |
16 | - # db.create_all() | |
17 | - # admin = User(email='admin@nowhere.org', name='admin', login='admin', password='admin', role='admin') | |
18 | - # db.session.add(admin) | |
19 | - # db.session.commit() | |
20 | 32 | |
21 | 33 | def tearDown(self): |
34 | + if os.path.isfile(self.db_path): | |
35 | + os.remove(self.db_path) | |
22 | 36 | self.app_context.pop() |
23 | 37 | |
24 | 38 | def test_always_true(self): |
... | ... | @@ -46,45 +60,3 @@ class DbMgrTestCase(BaseTestCase): |
46 | 60 | # Waiting for 17 periods + headers line |
47 | 61 | self.assertEqual(18, len(stacked_charges)) |
48 | 62 | |
49 | - | |
50 | -class AuthModelTestCase(BaseTestCase): | |
51 | - | |
52 | - def skip_if_no_sqlitememory(self): | |
53 | - if 'memory' not in self.app.config['SQLALCHEMY_DATABASE_URI']: | |
54 | - self.skipTest("Needs in memory sqlite") | |
55 | - | |
56 | - def get_admin(self): | |
57 | - return User.query.filter(User.name == 'admin').one() | |
58 | - | |
59 | - def setUp(self): | |
60 | - BaseTestCase.setUp(self) | |
61 | - self.skip_if_no_sqlitememory() | |
62 | - db.create_all() | |
63 | - admin = User(email='admin@nowhere.org', name='admin', login='admin', role='admin') | |
64 | - db.session.add(admin) | |
65 | - db.session.commit() | |
66 | - | |
67 | - def test_in_memory(self): | |
68 | - self.app.logger.info("In memory Sqlite DB for tests") | |
69 | - self.assertTrue(True) | |
70 | - | |
71 | - def test_setrole(self): | |
72 | - admin = self.get_admin() | |
73 | - admin.set_role("ADMIN") | |
74 | - db.session.commit() | |
75 | - admin = self.get_admin() | |
76 | - self.assertTrue(admin is not None) | |
77 | - self.assertTrue(admin.has_role("ADMIN")) | |
78 | - self.assertFalse(admin.has_role("SERVICE")) | |
79 | - | |
80 | - def test_setrole_valueerror(self): | |
81 | - admin = self.get_admin() | |
82 | - with self.assertRaises(ValueError) as ve: | |
83 | - admin.set_role("NOSUCHROLE") | |
84 | - | |
85 | - def test_setcheckpassword(self): | |
86 | - admin = self.get_admin() | |
87 | - admin.set_password("hahaha") | |
88 | - db.session.commit() | |
89 | - admin2 = self.get_admin() | |
90 | - self.assertTrue(admin2.check_password("hahaha")) | ... | ... |
... | ... | @@ -0,0 +1,49 @@ |
1 | +import unittest | |
2 | + | |
3 | +from app import create_app, db, User | |
4 | +from app.commands.commands import feed_from_irap, show_roles, user_add, user_show_all | |
5 | +from pdc_config import TestConfig | |
6 | + | |
7 | +from app.commands import bp as cli_bp | |
8 | + | |
9 | + | |
10 | +class CliBaseTestCase(unittest.TestCase): | |
11 | + | |
12 | + def setUp(self): | |
13 | + # Get rid of a strange 'ValueError: I/O operation' when app.logger outputs | |
14 | + TestConfig.PDC_LOGS_LEVEL = 'ERROR' | |
15 | + # Force sqlite in memory db | |
16 | + TestConfig.SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:' | |
17 | + # Create and register app | |
18 | + self.app = create_app(TestConfig) | |
19 | + self.app.register_blueprint(cli_bp) | |
20 | + self.app_context = self.app.app_context() | |
21 | + self.app_context.push() | |
22 | + db.create_all() | |
23 | + | |
24 | + def tearDown(self): | |
25 | + pass | |
26 | + | |
27 | + def test_hello3(self): | |
28 | + runner = self.app.test_cli_runner() | |
29 | + result = runner.invoke(user_show_all) | |
30 | + print("\n") | |
31 | + print(result.output) | |
32 | + | |
33 | + def test_hello2(self): | |
34 | + runner = self.app.test_cli_runner() | |
35 | + result = runner.invoke(show_roles) | |
36 | + print(result.output) | |
37 | + | |
38 | + def test_add_user(self): | |
39 | + runner = self.app.test_cli_runner() | |
40 | + | |
41 | + all_users = User.query.all() | |
42 | + self.assertEqual(0, len(all_users)) | |
43 | + # invoke the command directly | |
44 | + arguments = ['geo@ici.fr', 'Joseph Hitier', 'joh', 'passwd', 'PUBLIC'] | |
45 | + result = runner.invoke(user_add, arguments) | |
46 | + print(result.output) | |
47 | + all_users = User.query.all() | |
48 | + print(len(all_users)) | |
49 | + # self.assertEqual(1, len(all_users)) | ... | ... |
... | ... | @@ -0,0 +1,39 @@ |
1 | +from app.models import Category, db, Label, Project, ProjectLabel | |
2 | + | |
3 | +categorized_labels = {'pole': ['Spatial', 'Sol'], 'domaine': ['soleil-terre', 'atmosphere', 'r&t', 'géologie']} | |
4 | +projects_categories = {'ChemCam': {'pole': 'Spatial', 'domaine': 'géologie'}, | |
5 | + 'Pilot': {'pole': 'Spatial', 'domaine': 'atmosphere'}, | |
6 | + 'Ambre': {'pole': 'Spatial', 'domaine': 'soleil-terre'}} | |
7 | + | |
8 | + | |
9 | +def feed_projects(): | |
10 | + for c_name, labels in categorized_labels.items(): | |
11 | + n_c = Category(name=c_name) | |
12 | + db.session.add(n_c) | |
13 | + for l_name in labels: | |
14 | + n_l = Label(name=l_name, category=n_c) | |
15 | + db.session.add(n_l) | |
16 | + | |
17 | + db.session.commit() | |
18 | + | |
19 | + project_names = projects_categories.keys() | |
20 | + | |
21 | + for p_name in set(project_names): | |
22 | + n_p = Project(name=p_name) | |
23 | + db.session.add(n_p) | |
24 | + db.session.commit() | |
25 | + | |
26 | + for p, categories in projects_categories.items(): | |
27 | + n_p = db.session.query(Project).filter(Project.name == p).one() | |
28 | + for c_name, l_name in categories.items(): | |
29 | + n_c = db.session.query(Category).filter(Category.name == c_name).one() | |
30 | + n_l = db.session.query(Label).filter(Label.name == l_name).one() | |
31 | + n_pc = ProjectLabel(project=n_p, category=n_c, label=n_l) | |
32 | + db.session.add(n_pc) | |
33 | + | |
34 | + db.session.commit() | |
35 | + | |
36 | + | |
37 | +def show_categories(): | |
38 | + for c in db.session.query(Category).all(): | |
39 | + print(c.name + ": ", ",".join([l.name for l in c.categorized_labels])) | ... | ... |
... | ... | @@ -0,0 +1,71 @@ |
1 | +import unittest | |
2 | + | |
3 | +from app import create_app, db, User | |
4 | +from app.models import Project | |
5 | +from pdc_config import TestConfig | |
6 | +from tests.common_db_feed import feed_projects | |
7 | + | |
8 | + | |
9 | +class DbBaseTestCase(unittest.TestCase): | |
10 | + def setUp(self): | |
11 | + self.app = create_app(TestConfig) | |
12 | + # force db uri to sqlite memory | |
13 | + self.app.config.update( | |
14 | + SQLALCHEMY_DATABASE_URI='sqlite:///:memory:' | |
15 | + ) | |
16 | + self.app_context = self.app.app_context() | |
17 | + self.app_context.push() | |
18 | + db.create_all() | |
19 | + feed_projects() | |
20 | + | |
21 | + def tearDown(self): | |
22 | + db.session.remove() | |
23 | + db.drop_all() | |
24 | + self.app_context.pop() | |
25 | + | |
26 | + def test_first(self): | |
27 | + projects = Project.query.all() | |
28 | + self.assertEqual(3, len(projects)) | |
29 | + | |
30 | + | |
31 | +class AuthModelTestCase(DbBaseTestCase): | |
32 | + | |
33 | + def skip_if_no_sqlitememory(self): | |
34 | + if 'memory' not in self.app.config['SQLALCHEMY_DATABASE_URI']: | |
35 | + self.skipTest("Needs in memory sqlite") | |
36 | + | |
37 | + def get_admin(self): | |
38 | + return User.query.filter(User.name == 'admin').one() | |
39 | + | |
40 | + def setUp(self): | |
41 | + DbBaseTestCase.setUp(self) | |
42 | + self.skip_if_no_sqlitememory() | |
43 | + db.create_all() | |
44 | + admin = User(email='admin@nowhere.org', name='admin', login='admin', role='admin') | |
45 | + db.session.add(admin) | |
46 | + db.session.commit() | |
47 | + | |
48 | + def test_in_memory(self): | |
49 | + self.app.logger.info("In memory Sqlite DB for tests") | |
50 | + self.assertTrue(True) | |
51 | + | |
52 | + def test_setrole(self): | |
53 | + admin = self.get_admin() | |
54 | + admin.set_role("ADMIN") | |
55 | + db.session.commit() | |
56 | + admin = self.get_admin() | |
57 | + self.assertTrue(admin is not None) | |
58 | + self.assertTrue(admin.has_role("ADMIN")) | |
59 | + self.assertFalse(admin.has_role("SERVICE")) | |
60 | + | |
61 | + def test_setrole_valueerror(self): | |
62 | + admin = self.get_admin() | |
63 | + with self.assertRaises(ValueError) as ve: | |
64 | + admin.set_role("NOSUCHROLE") | |
65 | + | |
66 | + def test_setcheckpassword(self): | |
67 | + admin = self.get_admin() | |
68 | + admin.set_password("hahaha") | |
69 | + db.session.commit() | |
70 | + admin2 = self.get_admin() | |
71 | + self.assertTrue(admin2.check_password("hahaha")) | ... | ... |