commands.py 13.1 KB
import csv
import os
import sys
import click
import random

from flask import current_app
from sqlalchemy.exc import OperationalError, IntegrityError
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.sql import func
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import Session
from sqlalchemy import create_engine

from app.models import db, Agent, Service, Project, Capacity, Period, Charge
from app.auth.models import User, _nameToRole, _roleToName

from . import bp


@bp.cli.command('create_db')
def create_db():
    """
    Create the database structure. Database should be empty.

    configure the proper database uri in the db_config.py file.
    """
    db.create_all()
    admin = User(email='admin@nowhere.org', name='admin', login='admin', role='admin')
    admin.set_password('admin')
    sqlite_uri = db.engine.url.__str__() if 'sqlite' in db.engine.url.__str__() else None
    try:
        db.session.add(admin)
        db.session.commit()
    except IntegrityError:
        current_app.logger.error("User admin already exists, database should be empty at create")
        if sqlite_uri:
            current_app.logger.error("see " + sqlite_uri)
        sys.exit(-1)

    if sqlite_uri:
        current_app.logger.info("Created sqlite db: " + sqlite_uri)


@bp.cli.command("feed_from_irap")
@click.option('--csv-file', '-f', 'csv_file_name', help="the csv file path to feed from")
def feed_from_irap(csv_file_name):
    """
    Use an Irap csv charges files and feed db with

    :param csv_file_name:
    :return:
    """

    rows = []

    with open(csv_file_name, newline='') as csvfile:
        csvreader = csv.DictReader(csvfile, delimiter=',', quotechar='"')
        for row in csvreader:
            # print('\n'.join(row.keys()))
            # break
            # Remove any leading/trailing spaces
            row = {k: v.strip() for k, v in row.items()}
            rows.append(row)

    firstname_key = 'NOM'
    secondname_key = 'prénom'
    project_key = 'PROJETS'
    service_key = 'Groupe métier'
    # typology_title = 'TYPOLOGIE'
    # thematic_title = 'thématique'

    # Get the columns values
    #
    projects = [r[project_key] for r in rows]
    projects = sorted(set(projects))
    agents = [(r[firstname_key], r[secondname_key].strip()) for r in rows]
    agents = sorted(set(agents))
    services = [r[service_key] for r in rows]
    services = sorted(set(services))

    # thematics = [r[thematic_title] for r in rows]
    # thematics = sorted(set(thematics))
    # typologies = [r[typology_title] for r in rows]
    # typologies = sorted(set(typologies))

    # Feed agents from column
    #
    for a in agents:
        n_a = Agent(firstname=a[0], secondname=a[1])
        db.session.add(n_a)
    db.session.commit()

    # Feed projects from column
    #
    for p in projects:
        n_p = Project(name=p)
        db.session.add(n_p)
    db.session.commit()

    # Feed services from column
    #
    for s in services:
        n_s = Service(name=s)
        db.session.add(n_s)
    db.session.commit()

    # Feed periods names
    # Todo: are statically built,
    #       should come from year column name.
    #
    for p in range(2011, 2030):
        n_p = Period(name=f"{p}")
        db.session.add(n_p)
    db.session.commit()

    # Add one default capacity
    db.session.add(Capacity(name="Travailleur"))
    db.session.commit()

    # Now feed the charges.
    #
    # At least one for each csv row
    # At most one for each year
    #
    for r in rows:
        p = Project.query.filter(Project.name == r[project_key]).one()
        a = Agent.query.filter(Agent.firstname == r[firstname_key], Agent.secondname == r[secondname_key]).one()
        s = Service.query.filter(Service.name == r[service_key]).one()
        c = Capacity.query.first()
        for period_name in range(2011, 2030):
            t = Period.query.filter(Period.name == period_name).one()
            charge = r[f"{period_name}"]
            try:
                charge = int(100 * float(charge))
            except ValueError:
                charge = 0
            if charge == 0:
                continue
            n_c = Charge(agent_id=a.id,
                         project_id=p.id,
                         service_id=s.id,
                         capacity_id=c.id,
                         period_id=t.id,
                         charge_rate=charge)
            db.session.add(n_c)
    db.session.commit()


@bp.cli.command("feed_from_lesia")
def feed_from_lesia():
    """
    Feed db with agents from a lesia like mysql database.

    Remember to configure the proper database uri in the db_config.py file.
    """
    from .lesia_db import lesia_agent, lesia_session, lesia_service, lesia_project, \
        lesia_fonction, lesia_periods, lesia_affectation

    agents = lesia_session.query(lesia_agent).all()
    for a in agents:
        n_a = Agent(id=a.IDagent, firstname=a.nom, secondname=a.prenom)
        db.session.add(n_a)
    db.session.commit()

    services = lesia_session.query(lesia_service).all()
    for s in services:
        n_s = Service(id=s.id, name=s.nom, abbr=s.abbreviation)
        db.session.add(n_s)
    db.session.commit()

    projects = lesia_session.query(lesia_project).all()
    for p in projects:
        n_p = Project(id=p.id, name=p.nom)
        db.session.add(n_p)
    db.session.commit()

    fonctions = lesia_session.query(lesia_fonction).all()
    for f in fonctions:
        n_c = Capacity(id=f.id, name=f.nom)
        db.session.add(n_c)
    db.session.commit()

    periods = lesia_session.query(lesia_periods)
    for p in periods:
        n_p = Period(name=p.id_semestre)
        db.session.add(n_p)
    db.session.commit()

    affectations = lesia_session.query(lesia_affectation)
    for f in affectations:
        p = Period.query.filter(Period.name == f.semestre_id).one()
        n_c = Charge(agent_id=f.agent_id,
                     project_id=f.projet_id,
                     service_id=f.service_id,
                     capacity_id=f.fonction_id,
                     period_id=p.id,
                     charge_rate=f.charge)
        db.session.add(n_c)
    db.session.commit()


@bp.cli.command("fake_lesia_names")
def fake_lesia_names():
    """
    Extract fake name from resources files to change names in db.
    Mainly after a lesia import, for confidential reasons

    Changes nams in tables:

        - services
        - capacities
        - projects
    :return:
    """

    current_app.logger.info("Faking names from resources files")
    # get  resources files
    #
    # 1- projects
    #
    fake_projects_file = os.path.join(current_app.config['PDC_RESOURCES_DIR'], 'fake-db-names', 'fake-projects.txt')
    with open(fake_projects_file, newline='') as csvfile:
        spamreader = csv.reader(csvfile, delimiter=';', quotechar='|')
        fake_projects_names = [', '.join(row) for row in spamreader]
    fake_projects_names_iterator = iter(fake_projects_names)

    # 2- functions/capacities
    #
    fake_capacities_file = os.path.join(current_app.config['PDC_RESOURCES_DIR'], 'fake-db-names',
                                        'fake-capacities.txt')
    with open(fake_capacities_file, newline='') as csvfile:
        spamreader = csv.reader(csvfile, delimiter=';', quotechar='|')
        fake_capacities_names = [row for [row] in spamreader]
    fake_capacities_names_iterator = iter(fake_capacities_names)

    # 3- services
    #
    fake_services_file = os.path.join(current_app.config['PDC_RESOURCES_DIR'], 'fake-db-names',
                                      'fake-services.txt')
    with open(fake_services_file, newline='') as csvfile:
        spamreader = csv.reader(csvfile, delimiter=';', quotechar='|')
        fake_services_names = [row for row in spamreader]
    fake_services_names_iterator = iter(fake_services_names)

    # Skip columns names
    #
    next(fake_projects_names_iterator)
    next(fake_capacities_names_iterator)
    next(fake_services_names_iterator)

    for s in Service.query.all():
        next_service = next(fake_services_names_iterator)
        s.name = next_service[0]
        s.abbr = next_service[1]

    for p in Project.query.all():
        p.name = next(fake_projects_names_iterator)

    for c in Capacity.query.all():
        c.name = next(fake_capacities_names_iterator)

    db.session.commit()


@bp.cli.command("feed_periods")
@click.option('--begin-year', '-b', 'begin_year', default=2005, help="the start year to begin periods with")
@click.option('--end-year', '-e', 'end_year', default=2025, help="the last year to end periods with")
def feed_periods(begin_year, end_year):
    """ Fill in the periods names in the database. """
    for y in range(begin_year, end_year):
        for s in ['S1', 'S2']:
            period_name = "{}_{}".format(y, s)
            p = Period(name=period_name)
            db.session.add(p)
    try:
        db.session.commit()
    except IntegrityError:
        current_app.logger.error("Periods already exist")


@bp.cli.command("feed_random_charges")
@click.option('--agent', '-a', 'agent', default=None, help="the agent id you want to charge")
def feed_random_charges(agent):
    """ Randomly fill in the agents charges. """
    for i in range(0, 100):
        if agent is None:
            agent_id = random.choice([i for (i,) in db.session.query(Agent.id).all()])
        else:
            agent_id = int(agent)
        project_id = random.choice([i for (i,) in db.session.query(Project.id).all()])
        service_id = random.choice([i for (i,) in db.session.query(Service.id).all()])
        capacity_id = random.choice([i for (i,) in db.session.query(Capacity.id).all()])
        period_id = random.choice([i for (i,) in db.session.query(Period.id).all()])
        percent = random.choice(range(10, 110, 10))
        # check max agent charge for the period
        total_charge = db.session.query(func.sum(Charge.charge_rate).label("total_charge")) \
            .filter(Charge.agent_id == agent_id,
                    Charge.period_id == period_id
                    ).scalar()
        if total_charge is not None and (total_charge + percent) >= 100:
            print("Skipping agent {} for period {}".format(agent_id, period_id))
            continue
        charge = Charge(agent_id=agent_id,
                        project_id=project_id,
                        service_id=service_id,
                        capacity_id=capacity_id,
                        period_id=period_id,
                        charge_rate=percent)
        print("adding {}_{}_{}_{}_{}_{}".format(agent_id, project_id, service_id, capacity_id, period_id, percent))
        db.session.add(charge)
        db.session.commit()


@bp.cli.command('user_add')
@click.argument('email')
@click.argument('name')
@click.argument('login')
@click.argument('password')
@click.argument('role')
def user_add(email, name, login, password, role):
    """ Add a new user in db."""
    user = User.query.filter(User.name == name).one_or_none()
    if (user):
        current_app.logger.error(f"user already exists {name}")
        return
    user = User(email=email, name=name, login=login, password=password, role=role)
    db.session.add(user)
    db.session.commit()
    current_app.logger.info(f"added {name}")


@bp.cli.command('user_update')
@click.option('--name', '-n', 'name', default=None, help="the name to set for that user")
@click.option('--role', '-r', 'role', default=None, help="the role to set for that user")
@click.option('--email', '-e', 'email', default=None, help="the email to set for that user")
@click.option('--password', '-p', 'password', default=None, help="the password to set for that user")
@click.argument('user_id')
def user_update(user_id, name, role, email, password):
    """Update the user by given id and given parameters."""
    user = User.query.get(user_id)
    if not user:
        current_app.logger.error(f"such user_id doesnt exists {user_id}")
        return
    if name:
        user.name = name
        print(f"User --{user.name}-- name updated to {user.name}")
    if role:
        user.set_role(role)
        print(f"User --{user.name}-- role updated to {_roleToName[user.role]}")
    if email:
        user.email=email
        print(f"User --{user.name}-- email updated to {user.email}")
    if password:
        print(f"User --{user.name}-- password updated")
        user.set_password(password)
    if not ( name or role or email or password):
        print(f"No update for user --{user.name}--")
    db.session.commit()


@bp.cli.command('user_delete')
@click.argument('user_id')
def user_delete(user_id):
    """Delete the user by given id (see user_show_all")."""
    user = User.query.get(user_id)
    db.session.delete(user)
    db.session.commit()


@bp.cli.command('show_roles')
def show_roles():
    """ List all available roles for a user"""
    print("\n".join(list(_nameToRole)))


@bp.cli.command('user_show_all')
def user_show_all():
    """ Show all users in db."""
    print("{:<5} {:<15} {:<15} {:<15}".format('id', 'name', 'login', 'email'))
    print("{:<5} {:<15} {:<15} {:<15}".format('-' * 5, '-' * 15, '-' * 15, '-' * 15))
    for user in User.query.all():
        print(user.login)
        print("{:<5} {:<15} {:<15} {:<15}".format(
            user.id,
            user.name,
            user.login,
            user.email
        ))