Blame view

src/device_controller/test/test_client_gemini.py 2.63 KB
ac0e5c70   Etienne Pallier   nouveau devices_c...
1
2
3
4
5
6
7
8
9
#!/usr/bin/env python3

import threading

import unittest

import sys
sys.path.append('..')
sys.path.append('../..')
a4c26d27   Etienne Pallier   bugfix test clien...
10
##from device_controller.concrete_component.device_simulator_common.server_udp_or_tcp import get_SocketServer_UDP_TCP
9ee212a8   Etienne Pallier   Renommage des fic...
11
#from src_socket.client.socket_client_telescope_gemini import SocketClientTelescopeGemini
57621753   Etienne Pallier   NOUVELLE ARCHI un...
12
import device_controller.concrete_component.gemini.gemini_controller as gemini 
ac0e5c70   Etienne Pallier   nouveau devices_c...
13
14
15
16
17
18
19
20
21
22
23

#HOST, PORT = "localhost", 9999
#HOST, PORT = "localhost", 20001
HOST, PORT = "localhost", 11110

class TestClient(unittest.TestCase):


    def setUp(self):
        self.seq = range(10)

7a7e30cc   Etienne Pallier   Chaque DeviceCont...
24
    '''
ac0e5c70   Etienne Pallier   nouveau devices_c...
25
26
27
    def server(self):
        with get_SocketServer_UDP_TCP(HOST, PORT, "UDP") as myserver:
            myserver.serve_forever()
7a7e30cc   Etienne Pallier   Chaque DeviceCont...
28
    '''
ac0e5c70   Etienne Pallier   nouveau devices_c...
29
30
31
32
33
34
35
36
        
        
    def test_run_doctests(self):
        import doctest
        doctest.testmod(gemini)


    def test_run_unittests(self):
7a7e30cc   Etienne Pallier   Chaque DeviceCont...
37

ac0e5c70   Etienne Pallier   nouveau devices_c...
38
        q_a = [
3edb50a6   Etienne Pallier   bugfix simulateur...
39
40
41
            #(':toto#', 'TOTO'),
            (':toto#', 'NO ANSWER IMPLEMENTED FOR COMMAND :toto#'),
            #(':GD#', '+12:30'),
827af78a   Etienne Pallier   Nouvelle base de ...
42
            (':GD#', '+12:29'),
ac0e5c70   Etienne Pallier   nouveau devices_c...
43
            #(':GD#', '+12:28')
3edb50a6   Etienne Pallier   bugfix simulateur...
44
            # Command SIX (6)
ac0e5c70   Etienne Pallier   nouveau devices_c...
45
46
            ('6', 'G'),
        ]
7a7e30cc   Etienne Pallier   Chaque DeviceCont...
47
48

        '''
ac0e5c70   Etienne Pallier   nouveau devices_c...
49
50
51
52
53
        #TODO: RUN SERVER in a thread
        t_server = threading.Thread(target=self.server)
        #threads.append(t)
        t_server.start()
        #time.sleep(3)
7a7e30cc   Etienne Pallier   Chaque DeviceCont...
54
55
        '''

ac0e5c70   Etienne Pallier   nouveau devices_c...
56
57
        # RUN CLIENT to connect to server
        #tele_client = SocketClient_UDP_TCP(HOST, PORT, "UDP")
57621753   Etienne Pallier   NOUVELLE ARCHI un...
58
        with gemini.DC_Gemini(HOST, PORT) as tele_client:
ac0e5c70   Etienne Pallier   nouveau devices_c...
59
            # Only useful for TCP (does nothing for UDP)
57621753   Etienne Pallier   NOUVELLE ARCHI un...
60
            gemini_mount = tele_client.get_dc_component_for_type("DC_Mount")
ac0e5c70   Etienne Pallier   nouveau devices_c...
61
62
63
64
65
66
67
68
69
70
71
72
            tele_client._connect_to_device()
            
            # 1) SEND REQUEST data to server
            
            for data in q_a:
                question,answer = data
                tele_client.send_data(question)
        
                # 2) RECEIVE REPLY data from server
                data_received = tele_client.receive_data()
                self.assertEqual(data_received, answer)
            
57621753   Etienne Pallier   NOUVELLE ARCHI un...
73
74
            #radec = tele_client.get_radec()
            radec = gemini_mount.get_radec()
ac0e5c70   Etienne Pallier   nouveau devices_c...
75
76
            print("ra-dec is", radec)
            #self.assertEqual(radec,  ('15:01:48', '+12:28'))
827af78a   Etienne Pallier   Nouvelle base de ...
77
            self.assertEqual(radec.txt, '15:01:49,+12:29')
3edb50a6   Etienne Pallier   bugfix simulateur...
78
            #self.assertEqual(radec.txt, '15:01:50,+12:30')
ac0e5c70   Etienne Pallier   nouveau devices_c...
79
80
81
82
83
84
85
86
87
88
            
            #tele_client.close()
            
        #TODO: Stop the server thread (t_server), how can we do that ?
        # cf https://www.oreilly.com/library/view/python-cookbook-2nd/0596007973/ch09s03.html
        #t_server.suicide_toi() !!!


if __name__ == '__main__':
    unittest.main()