tests.py 3.67 KB
from django.test import TestCase
from common.models import *
from .tasks import Majordome
#from django.contrib.auth.models import User
import threading
#from threading import Thread
from django.shortcuts import get_object_or_404
import time
#import sys
import subprocess
import os
from django.conf import settings as djangosettings


# Create your tests here.


class MajordomeTests(TestCase):

    fixtures = ['tests/majordome_test']



    def setUp(self):
        '''
        self.m = Majordome()
        self.m.run()
        '''
        #Config.objects.create()
        self.path_dir_file = os.path.dirname(os.path.realpath(__file__))
        #self.venv_bin = self.path_dir_file + os.sep + config["path"] + os.sep + config["env"] + os.sep + self.bin_dir + os.sep + self.bin_name
        self.venv_bin = '../../private/venv_py3_pyros/bin/python'
        

    def getConfigFromDB(self):
        return get_object_or_404(Config, id=1)

    def execProcessAsync(self, command):
        #self.printFullTerm(Colors.BLUE, "Executing command [" + command + "]")
        p = subprocess.Popen(command, shell=True)
        '''
        self.subproc.append((p, command))
        self.printFullTerm(Colors.GREEN, "Process launched successfully")
        self.addExecuted(self.current_command, command)
        '''
        return p

    def execProcessFromVenvAsync(self, command: str):
        args = command.split()
        #self.printFullTerm(Colors.BLUE, "Executing command from venv [" + str(' '.join(args[1:])) + "]")
        p = subprocess.Popen(args)
        #self.subproc.append((p, ' '.join(args[1:])))
        #self.printFullTerm(Colors.GREEN, "Process launched successfully")
        #self.addExecuted(self.current_command, str(' '.join(args[1:])))
        return p



    def test_all(self):
        '''
        self.config = get_object_or_404(Config, id=1)
        print("config id is", self.config.id)
        print("config latitude is", self.config.latitude)
        print("config global mode is", self.config.global_mode)
        print("config row_data_save_frequency is", self.config.row_data_save_frequency)
        '''
        
        #print_lock = threading.Lock()


        #Majordome().run()
        '''
        time.sleep(2)
        thread_majordome = Majordome()
        thread_majordome.start()
        #thread_majordome.run()
        '''
        
        #with print_lock:
        print("global mode is", self.getConfigFromDB().global_mode)
        #sys.stdout.write("global mode")
        #sys.stdout.flush()
        print("global mode is", self.getConfigFromDB().global_mode)
        print("DB1 used is:", djangosettings.DATABASES['default']['NAME'])

        time.sleep(2)
        #'''

        # Launch the Majordome agent
        #'''
        agent="majordome"
        os.chdir(agent)
        #print("Current directory : " + str(os.getcwd()))
        ######p = self.execProcessFromVenvAsync(self.venv_bin + ' start_agent_'+agent+'_from_test.py')
        #p = self.execProcessFromVenvAsync('./start_agent_'+agent+'.py')
        #p.wait()
        print("hello1")
        print("hello2")
        time.sleep(10)

        '''
        # Kill agent Majordome
        self.config = self.getConfigFromDB()
        self.config.majordome_state = "STOP"
        self.config.save()
        p.kill()
        '''
        
        #self.execProcessAsync("ps aux | grep \"start_agent_majordome.py\" | awk '{ print $2 }' | xargs kill")
        #self.execProcessAsync("ps aux | grep start_agent_majordome.py | awk '{ print $2 }' | xargs kill")
        #'''
        
        print("hello3")
        time.sleep(5)
        print("hello4")
        
        self.tearDown()
        #thread_majordome.join()
        #thread_majordome.killExecutingSequence()