import sys import click import random from sqlalchemy.sql import func from sqlalchemy.ext.automap import automap_base from sqlalchemy.orm import Session from sqlalchemy import create_engine from app.models import db, User, Agent, Service, Project, Capacity, Period, Charge from db_config import mysql_uri from . import bp @bp.cli.command("feed_from_lesia") def feed_from_lesia(): Base = automap_base() engine = create_engine(mysql_uri) # reflect the tables Base.prepare(engine, reflect=True) # mapped classes are now created with names by default # matching that of the table name. LesiaAgent = Base.classes.agent LesiaService = Base.classes.gestit_services LesiaProject = Base.classes.gestit_projets LesiaFonction = Base.classes.gestit_fonctions lesia_session = Session(engine) agents = lesia_session.query(LesiaAgent).all() for a in agents: n_a = Agent(firstname=a.nom, secondname=a.prenom) db.session.add(n_a) db.session.commit() services = lesia_session.query(LesiaService).all() for s in services: n_s = Service(name=s.nom) db.session.add(n_s) db.session.commit() projects = lesia_session.query(LesiaProject).all() for p in projects: n_p = Project(name=p.nom) db.session.add(n_p) db.session.commit() fonctions = lesia_session.query(LesiaFonction).all() for f in fonctions: n_c = Capacity(name=f.nom) db.session.add(n_c) db.session.commit() @bp.cli.command("feed_periods") def feed_periods(): for y in range(2014, 2023): for s in ['S1', 'S2']: period_name = "{}_{}".format(y, s) p = Period(name=period_name) db.session.add(p) db.session.commit() @bp.cli.command("random_charges") @click.option('--agent', '-a', 'agent', default=None) def random_charges(agent): for i in range(0, 100): if agent is None: agent_id = random.choice([i for (i,) in db.session.query(Agent.id).all()]) else: agent_id = int(agent) project_id = random.choice([i for (i,) in db.session.query(Project.id).all()]) service_id = random.choice([i for (i,) in db.session.query(Service.id).all()]) capacity_id = random.choice([i for (i,) in db.session.query(Capacity.id).all()]) period_id = random.choice([i for (i,) in db.session.query(Period.id).all()]) percent = random.choice(range(10, 110, 10)) # check max agent charge for the period total_charge = db.session.query(func.sum(Charge.charge_rate).label("total_charge")) \ .filter(Charge.agent_id == agent_id, Charge.period_id == period_id ).scalar() if total_charge is not None and (total_charge + percent)>= 100: print("Skipping agent {} for period {}".format(agent_id, period_id)) continue charge = Charge(agent_id=agent_id, project_id=project_id, service_id=service_id, capacity_id=capacity_id, period_id=period_id, charge_rate=percent) print("adding {}_{}_{}_{}_{}_{}".format(agent_id, project_id, service_id, capacity_id, period_id, percent)) db.session.add(charge) db.session.commit() @bp.cli.command('delete_user') @click.argument('user_id') def delete_user(user_id): user = User.query.get(user_id) db.session.delete(user) db.session.commit() @bp.cli.command('create_db') def create_db(): db.create_all() admin = User(email='admin@nowhere.org', name='admin', login='admin', password='admin') db.session.add(admin) db.session.commit() @bp.cli.command('add_user') @click.argument('email') @click.argument('name') @click.argument('login') @click.argument('password') def add_user(email, name, login, password): user = User(email=email, name=name, login=login, password=password) db.session.add(user) db.session.commit() print("added ", name) @bp.cli.command('show_all') def show_all(): print("{:<5} {:<15} {:<15} {:<15} {:<15}".format('id', 'name', 'login', 'passwd', 'email')) print("{:<5} {:<15} {:<15} {:<15} {:<15}".format('-' * 5, '-' * 15, '-' * 15, '-' * 15, '-' * 15)) for user in User.query.all(): print(user.login) print("{:<5} {:<15} {:<15} {:<15} {:<15}".format( user.id, user.name, user.login, user.password, user.email ))