Interval.py
6.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
from common.models import *
from seq_submit.models import Sequence, ScheduleHasSequences
# from decimal import Decimal
from utils.Logger import Logger
from utils.JDManipulator import *
from utils.highPrecision import *
"""
Simple class that represents an interval of time
Julian days should be used
"""
class Interval:
def __init__(self, start, end):
self._start = Decimal(start)
self._end = Decimal(end)
self.duration = self._end - self._start
def __str__(self):
print("[" + str(self.start) + " - " + str(self.end) + "]")
def _get_start(self):
return self._start
def _set_start(self, start):
if start > self._end:
raise ValueError(
"Cannot set start (%d): must be lower than end (%d)" % (start, self._end))
self._start = start
self.duration = self._end - self._start
def _get_end(self):
return self._end
def _set_end(self, end):
if end < self._start:
raise ValueError(
"Cannot set end (%d): must be bigger than start (%d)" % (end, self._start))
self._end = end
self.duration = self._end - self._start
start = property(fget=_get_start, fset=_set_start)
end = property(fget=_get_end, fset=_set_end)
'''
Class that allows to manage intervals
Only the Scheduler inherit from IntervalManagement
'''
class IntervalManagement(Logger):
def __init__(self, name: str = "IntervalManagement", file: str = "IntervalManagement"):
super().__init__(name, file)
self.max_overhead_seconds = 10
self.max_overhead = Decimal(self.max_overhead_seconds / JD_VALUE)
self.intervals = []
def getMatchingIntervals(self, sequence: Sequence):
matching_intervals = []
for interval in self.intervals:
overlap = min(sequence.jd2, interval.end) - max(sequence.jd1, interval.start) - self.max_overhead
if overlap > sequence.duration or is_nearby_equal(overlap, sequence.duration):
matching_intervals.append(interval)
return matching_intervals
def getPreferredInterval(self, sequence: Sequence, matching_intervals) -> Interval:
if sequence.t_prefered == 0 or len(matching_intervals) == 1:
return matching_intervals[0]
else:
for index, interval in enumerate(matching_intervals):
if is_between(interval.start, sequence.t_prefered, interval.end):
return interval
elif sequence.t_prefered < interval.start:
if index == 0:
pref_int = interval
elif interval.start + self.max_overhead - sequence.t_prefered < sequence.t_prefered - matching_intervals[index - 1].end:
pref_int = interval
else:
pref_int = matching_intervals[index - 1]
return pref_int
return matching_intervals[-1]
def getSequencePositionInInterval(self, sequence: Sequence, interval: Interval) -> str:
if is_between(interval.start, sequence.t_prefered, interval.end):
if is_nearby_less_or_equal(sequence.t_prefered - Decimal(0.5) * sequence.duration, interval.start):
position_in_interval = "START"
elif is_nearby_sup_or_equal(sequence.t_prefered + Decimal(0.5) * sequence.duration, interval.end):
position_in_interval = "END"
else:
position_in_interval = "PREFERRED"
else:
if sequence.t_prefered < interval.start:
position_in_interval = "START"
else:
position_in_interval = "END"
return position_in_interval
def insertSequenceInInterval(self, sequence: Sequence, shs: ScheduleHasSequences,
interval: Interval, position: str) -> int:
if position not in ["START", "END", "PREFERRED"]:
return 1
if position == "END":
shs.tep = min(interval.end, sequence.jd2)
shs.tsp = shs.tep - sequence.duration
shs.deltaTL = shs.tsp - max(interval.start + self.max_overhead, sequence.jd1)
shs.deltaTR = Decimal(0)
elif position == "PREFERRED":
shs.tsp = max(sequence.jd1, sequence.t_prefered - Decimal(0.5) * sequence.duration)
if shs.tsp - interval.start < self.max_overhead:
shs.tsp = interval.start + self.max_overhead
shs.tep = shs.tsp + sequence.duration
shs.deltaTL = shs.tsp - max(interval.start + self.max_overhead, sequence.jd1)
shs.deltaTR = min(interval.end, sequence.jd2) - shs.tep
else:
shs.tsp = max(interval.start + self.max_overhead, sequence.jd1)
shs.tep = shs.tsp + sequence.duration
shs.deltaTL = 0
shs.deltaTR = min(interval.end, sequence.jd2) - shs.tep
return 0
def placeSequence(self, sequence: Sequence, shs: ScheduleHasSequences, matching_invervals):
if len(matching_invervals) == 0:
return None
interval = self.getPreferredInterval(sequence, matching_invervals)
if not interval:
return None
sequence_pos = self.getSequencePositionInInterval(sequence, interval)
self.insertSequenceInInterval(sequence, shs, interval, sequence_pos)
self.cutInterval(sequence, shs, interval)
return interval
def cutInterval(self, sequence: Sequence, shs: ScheduleHasSequences, interval: Interval) -> int:
inter_before = Interval(interval.start, shs.tsp - self.max_overhead)
inter_after = Interval(shs.tep, interval.end)
self.intervals.remove(interval)
if inter_before.duration > 0:
self.intervals.append(inter_before)
if inter_after.duration > 0:
self.intervals.append(inter_after)
self.intervals.sort(key=lambda interval: interval.start, reverse=False)
return 0
def getPotentialIntervals(self, sequence: Sequence):
potential_intervals = []
for interval in self.intervals:
overlap = min(sequence.jd2, interval.end) - max(sequence.jd1, interval.start) - self.max_overhead
if overlap > 0:
potential_intervals.append(interval)
return potential_intervals