tests.py
3.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from django.test import TestCase
from common.models import *
from .tasks import Majordome
#from django.contrib.auth.models import User
import threading
#from threading import Thread
from django.shortcuts import get_object_or_404
import time
#import sys
import subprocess
import os
from django.conf import settings as djangosettings
# Create your tests here.
class MajordomeTests(TestCase):
fixtures = ['tests/majordome_test']
def setUp(self):
'''
self.m = Majordome()
self.m.run()
'''
#Config.objects.create()
self.path_dir_file = os.path.dirname(os.path.realpath(__file__))
#self.venv_bin = self.path_dir_file + os.sep + config["path"] + os.sep + config["env"] + os.sep + self.bin_dir + os.sep + self.bin_name
self.venv_bin = '../../private/venv_py3_pyros/bin/python'
def getConfigFromDB(self):
return get_object_or_404(Config, id=1)
def execProcessAsync(self, command):
#self.printFullTerm(Colors.BLUE, "Executing command [" + command + "]")
p = subprocess.Popen(command, shell=True)
'''
self.subproc.append((p, command))
self.printFullTerm(Colors.GREEN, "Process launched successfully")
self.addExecuted(self.current_command, command)
'''
return p
def execProcessFromVenvAsync(self, command: str):
args = command.split()
#self.printFullTerm(Colors.BLUE, "Executing command from venv [" + str(' '.join(args[1:])) + "]")
p = subprocess.Popen(args)
#self.subproc.append((p, ' '.join(args[1:])))
#self.printFullTerm(Colors.GREEN, "Process launched successfully")
#self.addExecuted(self.current_command, str(' '.join(args[1:])))
return p
def test_all(self):
'''
self.config = get_object_or_404(Config, id=1)
print("config id is", self.config.id)
print("config latitude is", self.config.latitude)
print("config global mode is", self.config.global_mode)
print("config row_data_save_frequency is", self.config.row_data_save_frequency)
'''
#print_lock = threading.Lock()
#Majordome().run()
'''
time.sleep(2)
thread_majordome = Majordome()
thread_majordome.start()
#thread_majordome.run()
'''
#with print_lock:
print("global mode is", self.getConfigFromDB().global_mode)
#sys.stdout.write("global mode")
#sys.stdout.flush()
print("global mode is", self.getConfigFromDB().global_mode)
print("DB1 used is:", djangosettings.DATABASES['default']['NAME'])
time.sleep(2)
#'''
# Launch the Majordome agent
#'''
agent="majordome"
os.chdir(agent)
#print("Current directory : " + str(os.getcwd()))
p = self.execProcessFromVenvAsync(self.venv_bin + ' start_agent_'+agent+'_from_test.py')
#p = self.execProcessFromVenvAsync('./start_agent_'+agent+'.py')
#p.wait()
print("hello1")
print("hello2")
time.sleep(10)
# Kill agent Majordome
self.config = self.getConfigFromDB()
self.config.majordome_state = "STOP"
self.config.save()
p.kill()
#self.execProcessAsync("ps aux | grep \"start_agent_majordome.py\" | awk '{ print $2 }' | xargs kill")
#self.execProcessAsync("ps aux | grep start_agent_majordome.py | awk '{ print $2 }' | xargs kill")
#'''
print("hello3")
time.sleep(5)
print("hello4")
self.tearDown()
#thread_majordome.join()
#thread_majordome.killExecutingSequence()