from django.test import TestCase from common.models import * from .tasks import Majordome #from django.contrib.auth.models import User import threading #from threading import Thread from django.shortcuts import get_object_or_404 import time #import sys import subprocess import os from django.conf import settings as djangosettings # Create your tests here. class MajordomeTests(TestCase): fixtures = ['tests/majordome_test'] def setUp(self): ''' self.m = Majordome() self.m.run() ''' #Config.objects.create() self.path_dir_file = os.path.dirname(os.path.realpath(__file__)) #self.venv_bin = self.path_dir_file + os.sep + config["path"] + os.sep + config["env"] + os.sep + self.bin_dir + os.sep + self.bin_name self.venv_bin = '../../private/venv_py3_pyros/bin/python' def getConfigFromDB(self): return get_object_or_404(Config, id=1) def execProcessAsync(self, command): #self.printFullTerm(Colors.BLUE, "Executing command [" + command + "]") p = subprocess.Popen(command, shell=True) ''' self.subproc.append((p, command)) self.printFullTerm(Colors.GREEN, "Process launched successfully") self.addExecuted(self.current_command, command) ''' return p def execProcessFromVenvAsync(self, command: str): args = command.split() #self.printFullTerm(Colors.BLUE, "Executing command from venv [" + str(' '.join(args[1:])) + "]") p = subprocess.Popen(args) #self.subproc.append((p, ' '.join(args[1:]))) #self.printFullTerm(Colors.GREEN, "Process launched successfully") #self.addExecuted(self.current_command, str(' '.join(args[1:]))) return p def test_all(self): ''' self.config = get_object_or_404(Config, id=1) print("config id is", self.config.id) print("config latitude is", self.config.latitude) print("config global mode is", self.config.global_mode) print("config row_data_save_frequency is", self.config.row_data_save_frequency) ''' #print_lock = threading.Lock() #Majordome().run() ''' time.sleep(2) thread_majordome = Majordome() thread_majordome.start() #thread_majordome.run() ''' #with print_lock: print("global mode is", self.getConfigFromDB().global_mode) #sys.stdout.write("global mode") #sys.stdout.flush() print("global mode is", self.getConfigFromDB().global_mode) print("DB1 used is:", djangosettings.DATABASES['default']['NAME']) time.sleep(2) #''' # Launch the Majordome agent #''' agent="majordome" os.chdir(agent) #print("Current directory : " + str(os.getcwd())) ######p = self.execProcessFromVenvAsync(self.venv_bin + ' start_agent_'+agent+'_from_test.py') #p = self.execProcessFromVenvAsync('./start_agent_'+agent+'.py') #p.wait() print("hello1") print("hello2") time.sleep(10) ''' # Kill agent Majordome self.config = self.getConfigFromDB() self.config.majordome_state = "STOP" self.config.save() p.kill() ''' #self.execProcessAsync("ps aux | grep \"start_agent_majordome.py\" | awk '{ print $2 }' | xargs kill") #self.execProcessAsync("ps aux | grep start_agent_majordome.py | awk '{ print $2 }' | xargs kill") #''' print("hello3") time.sleep(5) print("hello4") # (EP) !!! VERY IMPORTANT, because otherwise the other tests won't pass as they are expecting to run from src/ !!! os.chdir('..') self.tearDown() #thread_majordome.join() #thread_majordome.killExecutingSequence()