test_client_gemini.py
1.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#!/usr/bin/env python3
import threading
import unittest
import sys
sys.path.append('..')
from src.server.server_udp_or_tcp import get_SocketServer_UDP_TCP
from src.client.socket_client_telescope_gemini import SocketClientTelescopeGEMINI
#HOST, PORT = "localhost", 9999
#HOST, PORT = "localhost", 20001
HOST, PORT = "localhost", 11110
class TestClient(unittest.TestCase):
def setUp(self):
self.seq = range(10)
def server(self):
with get_SocketServer_UDP_TCP(HOST, PORT, "UDP") as myserver:
myserver.serve_forever()
def test_run(self):
q_a = [
(':toto#', 'TOTO'),
(':GD#', '+12:28'),
#(':GD#', '+12:28')
('6', 'G'),
]
#TODO: RUN SERVER in a thread
t_server = threading.Thread(target=self.server)
#threads.append(t)
t_server.start()
#time.sleep(3)
# RUN CLIENT to connect to server
#tsock = SocketClient_UDP_TCP(HOST, PORT, "UDP")
with SocketClientTelescopeGEMINI(HOST, PORT) as tsock:
# Only useful for TCP (does nothing for UDP)
tsock._connect_to_server()
# 1) SEND REQUEST data to server
for data in q_a:
question,answer = data
tsock.send_data(question)
# 2) RECEIVE REPLY data from server
data_received = tsock.receive_data()
self.assertEqual(data_received, answer)
radec = tsock.get("RA-DEC")
print("ra-dec is", radec)
self.assertEqual(radec, ['15:01:48', '+12:28'])
#tsock.close()
#TODO: Stop the server thread (t_server), how can we do that ?
# cf https://www.oreilly.com/library/view/python-cookbook-2nd/0596007973/ch09s03.html
#t_server.suicide_toi() !!!
if __name__ == '__main__':
unittest.main()