import csv import os import sys from datetime import datetime import click import random from flask import current_app from sqlalchemy.exc import IntegrityError from sqlalchemy.sql import func from app.models import db, Agent, Service, Project, Capacity, Period, Charge, AgentStatus, Company, AgentBap, \ AgentGrade, Category, Label, ProjectLabel, CategoryLabel, AgentHistory, AgentResponsability, AgentSkill # TODO: rename to methods and add get_roles() from app.auth.models import User, _nameToRole, _roleToName from . import bp def feed_statical(): """ Fill in db with default statical lists TODQ: feed from config file :return: """ responsabilities = [ {"abbr": "CP", "name": "Chef de projet"}, {"abbr": "CS", "name": "Chef de service"}, {"abbr": "CSad", "name": "Chef de service adjoint"}, {"abbr": "IngeSys", "name": "Ingénieur Systèmes"}, {"abbr": "RS", "name": "Responsable scientifique"} ] for r in responsabilities: n_ar = AgentResponsability(abbr=r['abbr'], name=r['name']) db.session.add(n_ar) db.session.commit() skills = ["AIT", "AIT Optique", "Assistant Prévention & Sécurité", "Assistant Prévention & Sécurité", "Assistant Prévention et Sécurité", "Assurance produit", "Automatismes", "C", "C++", "chef de service", "Configuration FPGA", "cryogénie", "CSS", "détection fuite Hélium", "Electronique", "Electronique AIT/AIV", "essais en environnement", "essais vide-thermique", "Excel", "fabrication mécanique", "FPGA", "Gestion de projet", "graphisme", "Haskell", "HTML", "IDL", "instrumentation spatiale", "Java", "Javascript", "LabView", "Latex", "logiciels embarqués", "Machine Learning", "mécanique BE", "optique instrumentale", "optronique", "photographie", "Photoshop", "PHP", "Powerpoint", "Pro du café", "processeurs LEON", "Python", "Qualité système", "Ruby", "Ruby on Rails", "Scrum", "SpaceWire", "temps-réel", "Vide", "Word"] for s in skills: n_as = AgentSkill(name=s) db.session.add(n_as) db.session.commit() @bp.cli.command('feed_default') def feed_default(): feed_statical() @bp.cli.command('create_db') def create_db(): """ Create the database structure. Database should be empty. configure the proper database uri in the db_config.py file. """ db.create_all() admin = User(email='admin@nowhere.org', name='admin', login='admin', role='admin') admin.set_password('admin') sqlite_uri = db.engine.url.__str__() if 'sqlite' in db.engine.url.__str__() else None try: db.session.add(admin) db.session.commit() except IntegrityError: current_app.logger.error("User admin already exists, database should be empty at create") if sqlite_uri: current_app.logger.error("see " + sqlite_uri) sys.exit(-1) if sqlite_uri: print("Created sqlite db: " + sqlite_uri) @bp.cli.command("feed_from_irap") @click.option('--csv-file', '-f', 'csv_file_name', help="the csv file path to feed from") def feed_from_irap(csv_file_name): """ Use an Irap csv charges files and feed db with :param csv_file_name: :return: """ rows = [] with open(csv_file_name, newline='') as csvfile: csvreader = csv.DictReader(csvfile, delimiter=',', quotechar='"') for row in csvreader: # Remove any leading/trailing spaces row = {k: v.strip() for k, v in row.items()} rows.append(row) firstname_key = 'NOM' secondname_key = 'PRÉNOM' status_key = 'STATUT' virtual_key = 'VIRTUEL' company_key = 'SOCIÉTÉ' bap_key = 'BAP' grade_key = 'GRADE CORPS' project_key = 'PROJETS' service_key = 'GROUPE MÉTIER' categorie_keys = ['TYPOLOGIE', 'THÉMATIQUE'] # Get the columns values # agents = [] services = [] baps = [] grades = [] companies = [] statuses = [] labels = [] # Build a category dict of lists # key being the category name, # list being filled with corresponding labels categorie_labels = {k: [] for k in categorie_keys} # Projects' labels is a dict of lists # indexed by project name # containing labels for that project # project_labels = {} # # Parse the rows and fill in various lists # for r in rows: services.append(r[service_key]) baps.append(r[bap_key]) grades.append(r[grade_key]) companies.append(r[company_key]) statuses.append(r[status_key]) # the projet and its labels project_name = r[project_key] project_labels[project_name] = [] # now fill in both # the labels list, # the category-labels dict, # and the project-labels dict for k in categorie_keys: labels.append(r[k]) categorie_labels[k].append(r[k]) project_labels[project_name].append(r[k]) # create the agents list of dicts agents.append({ 'firstname': r[firstname_key], 'secondname': r[secondname_key], 'status': r[status_key], 'virtual': r[virtual_key], 'company': r[company_key], 'bap': r[bap_key], 'grade': r[grade_key]}) # Uppercase the small tables # baps = [x.upper() for x in baps] grades = [x.upper() for x in grades] statuses = [x.upper() for x in statuses] # Now, sort the lists # # 1- first remove empty string with filter() # 2- then keep only uniq item with set() # 3- at last alpha sort with sorted() # services = sorted(set(filter(None, services))) baps = sorted(set(filter(None, baps))) grades = sorted(set(filter(None, grades))) companies = sorted(set(filter(None, companies))) statuses = sorted(set(filter(None, statuses))) labels = sorted(set(filter(None, labels))) # Do the same for the projects, that are keys of project_labels # projects = sorted(set(project_labels.keys())) # Do the same for the labels inside each category # c is the category name containing the labels list # for c in categorie_keys: c_labels = sorted(set(filter(None, categorie_labels[c]))) categorie_labels[c] = c_labels # At least, as agents is a list of dicts, sorting is a bit tricky # # the first one liner will store the last agent's line only # then we alpha sort on the name # on both, the discrimination is based on the name couple: (firstname, secondname) # agents = list({(a['firstname'], a['secondname']): a for a in agents}.values()) agents = sorted(agents, key=lambda a: (a['firstname'], a['secondname'])) # # We are done with collecting data # # Now we write to database # # Feed baps from column # for b in baps: n_b = AgentBap(name=b) db.session.add(n_b) db.session.commit() # Feed grades from column # for g in grades: n_g = AgentGrade(name=g) db.session.add(n_g) db.session.commit() # Feed companies from column # for c in companies: n_c = Company(name=c) db.session.add(n_c) db.session.commit() # Feed statuses from column # for s in statuses: n_s = AgentStatus(name=s) db.session.add(n_s) db.session.commit() # Feed projects from column # for p in projects: n_p = Project(name=p) db.session.add(n_p) db.session.commit() # Feed labels from column # for _l in labels: n_l = Label(name=_l) db.session.add(n_l) db.session.commit() # Feed categories from initial list # for _c in categorie_keys: n_c = Category(name=_c) db.session.add(n_c) db.session.commit() # Feed services from column # for s in services: n_s = Service(name=s) db.session.add(n_s) db.session.commit() # Feed periods names # Todo: are statically built, # should come from year column name. # also see later # for p in range(2011, 2030): n_p = Period(name=f"{p}") db.session.add(n_p) db.session.commit() # Add one default capacity # db.session.add(Capacity(name="Agent")) db.session.commit() # Feed categories and labels # for category, labels in categorie_labels.items(): print(f"Category: {category}") n_c = Category.query.filter_by(name=category).one() for label in labels: print(label) n_l = Label.query.filter(Label.name == label).one() current_app.logger.debug(f"Adding label {label} to category {category}") n_cl = CategoryLabel(label=n_l, category=n_c) db.session.add(n_cl) db.session.commit() # Feed project's labels # for project, labels in project_labels.items(): print(f"Project {project}") n_p = Project.query.filter(Project.name == project).one() for label in labels: n_l = Label.query.filter(Label.name == label).one() n_pl = ProjectLabel(project=n_p, label=n_l) db.session.add(n_pl) db.session.commit() # Feed agents from agents list previously filled # for a in agents: status = AgentStatus.query.filter(AgentStatus.name == a['status']).one_or_none() if status is None and a['status']: status = AgentStatus(name=a['status']) company = Company.query.filter(Company.name == a['company']).one_or_none() if company is None and a['company']: company = Company(name=a['company']) bap = AgentBap.query.filter(AgentBap.name == a['bap']).one_or_none() if bap is None and a['bap']: bap = AgentBap(name=a['bap']) grade = AgentGrade.query.filter(AgentGrade.name == a['grade']).one_or_none() if grade is None and a['grade']: grade = AgentBap(name=a['grade']) virtual = 1 if a['virtual'] else 0 permanent = 1 if a['status'] == 'PERM' else 0 n_a = Agent(firstname=a['firstname'], secondname=a['secondname'], status_id=status.id if status else None, company_id=company.id if company else None, bap_id=bap.id if bap else None, grade_id=grade.id if grade else None, virtual=virtual, permanent=permanent) db.session.add(n_a) db.session.commit() # Feed status history # This is done for the current period only # as this csv format doesnt hold any history information # for a in Agent.query.all(): _p = Period.query.filter(Period.name == datetime.today().year).one() n_ap = AgentHistory(period_id=_p.id, agent_id=a.id, status_id=a.status_id) db.session.add(n_ap) db.session.commit() # Now feed both charges and status history # # At least one for each csv row # At most one for each year # for r in rows: p = Project.query.filter(Project.name == r[project_key]).one() a = Agent.query.filter(Agent.firstname == r[firstname_key], Agent.secondname == r[secondname_key]).one() s = Service.query.filter(Service.name == r[service_key]).one() c = Capacity.query.first() # TODO: period names should come from db request for period_name in range(2011, 2030): t = Period.query.filter(Period.name == period_name).one() charge_value = r[f"{period_name}"] # Charge are stored as percent in db, but as fraction of ETP in irap csv # we make the conversion here. try: charge_value = int(100 * float(charge_value)) except ValueError: charge_value = 0 if charge_value == 0: continue n_c = Charge(agent_id=a.id, project_id=p.id, service_id=s.id, capacity_id=c.id, period_id=t.id, charge_rate=charge_value) status_value = r[status_key].strip().upper() if not status_value: continue st = AgentStatus.query.filter(AgentStatus.name == status_value).one_or_none() if st is None: continue print(f"Adding history p: {t.name}, a: {a.firstname}, s: {st.name}") n_ap = AgentHistory(period_id=t.id, agent_id=a.id, status_id=st.id) db.session.add_all([n_c, n_ap]) db.session.commit() @bp.cli.command("feed_from_lesia") def feed_from_lesia(): """ Feed db with agents from a lesia like mysql database. Remember to configure the proper database uri in the db_config.py file. """ from .lesia_db import lesia_agent, lesia_session, lesia_service, lesia_project, \ lesia_fonction, lesia_periods, lesia_affectation, lesia_domains, lesia_poles, \ lesia_domainprojects, lesia_agentsemestres, lesia_statutagent # Feed all lesia 'domaine' names and link to new category "Domaine" # domain_category = Category(name="Domaine") domains = lesia_session.query(lesia_domains) for d in domains: n_l = Label(name=d.nom) n_cl = CategoryLabel(category=domain_category, label=n_l) db.session.add(n_cl) db.session.commit() # Feed all lesia 'pôle' names and link to new category "Pôle" # pole_category = Category(name="Pôle") poles = lesia_session.query(lesia_poles) for p in poles: n_l = Label(name=p.nom) n_cl = CategoryLabel(category=pole_category, label=n_l) db.session.add(n_cl) db.session.commit() # Feed lesia project with proper "pôle" # (as this information is stored in gestit_projets) # projects = lesia_session.query(lesia_project).all() for p in projects: # add project n_p = Project(id=p.id, name=p.nom) db.session.add(n_p) # get corresponding lesia pole name pole_name = lesia_session.query(lesia_poles).filter(lesia_poles.id == p.pole_id).one().nom # search corresponding Label and store in ProjectLabel table n_l = Label.query.filter(Label.name == pole_name).one() n_pl = ProjectLabel(project=n_p, label=n_l) db.session.add(n_pl) db.session.commit() # Get projects domain information and store in ProjectLabel # domain_projects = lesia_session.query(lesia_domainprojects) for dp in domain_projects: project_name = lesia_session.query(lesia_project).filter(lesia_project.id == dp.projet_id).one().nom domain_name = lesia_session.query(lesia_domains).filter(lesia_domains.id == dp.domaine_id).one().nom n_p = Project.query.filter(Project.name == project_name).one() n_l = Label.query.filter(Label.name == domain_name).one() n_pl = ProjectLabel(project=n_p, label=n_l) db.session.add(n_pl) # Some projects have 2 domain labels in lesia db # That is not allowed any more in the new model. # try: db.session.commit() except IntegrityError: db.session.rollback() current_app.logger.error( "Error adding project to category/label: {} {} {}".format(n_p.name, n_l.category.name, n_l.name)) continue agents = lesia_session.query(lesia_agent).all() for a in agents: n_a = Agent(id=a.IDagent, firstname=a.nom, secondname=a.prenom) db.session.add(n_a) db.session.commit() services = lesia_session.query(lesia_service).all() for s in services: n_s = Service(id=s.id, name=s.nom, abbr=s.abbreviation) db.session.add(n_s) db.session.commit() fonctions = lesia_session.query(lesia_fonction).all() for f in fonctions: n_c = Capacity(id=f.id, name=f.nom) db.session.add(n_c) db.session.commit() periods = lesia_session.query(lesia_periods) for p in periods: n_p = Period(name=p.id_semestre) db.session.add(n_p) db.session.commit() affectations = lesia_session.query(lesia_affectation) for f in affectations: p = Period.query.filter(Period.name == f.semestre_id).one() n_c = Charge(agent_id=f.agent_id, project_id=f.projet_id, service_id=f.service_id, capacity_id=f.fonction_id, period_id=p.id, charge_rate=f.charge) db.session.add(n_c) db.session.commit() for sa in lesia_session.query(lesia_statutagent): n_as = AgentStatus(id=sa.IDstatut, name=sa.nomstatut) print(f"Adding Status {sa.nomstatut}") db.session.add(n_as) db.session.commit() for las in lesia_session.query(lesia_agentsemestres): _p = Period.query.filter(Period.name == las.semestre_id).one() n_ap = AgentHistory(period_id=_p.id, agent_id=las.agent_id, status_id=las.statut_id) db.session.add(n_ap) db.session.commit() @bp.cli.command("fake_lesia_names") def fake_lesia_names(): """ Extract fake name from resources files to change names in db. Mainly after a lesia import, for confidential reasons Changes nams in tables: - services - capacities - projects :return: """ current_app.logger.info("Faking names from resources files") # get resources files # # 1- projects # fake_projects_file = os.path.join(current_app.config['PDC_DB_DATA_DIR'], 'fake-projects.txt') with open(fake_projects_file, newline='') as csvfile: spamreader = csv.reader(csvfile, delimiter=';', quotechar='|') fake_projects_names = [', '.join(row) for row in spamreader] fake_projects_names_iterator = iter(fake_projects_names) # 2- functions/capacities # fake_capacities_file = os.path.join(current_app.config['PDC_DB_DATA_DIR'], 'fake-capacities.txt') with open(fake_capacities_file, newline='') as csvfile: spamreader = csv.reader(csvfile, delimiter=';', quotechar='|') fake_capacities_names = [row for [row] in spamreader] fake_capacities_names_iterator = iter(fake_capacities_names) # 3- services # fake_services_file = os.path.join(current_app.config['PDC_DB_DATA_DIR'], 'fake-services.txt') with open(fake_services_file, newline='') as csvfile: spamreader = csv.reader(csvfile, delimiter=';', quotechar='|') fake_services_names = [row for row in spamreader] fake_services_names_iterator = iter(fake_services_names) # Skip columns names # next(fake_projects_names_iterator) next(fake_capacities_names_iterator) next(fake_services_names_iterator) for s in Service.query.all(): next_service = next(fake_services_names_iterator) s.name = next_service[0] s.abbr = next_service[1] for p in Project.query.all(): p.name = next(fake_projects_names_iterator) for c in Capacity.query.all(): c.name = next(fake_capacities_names_iterator) db.session.commit() @bp.cli.command("feed_periods") @click.option('--begin-year', '-b', 'begin_year', default=2005, help="the start year to begin periods with") @click.option('--end-year', '-e', 'end_year', default=2025, help="the last year to end periods with") def feed_periods(begin_year, end_year): """ Fill in the periods names in the database. """ for y in range(begin_year, end_year): for s in ['S1', 'S2']: period_name = "{}_{}".format(y, s) p = Period(name=period_name) db.session.add(p) try: db.session.commit() except IntegrityError: current_app.logger.error("Periods already exist") @bp.cli.command("feed_random_charges") @click.option('--agent', '-a', 'agent', default=None, help="the agent id you want to charge") def feed_random_charges(agent): """ Randomly fill in the agents charges. """ for _ in range(0, 100): if agent is None: agent_id = random.choice([i for (i,) in db.session.query(Agent.id).all()]) else: agent_id = int(agent) project_id = random.choice([i for (i,) in db.session.query(Project.id).all()]) service_id = random.choice([i for (i,) in db.session.query(Service.id).all()]) capacity_id = random.choice([i for (i,) in db.session.query(Capacity.id).all()]) period_id = random.choice([i for (i,) in db.session.query(Period.id).all()]) percent = random.choice(range(10, 110, 10)) # check max agent charge for the period total_charge = db.session.query(func.sum(Charge.charge_rate).label("total_charge")) \ .filter(Charge.agent_id == agent_id, Charge.period_id == period_id ).scalar() if total_charge is not None and (total_charge + percent) >= 100: print("Skipping agent {} for period {}".format(agent_id, period_id)) continue charge = Charge(agent_id=agent_id, project_id=project_id, service_id=service_id, capacity_id=capacity_id, period_id=period_id, charge_rate=percent) print("adding {}_{}_{}_{}_{}_{}".format(agent_id, project_id, service_id, capacity_id, period_id, percent)) db.session.add(charge) db.session.commit() @bp.cli.command('user_add') @click.argument('email') @click.argument('name') @click.argument('login') @click.argument('password') @click.argument('role') def user_add(email, name, login, password, role): """ Add a new user in db.""" user = User.query.filter(User.name == name).one_or_none() if user: current_app.logger.warn(f"user already exists {name}") return user = User(email=email, name=name, login=login, password=password, role=role) db.session.add(user) db.session.commit() current_app.logger.info(f"added {name}") @bp.cli.command('user_update') @click.option('--name', '-n', 'name', default=None, help="the name to set for that user") @click.option('--role', '-r', 'role', default=None, help="the role to set for that user") @click.option('--email', '-e', 'email', default=None, help="the email to set for that user") @click.option('--password', '-p', 'password', default=None, help="the password to set for that user") @click.argument('user_id') def user_update(user_id, name, role, email, password): """Update the user by given id and given parameters.""" user = User.query.get(user_id) if not user: current_app.logger.error(f"such user_id doesnt exists {user_id}") return if name: user.name = name print(f"User --{user.name}-- name updated to {user.name}") if role: user.set_role(role) print(f"User --{user.name}-- role updated to {_roleToName[user.role]}") if email: user.email = email print(f"User --{user.name}-- email updated to {user.email}") if password: print(f"User --{user.name}-- password updated") user.set_password(password) if not (name or role or email or password): print(f"No update for user --{user.name}--") db.session.commit() @bp.cli.command('user_delete') @click.argument('user_id') def user_delete(user_id): """Delete the user by given id (see user_show_all").""" user = User.query.get(user_id) db.session.delete(user) db.session.commit() @bp.cli.command('show_roles') def show_roles(): """ List all available roles for a user""" print("\n".join(list(_nameToRole))) @bp.cli.command('user_show_all') def user_show_all(): """ Show all users in db.""" print("{:<5} {:<15} {:<15} {:<15}".format('id', 'name', 'login', 'email')) print("{:<5} {:<15} {:<15} {:<15}".format('-' * 5, '-' * 15, '-' * 15, '-' * 15)) for user in User.query.all(): print(user.login) print("{:<5} {:<15} {:<15} {:<15}".format( user.id, user.name, user.login, user.email ))