commands.py 6.49 KB
import sys
import click
import random

from flask import current_app
from sqlalchemy.exc import OperationalError, IntegrityError
from sqlalchemy.sql import func
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import Session
from sqlalchemy import create_engine

from app.models import db, Agent, Service, Project, Capacity, Period, Charge
from app.auth.models import User

from . import bp


@bp.cli.command("lesia_to_csv")
def lesia_to_csv():
    """
    Extract some datas from lesia db to csv file for later db creation
        - agents
        - services
        - capacities
        - projects
    :return:
    """


@bp.cli.command("feed_from_lesia")
def feed_from_lesia():
    """
    Feed db with agents from a lesia like mysql database.

    configure the proper database uri in the db_config.py file.
    """
    from .lesia_db import lesia_agent, lesia_session, lesia_service, lesia_project, \
        lesia_fonction, lesia_periods, lesia_affectation

    agents = lesia_session.query(lesia_agent).all()
    for a in agents:
        n_a = Agent(id=a.IDagent, firstname=a.nom, secondname=a.prenom)
        db.session.add(n_a)
    db.session.commit()

    services = lesia_session.query(lesia_service).all()
    for s in services:
        n_s = Service(id=s.id, name=s.nom, abbr=s.abbreviation)
        db.session.add(n_s)
    db.session.commit()

    projects = lesia_session.query(lesia_project).all()
    for p in projects:
        n_p = Project(id=p.id, name=p.nom)
        db.session.add(n_p)
    db.session.commit()

    fonctions = lesia_session.query(lesia_fonction).all()
    for f in fonctions:
        n_c = Capacity(id=f.id, name=f.nom)
        db.session.add(n_c)
    db.session.commit()

    periods = lesia_session.query(lesia_periods)
    for p in periods:
        n_p = Period(name=p.id_semestre)
        db.session.add(n_p)
    db.session.commit()

    affectations = lesia_session.query(lesia_affectation)
    for f in affectations:
        p = Period.query.filter(Period.name == f.semestre_id).one()
        n_c = Charge(agent_id=f.agent_id,
                     project_id=f.projet_id,
                     service_id=f.service_id,
                     capacity_id=f.fonction_id,
                     period_id=p.id,
                     charge_rate=f.charge)
        db.session.add(n_c)
    db.session.commit()


@bp.cli.command("feed_periods")
@click.option('--begin-year', '-b', 'begin_year', default=2005, help="the start year to begin periods with")
@click.option('--end-year', '-e', 'end_year', default=2025, help="the last year to end periods with")
def feed_periods(begin_year, end_year):
    """ Fill in the periods names in the database. """
    for y in range(begin_year, end_year):
        for s in ['S1', 'S2']:
            period_name = "{}_{}".format(y, s)
            p = Period(name=period_name)
            db.session.add(p)
    try:
        db.session.commit()
    except IntegrityError:
        current_app.logger.error("Periods already exist")


@bp.cli.command("feed_random_charges")
@click.option('--agent', '-a', 'agent', default=None, help="the agent id you want to charge")
def feed_random_charges(agent):
    """ Randomly fill in the agents charges. """
    for i in range(0, 100):
        if agent is None:
            agent_id = random.choice([i for (i,) in db.session.query(Agent.id).all()])
        else:
            agent_id = int(agent)
        project_id = random.choice([i for (i,) in db.session.query(Project.id).all()])
        service_id = random.choice([i for (i,) in db.session.query(Service.id).all()])
        capacity_id = random.choice([i for (i,) in db.session.query(Capacity.id).all()])
        period_id = random.choice([i for (i,) in db.session.query(Period.id).all()])
        percent = random.choice(range(10, 110, 10))
        # check max agent charge for the period
        total_charge = db.session.query(func.sum(Charge.charge_rate).label("total_charge")) \
            .filter(Charge.agent_id == agent_id,
                    Charge.period_id == period_id
                    ).scalar()
        if total_charge is not None and (total_charge + percent) >= 100:
            print("Skipping agent {} for period {}".format(agent_id, period_id))
            continue
        charge = Charge(agent_id=agent_id,
                        project_id=project_id,
                        service_id=service_id,
                        capacity_id=capacity_id,
                        period_id=period_id,
                        charge_rate=percent)
        print("adding {}_{}_{}_{}_{}_{}".format(agent_id, project_id, service_id, capacity_id, period_id, percent))
        db.session.add(charge)
        db.session.commit()


@bp.cli.command('user_delete')
@click.argument('user_id')
def user_delete(user_id):
    """Delete the user by given id (see user_show_all")."""
    user = User.query.get(user_id)
    db.session.delete(user)
    db.session.commit()


@bp.cli.command('create_db')
def create_db():
    """
    Create the database structure. Database should be empty.

    configure the proper database uri in the db_config.py file.
    """
    db.create_all()
    admin = User(email='admin@nowhere.org', name='admin', login='admin', password='admin', role='admin')
    sqlite_uri = db.engine.url.__str__() if 'sqlite' in db.engine.url.__str__() else None
    try:
        db.session.add(admin)
        db.session.commit()
    except IntegrityError:
        current_app.logger.error("User admin already exists, database should be empty at create")
        if sqlite_uri:
            current_app.logger.error("see " + sqlite_uri)
        sys.exit(-1)

    if sqlite_uri:
        current_app.logger.info("Created sqlite db: " + sqlite_uri)


@bp.cli.command('user_add')
@click.argument('email')
@click.argument('name')
@click.argument('login')
@click.argument('password')
def user_add(email, name, login, password):
    """ Add a new user in db."""
    user = User(email=email, name=name, login=login, password=password)
    db.session.add(user)
    db.session.commit()
    current_app.logger.info("added ", name)


@bp.cli.command('user_show_all')
def user_show_all():
    """ Show all users in db."""
    print("{:<5} {:<15} {:<15} {:<15} {:<15}".format('id', 'name', 'login', 'passwd', 'email'))
    print("{:<5} {:<15} {:<15} {:<15} {:<15}".format('-' * 5, '-' * 15, '-' * 15, '-' * 15, '-' * 15))
    for user in User.query.all():
        print(user.login)
        print("{:<5} {:<15} {:<15} {:<15} {:<15}".format(
            user.id,
            user.name,
            user.login,
            user.password,
            user.email
        ))