commands.py 11.6 KB
import csv
import os
import sys
import click
import random

from flask import current_app
from sqlalchemy.exc import OperationalError, IntegrityError
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.sql import func
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import Session
from sqlalchemy import create_engine

from app.models import db, Agent, Service, Project, Capacity, Period, Charge
from app.auth.models import User, _nameToRole

from . import bp


@bp.cli.command("feed_from_irap")
@click.option('--csv-file', '-f', 'csv_file_name', help="the csv file path to feed from")
def feed_from_irap(csv_file_name):
    """
    Use an Irap csv charges files and feed db with

    :param csv_file_name:
    :return:
    """

    rows = []

    with open(csv_file_name, newline='') as csvfile:
        csvreader = csv.DictReader(csvfile, delimiter=',', quotechar='"')
        for row in csvreader:
            # print('\n'.join(row.keys()))
            # break
            rows.append(row)

    # todo: service_title
    #       project_tile
    #       typology_tile
    #       thematic_tile
    #       agent_tile

    projects = [r['PROJETS'].strip() for r in rows]
    projects = sorted(set(projects))
    agents = [(r['NOM'].strip(), r['prénom'].strip()) for r in rows]
    agents = sorted(set(agents))
    thematics = [r['thématique'].strip() for r in rows]
    thematics = sorted(set(thematics))
    typologies = [r['TYPOLOGIE'].strip() for r in rows]
    typologies = sorted(set(typologies))
    services = [r['Groupe métier'].strip() for r in rows]
    services = sorted(set(services))

    for a in agents:
        n_a = Agent(firstname=a[0], secondname=a[1])
        db.session.add(n_a)
    db.session.commit()
    # a = Agent.query.filter(Agent.firstname == 'ESPAIGNOL').one()
    # print(f">{a.secondname}<")
    # sys.exit()

    for p in projects:
        n_p = Project(name=p)
        db.session.add(n_p)
    db.session.commit()

    for s in services:
        n_s = Service(name=s)
        db.session.add(n_s)
    db.session.commit()

    for p in range(2011, 2030):
        n_p = Period(name=f"{p}")
        db.session.add(n_p)
    db.session.commit()

    for r in rows:
        p = Project.query.filter(Project.name == r['PROJETS']).one()
        try:
            a = Agent.query.filter(Agent.firstname == r['NOM'], Agent.secondname == r['prénom']).one()
        except NoResultFound:
            print(f"Not found >{a.firstname}< >{a.secondname}<")
            continue
        s = Service.query.filter(Service.name == r['Groupe métier']).one()
        for period_name in range(2011, 2030):
            t = Period.query.filter(Period.name == period_name).one()
            charge = r[f"{period_name}"]
            # print(f">{charge}<")
            try:
                charge = float(charge)
                charge = int(100*charge)
            except ValueError:
                # print(f"Wrong charge {charge}")
                charge = 0
            n_c = Charge(agent_id=a.id,
                         project_id=p.id,
                         service_id=s.id,
                         capacity_id=0,
                         period_id=t.id,
                         charge_rate=charge)
            db.session.add(n_c)
    db.session.commit()


@bp.cli.command("fake_lesia_names")
def fake_lesia_names():
    """
    Extract fake name from resources files to change names in db.
    Mainly after a lesia import, for confidential reasons

    Changes nams in tables:

        - services
        - capacities
        - projects
    :return:
    """

    current_app.logger.info("Faking names from resources files")
    # get  resources files
    #
    # 1- projects
    #
    fake_projects_file = os.path.join(current_app.config['PDC_RESOURCES_DIR'], 'fake-db-names', 'fake-projects.txt')
    with open(fake_projects_file, newline='') as csvfile:
        spamreader = csv.reader(csvfile, delimiter=';', quotechar='|')
        fake_projects_names = [', '.join(row) for row in spamreader]
    fake_projects_names_iterator = iter(fake_projects_names)

    # 2- functions/capacities
    #
    fake_capacities_file = os.path.join(current_app.config['PDC_RESOURCES_DIR'], 'fake-db-names',
                                        'fake-capacities.txt')
    with open(fake_capacities_file, newline='') as csvfile:
        spamreader = csv.reader(csvfile, delimiter=';', quotechar='|')
        fake_capacities_names = [row for [row] in spamreader]
    fake_capacities_names_iterator = iter(fake_capacities_names)

    # 3- services
    #
    fake_services_file = os.path.join(current_app.config['PDC_RESOURCES_DIR'], 'fake-db-names',
                                      'fake-services.txt')
    with open(fake_services_file, newline='') as csvfile:
        spamreader = csv.reader(csvfile, delimiter=';', quotechar='|')
        fake_services_names = [row for row in spamreader]
    fake_services_names_iterator = iter(fake_services_names)

    # Skip columns names
    #
    next(fake_projects_names_iterator)
    next(fake_capacities_names_iterator)
    next(fake_services_names_iterator)

    for s in Service.query.all():
        next_service = next(fake_services_names_iterator)
        s.name = next_service[0]
        s.abbr = next_service[1]

    for p in Project.query.all():
        p.name = next(fake_projects_names_iterator)

    for c in Capacity.query.all():
        c.name = next(fake_capacities_names_iterator)

    db.session.commit()


@bp.cli.command("feed_from_lesia")
def feed_from_lesia():
    """
    Feed db with agents from a lesia like mysql database.

    Remember to configure the proper database uri in the db_config.py file.
    """
    from .lesia_db import lesia_agent, lesia_session, lesia_service, lesia_project, \
        lesia_fonction, lesia_periods, lesia_affectation

    agents = lesia_session.query(lesia_agent).all()
    for a in agents:
        n_a = Agent(id=a.IDagent, firstname=a.nom, secondname=a.prenom)
        db.session.add(n_a)
    db.session.commit()

    services = lesia_session.query(lesia_service).all()
    for s in services:
        n_s = Service(id=s.id, name=s.nom, abbr=s.abbreviation)
        db.session.add(n_s)
    db.session.commit()

    projects = lesia_session.query(lesia_project).all()
    for p in projects:
        n_p = Project(id=p.id, name=p.nom)
        db.session.add(n_p)
    db.session.commit()

    fonctions = lesia_session.query(lesia_fonction).all()
    for f in fonctions:
        n_c = Capacity(id=f.id, name=f.nom)
        db.session.add(n_c)
    db.session.commit()

    periods = lesia_session.query(lesia_periods)
    for p in periods:
        n_p = Period(name=p.id_semestre)
        db.session.add(n_p)
    db.session.commit()

    affectations = lesia_session.query(lesia_affectation)
    for f in affectations:
        p = Period.query.filter(Period.name == f.semestre_id).one()
        n_c = Charge(agent_id=f.agent_id,
                     project_id=f.projet_id,
                     service_id=f.service_id,
                     capacity_id=f.fonction_id,
                     period_id=p.id,
                     charge_rate=f.charge)
        db.session.add(n_c)
    db.session.commit()


@bp.cli.command("feed_periods")
@click.option('--begin-year', '-b', 'begin_year', default=2005, help="the start year to begin periods with")
@click.option('--end-year', '-e', 'end_year', default=2025, help="the last year to end periods with")
def feed_periods(begin_year, end_year):
    """ Fill in the periods names in the database. """
    for y in range(begin_year, end_year):
        for s in ['S1', 'S2']:
            period_name = "{}_{}".format(y, s)
            p = Period(name=period_name)
            db.session.add(p)
    try:
        db.session.commit()
    except IntegrityError:
        current_app.logger.error("Periods already exist")


@bp.cli.command("feed_random_charges")
@click.option('--agent', '-a', 'agent', default=None, help="the agent id you want to charge")
def feed_random_charges(agent):
    """ Randomly fill in the agents charges. """
    for i in range(0, 100):
        if agent is None:
            agent_id = random.choice([i for (i,) in db.session.query(Agent.id).all()])
        else:
            agent_id = int(agent)
        project_id = random.choice([i for (i,) in db.session.query(Project.id).all()])
        service_id = random.choice([i for (i,) in db.session.query(Service.id).all()])
        capacity_id = random.choice([i for (i,) in db.session.query(Capacity.id).all()])
        period_id = random.choice([i for (i,) in db.session.query(Period.id).all()])
        percent = random.choice(range(10, 110, 10))
        # check max agent charge for the period
        total_charge = db.session.query(func.sum(Charge.charge_rate).label("total_charge")) \
            .filter(Charge.agent_id == agent_id,
                    Charge.period_id == period_id
                    ).scalar()
        if total_charge is not None and (total_charge + percent) >= 100:
            print("Skipping agent {} for period {}".format(agent_id, period_id))
            continue
        charge = Charge(agent_id=agent_id,
                        project_id=project_id,
                        service_id=service_id,
                        capacity_id=capacity_id,
                        period_id=period_id,
                        charge_rate=percent)
        print("adding {}_{}_{}_{}_{}_{}".format(agent_id, project_id, service_id, capacity_id, period_id, percent))
        db.session.add(charge)
        db.session.commit()


@bp.cli.command('user_delete')
@click.argument('user_id')
def user_delete(user_id):
    """Delete the user by given id (see user_show_all")."""
    user = User.query.get(user_id)
    db.session.delete(user)
    db.session.commit()


@bp.cli.command('create_db')
def create_db():
    """
    Create the database structure. Database should be empty.

    configure the proper database uri in the db_config.py file.
    """
    db.create_all()
    admin = User(email='admin@nowhere.org', name='admin', login='admin', role='admin')
    admin.set_password('admin')
    sqlite_uri = db.engine.url.__str__() if 'sqlite' in db.engine.url.__str__() else None
    try:
        db.session.add(admin)
        db.session.commit()
    except IntegrityError:
        current_app.logger.error("User admin already exists, database should be empty at create")
        if sqlite_uri:
            current_app.logger.error("see " + sqlite_uri)
        sys.exit(-1)

    if sqlite_uri:
        current_app.logger.info("Created sqlite db: " + sqlite_uri)


@bp.cli.command('user_add')
@click.argument('email')
@click.argument('name')
@click.argument('login')
@click.argument('password')
@click.argument('role')
def user_add(email, name, login, password, role):
    """ Add a new user in db."""
    user = User.query.filter(User.name == name).one_or_none()
    if (user):
        current_app.logger.error(f"user already exists {name}")
        return
    user = User(email=email, name=name, login=login, password=password, role=role)
    db.session.add(user)
    db.session.commit()
    current_app.logger.info(f"added {name}")


@bp.cli.command('show_roles')
def show_roles():
    """ List all available roles for a user"""
    print("\n".join(list(_nameToRole)))


@bp.cli.command('user_show_all')
def user_show_all():
    """ Show all users in db."""
    print("{:<5} {:<15} {:<15} {:<15} {:<15}".format('id', 'name', 'login', 'passwd', 'email'))
    print("{:<5} {:<15} {:<15} {:<15} {:<15}".format('-' * 5, '-' * 15, '-' * 15, '-' * 15, '-' * 15))
    for user in User.query.all():
        print(user.login)
        print("{:<5} {:<15} {:<15} {:<15} {:<15}".format(
            user.id,
            user.name,
            user.login,
            user.password,
            user.email
        ))