import csv import os import sys import click import random from flask import current_app from sqlalchemy.exc import OperationalError, IntegrityError from sqlalchemy.sql import func from sqlalchemy.ext.automap import automap_base from sqlalchemy.orm import Session from sqlalchemy import create_engine from app.models import db, Agent, Service, Project, Capacity, Period, Charge from app.auth.models import User, _nameToRole from . import bp @bp.cli.command("fake_lesia_names") def fake_lesia_names(): """ Extract fake name from resources files to change names in db. Mainly after a lesia import, for confidential reasons Changes nams in tables: - services - capacities - projects :return: """ current_app.logger.info("Faking names from resources files") # get resources files # # 1- projects # fake_projects_file = os.path.join(current_app.config['PDC_RESOURCES_DIR'], 'fake-db-names', 'fake-projects.txt') with open(fake_projects_file, newline='') as csvfile: spamreader = csv.reader(csvfile, delimiter=';', quotechar='|') fake_projects_names = [', '.join(row) for row in spamreader] fake_projects_names_iterator = iter(fake_projects_names) # 2- functions/capacities # fake_capacities_file = os.path.join(current_app.config['PDC_RESOURCES_DIR'], 'fake-db-names', 'fake-capacities.txt') with open(fake_capacities_file, newline='') as csvfile: spamreader = csv.reader(csvfile, delimiter=';', quotechar='|') fake_capacities_names = [row for [row] in spamreader] fake_capacities_names_iterator = iter(fake_capacities_names) # 3- services # fake_services_file = os.path.join(current_app.config['PDC_RESOURCES_DIR'], 'fake-db-names', 'fake-services.txt') with open(fake_services_file, newline='') as csvfile: spamreader = csv.reader(csvfile, delimiter=';', quotechar='|') fake_services_names = [row for row in spamreader] fake_services_names_iterator = iter(fake_services_names) # Skip columns names # next(fake_projects_names_iterator) next(fake_capacities_names_iterator) next(fake_services_names_iterator) for s in Service.query.all(): next_service = next(fake_services_names_iterator) s.name = next_service[0] s.abbr = next_service[1] for p in Project.query.all(): p.name = next(fake_projects_names_iterator) for c in Capacity.query.all(): c.name = next(fake_capacities_names_iterator) db.session.commit() @bp.cli.command("feed_from_lesia") def feed_from_lesia(): """ Feed db with agents from a lesia like mysql database. Remember to configure the proper database uri in the db_config.py file. """ from .lesia_db import lesia_agent, lesia_session, lesia_service, lesia_project, \ lesia_fonction, lesia_periods, lesia_affectation agents = lesia_session.query(lesia_agent).all() for a in agents: n_a = Agent(id=a.IDagent, firstname=a.nom, secondname=a.prenom) db.session.add(n_a) db.session.commit() services = lesia_session.query(lesia_service).all() for s in services: n_s = Service(id=s.id, name=s.nom, abbr=s.abbreviation) db.session.add(n_s) db.session.commit() projects = lesia_session.query(lesia_project).all() for p in projects: n_p = Project(id=p.id, name=p.nom) db.session.add(n_p) db.session.commit() fonctions = lesia_session.query(lesia_fonction).all() for f in fonctions: n_c = Capacity(id=f.id, name=f.nom) db.session.add(n_c) db.session.commit() periods = lesia_session.query(lesia_periods) for p in periods: n_p = Period(name=p.id_semestre) db.session.add(n_p) db.session.commit() affectations = lesia_session.query(lesia_affectation) for f in affectations: p = Period.query.filter(Period.name == f.semestre_id).one() n_c = Charge(agent_id=f.agent_id, project_id=f.projet_id, service_id=f.service_id, capacity_id=f.fonction_id, period_id=p.id, charge_rate=f.charge) db.session.add(n_c) db.session.commit() @bp.cli.command("feed_periods") @click.option('--begin-year', '-b', 'begin_year', default=2005, help="the start year to begin periods with") @click.option('--end-year', '-e', 'end_year', default=2025, help="the last year to end periods with") def feed_periods(begin_year, end_year): """ Fill in the periods names in the database. """ for y in range(begin_year, end_year): for s in ['S1', 'S2']: period_name = "{}_{}".format(y, s) p = Period(name=period_name) db.session.add(p) try: db.session.commit() except IntegrityError: current_app.logger.error("Periods already exist") @bp.cli.command("feed_random_charges") @click.option('--agent', '-a', 'agent', default=None, help="the agent id you want to charge") def feed_random_charges(agent): """ Randomly fill in the agents charges. """ for i in range(0, 100): if agent is None: agent_id = random.choice([i for (i,) in db.session.query(Agent.id).all()]) else: agent_id = int(agent) project_id = random.choice([i for (i,) in db.session.query(Project.id).all()]) service_id = random.choice([i for (i,) in db.session.query(Service.id).all()]) capacity_id = random.choice([i for (i,) in db.session.query(Capacity.id).all()]) period_id = random.choice([i for (i,) in db.session.query(Period.id).all()]) percent = random.choice(range(10, 110, 10)) # check max agent charge for the period total_charge = db.session.query(func.sum(Charge.charge_rate).label("total_charge")) \ .filter(Charge.agent_id == agent_id, Charge.period_id == period_id ).scalar() if total_charge is not None and (total_charge + percent) >= 100: print("Skipping agent {} for period {}".format(agent_id, period_id)) continue charge = Charge(agent_id=agent_id, project_id=project_id, service_id=service_id, capacity_id=capacity_id, period_id=period_id, charge_rate=percent) print("adding {}_{}_{}_{}_{}_{}".format(agent_id, project_id, service_id, capacity_id, period_id, percent)) db.session.add(charge) db.session.commit() @bp.cli.command('user_delete') @click.argument('user_id') def user_delete(user_id): """Delete the user by given id (see user_show_all").""" user = User.query.get(user_id) db.session.delete(user) db.session.commit() @bp.cli.command('create_db') def create_db(): """ Create the database structure. Database should be empty. configure the proper database uri in the db_config.py file. """ db.create_all() admin = User(email='admin@nowhere.org', name='admin', login='admin', role='admin') admin.set_password('admin') sqlite_uri = db.engine.url.__str__() if 'sqlite' in db.engine.url.__str__() else None try: db.session.add(admin) db.session.commit() except IntegrityError: current_app.logger.error("User admin already exists, database should be empty at create") if sqlite_uri: current_app.logger.error("see " + sqlite_uri) sys.exit(-1) if sqlite_uri: current_app.logger.info("Created sqlite db: " + sqlite_uri) @bp.cli.command('user_add') @click.argument('email') @click.argument('name') @click.argument('login') @click.argument('password') @click.argument('role') def user_add(email, name, login, password, role): """ Add a new user in db.""" user = User.query.filter(User.name == name).one_or_none() if (user): current_app.logger.error(f"user already exists {name}") return user = User(email=email, name=name, login=login, password=password, role=role) db.session.add(user) db.session.commit() current_app.logger.info(f"added {name}") @bp.cli.command('show_roles') def show_roles(): """ List all available roles for a user""" print("\n".join(list(_nameToRole))) @bp.cli.command('user_show_all') def user_show_all(): """ Show all users in db.""" print("{:<5} {:<15} {:<15} {:<15} {:<15}".format('id', 'name', 'login', 'passwd', 'email')) print("{:<5} {:<15} {:<15} {:<15} {:<15}".format('-' * 5, '-' * 15, '-' * 15, '-' * 15, '-' * 15)) for user in User.query.all(): print(user.login) print("{:<5} {:<15} {:<15} {:<15} {:<15}".format( user.id, user.name, user.login, user.password, user.email ))