Blame view

src/scheduler/Interval.py 6.2 KB
bca9a283   Jeremy   Reworked the sche...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from common.models import *
# from decimal import Decimal
from utils.Logger import Logger
from utils.JDManipulator import *
from utils.highPrecision import *

"""
    Simple class that represents an interval of time
    Julian days should be used
"""


class Interval:
    def __init__(self, start, end):
        self._start = Decimal(start)
        self._end = Decimal(end)
675fb3d5   Jeremy   Update scheduler ...
17
        self.duration = self._end - self._start
bca9a283   Jeremy   Reworked the sche...
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

    def __str__(self):
        print("[" + str(self.start) + " - " + str(self.end) + "]")

    def _get_start(self):
        return self._start

    def _set_start(self, start):
        if start > self._end:
            raise ValueError(
                "Cannot set start (%d): must be lower than end (%d)" % (start, self._end))
        self._start = start
        self.duration = self._end - self._start

    def _get_end(self):
        return self._end

    def _set_end(self, end):
        if end < self._start:
            raise ValueError(
                "Cannot set end (%d): must be bigger than start (%d)" % (end, self._start))
        self._end = end
        self.duration = self._end - self._start

    start = property(fget=_get_start, fset=_set_start)
    end = property(fget=_get_end, fset=_set_end)

257abe9b   Jeremy   Added comments
45
'''
ee2c4543   Etienne Pallier   Bugfix Monitoring...
46
    Class that allows to manage intervals
257abe9b   Jeremy   Added comments
47
48
49
50
    Only the Scheduler inherit from IntervalManagement
'''


bca9a283   Jeremy   Reworked the sche...
51
52
53
class IntervalManagement(Logger):
    def __init__(self, name: str = "IntervalManagement", file: str = "IntervalManagement"):
        super().__init__(name, file)
675fb3d5   Jeremy   Update scheduler ...
54
        self.max_overhead_seconds = 10
bca9a283   Jeremy   Reworked the sche...
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
        self.max_overhead = Decimal(self.max_overhead_seconds / JD_VALUE)
        self.intervals = []

    def getMatchingIntervals(self, sequence: Sequence):
        matching_intervals = []

        for interval in self.intervals:
            overlap = min(sequence.jd2, interval.end) - max(sequence.jd1, interval.start) - self.max_overhead
            if overlap > sequence.duration or is_nearby_equal(overlap, sequence.duration):
                matching_intervals.append(interval)
        return matching_intervals

    def getPreferredInterval(self, sequence: Sequence, matching_intervals) -> Interval:
        if sequence.t_prefered == 0 or len(matching_intervals) == 1:
            return matching_intervals[0]
        else:
            for index, interval in enumerate(matching_intervals):
                if is_between(interval.start, sequence.t_prefered, interval.end):
                    return interval
                elif sequence.t_prefered < interval.start:
                    if index == 0:
                        pref_int = interval
                    elif interval.start + self.max_overhead - sequence.t_prefered < sequence.t_prefered - matching_intervals[index - 1].end:
                        pref_int = interval
                    else:
                        pref_int = matching_intervals[index - 1]
                    return pref_int
        return matching_intervals[-1]

    def getSequencePositionInInterval(self, sequence: Sequence, interval: Interval) -> str:
        if is_between(interval.start, sequence.t_prefered, interval.end):
            if is_nearby_less_or_equal(sequence.t_prefered - Decimal(0.5) * sequence.duration, interval.start):
                position_in_interval = "START"
            elif is_nearby_sup_or_equal(sequence.t_prefered + Decimal(0.5) * sequence.duration, interval.end):
                position_in_interval = "END"
            else:
                position_in_interval = "PREFERRED"
        else:
            if sequence.t_prefered < interval.start:
                position_in_interval = "START"
            else:
                position_in_interval = "END"
        return position_in_interval

    def insertSequenceInInterval(self, sequence: Sequence, shs: ScheduleHasSequences,
                                 interval: Interval, position: str) -> int:
        if position not in ["START", "END", "PREFERRED"]:
            return 1
        if position == "END":
            shs.tep = min(interval.end, sequence.jd2)
            shs.tsp = shs.tep - sequence.duration
            shs.deltaTL = shs.tsp - max(interval.start + self.max_overhead, sequence.jd1)
            shs.deltaTR = Decimal(0)
        elif position == "PREFERRED":
            shs.tsp = max(sequence.jd1, sequence.t_prefered - Decimal(0.5) * sequence.duration)
            if shs.tsp - interval.start < self.max_overhead:
                shs.tsp = interval.start + self.max_overhead
            shs.tep = shs.tsp + sequence.duration
            shs.deltaTL = shs.tsp - max(interval.start + self.max_overhead, sequence.jd1)
            shs.deltaTR = min(interval.end, sequence.jd2) - shs.tep
        else:
            shs.tsp = max(interval.start + self.max_overhead, sequence.jd1)
            shs.tep = shs.tsp + sequence.duration
            shs.deltaTL = 0
            shs.deltaTR = min(interval.end, sequence.jd2) - shs.tep
        return 0

    def placeSequence(self, sequence: Sequence, shs: ScheduleHasSequences, matching_invervals):
        if len(matching_invervals) == 0:
            return None
        interval = self.getPreferredInterval(sequence, matching_invervals)
        if not interval:
            return None
        sequence_pos = self.getSequencePositionInInterval(sequence, interval)
        self.insertSequenceInInterval(sequence, shs, interval, sequence_pos)
        self.cutInterval(sequence, shs, interval)
        return interval

    def cutInterval(self, sequence: Sequence, shs: ScheduleHasSequences, interval: Interval) -> int:
        inter_before = Interval(interval.start, shs.tsp - self.max_overhead)
        inter_after = Interval(shs.tep, interval.end)
        self.intervals.remove(interval)
        if inter_before.duration > 0:
            self.intervals.append(inter_before)
        if inter_after.duration > 0:
            self.intervals.append(inter_after)
        self.intervals.sort(key=lambda interval: interval.start, reverse=False)
        return 0

    def getPotentialIntervals(self, sequence: Sequence):
        potential_intervals = []

        for interval in self.intervals:
            overlap = min(sequence.jd2, interval.end) - max(sequence.jd1, interval.start) - self.max_overhead
            if overlap > 0:
                potential_intervals.append(interval)

        return potential_intervals