import csv import os import sys import click import random from flask import current_app from sqlalchemy.exc import OperationalError, IntegrityError from sqlalchemy.orm.exc import NoResultFound from sqlalchemy.sql import func from sqlalchemy.ext.automap import automap_base from sqlalchemy.orm import Session from sqlalchemy import create_engine from app.models import db, Agent, Service, Project, Capacity, Period, Charge from app.auth.models import User, _nameToRole, _roleToName from . import bp @bp.cli.command('create_db') def create_db(): """ Create the database structure. Database should be empty. configure the proper database uri in the db_config.py file. """ db.create_all() admin = User(email='admin@nowhere.org', name='admin', login='admin', role='admin') admin.set_password('admin') sqlite_uri = db.engine.url.__str__() if 'sqlite' in db.engine.url.__str__() else None try: db.session.add(admin) db.session.commit() except IntegrityError: current_app.logger.error("User admin already exists, database should be empty at create") if sqlite_uri: current_app.logger.error("see " + sqlite_uri) sys.exit(-1) if sqlite_uri: current_app.logger.info("Created sqlite db: " + sqlite_uri) @bp.cli.command("feed_from_irap") @click.option('--csv-file', '-f', 'csv_file_name', help="the csv file path to feed from") def feed_from_irap(csv_file_name): """ Use an Irap csv charges files and feed db with :param csv_file_name: :return: """ rows = [] with open(csv_file_name, newline='') as csvfile: csvreader = csv.DictReader(csvfile, delimiter=',', quotechar='"') for row in csvreader: # print('\n'.join(row.keys())) # break # Remove any leading/trailing spaces row = {k: v.strip() for k, v in row.items()} rows.append(row) firstname_key = 'NOM' secondname_key = 'prénom' project_key = 'PROJETS' service_key = 'Groupe métier' # typology_title = 'TYPOLOGIE' # thematic_title = 'thématique' # Get the columns values # projects = [r[project_key] for r in rows] projects = sorted(set(projects)) agents = [(r[firstname_key], r[secondname_key].strip()) for r in rows] agents = sorted(set(agents)) services = [r[service_key] for r in rows] services = sorted(set(services)) # thematics = [r[thematic_title] for r in rows] # thematics = sorted(set(thematics)) # typologies = [r[typology_title] for r in rows] # typologies = sorted(set(typologies)) # Feed agents from column # for a in agents: n_a = Agent(firstname=a[0], secondname=a[1]) db.session.add(n_a) db.session.commit() # Feed projects from column # for p in projects: n_p = Project(name=p) db.session.add(n_p) db.session.commit() # Feed services from column # for s in services: n_s = Service(name=s) db.session.add(n_s) db.session.commit() # Feed periods names # Todo: are statically built, # should come from year column name. # for p in range(2011, 2030): n_p = Period(name=f"{p}") db.session.add(n_p) db.session.commit() # Add one default capacity db.session.add(Capacity(name="Travailleur")) db.session.commit() # Now feed the charges. # # At least one for each csv row # At most one for each year # for r in rows: p = Project.query.filter(Project.name == r[project_key]).one() a = Agent.query.filter(Agent.firstname == r[firstname_key], Agent.secondname == r[secondname_key]).one() s = Service.query.filter(Service.name == r[service_key]).one() c = Capacity.query.first() for period_name in range(2011, 2030): t = Period.query.filter(Period.name == period_name).one() charge = r[f"{period_name}"] # Charge are stored as percent in db, but as fraction of ETP in irap csv # we make the conversion here. try: charge = int(100 * float(charge)) except ValueError: charge = 0 if charge == 0: continue n_c = Charge(agent_id=a.id, project_id=p.id, service_id=s.id, capacity_id=c.id, period_id=t.id, charge_rate=charge) db.session.add(n_c) db.session.commit() @bp.cli.command("feed_from_lesia") def feed_from_lesia(): """ Feed db with agents from a lesia like mysql database. Remember to configure the proper database uri in the db_config.py file. """ from .lesia_db import lesia_agent, lesia_session, lesia_service, lesia_project, \ lesia_fonction, lesia_periods, lesia_affectation agents = lesia_session.query(lesia_agent).all() for a in agents: n_a = Agent(id=a.IDagent, firstname=a.nom, secondname=a.prenom) db.session.add(n_a) db.session.commit() services = lesia_session.query(lesia_service).all() for s in services: n_s = Service(id=s.id, name=s.nom, abbr=s.abbreviation) db.session.add(n_s) db.session.commit() projects = lesia_session.query(lesia_project).all() for p in projects: n_p = Project(id=p.id, name=p.nom) db.session.add(n_p) db.session.commit() fonctions = lesia_session.query(lesia_fonction).all() for f in fonctions: n_c = Capacity(id=f.id, name=f.nom) db.session.add(n_c) db.session.commit() periods = lesia_session.query(lesia_periods) for p in periods: n_p = Period(name=p.id_semestre) db.session.add(n_p) db.session.commit() affectations = lesia_session.query(lesia_affectation) for f in affectations: p = Period.query.filter(Period.name == f.semestre_id).one() n_c = Charge(agent_id=f.agent_id, project_id=f.projet_id, service_id=f.service_id, capacity_id=f.fonction_id, period_id=p.id, charge_rate=f.charge) db.session.add(n_c) db.session.commit() @bp.cli.command("fake_lesia_names") def fake_lesia_names(): """ Extract fake name from resources files to change names in db. Mainly after a lesia import, for confidential reasons Changes nams in tables: - services - capacities - projects :return: """ current_app.logger.info("Faking names from resources files") # get resources files # # 1- projects # fake_projects_file = os.path.join(current_app.config['PDC_RESOURCES_DIR'], 'fake-db-names', 'fake-projects.txt') with open(fake_projects_file, newline='') as csvfile: spamreader = csv.reader(csvfile, delimiter=';', quotechar='|') fake_projects_names = [', '.join(row) for row in spamreader] fake_projects_names_iterator = iter(fake_projects_names) # 2- functions/capacities # fake_capacities_file = os.path.join(current_app.config['PDC_RESOURCES_DIR'], 'fake-db-names', 'fake-capacities.txt') with open(fake_capacities_file, newline='') as csvfile: spamreader = csv.reader(csvfile, delimiter=';', quotechar='|') fake_capacities_names = [row for [row] in spamreader] fake_capacities_names_iterator = iter(fake_capacities_names) # 3- services # fake_services_file = os.path.join(current_app.config['PDC_RESOURCES_DIR'], 'fake-db-names', 'fake-services.txt') with open(fake_services_file, newline='') as csvfile: spamreader = csv.reader(csvfile, delimiter=';', quotechar='|') fake_services_names = [row for row in spamreader] fake_services_names_iterator = iter(fake_services_names) # Skip columns names # next(fake_projects_names_iterator) next(fake_capacities_names_iterator) next(fake_services_names_iterator) for s in Service.query.all(): next_service = next(fake_services_names_iterator) s.name = next_service[0] s.abbr = next_service[1] for p in Project.query.all(): p.name = next(fake_projects_names_iterator) for c in Capacity.query.all(): c.name = next(fake_capacities_names_iterator) db.session.commit() @bp.cli.command("feed_periods") @click.option('--begin-year', '-b', 'begin_year', default=2005, help="the start year to begin periods with") @click.option('--end-year', '-e', 'end_year', default=2025, help="the last year to end periods with") def feed_periods(begin_year, end_year): """ Fill in the periods names in the database. """ for y in range(begin_year, end_year): for s in ['S1', 'S2']: period_name = "{}_{}".format(y, s) p = Period(name=period_name) db.session.add(p) try: db.session.commit() except IntegrityError: current_app.logger.error("Periods already exist") @bp.cli.command("feed_random_charges") @click.option('--agent', '-a', 'agent', default=None, help="the agent id you want to charge") def feed_random_charges(agent): """ Randomly fill in the agents charges. """ for i in range(0, 100): if agent is None: agent_id = random.choice([i for (i,) in db.session.query(Agent.id).all()]) else: agent_id = int(agent) project_id = random.choice([i for (i,) in db.session.query(Project.id).all()]) service_id = random.choice([i for (i,) in db.session.query(Service.id).all()]) capacity_id = random.choice([i for (i,) in db.session.query(Capacity.id).all()]) period_id = random.choice([i for (i,) in db.session.query(Period.id).all()]) percent = random.choice(range(10, 110, 10)) # check max agent charge for the period total_charge = db.session.query(func.sum(Charge.charge_rate).label("total_charge")) \ .filter(Charge.agent_id == agent_id, Charge.period_id == period_id ).scalar() if total_charge is not None and (total_charge + percent) >= 100: print("Skipping agent {} for period {}".format(agent_id, period_id)) continue charge = Charge(agent_id=agent_id, project_id=project_id, service_id=service_id, capacity_id=capacity_id, period_id=period_id, charge_rate=percent) print("adding {}_{}_{}_{}_{}_{}".format(agent_id, project_id, service_id, capacity_id, period_id, percent)) db.session.add(charge) db.session.commit() @bp.cli.command('user_add') @click.argument('email') @click.argument('name') @click.argument('login') @click.argument('password') @click.argument('role') def user_add(email, name, login, password, role): """ Add a new user in db.""" user = User.query.filter(User.name == name).one_or_none() if (user): current_app.logger.error(f"user already exists {name}") return user = User(email=email, name=name, login=login, password=password, role=role) db.session.add(user) db.session.commit() current_app.logger.info(f"added {name}") @bp.cli.command('user_update') @click.option('--name', '-n', 'name', default=None, help="the name to set for that user") @click.option('--role', '-r', 'role', default=None, help="the role to set for that user") @click.option('--email', '-e', 'email', default=None, help="the email to set for that user") @click.option('--password', '-p', 'password', default=None, help="the password to set for that user") @click.argument('user_id') def user_update(user_id, name, role, email, password): """Update the user by given id and given parameters.""" user = User.query.get(user_id) if not user: current_app.logger.error(f"such user_id doesnt exists {user_id}") return if name: user.name = name print(f"User --{user.name}-- name updated to {user.name}") if role: user.set_role(role) print(f"User --{user.name}-- role updated to {_roleToName[user.role]}") if email: user.email=email print(f"User --{user.name}-- email updated to {user.email}") if password: print(f"User --{user.name}-- password updated") user.set_password(password) if not ( name or role or email or password): print(f"No update for user --{user.name}--") db.session.commit() @bp.cli.command('user_delete') @click.argument('user_id') def user_delete(user_id): """Delete the user by given id (see user_show_all").""" user = User.query.get(user_id) db.session.delete(user) db.session.commit() @bp.cli.command('show_roles') def show_roles(): """ List all available roles for a user""" print("\n".join(list(_nameToRole))) @bp.cli.command('user_show_all') def user_show_all(): """ Show all users in db.""" print("{:<5} {:<15} {:<15} {:<15}".format('id', 'name', 'login', 'email')) print("{:<5} {:<15} {:<15} {:<15}".format('-' * 5, '-' * 15, '-' * 15, '-' * 15)) for user in User.query.all(): print(user.login) print("{:<5} {:<15} {:<15} {:<15}".format( user.id, user.name, user.login, user.email ))