from django.test import TestCase from common.models import * from .tasks import Majordome # from django.contrib.auth.models import User import threading # from threading import Thread from django.shortcuts import get_object_or_404 import time # import sys import subprocess import os from django.conf import settings as djangosettings # Create your tests here. class MajordomeTests(TestCase): fixtures = ["tests/majordome_test"] def setUp(self): """ self.m = Majordome() self.m.run() """ # Config.objects.create() self.path_dir_file = os.path.dirname(os.path.realpath(__file__)) # self.venv_bin = self.path_dir_file + os.sep + config["path"] + os.sep + config["env"] + os.sep + self.bin_dir + os.sep + self.bin_name self.venv_bin = "../../private/venv_py3_pyros/bin/python" def getConfigFromDB(self): return get_object_or_404(Config, id=1) def execProcessAsync(self, command): # self.printFullTerm(Colors.BLUE, "Executing command [" + command + "]") p = subprocess.Popen(command, shell=True) """ self.subproc.append((p, command)) self.printFullTerm(Colors.GREEN, "Process launched successfully") self.addExecuted(self.current_command, command) """ return p def execProcessFromVenvAsync(self, command: str): args = command.split() # self.printFullTerm(Colors.BLUE, "Executing command from venv [" + str(' '.join(args[1:])) + "]") p = subprocess.Popen(args) # self.subproc.append((p, ' '.join(args[1:]))) # self.printFullTerm(Colors.GREEN, "Process launched successfully") # self.addExecuted(self.current_command, str(' '.join(args[1:]))) return p def test_all(self): """ self.config = get_object_or_404(Config, id=1) print("config id is", self.config.id) print("config latitude is", self.config.latitude) print("config global mode is", self.config.global_mode) print("config row_data_save_frequency is", self.config.row_data_save_frequency) """ # print_lock = threading.Lock() # Majordome().run() """ time.sleep(2) thread_majordome = Majordome() thread_majordome.start() #thread_majordome.run() """ # with print_lock: print("global mode is", self.getConfigFromDB().global_mode) # sys.stdout.write("global mode") # sys.stdout.flush() print("global mode is", self.getConfigFromDB().global_mode) print("DB1 used is:", djangosettings.DATABASES["default"]["NAME"]) time.sleep(2) #''' # Launch the Majordome agent #''' agent = "majordome" os.chdir(agent) # print("Current directory : " + str(os.getcwd())) ######p = self.execProcessFromVenvAsync(self.venv_bin + ' start_agent_'+agent+'_from_test.py') # p = self.execProcessFromVenvAsync('./start_agent_'+agent+'.py') # p.wait() print("hello1") print("hello2") time.sleep(10) """ # Kill agent Majordome self.config = self.getConfigFromDB() self.config.majordome_state = "STOP" self.config.save() p.kill() """ # self.execProcessAsync("ps aux | grep \"start_agent_majordome.py\" | awk '{ print $2 }' | xargs kill") # self.execProcessAsync("ps aux | grep start_agent_majordome.py | awk '{ print $2 }' | xargs kill") #''' print("hello3") time.sleep(5) print("hello4") # (EP) !!! VERY IMPORTANT, because otherwise the other tests won't pass as they are expecting to run from src/ !!! os.chdir("..") self.tearDown() # thread_majordome.join() # thread_majordome.killExecutingSequence()