import csv import os import sys import click import random from flask import current_app from sqlalchemy.exc import IntegrityError from sqlalchemy.sql import func from app.models import db, Agent, Service, Project, Capacity, Period, Charge, AgentStatus, Company, AgentBap, \ AgentGrade, Category, Label, ProjectLabel, CategoryLabel # TODO: rename to methods and add get_roles() from app.auth.models import User, _nameToRole, _roleToName from . import bp @bp.cli.command('create_db') def create_db(): """ Create the database structure. Database should be empty. configure the proper database uri in the db_config.py file. """ db.create_all() admin = User(email='admin@nowhere.org', name='admin', login='admin', role='admin') admin.set_password('admin') sqlite_uri = db.engine.url.__str__() if 'sqlite' in db.engine.url.__str__() else None try: db.session.add(admin) db.session.commit() except IntegrityError: current_app.logger.error("User admin already exists, database should be empty at create") if sqlite_uri: current_app.logger.error("see " + sqlite_uri) sys.exit(-1) if sqlite_uri: print("Created sqlite db: " + sqlite_uri) @bp.cli.command("feed_from_irap") @click.option('--csv-file', '-f', 'csv_file_name', help="the csv file path to feed from") def feed_from_irap(csv_file_name): """ Use an Irap csv charges files and feed db with :param csv_file_name: :return: """ rows = [] with open(csv_file_name, newline='') as csvfile: csvreader = csv.DictReader(csvfile, delimiter=',', quotechar='"') for row in csvreader: # Remove any leading/trailing spaces row = {k: v.strip() for k, v in row.items()} rows.append(row) firstname_key = 'NOM' secondname_key = 'PRÉNOM' status_key = 'STATUT' virtual_key = 'VIRTUEL' company_key = 'SOCIÉTÉ' bap_key = 'BAP' grade_key = 'GRADE CORPS' project_key = 'PROJETS' service_key = 'GROUPE MÉTIER' categorie_keys = ['TYPOLOGIE', 'THÉMATIQUE'] # Get the columns values # agents = [] services = [] baps = [] grades = [] companies = [] statuses = [] labels = [] # Build a category dict of lists # key being the category name, # list being filled with corresponding labels categorie_labels = {k: [] for k in categorie_keys} # Projects' labels is a dict of lists # indexed by project name # containing labels for that project # project_labels = {} # # Parse the rows and fill in various lists # for r in rows: services.append(r[service_key]) baps.append(r[bap_key]) grades.append(r[grade_key]) companies.append(r[company_key]) statuses.append(r[status_key]) # the projet and its labels project_name = r[project_key] project_labels[project_name] = [] # now fill in both # the labels list, # the category-labels dict, # and the project-labels dict for k in categorie_keys: labels.append(r[k]) categorie_labels[k].append(r[k]) project_labels[project_name].append(r[k]) # create the agents list of dicts agents.append({ 'firstname': r[firstname_key], 'secondname': r[secondname_key], 'status': r[status_key], 'virtual': r[virtual_key], 'company': r[company_key], 'bap': r[bap_key], 'grade': r[grade_key]}) # Uppercase the small tables # baps = [x.upper() for x in baps] grades = [x.upper() for x in grades] statuses = [x.upper() for x in statuses] # Now, sort the lists # # 1- first remove empty string with filter() # 2- then keep only uniq item with set() # 3- at last alpha sort with sorted() # services = sorted(set(filter(None, services))) baps = sorted(set(filter(None, baps))) grades = sorted(set(filter(None, grades))) companies = sorted(set(filter(None, companies))) statuses = sorted(set(filter(None, statuses))) labels = sorted(set(filter(None, labels))) # Do the same for the projects, that are keys of project_labels # projects = sorted(set(project_labels.keys())) # Do the same for the labels inside each category # c is the category name containing the labels list # for c in categorie_keys: c_labels = sorted(set(filter(None, categorie_labels[c]))) categorie_labels[c] = c_labels # At least, as agents is a list of dicts, sorting is a bit tricky # # the first one liner will store the last agent's line only # then we alpha sort on the name # on both, the discrimination is based on the name couple: (firstname, secondname) # agents = list({(a['firstname'], a['secondname']): a for a in agents}.values()) agents = sorted(agents, key=lambda a: (a['firstname'], a['secondname'])) # # We are done with collecting data # # Now we write to database # # Feed baps from column # for b in baps: n_b = AgentBap(name=b) db.session.add(n_b) db.session.commit() # Feed grades from column # for g in grades: n_g = AgentGrade(name=g) db.session.add(n_g) db.session.commit() # Feed companies from column # for c in companies: n_c = Company(name=c) db.session.add(n_c) db.session.commit() # Feed statuses from column # for s in statuses: n_s = AgentStatus(name=s) db.session.add(n_s) db.session.commit() # Feed projects from column # for p in projects: n_p = Project(name=p) db.session.add(n_p) db.session.commit() # Feed labels from column # for _l in labels: n_l = Label(name=_l) db.session.add(n_l) db.session.commit() # Feed categories from initial list # for _c in categorie_keys: n_c = Category(name=_c) db.session.add(n_c) db.session.commit() # Feed services from column # for s in services: n_s = Service(name=s) db.session.add(n_s) db.session.commit() # Feed periods names # Todo: are statically built, # should come from year column name. # also see later # for p in range(2011, 2030): n_p = Period(name=f"{p}") db.session.add(n_p) db.session.commit() # Add one default capacity # db.session.add(Capacity(name="Agent")) db.session.commit() # Feed categories and labels # for category, labels in categorie_labels.items(): print(category) n_c = Category.query.filter_by(name=category).one() for label in labels: print(label) n_l = Label.query.filter(Label.name == label).one() current_app.logger.debug(f"Adding label {label} to category {category}") n_cl = CategoryLabel(label=n_l, category=n_c) db.session.add(n_cl) db.session.commit() # Feed project's labels # for project, labels in project_labels.items(): print(f"Project {project}") n_p = Project.query.filter(Project.name == project).one() for label in labels: n_l = Label.query.filter(Label.name == label).one() n_pl = ProjectLabel(project=n_p, label=n_l) db.session.add(n_pl) db.session.commit() # Feed agents from agents list previously filled # for a in agents: status = AgentStatus.query.filter(AgentStatus.name == a['status']).one_or_none() if status is None and a['status']: status = AgentStatus(name=a['status']) company = Company.query.filter(Company.name == a['company']).one_or_none() if company is None and a['company']: company = Company(name=a['company']) bap = AgentBap.query.filter(AgentBap.name == a['bap']).one_or_none() if bap is None and a['bap']: bap = AgentBap(name=a['bap']) grade = AgentGrade.query.filter(AgentGrade.name == a['grade']).one_or_none() if grade is None and a['grade']: grade = AgentBap(name=a['grade']) virtual = 1 if a['virtual'] else 0 permanent = 1 if a['status'] == 'PERM' else 0 n_a = Agent(firstname=a['firstname'], secondname=a['secondname'], status_id=status.id if status else None, company_id=company.id if company else None, bap_id=bap.id if bap else None, grade_id=grade.id if grade else None, virtual=virtual, permanent=permanent) db.session.add(n_a) db.session.commit() # Feed agents from agents list previously filled # # Now feed the charges. # # At least one for each csv row # At most one for each year # for r in rows: p = Project.query.filter(Project.name == r[project_key]).one() a = Agent.query.filter(Agent.firstname == r[firstname_key], Agent.secondname == r[secondname_key]).one() s = Service.query.filter(Service.name == r[service_key]).one() c = Capacity.query.first() # TODO: period names should come from db request for period_name in range(2011, 2030): t = Period.query.filter(Period.name == period_name).one() charge = r[f"{period_name}"] # Charge are stored as percent in db, but as fraction of ETP in irap csv # we make the conversion here. try: charge = int(100 * float(charge)) except ValueError: charge = 0 if charge == 0: continue n_c = Charge(agent_id=a.id, project_id=p.id, service_id=s.id, capacity_id=c.id, period_id=t.id, charge_rate=charge) db.session.add(n_c) db.session.commit() @bp.cli.command("feed_from_lesia") def feed_from_lesia(): """ Feed db with agents from a lesia like mysql database. Remember to configure the proper database uri in the db_config.py file. """ from .lesia_db import lesia_agent, lesia_session, lesia_service, lesia_project, \ lesia_fonction, lesia_periods, lesia_affectation, lesia_domains, lesia_poles, \ lesia_domainprojects # Feed all lesia 'domaine' names and link to new category "Domaine" # domain_category = Category(name="Domaine") domains = lesia_session.query(lesia_domains) for d in domains: n_l = Label(name=d.nom) n_cl = CategoryLabel(category=domain_category, label=n_l) db.session.add(n_cl) db.session.commit() # Feed all lesia 'pôle' names and link to new category "Pôle" # pole_category = Category(name="Pôle") poles = lesia_session.query(lesia_poles) for p in poles: n_l = Label(name=p.nom) n_cl = CategoryLabel(category=pole_category, label=n_l) db.session.add(n_cl) db.session.commit() # Feed lesia project with proper "pôle" # (as this information is stored in gestit_projets) # projects = lesia_session.query(lesia_project).all() for p in projects: # add project n_p = Project(id=p.id, name=p.nom) db.session.add(n_p) # get corresponding lesia pole name pole_name = lesia_session.query(lesia_poles).filter(lesia_poles.id == p.pole_id).one().nom # search corresponding Label and store in ProjectLabel table n_l = Label.query.filter(Label.name == pole_name).one() n_pl = ProjectLabel(project=n_p, label=n_l) db.session.add(n_pl) db.session.commit() # Get projects domain information and store in ProjectLabel # domain_projects = lesia_session.query(lesia_domainprojects) for dp in domain_projects: project_name = lesia_session.query(lesia_project).filter(lesia_project.id == dp.projet_id).one().nom domain_name = lesia_session.query(lesia_domains).filter(lesia_domains.id == dp.domaine_id).one().nom n_p = Project.query.filter(Project.name == project_name).one() n_l = Label.query.filter(Label.name == domain_name).one() n_pl = ProjectLabel(project=n_p, label=n_l) db.session.add(n_pl) # Some projects have 2 domain labels in lesia db # That is not allowed any more in the new model. # try: db.session.commit() except IntegrityError: db.session.rollback() current_app.logger.error( "Error adding project to category/label: {} {} {}".format(n_p.name, n_l.category.name, n_l.name)) continue agents = lesia_session.query(lesia_agent).all() for a in agents: n_a = Agent(id=a.IDagent, firstname=a.nom, secondname=a.prenom) db.session.add(n_a) db.session.commit() services = lesia_session.query(lesia_service).all() for s in services: n_s = Service(id=s.id, name=s.nom, abbr=s.abbreviation) db.session.add(n_s) db.session.commit() fonctions = lesia_session.query(lesia_fonction).all() for f in fonctions: n_c = Capacity(id=f.id, name=f.nom) db.session.add(n_c) db.session.commit() periods = lesia_session.query(lesia_periods) for p in periods: n_p = Period(name=p.id_semestre) db.session.add(n_p) db.session.commit() affectations = lesia_session.query(lesia_affectation) for f in affectations: p = Period.query.filter(Period.name == f.semestre_id).one() n_c = Charge(agent_id=f.agent_id, project_id=f.projet_id, service_id=f.service_id, capacity_id=f.fonction_id, period_id=p.id, charge_rate=f.charge) db.session.add(n_c) db.session.commit() @bp.cli.command("fake_lesia_names") def fake_lesia_names(): """ Extract fake name from resources files to change names in db. Mainly after a lesia import, for confidential reasons Changes nams in tables: - services - capacities - projects :return: """ current_app.logger.info("Faking names from resources files") # get resources files # # 1- projects # fake_projects_file = os.path.join(current_app.config['PDC_RESOURCES_DIR'], 'fake-db-names', 'fake-projects.txt') with open(fake_projects_file, newline='') as csvfile: spamreader = csv.reader(csvfile, delimiter=';', quotechar='|') fake_projects_names = [', '.join(row) for row in spamreader] fake_projects_names_iterator = iter(fake_projects_names) # 2- functions/capacities # fake_capacities_file = os.path.join(current_app.config['PDC_RESOURCES_DIR'], 'fake-db-names', 'fake-capacities.txt') with open(fake_capacities_file, newline='') as csvfile: spamreader = csv.reader(csvfile, delimiter=';', quotechar='|') fake_capacities_names = [row for [row] in spamreader] fake_capacities_names_iterator = iter(fake_capacities_names) # 3- services # fake_services_file = os.path.join(current_app.config['PDC_RESOURCES_DIR'], 'fake-db-names', 'fake-services.txt') with open(fake_services_file, newline='') as csvfile: spamreader = csv.reader(csvfile, delimiter=';', quotechar='|') fake_services_names = [row for row in spamreader] fake_services_names_iterator = iter(fake_services_names) # Skip columns names # next(fake_projects_names_iterator) next(fake_capacities_names_iterator) next(fake_services_names_iterator) for s in Service.query.all(): next_service = next(fake_services_names_iterator) s.name = next_service[0] s.abbr = next_service[1] for p in Project.query.all(): p.name = next(fake_projects_names_iterator) for c in Capacity.query.all(): c.name = next(fake_capacities_names_iterator) db.session.commit() @bp.cli.command("feed_periods") @click.option('--begin-year', '-b', 'begin_year', default=2005, help="the start year to begin periods with") @click.option('--end-year', '-e', 'end_year', default=2025, help="the last year to end periods with") def feed_periods(begin_year, end_year): """ Fill in the periods names in the database. """ for y in range(begin_year, end_year): for s in ['S1', 'S2']: period_name = "{}_{}".format(y, s) p = Period(name=period_name) db.session.add(p) try: db.session.commit() except IntegrityError: current_app.logger.error("Periods already exist") @bp.cli.command("feed_random_charges") @click.option('--agent', '-a', 'agent', default=None, help="the agent id you want to charge") def feed_random_charges(agent): """ Randomly fill in the agents charges. """ for _ in range(0, 100): if agent is None: agent_id = random.choice([i for (i,) in db.session.query(Agent.id).all()]) else: agent_id = int(agent) project_id = random.choice([i for (i,) in db.session.query(Project.id).all()]) service_id = random.choice([i for (i,) in db.session.query(Service.id).all()]) capacity_id = random.choice([i for (i,) in db.session.query(Capacity.id).all()]) period_id = random.choice([i for (i,) in db.session.query(Period.id).all()]) percent = random.choice(range(10, 110, 10)) # check max agent charge for the period total_charge = db.session.query(func.sum(Charge.charge_rate).label("total_charge")) \ .filter(Charge.agent_id == agent_id, Charge.period_id == period_id ).scalar() if total_charge is not None and (total_charge + percent) >= 100: print("Skipping agent {} for period {}".format(agent_id, period_id)) continue charge = Charge(agent_id=agent_id, project_id=project_id, service_id=service_id, capacity_id=capacity_id, period_id=period_id, charge_rate=percent) print("adding {}_{}_{}_{}_{}_{}".format(agent_id, project_id, service_id, capacity_id, period_id, percent)) db.session.add(charge) db.session.commit() @bp.cli.command('user_add') @click.argument('email') @click.argument('name') @click.argument('login') @click.argument('password') @click.argument('role') def user_add(email, name, login, password, role): """ Add a new user in db.""" user = User.query.filter(User.name == name).one_or_none() if user: current_app.logger.warn(f"user already exists {name}") return user = User(email=email, name=name, login=login, password=password, role=role) db.session.add(user) db.session.commit() current_app.logger.info(f"added {name}") @bp.cli.command('user_update') @click.option('--name', '-n', 'name', default=None, help="the name to set for that user") @click.option('--role', '-r', 'role', default=None, help="the role to set for that user") @click.option('--email', '-e', 'email', default=None, help="the email to set for that user") @click.option('--password', '-p', 'password', default=None, help="the password to set for that user") @click.argument('user_id') def user_update(user_id, name, role, email, password): """Update the user by given id and given parameters.""" user = User.query.get(user_id) if not user: current_app.logger.error(f"such user_id doesnt exists {user_id}") return if name: user.name = name print(f"User --{user.name}-- name updated to {user.name}") if role: user.set_role(role) print(f"User --{user.name}-- role updated to {_roleToName[user.role]}") if email: user.email = email print(f"User --{user.name}-- email updated to {user.email}") if password: print(f"User --{user.name}-- password updated") user.set_password(password) if not (name or role or email or password): print(f"No update for user --{user.name}--") db.session.commit() @bp.cli.command('user_delete') @click.argument('user_id') def user_delete(user_id): """Delete the user by given id (see user_show_all").""" user = User.query.get(user_id) db.session.delete(user) db.session.commit() @bp.cli.command('show_roles') def show_roles(): """ List all available roles for a user""" print("\n".join(list(_nameToRole))) @bp.cli.command('user_show_all') def user_show_all(): """ Show all users in db.""" print("{:<5} {:<15} {:<15} {:<15}".format('id', 'name', 'login', 'email')) print("{:<5} {:<15} {:<15} {:<15}".format('-' * 5, '-' * 15, '-' * 15, '-' * 15)) for user in User.query.all(): print(user.login) print("{:<5} {:<15} {:<15} {:<15}".format( user.id, user.name, user.login, user.email ))