tests.py
3.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
from django.test import TestCase
from common.models import *
from .tasks import Majordome
# from django.contrib.auth.models import User
import threading
# from threading import Thread
from django.shortcuts import get_object_or_404
import time
# import sys
import subprocess
import os
from django.conf import settings as djangosettings
# Create your tests here.
class MajordomeTests(TestCase):
fixtures = ["tests/majordome_test"]
def setUp(self):
"""
self.m = Majordome()
self.m.run()
"""
# Config.objects.create()
self.path_dir_file = os.path.dirname(os.path.realpath(__file__))
# self.venv_bin = self.path_dir_file + os.sep + config["path"] + os.sep + config["env"] + os.sep + self.bin_dir + os.sep + self.bin_name
self.venv_bin = "../../private/venv_py3_pyros/bin/python"
def getConfigFromDB(self):
return get_object_or_404(Config, id=1)
def execProcessAsync(self, command):
# self.printFullTerm(Colors.BLUE, "Executing command [" + command + "]")
p = subprocess.Popen(command, shell=True)
"""
self.subproc.append((p, command))
self.printFullTerm(Colors.GREEN, "Process launched successfully")
self.addExecuted(self.current_command, command)
"""
return p
def execProcessFromVenvAsync(self, command: str):
args = command.split()
# self.printFullTerm(Colors.BLUE, "Executing command from venv [" + str(' '.join(args[1:])) + "]")
p = subprocess.Popen(args)
# self.subproc.append((p, ' '.join(args[1:])))
# self.printFullTerm(Colors.GREEN, "Process launched successfully")
# self.addExecuted(self.current_command, str(' '.join(args[1:])))
return p
def test_all(self):
"""
self.config = get_object_or_404(Config, id=1)
print("config id is", self.config.id)
print("config latitude is", self.config.latitude)
print("config global mode is", self.config.global_mode)
print("config row_data_save_frequency is", self.config.row_data_save_frequency)
"""
# print_lock = threading.Lock()
# Majordome().run()
"""
time.sleep(2)
thread_majordome = Majordome()
thread_majordome.start()
#thread_majordome.run()
"""
# with print_lock:
print("global mode is", self.getConfigFromDB().global_mode)
# sys.stdout.write("global mode")
# sys.stdout.flush()
print("global mode is", self.getConfigFromDB().global_mode)
print("DB1 used is:", djangosettings.DATABASES["default"]["NAME"])
time.sleep(2)
#'''
# Launch the Majordome agent
#'''
agent = "majordome"
os.chdir(agent)
# print("Current directory : " + str(os.getcwd()))
######p = self.execProcessFromVenvAsync(self.venv_bin + ' start_agent_'+agent+'_from_test.py')
# p = self.execProcessFromVenvAsync('./start_agent_'+agent+'.py')
# p.wait()
print("hello1")
print("hello2")
time.sleep(10)
"""
# Kill agent Majordome
self.config = self.getConfigFromDB()
self.config.majordome_state = "STOP"
self.config.save()
p.kill()
"""
# self.execProcessAsync("ps aux | grep \"start_agent_majordome.py\" | awk '{ print $2 }' | xargs kill")
# self.execProcessAsync("ps aux | grep start_agent_majordome.py | awk '{ print $2 }' | xargs kill")
#'''
print("hello3")
time.sleep(5)
print("hello4")
# (EP) !!! VERY IMPORTANT, because otherwise the other tests won't pass as they are expecting to run from src/ !!!
os.chdir("..")
self.tearDown()
# thread_majordome.join()
# thread_majordome.killExecutingSequence()