test_client_gemini.py
2.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#!/usr/bin/env python3
import threading
import unittest
import sys
sys.path.append('..')
sys.path.append('../..')
from devices_channel.server.server_udp_or_tcp import get_SocketServer_UDP_TCP
#from src_socket.client.socket_client_telescope_gemini import SocketClientTelescopeGEMINI
import devices_channel.client.telescope_controller_gemini as gemini
#HOST, PORT = "localhost", 9999
#HOST, PORT = "localhost", 20001
HOST, PORT = "localhost", 11110
class TestClient(unittest.TestCase):
def setUp(self):
self.seq = range(10)
def server(self):
with get_SocketServer_UDP_TCP(HOST, PORT, "UDP") as myserver:
myserver.serve_forever()
def test_run_doctests(self):
import doctest
doctest.testmod(gemini)
def test_run_unittests(self):
q_a = [
(':toto#', 'TOTO'),
(':GD#', '+12:28'),
#(':GD#', '+12:28')
('6', 'G'),
]
#TODO: RUN SERVER in a thread
t_server = threading.Thread(target=self.server)
#threads.append(t)
t_server.start()
#time.sleep(3)
# RUN CLIENT to connect to server
#tele_client = SocketClient_UDP_TCP(HOST, PORT, "UDP")
with gemini.TelescopeControllerGEMINI(HOST, PORT) as tele_client:
# Only useful for TCP (does nothing for UDP)
tele_client._connect_to_device()
# 1) SEND REQUEST data to server
for data in q_a:
question,answer = data
tele_client.send_data(question)
# 2) RECEIVE REPLY data from server
data_received = tele_client.receive_data()
self.assertEqual(data_received, answer)
radec = tele_client.get_radec()
print("ra-dec is", radec)
#self.assertEqual(radec, ('15:01:48', '+12:28'))
self.assertEqual(radec.txt, '15:01:48,+12:28')
#tele_client.close()
#TODO: Stop the server thread (t_server), how can we do that ?
# cf https://www.oreilly.com/library/view/python-cookbook-2nd/0596007973/ch09s03.html
#t_server.suicide_toi() !!!
if __name__ == '__main__':
unittest.main()