Blame view

src/majordome/tests.py 3.31 KB
3df2d31a   haribo   #3430 : dates are...
1
from django.test import TestCase
03814884   Etienne Pallier   launch majordome ...
2
from common.models import *
d48f6550   Jeremy   Fix little bug on...
3
from .tasks import Majordome
03814884   Etienne Pallier   launch majordome ...
4
5
6
7
8
9
10
11
12
#from django.contrib.auth.models import User
import threading
#from threading import Thread
from django.shortcuts import get_object_or_404
import time
#import sys
import subprocess
import os

3df2d31a   haribo   #3430 : dates are...
13
14

# Create your tests here.
d48f6550   Jeremy   Fix little bug on...
15

03814884   Etienne Pallier   launch majordome ...
16
17
18
19
20
21

class MajordomeTests(TestCase):

    fixtures = ['tests/majordome_test']


d48f6550   Jeremy   Fix little bug on...
22
23

    def setUp(self):
03814884   Etienne Pallier   launch majordome ...
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
        '''
        self.m = Majordome()
        self.m.run()
        '''
        #Config.objects.create()
        self.path_dir_file = os.path.dirname(os.path.realpath(__file__))
        #self.venv_bin = self.path_dir_file + os.sep + config["path"] + os.sep + config["env"] + os.sep + self.bin_dir + os.sep + self.bin_name
        self.venv_bin = '../../private/venv_py3_pyros/bin/python'
 

    def getConfigFromDB(self):
        return get_object_or_404(Config, id=1)

    def execProcessAsync(self, command):
        #self.printFullTerm(Colors.BLUE, "Executing command [" + command + "]")
        p = subprocess.Popen(command, shell=True)
        '''
        self.subproc.append((p, command))
        self.printFullTerm(Colors.GREEN, "Process launched successfully")
        self.addExecuted(self.current_command, command)
        '''
        return p

    def execProcessFromVenvAsync(self, command: str):
        args = command.split()
        #self.printFullTerm(Colors.BLUE, "Executing command from venv [" + str(' '.join(args[1:])) + "]")
        p = subprocess.Popen(args)
        #self.subproc.append((p, ' '.join(args[1:])))
        #self.printFullTerm(Colors.GREEN, "Process launched successfully")
        #self.addExecuted(self.current_command, str(' '.join(args[1:])))
        return p



    def test_all(self):
        '''
        self.config = get_object_or_404(Config, id=1)
        print("config id is", self.config.id)
        print("config latitude is", self.config.latitude)
        print("config global mode is", self.config.global_mode)
        print("config row_data_save_frequency is", self.config.row_data_save_frequency)
        '''
        
        #print_lock = threading.Lock()


        #Majordome().run()
        '''
        time.sleep(2)
        thread_majordome = Majordome()
        thread_majordome.start()
        #thread_majordome.run()
        '''
        
        #with print_lock:
        print("global mode is", self.getConfigFromDB().global_mode)
        #sys.stdout.write("global mode")
        #sys.stdout.flush()
        print("global mode is", self.getConfigFromDB().global_mode)

        time.sleep(2)
        #'''

        # Launch the Majordome agent
        #'''
        agent="majordome"
        os.chdir(agent)
        #print("Current directory : " + str(os.getcwd()))
        p = self.execProcessFromVenvAsync(self.venv_bin + ' start_agent_'+agent+'_from_test.py')
        #p = self.execProcessFromVenvAsync('./start_agent_'+agent+'.py')
        #p.wait()
        print("hello1")
        print("hello2")
        time.sleep(7)
d48f6550   Jeremy   Fix little bug on...
98

03814884   Etienne Pallier   launch majordome ...
99
100
101
102
103
104
105
106
107
        # Kill agent Majordome
        p.kill()
        #self.execProcessAsync("ps aux | grep \"start_agent_majordome.py\" | awk '{ print $2 }' | xargs kill")
        #self.execProcessAsync("ps aux | grep start_agent_majordome.py | awk '{ print $2 }' | xargs kill")
        #'''
        
        print("hello")
        #thread_majordome.join()
        #thread_majordome.killExecutingSequence()