db_mgr.py
11.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
from app.models import db, Period, Project, Category
# TODO: make this configurable, and choose another place to use it,
# in 'routes.py' maybe
# Charge as stored on a 100% basis, percent of total worked time;
# Here it is possible to convert to an ETP basis, dividing by 100
# or we set it to '1' if we want to keep the percent basis
charge_unit = 100
def category_labels():
"""
Build a list of dicts of the categorized labels for form display:
[{'name': 'Domaine',
'labels': [{'id': 1, 'name': 'R&T'},
{'id': 3, 'name': 'Autres 2'},
.
.
.
{'id': 11, 'name': 'Instrumentation Sol'}],
},
{'name': 'Pôle',
'labels': [{'id': 12, 'name': 'Astronomie'},
{'id': 14, 'name': 'Solaire'},
.
.
.
{'id': 21, 'name': 'Administration'}]
}]
:return:
"""
_categories = []
for _c in Category.query.all():
_category = {'name': _c.name, 'labels': []}
for _l in _c.labels:
_category['labels'].append({'id': _l.label.id, 'name': _l.label.name})
_categories.append(_category)
return _categories
def projects():
"""
Build the list of all projects, with their charges for the current period
Returns a table with headers in the first line:
Id, Project name, Category_1, ... , Category_n, Total_Charge_for_the_period
The category cells embed the list of the labels of the project for that category.
:return:
"""
current_period_id = get_current_period()
sql_txt = """
select p.id, IFNULL(sum(tc.charge_rate), 0) as total_charge
from project as p left join
( select c.project_id, c.charge_rate from charge c where c.period_id = {})
tc
on p.id = tc.project_id
group by p.id
order by total_charge desc;
""".format(current_period_id)
projects_charges = db.session.execute(sql_txt).fetchall()
# First row is table titles
all_projects = [["Id", "Projet"]]
# Add all categories as next table titles
# headers will then look like [["Id", "Projet", "Domaine", "Pôle"]]
categories = Category.query.all()
for c in categories:
all_projects[0].append(c.name)
# then add charge title
# headers become [["Id", "Projet", "Domaine", "Pôle", "Charge"]]
all_projects[0].append("Charge")
# Build the table row by row
for _pc in projects_charges:
p_id = _pc[0]
project = Project.query.get(int(p_id))
# adding 2 first columns: id, name
project_row = [project.id, project.name]
# then labels, many for each category
for category in categories:
# we build the labels.name list of the intersection of current category's labels with project's labels
# Comprehensive shortcut is:
# labels = [_cl.label.name for _cl in category.labels if _cl.label in [_pl.label for _pl in project.labels]]
#
category_labels = [_cl.label for _cl in category.labels]
project_labels = [_pl.label for _pl in project.labels]
intersection_labels = list(set.intersection(set(project_labels), set(category_labels)))
labels = [label.name for label in intersection_labels]
project_row.append(labels)
# then add total charge
project_row.append(_pc[1])
all_projects.append(project_row)
return all_projects
def agents():
"""
Build the list of all agents, with their charges for the current period
:return:
"""
current_period_id = get_current_period()
sql_txt = """
select a.id, a.firstname, a.secondname, IFNULL (sum(tc.charge_rate), 0) as total_charge
from agent as a left join
( select c.agent_id, c.charge_rate from charge c where c.period_id = {})
tc
on a.id = tc.agent_id
group by a.id
order by total_charge desc;
""".format(current_period_id)
all_agents = db.session.execute(sql_txt).fetchall()
return all_agents
def charges_by_project(project_id):
"""
Build the flatten list of charges for one project, period by period
:param project_id:
:return:
"""
req_sql = f"""
select p.name as period_name,
a.firstname ||" "|| a.secondname as agent_name,
s.name as service_name,
c2.name as capacity_name,
c.charge_rate as charge_rate
from charge as c
join period p on c.period_id = p.id
join project p2 on c.project_id = p2.id
join service s on c.service_id = s.id
join agent a on a.id = c.agent_id
join capacity c2 on c2.id = c.capacity_id
where project_id = {project_id}
order by c.period_id
"""
req_res = db.session.execute(req_sql)
results = list(req_res)
# Remove comma
nocomma_results = []
for line in results:
nocomma_line = [str(cell) for cell in line]
nocomma_results.append(nocomma_line)
headers = ["Periode", "Agent", "Service", "Fonction", "Charge"]
nocomma_results.insert(0, headers)
return nocomma_results
def charges_by_project_stacked(project_id, category="service"):
"""
Build the list of charges for one project, period by period
:param project_id: the project's id we want to return data for
:param category: what dict to build for each period, 'service' or 'capacity' ?
:return: a 2 dim table with header as first line and datas next, of the form
period, category_0, category_1, ....., category_n,
per_0, value_00, value_01, ....., value_0n,
per_1, value_10, value_11, ....., value_1n,
.
.
per_n, value_n0, value_n1, ....., value_nn,
TODO: common with charges_by_agent_stacked(agent_id): code to extrat
"""
if category == 'capacity':
category_table = 'capacity'
sql_req = """
select sum(c.charge_rate)
from capacity c1 left join
charge c on c1.id = c.capacity_id and project_id = {} and period_id={}
group by c1.id
order by c1.id
"""
elif category == 'service':
category_table = 'service'
sql_req = """
select sum(c.charge_rate)
from service s left join
charge c on s.id = c.service_id and project_id = {} and period_id={}
group by s.id
order by s.id
"""
else:
raise ValueError("Waiting for 'service' or 'capacity'")
categories_req = "select name from {} order by id".format(category_table)
categories = [c for (c,) in db.session.execute(categories_req)]
headers = ['period'] + categories
all_charges = [headers]
for (period_id, period_name) in db.session.execute("select id, name from period order by id"):
# build the request from the string previously set
charge_by_categorie_req = sql_req.format(project_id, period_id)
# build the charges line for the current period
category_charges = [period_name]
for (category_rate,) in db.session.execute(charge_by_categorie_req):
category_rate = str(round(category_rate / charge_unit, 2)) if category_rate else '0'
category_charges.append(category_rate)
all_charges.append(category_charges)
return all_charges
def charges_for_projects_stacked():
sql_req = """
select p.name, sum(charge_rate) as tot_charg
from charge
join project p on p.id = charge.project_id
where period_id = {}
group by project_id
"""
projects_req = "select name from project order by id"
projects_names = [name for (name,) in db.session.execute(projects_req)]
headers = ['period'] + projects_names
all_charges = [headers]
for (period_id, period_name) in db.session.execute("select id, name from period order by id"):
project_charges = [period_name]
charges_for_projects_req = sql_req.format(period_id)
for( project_name, project_charge) in db.session.execute(charges_for_projects_req):
project_charge = str(round(project_charge / charge_unit, 2)) if project_charge else '0'
project_charges.append(project_charge)
all_charges.append(project_charges)
return all_charges
def charges_by_agent_stacked(agent_id):
"""
Build the list of charges for all projects of one agent, period by period
TODO: common with charges_by_project_stacked(project_id, ..) code to extrat
:param agent_id:
:return:
"""
categories_req = "select name from project order by id"
categories = [c for (c,) in db.session.execute(categories_req)]
headers = ['period'] + categories
all_charges = [headers]
for (period_id, period_name) in db.session.execute("select id, name from period order by id"):
charge_by_project_req = """
select sum(c.charge_rate)
from project p
left join
charge c on p.id = c.project_id and agent_id = {} and period_id = {}
group by p.id
order by p.id
""".format(agent_id, period_id)
category_charges = [period_name]
for (category_rate,) in db.session.execute(charge_by_project_req):
category_rate = str(round(category_rate / charge_unit, 2)) if category_rate else '0'
category_charges.append(category_rate)
all_charges.append(category_charges)
return all_charges
def charges_by_agent_tabled(agent_id):
"""
Build the flatten list of charges for one agent, period by period
:param agent_id:
:return:
"""
req_sql = f"""
select p.name as period_name,
p2.name as project_name,
s.name as service_name,
c2.name as capacity_name,
c.charge_rate as charge_rate
from charge as c
join period p on c.period_id = p.id
join project p2 on c.project_id = p2.id
join service s on c.service_id = s.id
join agent a on a.id = c.agent_id
join capacity c2 on c2.id = c.capacity_id
where agent_id = {agent_id}
order by c.period_id
"""
req_res = db.session.execute(req_sql)
results = list(req_res)
# Remove comma
nocomma_results = []
for line in results:
nocomma_line = [str(cell) for cell in line]
nocomma_results.append(nocomma_line)
headers = ["Periode", "Projet", "Service", "Fonction", "Charge"]
nocomma_results.insert(0, headers)
return nocomma_results
def charges_by_agent(agent_id):
periods = db.session.execute("select name from period")
sql_txt = "select p.name, sum(charge_rate) from charge inner join period p on charge.period_id = p.id\
where agent_id={} group by period_id order by p.name" \
.format(agent_id)
request = db.session.execute(sql_txt)
all_charges = []
charges_list = [(period, charge) for (period, charge) in request.fetchall()]
charges_dict = dict(charges_list)
for (p,) in periods:
if p in charges_dict.keys():
all_charges.append([p, charges_dict[p]])
else:
all_charges.append([p, 0])
return all_charges
def get_current_period():
"""
:return: the id of the period of current day
"""
# TODO: request on dates as soon as periods are dated
p = Period.query.filter((Period.name == '2021') | (Period.name == '2021_S1')).one()
return p.id