cameraVISSimulator.py
3.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import os
import sys
DEBUG_FILE = False
PACKAGE_PARENT = '..'
SCRIPT_DIR = os.path.dirname(os.path.realpath(os.path.join(os.getcwd(), os.path.expanduser(__file__))))
sys.path.append(os.path.normpath(os.path.join(SCRIPT_DIR, PACKAGE_PARENT)))
from utils.Device import Device
from utils.StatusManager import StatusManager
class CameraVISSimulator(Device, StatusManager):
status = {"status" : "VALID"}
exposure_time = 5
shutter_time = 3
cooler_timer = 5
def __init__(self, argv):
super().__init__(argv)
self.setDevice("CameraVIS")
self.setStatusManager("cameraVISSimulator", argv)
def cameraVISPrint(self, string: str):
if DEBUG_FILE:
print("cameraVISSimulator : " + string)
def handleConnection(self):
for co in self.connections:
data = self.readBytes(128, co)
answer = ""
cut_data = data.split(" ")
if (len(cut_data) < 2):
self.cameraVISPrint("Received invalid message on socket : " + data)
continue
cmd_type = cut_data[0]
cmd = cut_data[1]
args = cut_data[2:]
if (cmd_type == "GET"):
if (cmd == "STATUS"):
answer = self.create_status()
else:
answer = "Invalid cmd for GET: " + cmd
self.cameraVISPrint(answer)
elif (cmd_type == "DO"):
if (cmd == "CAPTURE"):
folder_name = args[0]
nb_images = int(args[1])
duration = args[2]
i = 0
while i < nb_images:
try:
with open(folder_name + os.sep + "vis_image_" + str(i), "w") as f:
f.write(folder_name + " " + str(nb_images) + " " + str(duration))
except Exception as e:
self.cameraVISPrint(str(e))
i += 1
answer = "CAPTURING"
else:
self.cameraVISPrint("Ignored message " + data)
if (len(answer) > 0):
self.sendMessage(answer, co)
return (0)
def loop(self):
i = 0
if (self.ended == 0):
self.cameraVISPrint("No entry for cameraVIS found in config file : " + self.config_file)
return (0)
while (True):
self.updateStatus(i)
if (self.isConnected()):
self.handleConnection()
i += 1
if (i > self.ended):
return (0)
return (1)
def run(self):
if DEBUG_FILE:
print("cameraVIS simulator running")
self.parse()
self.configSocket()
self.loop()
return (0)
if (__name__ == "__main__"):
sim = CameraVISSimulator(sys.argv)
sim.run()
# from Camera import Camera
#
# class CameraVIS(Camera):
# def __init__(self):
# super().__init__("CameraVIS")
#
# cam = CameraVIS()
# cam.loop()