import sys import click import random from flask import current_app from sqlalchemy.exc import OperationalError from sqlalchemy.sql import func from sqlalchemy.ext.automap import automap_base from sqlalchemy.orm import Session from sqlalchemy import create_engine from app.models import db, Agent, Service, Project, Capacity, Period, Charge from app.auth.models import User from . import bp @bp.cli.command("feed_from_lesia") def feed_from_lesia(): """ Feed db with agents from a lesia like mysql database. configure that database uri in the db_config.py file. """ Base = automap_base() engine = create_engine(current_app.config['LESIA_AGENTS_DB_URI']) # reflect the tables try: Base.prepare(engine, reflect=True) except OperationalError: # TODO: use logging facility instead print("Please, configure the mysql database (see db_config.py)") sys.exit(-1) # mapped classes are now created with names by default # matching that of the table name. lesia_agent = Base.classes.agent lesia_service = Base.classes.gestit_services lesia_project = Base.classes.gestit_projets lesia_fonction = Base.classes.gestit_fonctions lesia_affectation = Base.classes.gestit_affectations lesia_periods = Base.classes.gestit_semestres lesia_session = Session(engine) agents = lesia_session.query(lesia_agent).all() for a in agents: n_a = Agent(id=a.IDagent, firstname=a.nom, secondname=a.prenom) db.session.add(n_a) db.session.commit() services = lesia_session.query(lesia_service).all() for s in services: n_s = Service(id=s.id, name=s.nom, abbr=s.abbreviation) db.session.add(n_s) db.session.commit() projects = lesia_session.query(lesia_project).all() for p in projects: n_p = Project(id=p.id, name=p.nom) db.session.add(n_p) db.session.commit() fonctions = lesia_session.query(lesia_fonction).all() for f in fonctions: n_c = Capacity(id=f.id, name=f.nom) db.session.add(n_c) db.session.commit() periods = lesia_session.query(lesia_periods) for p in periods: n_p = Period(name=p.id_semestre) db.session.add(n_p) db.session.commit() affectations = lesia_session.query(lesia_affectation) for f in affectations: p = Period.query.filter(Period.name == f.semestre_id).one() n_c = Charge(agent_id=f.agent_id, project_id=f.projet_id, service_id=f.service_id, capacity_id=f.fonction_id, period_id=p.id, charge_rate=f.charge) db.session.add(n_c) db.session.commit() @bp.cli.command("feed_periods") def feed_periods(): """ Fill in the periods name in the database. """ for y in range(2014, 2023): for s in ['S1', 'S2']: period_name = "{}_{}".format(y, s) p = Period(name=period_name) db.session.add(p) db.session.commit() @bp.cli.command("feed_random_charges") @click.option('--agent', '-a', 'agent', default=None, help="the agent id you want to charge") def feed_random_charges(agent): """ Randomly fill in the agents charges. """ for i in range(0, 100): if agent is None: agent_id = random.choice([i for (i,) in db.session.query(Agent.id).all()]) else: agent_id = int(agent) project_id = random.choice([i for (i,) in db.session.query(Project.id).all()]) service_id = random.choice([i for (i,) in db.session.query(Service.id).all()]) capacity_id = random.choice([i for (i,) in db.session.query(Capacity.id).all()]) period_id = random.choice([i for (i,) in db.session.query(Period.id).all()]) percent = random.choice(range(10, 110, 10)) # check max agent charge for the period total_charge = db.session.query(func.sum(Charge.charge_rate).label("total_charge")) \ .filter(Charge.agent_id == agent_id, Charge.period_id == period_id ).scalar() if total_charge is not None and (total_charge + percent) >= 100: print("Skipping agent {} for period {}".format(agent_id, period_id)) continue charge = Charge(agent_id=agent_id, project_id=project_id, service_id=service_id, capacity_id=capacity_id, period_id=period_id, charge_rate=percent) print("adding {}_{}_{}_{}_{}_{}".format(agent_id, project_id, service_id, capacity_id, period_id, percent)) db.session.add(charge) db.session.commit() @bp.cli.command('user_delete') @click.argument('user_id') def user_delete(user_id): """Delete the user by given id (see user_show_all").""" user = User.query.get(user_id) db.session.delete(user) db.session.commit() @bp.cli.command('create_db') def create_db(): """ Create the database structure.""" db.create_all() admin = User(email='admin@nowhere.org', name='admin', login='admin', password='admin', role='admin') db.session.add(admin) db.session.commit() @bp.cli.command('user_add') @click.argument('email') @click.argument('name') @click.argument('login') @click.argument('password') def user_add(email, name, login, password): """ Add a new user in db.""" user = User(email=email, name=name, login=login, password=password) db.session.add(user) db.session.commit() print("added ", name) @bp.cli.command('user_show_all') def user_show_all(): """ Show all users in db.""" print("{:<5} {:<15} {:<15} {:<15} {:<15}".format('id', 'name', 'login', 'passwd', 'email')) print("{:<5} {:<15} {:<15} {:<15} {:<15}".format('-' * 5, '-' * 15, '-' * 15, '-' * 15, '-' * 15)) for user in User.query.all(): print(user.login) print("{:<5} {:<15} {:<15} {:<15} {:<15}".format( user.id, user.name, user.login, user.password, user.email ))