#!/usr/bin/env python3 import threading import unittest import sys sys.path.append('..') sys.path.append('../..') from devices_controller.devices_controller_concrete.device_simulator_common.server_udp_or_tcp import get_SocketServer_UDP_TCP #from src_socket.client.socket_client_telescope_gemini import SocketClientTelescopeGEMINI import devices_controller.devices_controller_concrete.device_controller_Gemini.telescope_controller_gemini as gemini #HOST, PORT = "localhost", 9999 #HOST, PORT = "localhost", 20001 HOST, PORT = "localhost", 11110 class TestClient(unittest.TestCase): def setUp(self): self.seq = range(10) def server(self): with get_SocketServer_UDP_TCP(HOST, PORT, "UDP") as myserver: myserver.serve_forever() def test_run_doctests(self): import doctest doctest.testmod(gemini) def test_run_unittests(self): q_a = [ (':toto#', 'TOTO'), (':GD#', '+12:28'), #(':GD#', '+12:28') ('6', 'G'), ] #TODO: RUN SERVER in a thread t_server = threading.Thread(target=self.server) #threads.append(t) t_server.start() #time.sleep(3) # RUN CLIENT to connect to server #tele_client = SocketClient_UDP_TCP(HOST, PORT, "UDP") with gemini.TelescopeControllerGEMINI(HOST, PORT) as tele_client: # Only useful for TCP (does nothing for UDP) tele_client._connect_to_device() # 1) SEND REQUEST data to server for data in q_a: question,answer = data tele_client.send_data(question) # 2) RECEIVE REPLY data from server data_received = tele_client.receive_data() self.assertEqual(data_received, answer) radec = tele_client.get_radec() print("ra-dec is", radec) #self.assertEqual(radec, ('15:01:48', '+12:28')) self.assertEqual(radec.txt, '15:01:48,+12:28') #tele_client.close() #TODO: Stop the server thread (t_server), how can we do that ? # cf https://www.oreilly.com/library/view/python-cookbook-2nd/0596007973/ch09s03.html #t_server.suicide_toi() !!! if __name__ == '__main__': unittest.main()